]> pd.if.org Git - btree/blob - threadskv10h.c
Fix small bug in main when there is less t han one input file
[btree] / threadskv10h.c
1 // btree version threadskv10h futex version
2 //      with reworked bt_deletekey code,
3 //      phase-fair re-entrant reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      ACID batched key-value updates
8 //      LSM B-trees for write optimization
9 //      larger sized leaf pages than non-leaf
10 //      and LSM B-tree find & count operations
11
12 // 15 DEC 2014
13
14 // author: karl malbrain, malbrain@cal.berkeley.edu
15
16 /*
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
19
20 The orginal author is Karl Malbrain.
21
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
28 */
29
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
32
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
35
36 #ifdef linux
37 #define _GNU_SOURCE
38 #include <xmmintrin.h>
39 #include <linux/futex.h>
40 #include <sys/syscall.h>
41 #endif
42
43 #ifdef unix
44 #include <unistd.h>
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <fcntl.h>
48 #include <sys/time.h>
49 #include <sys/mman.h>
50 #include <errno.h>
51 #include <pthread.h>
52 #include <limits.h>
53 #else
54 #define WIN32_LEAN_AND_MEAN
55 #include <windows.h>
56 #include <stdio.h>
57 #include <stdlib.h>
58 #include <time.h>
59 #include <fcntl.h>
60 #include <process.h>
61 #include <intrin.h>
62 #endif
63
64 #include <memory.h>
65 #include <string.h>
66 #include <stddef.h>
67
68 typedef unsigned long long      uid;
69 typedef unsigned long long      logseqno;
70
71 #ifndef unix
72 typedef unsigned long long      off64_t;
73 typedef unsigned short          ushort;
74 typedef unsigned int            uint;
75 #endif
76
77 #define BT_ro 0x6f72    // ro
78 #define BT_rw 0x7772    // rw
79
80 #define BT_maxbits              26                                      // maximum page size in bits
81 #define BT_minbits              9                                       // minimum page size in bits
82 #define BT_minpage              (1 << BT_minbits)       // minimum page size
83 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
84
85 //  BTree page number constants
86 #define ALLOC_page              0       // allocation page
87 #define ROOT_page               1       // root of the btree
88 #define LATCH_page              2       // first page of latches
89
90 #define SEG_bits                16      // number of leaf pages in a segment in bits
91 #define MIN_seg                 32      // initial number of mapping segments
92
93 //      Number of levels to create in a new BTree
94 #define MIN_lvl                 2
95
96 /*
97 There are six lock types for each node in four independent sets: 
98 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
99 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
100 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
101 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
102 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
103 6. (set 4) LinkModification: Exclusive. Update of a node's left link is underway. Incompatible with another LinkModification. 
104 */
105
106 typedef enum{
107         BtLockAccess = 1,
108         BtLockDelete = 2,
109         BtLockRead   = 4,
110         BtLockWrite  = 8,
111         BtLockParent = 16,
112         BtLockLink   = 32
113 } BtLock;
114
115 typedef struct {
116   union {
117         struct {
118           volatile unsigned char xcl[1];
119           volatile unsigned char filler;
120           volatile ushort waiters[1];
121         } bits[1];
122         uint value[1];
123   };
124 } MutexLatch;
125
126 //      definition for reader/writer reentrant lock implementation
127
128 typedef struct {
129   MutexLatch xcl[1];
130   MutexLatch wrt[1];
131   ushort readers;       // number of readers holding lock
132 #ifdef DEBUG
133   ushort line;          // owner source line number
134 #endif
135   ushort dup;           // re-entrant lock count
136   pid_t tid;            // owner pid
137 } RWLock;
138
139 //  hash table entries
140
141 typedef struct {
142         MutexLatch latch[1];
143         uint entry;             // Latch table entry at head of chain
144 } BtHashEntry;
145
146 //      latch manager table structure
147
148 typedef struct {
149         uid page_no;                    // latch set page number
150         MutexLatch modify[1];   // modify entry lite latch
151         RWLock readwr[1];               // read/write page lock
152         RWLock access[1];               // Access Intent/Page delete
153         RWLock parent[1];               // Posting of fence key in parent
154         RWLock link[1];                 // left link update in progress
155         uint split;                             // right split page atomic insert
156         uint next;                              // next entry in hash table chain
157         uint prev;                              // prev entry in hash table chain
158         uint pin;                               // number of accessing threads
159 } BtLatchSet;
160
161 //      Define the length of the page record numbers
162
163 #define BtId 6
164
165 //      Page key slot definition.
166
167 //      Keys are marked dead, but remain on the page until
168 //      it cleanup is called. The fence key (highest key) for
169 //      a leaf page is always present, even after cleanup.
170
171 //      Slot types
172
173 //      In addition to the Unique keys that occupy slots
174 //      there are Librarian and Duplicate key
175 //      slots occupying the key slot array.
176
177 //      The Librarian slots are dead keys that
178 //      serve as filler, available to add new Unique
179 //      or Dup slots that are inserted into the B-tree.
180
181 //      The Duplicate slots have had their key bytes extended
182 //      by 6 bytes to contain a binary duplicate key uniqueifier.
183
184 typedef enum {
185         Unique,
186         Update,
187         Librarian,
188         Duplicate,
189         Delete
190 } BtSlotType;
191
192 typedef struct {
193         uint off:BT_maxbits;    // page offset for key start
194         uint type:3;                    // type of slot
195         uint dead:1;                    // set for deleted slot
196 } BtSlot;
197
198 //      The key structure occupies space at the upper end of
199 //      each page.  It's a length byte followed by the key
200 //      bytes.
201
202 typedef struct {
203         unsigned char len;              // this can be changed to a ushort or uint
204         unsigned char key[0];
205 } BtKey;
206
207 //      the value structure also occupies space at the upper
208 //      end of the page. Each key is immediately followed by a value.
209
210 typedef struct {
211         unsigned char len;              // this can be changed to a ushort or uint
212         unsigned char value[0];
213 } BtVal;
214
215 #define BT_maxkey       255             // maximum number of bytes in a key
216 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
217
218 //      The first part of an index page.
219 //      It is immediately followed
220 //      by the BtSlot array of keys.
221
222 typedef struct BtPage_ {
223         uint cnt;                                       // count of keys in page
224         uint act;                                       // count of active keys
225         uint min;                                       // next key/value offset
226         uint fence;                                     // page fence key offset
227         uint garbage;                           // page garbage in bytes
228         unsigned char lvl;                      // level of page, zero = leaf
229         unsigned char free;                     // page is on the free chain
230         unsigned char kill;                     // page is being deleted
231         unsigned char nopromote;        // page is being constructed
232         uid right, left;                        // page numbers to right and left
233 } *BtPage;
234
235 //  The loadpage interface object
236
237 typedef struct {
238         BtPage page;            // current page pointer
239         BtLatchSet *latch;      // current page latch set
240 } BtPageSet;
241
242 //      structure for latch manager on shared ALLOC_page
243
244 typedef struct {
245         uid allocpage;                                  // page number of first available page
246         uid freechain;                                  // head of free page_nos chain
247         uid leafchain;                                  // head of leaf page_nos chain
248         uid leaf_page;                                  // page number of leftmost leaf
249         uid rightleaf;                                  // page number of rightmost leaf
250         uid leafpromote;                                // next leaf page to try promotion
251         unsigned long long leafpages;   // number of active leaf pages
252         unsigned long long upperpages;  // number of active upper pages
253         unsigned char leaf_xtra;                // leaf page size in xtra bits
254         unsigned char page_bits;                // base page size in bits
255         uint nlatchpage;                                // size of buffer pool & latchsets
256         uint latchtotal;                                // number of page latch entries
257         uint latchvictim;                               // next latch entry to test for pin
258         uint latchhash;                                 // number of latch hash table slots
259         MutexLatch lock[1];                             // allocation area lite latch
260         MutexLatch promote[1];                  // promotion lite latch
261 } BtPageZero;
262
263 //      The object structure for Btree access
264
265 typedef struct {
266         uint page_size;                         // base page size       
267         uint page_bits;                         // base page size in bits       
268         uint leaf_xtra;                         // leaf xtra bits       
269 #ifdef unix
270         int idx;
271 #else
272         HANDLE idx;
273 #endif
274         BtPageZero *pagezero;           // mapped allocation page
275         BtHashEntry *hashtable;         // the buffer pool hash table entries
276         BtLatchSet *latchsets;          // mapped latch set from buffer pool
277         uint maxleaves;                         // leaf page count to begin promote
278         int err;                                        // last error
279         int line;                                       // last error line no
280         int found;                                      // number of keys found by delete
281         int type;                                       // type of LSM tree 0=cache, 1=main
282         uint maxseg;                            // max number of memory mapped segments
283         uint segments;                          // number of memory mapped segments in use
284         MutexLatch maps[1];                     // segment table mutex
285         unsigned char **pages;          // memory mapped segments of b-tree
286 } BtMgr;
287
288 typedef struct {
289         BtMgr *mgr;                                     // buffer manager for entire process
290         BtMgr *main;                            // buffer manager for main btree
291         pid_t tid;                                      // thread-id of thread
292         BtPageSet cacheset[1];          // cached page frame for cache btree
293         BtPageSet mainset[1];           // cached page frame for main btree
294         uint cacheslot;                         // slot number in cacheset
295         uint mainslot;                          // slot number in mainset
296         ushort phase;                           // 1 = main btree 0 = cache btree 2 = both
297         BtSlot *cachenode;
298         BtSlot *mainnode;
299         BtKey *cachekey;
300         BtKey *mainkey;
301         BtVal *cacheval;
302         BtVal *mainval;
303 } BtDb;
304
305 typedef struct {
306         uint entry:31;          // latch table entry number
307         uint reuse:1;           // reused previous page
308         uint slot;                      // slot on page
309         uint src;                       // source slot
310 } AtomicTxn;
311
312 //      Catastrophic errors
313
314 typedef enum {
315         BTERR_ok = 0,
316         BTERR_struct,
317         BTERR_ovflw,
318         BTERR_lock,
319         BTERR_map,
320         BTERR_read,
321         BTERR_wrt,
322         BTERR_atomic
323 } BTERR;
324
325 // B-Tree functions
326
327 extern void bt_close (BtDb *bt);
328 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
329 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
330 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, pid_t tid, uint line);
331 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch, uint line);
332 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type);
333 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl);
334
335 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
336
337 extern BTERR bt_startkey (BtDb *db, unsigned char *key, uint len);
338 extern BTERR bt_nextkey (BtDb *bt);
339
340 extern uint bt_lastkey (BtDb *bt);
341 extern uint bt_prevkey (BtDb *bt);
342
343 //      manager functions
344 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize);
345 extern void bt_mgrclose (BtMgr *mgr);
346
347 //      atomic transaction functions
348 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, uint count, pid_t tid);
349 BTERR bt_promote (BtDb *bt);
350
351 //  The page is allocated from low and hi ends.
352 //  The key slots are allocated from the bottom,
353 //      while the text and value of the key
354 //  are allocated from the top.  When the two
355 //  areas meet, the page is split into two.
356
357 //  A key consists of a length byte, two bytes of
358 //  index number (0 - 65535), and up to 253 bytes
359 //  of key value.
360
361 //  Associated with each key is a value byte string
362 //      containing any value desired.
363
364 //  The b-tree root is always located at page 1.
365 //      The first leaf page of level zero is always
366 //      located on page 2.
367
368 //      The b-tree pages are linked with next
369 //      pointers to facilitate enumerators,
370 //      and provide for concurrency.
371
372 //      When to root page fills, it is split in two and
373 //      the tree height is raised by a new root at page
374 //      one with two keys.
375
376 //      Deleted keys are marked with a dead bit until
377 //      page cleanup. The fence key for a leaf node is
378 //      always present
379
380 //  To achieve maximum concurrency one page is locked at a time
381 //  as the tree is traversed to find leaf key in question. The right
382 //  page numbers are used in cases where the page is being split,
383 //      or consolidated.
384
385 //  Page 0 is dedicated to lock for new page extensions,
386 //      and chains empty pages together for reuse. It also
387 //      contains the latch manager hash table.
388
389 //      The ParentModification lock on a node is obtained to serialize posting
390 //      or changing the fence key for a node.
391
392 //      Empty pages are chained together through the ALLOC page and reused.
393
394 //      Access macros to address slot and key values from the page
395 //      Page slots use 1 based indexing.
396
397 #define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1))
398 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
399 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
400 #define fenceptr(page) ((BtKey*)((unsigned char*)(page) + page->fence))
401
402 void bt_putid(unsigned char *dest, uid id)
403 {
404 int i = BtId;
405
406         while( i-- )
407                 dest[i] = (unsigned char)id, id >>= 8;
408 }
409
410 uid bt_getid(unsigned char *src)
411 {
412 uid id = 0;
413 int i;
414
415         for( i = 0; i < BtId; i++ )
416                 id <<= 8, id |= *src++; 
417
418         return id;
419 }
420
421 //      lite weight spin lock Latch Manager
422
423 pid_t sys_gettid ()
424 {
425         return syscall(SYS_gettid);
426 }
427
428 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
429 {
430         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
431 }
432
433 void bt_mutexlock(MutexLatch *latch)
434 {
435 uint idx, waited = 0;
436 MutexLatch prev[1];
437
438  while( 1 ) {
439   for( idx = 0; idx < 100; idx++ ) {
440         *prev->value = __sync_fetch_and_or (latch->value, 1);
441         if( !*prev->bits->xcl ) {
442           if( waited )
443                 __sync_fetch_and_sub (latch->bits->waiters, 1);
444           return;
445         }
446   }
447
448   if( !waited ) {
449         __sync_fetch_and_add (latch->bits->waiters, 1);
450         *prev->bits->waiters += 1;
451         waited++;
452   }
453
454   sys_futex (latch->value, FUTEX_WAIT, *prev->value, NULL, NULL, 0);
455  }
456 }
457
458 int bt_mutextry(MutexLatch *latch)
459 {
460         return !__sync_lock_test_and_set (latch->bits->xcl, 1);
461 }
462
463 void bt_releasemutex(MutexLatch *latch)
464 {
465 MutexLatch prev[1];
466
467         *prev->value = __sync_fetch_and_and (latch->value, 0xffff0000);
468
469         if( *prev->bits->waiters )
470                 sys_futex( latch->value, FUTEX_WAKE, 1, NULL, NULL, 0 );
471 }
472
473 //      reader/writer lock implementation
474
475 void WriteLock (RWLock *lock, pid_t tid, uint line)
476 {
477         if( tid && lock->tid == tid ) {
478                 lock->dup++;
479                 return;
480         }
481         bt_mutexlock (lock->xcl);
482         bt_mutexlock (lock->wrt);
483         bt_releasemutex (lock->xcl);
484         lock->tid = tid;
485 #ifdef DEBUG
486         lock->line = line;
487 #endif
488 }
489
490 void WriteRelease (RWLock *lock)
491 {
492         if( lock->dup ) {
493                 lock->dup--;
494                 return;
495         }
496         lock->tid = 0;
497         bt_releasemutex (lock->wrt);
498 }
499
500 void ReadLock (RWLock *lock)
501 {
502         bt_mutexlock (lock->xcl);
503
504         if( !__sync_fetch_and_add (&lock->readers, 1) )
505                 bt_mutexlock (lock->wrt);
506
507         bt_releasemutex (lock->xcl);
508 }
509
510 void ReadRelease (RWLock *lock)
511 {
512         if( __sync_fetch_and_sub (&lock->readers, 1) == 1 )
513                 bt_releasemutex (lock->wrt);
514 }
515
516 //      read page into buffer pool from permanent location in Btree file
517
518 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
519 {
520 uint page_size = mgr->page_size;
521
522   if( leaf )
523         page_size <<= mgr->leaf_xtra;
524
525   if( pread(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
526         return mgr->err = BTERR_read;
527
528   return 0;
529 }
530
531 //      write page to location in Btree file
532
533 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
534 {
535 uint page_size = mgr->page_size;
536
537   if( leaf )
538         page_size <<= mgr->leaf_xtra;
539
540   if( pwrite(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
541         return mgr->err = BTERR_wrt;
542
543   return 0;
544 }
545
546 //      decrement pin count
547
548 void bt_unpinlatch (BtLatchSet *latch)
549 {
550         bt_mutexlock(latch->modify);
551         latch->pin--;
552         bt_releasemutex(latch->modify);
553 }
554
555 //  return the btree cached page address
556
557 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
558 {
559 uint segment = latch->page_no >> SEG_bits;
560 int flag = PROT_READ | PROT_WRITE;
561 uid mask = (uid)1 << SEG_bits;
562 BtPage page;
563
564   bt_mutexlock (mgr->maps);
565   mask--;
566
567   while( 1 ) {
568         if( segment < mgr->segments ) {
569           page = (BtPage)(mgr->pages[segment] + ((latch->page_no & mask) << mgr->page_bits));
570
571           bt_releasemutex (mgr->maps);
572           return page;
573         }
574
575         if( mgr->segments < mgr->maxseg ) {
576           mgr->pages[mgr->segments] = mmap (0, (uid)mgr->page_size << SEG_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << mgr->page_bits << SEG_bits);
577           mgr->segments++;
578           continue;
579         }
580
581         mgr->maxseg <<= 1;
582         mgr->pages = realloc (mgr->pages, mgr->maxseg * sizeof(void *));
583   }
584 }
585
586 //  return next available latch entry
587 //        and with latch entry locked
588
589 uint bt_availnext (BtMgr *mgr)
590 {
591 BtLatchSet *latch;
592 uint entry;
593
594   while( 1 ) {
595 #ifdef unix
596         entry = __sync_fetch_and_add (&mgr->pagezero->latchvictim, 1) + 1;
597 #else
598         entry = _InterlockedIncrement (&mgr->pagezero->latchvictim);
599 #endif
600         entry %= mgr->pagezero->latchtotal;
601
602         if( !entry )
603                 continue;
604
605         latch = mgr->latchsets + entry;
606
607         if( !bt_mutextry(latch->modify) )
608                 continue;
609
610         //  return this entry if it is not pinned
611
612         if( !latch->pin )
613                 return entry;
614
615         bt_releasemutex(latch->modify);
616   }
617 }
618
619 //      pin latch in latch pool
620
621 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no)
622 {
623 uint hashidx = page_no % mgr->pagezero->latchhash;
624 uint entry, oldidx;
625 BtLatchSet *latch;
626 BtPage page;
627
628   //  try to find our entry
629
630   bt_mutexlock(mgr->hashtable[hashidx].latch);
631
632   if( entry = mgr->hashtable[hashidx].entry ) do
633   {
634         latch = mgr->latchsets + entry;
635
636         if( page_no == latch->page_no )
637                 break;
638   } while( entry = latch->next );
639
640   //  found our entry: increment pin
641
642   if( entry ) {
643         latch = mgr->latchsets + entry;
644         bt_mutexlock(latch->modify);
645         latch->pin++;
646         bt_releasemutex(latch->modify);
647         bt_releasemutex(mgr->hashtable[hashidx].latch);
648         return latch;
649   }
650
651   //  find and reuse unpinned entry
652
653 trynext:
654   entry = bt_availnext (mgr);
655   latch = mgr->latchsets + entry;
656   oldidx = latch->page_no % mgr->pagezero->latchhash;
657
658   //  skip over this entry if latch not available
659
660   if( latch->page_no )
661    if( oldidx != hashidx )
662     if( !bt_mutextry (mgr->hashtable[oldidx].latch) ) {
663           bt_releasemutex(latch->modify);
664           goto trynext;
665         }
666
667   //  if latch is on a different hash chain
668   //    unlink from the old page_no chain
669
670   if( latch->page_no )
671    if( oldidx != hashidx ) {
672         if( latch->prev )
673           mgr->latchsets[latch->prev].next = latch->next;
674         else
675           mgr->hashtable[oldidx].entry = latch->next;
676
677         if( latch->next )
678           mgr->latchsets[latch->next].prev = latch->prev;
679
680     bt_releasemutex (mgr->hashtable[oldidx].latch);
681    }
682
683   //  link page as head of hash table chain
684   //  if this is a never before used entry,
685   //  or it was previously on a different
686   //  hash table chain. Otherwise, just
687   //  leave it in its current hash table
688   //  chain position.
689
690   if( !latch->page_no || hashidx != oldidx ) {
691         if( latch->next = mgr->hashtable[hashidx].entry )
692           mgr->latchsets[latch->next].prev = entry;
693
694         mgr->hashtable[hashidx].entry = entry;
695     latch->prev = 0;
696   }
697
698   //  fill in latch structure
699
700   latch->page_no = page_no;
701   latch->pin = 1;
702
703   bt_releasemutex (latch->modify);
704   bt_releasemutex (mgr->hashtable[hashidx].latch);
705   return latch;
706 }
707   
708 void bt_mgrclose (BtMgr *mgr)
709 {
710 char *name = mgr->type ? "Main" : "Cache";
711 BtLatchSet *latch;
712 uint num = 0;
713 BtPage page;
714 uint entry;
715
716         //      flush previously written dirty pages
717         //      and write recovery buffer to disk
718
719         fdatasync (mgr->idx);
720
721 #ifdef unix
722         while( mgr->segments )
723                 munmap (mgr->pages[--mgr->segments], (uid)mgr->page_size << SEG_bits);
724 #else
725         while( mgr->segments ) {
726                 FlushViewOfFile(mgr->pages[--mgr->segments], 0);
727                 UnmapViewOfFile(mgr->pages[mgr->Segments]);
728         }
729 #endif
730 #ifdef unix
731         close (mgr->idx);
732         free (mgr);
733 #else
734         FlushFileBuffers(mgr->idx);
735         CloseHandle(mgr->idx);
736         GlobalFree (mgr);
737 #endif
738 }
739
740 //      close and release memory
741
742 void bt_close (BtDb *bt)
743 {
744         free (bt);
745 }
746
747 void bt_initpage (BtMgr *mgr, BtPage page, uid leaf_page_no, uint lvl)
748 {
749 BtSlot *node = slotptr(page, 1);
750 unsigned char value[BtId];
751 uid page_no;
752 BtKey* key;
753 BtVal *val;
754
755         page_no = lvl ? ROOT_page : leaf_page_no;
756         node->off = mgr->page_size;
757
758         if( !lvl )
759                 node->off <<= mgr->leaf_xtra;
760
761         node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
762         node->type = Librarian;
763         node++->dead = 1;
764
765         node->off = node[-1].off;
766         key = keyptr(page, 2);
767         key = keyptr(page, 1);
768         key->len = 2;           // create stopper key
769         key->key[0] = 0xff;
770         key->key[1] = 0xff;
771
772         bt_putid(value, leaf_page_no);
773         val = valptr(page, 1);
774         val->len = lvl ? BtId : 0;
775         memcpy (val->value, value, val->len);
776
777         page->fence = node->off;
778         page->min = node->off;
779         page->lvl = lvl;
780         page->cnt = 2;
781         page->act = 1;
782
783         if( bt_writepage (mgr, page, page_no, !lvl) ) {
784                 fprintf (stderr, "Unable to create btree page %d\n", page_no);
785                 exit(0);
786         }
787 }
788
789 //  open/create new btree buffer manager
790
791 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
792 //              extra bits for leaves (e.g. 4) size of latch pool (e.g. 500)
793
794 BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax)
795 {
796 uint lvl, attr, last, slot, idx, blk;
797 int flag, initit = 0;
798 BtPageZero *pagezero;
799 struct flock lock[1];
800 BtLatchSet *latch;
801 uid leaf_page;
802 off64_t size;
803 BtPage page;
804 uint amt[1];
805 BtMgr* mgr;
806
807         // determine sanity of page size and buffer pool
808
809         if( leafxtra | pagebits )
810           if( leafxtra + pagebits > BT_maxbits )
811                 fprintf (stderr, "pagebits + leafxtra > maxbits\n"), exit(1);
812
813         if( pagebits )
814           if( pagebits < BT_minbits )
815                 fprintf (stderr, "pagebits < minbits\n"), exit(1);
816
817 #ifdef unix
818         mgr = calloc (1, sizeof(BtMgr));
819
820         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
821
822         if( mgr->idx == -1 ) {
823                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
824                 return free(mgr), NULL;
825         }
826
827         memset (lock, 0, sizeof(lock));
828         lock->l_len = sizeof(struct BtPage_);
829         lock->l_type = F_WRLCK;
830
831         if( fcntl (mgr->idx, F_SETLKW, lock) < 0 ) {
832                 fprintf(stderr, "unable to lock record zero %s\n", name);
833                 exit(1);
834         }
835 #else
836         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
837         attr = FILE_ATTRIBUTE_NORMAL;
838         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
839
840         if( mgr->idx == INVALID_HANDLE_VALUE ) {
841                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
842                 return GlobalFree(mgr), NULL;
843         }
844 #endif
845
846 #ifdef unix
847         pagezero = valloc (BT_maxpage);
848         page = (BtPage)pagezero;
849         *amt = 0;
850
851         // read minimum page size to get root info
852         //      to support raw disk partition files
853         //      check if page_bits == 0 on the disk.
854
855         if( size = lseek (mgr->idx, 0L, 2) )
856                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
857                         if( pagezero->page_bits ) {
858                                 pagebits = pagezero->page_bits;
859                                 leafxtra = pagezero->leaf_xtra;
860                         } else
861                                 initit = 1;
862                 else
863                         return free(mgr), free(pagezero), NULL;
864         else
865                 initit = 1;
866 #else
867         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
868         size = GetFileSize(mgr->idx, amt);
869
870         if( size || *amt ) {
871                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
872                         return bt_mgrclose (mgr), NULL;
873                 pagebits = pagezero->page_bits;
874                 leafxtra = pagezero->leaf_xtra;
875         } else
876                 initit = 1;
877 #endif
878
879         mgr->page_size = 1 << pagebits;
880         mgr->page_bits = pagebits;
881         mgr->leaf_xtra = leafxtra;
882
883         if( !initit )
884                 goto mgrlatch;
885
886         //  calculate number of latch table & hash entries
887
888         memset (pagezero, 0, 1 << pagebits);
889         pagezero->nlatchpage = nodemax/16 * sizeof(BtHashEntry);
890
891         pagezero->nlatchpage += sizeof(BtLatchSet) * nodemax + mgr->page_size - 1;
892         pagezero->nlatchpage >>= mgr->page_bits;
893         pagezero->latchtotal = nodemax;
894
895         pagezero->latchhash = (((uid)pagezero->nlatchpage<< mgr->page_bits) - nodemax * sizeof(BtLatchSet)) / sizeof(BtHashEntry);
896
897         // initialize an empty b-tree with alloc page, root page, leaf page
898         // and page(s) of latches and page pool cache
899
900         pagezero->page_bits = mgr->page_bits;
901         pagezero->leaf_xtra = leafxtra;
902         pagezero->upperpages = 1;
903         pagezero->leafpages = 1;
904
905         leaf_page = pagezero->leaf_page = pagezero->nlatchpage + LATCH_page;
906
907         //      round first leafpage up to leafxtra boundary
908
909         if( pagezero->leaf_page & ((1 << leafxtra) - 1)) {
910                 blk = pagezero->leaf_page;
911                 pagezero->leaf_page |= (1 << leafxtra) - 1;
912                 pagezero->freechain = pagezero->leaf_page++;
913                 leaf_page = pagezero->leaf_page;
914         } else
915                 blk = 0;
916
917         pagezero->rightleaf = pagezero->leaf_page;
918         pagezero->allocpage = pagezero->leaf_page + (1 << leafxtra);
919
920         if( pwrite (mgr->idx, pagezero, 1 << pagebits, 0) < 1 << pagebits) {
921                 fprintf (stderr, "Unable to create btree page zero\n");
922                 return bt_mgrclose (mgr), NULL;
923         }
924
925         //      initialize root level 1 page
926
927         memset (page, 0, 1 << pagebits);
928         bt_initpage (mgr, page, leaf_page, 1);
929
930         //  chain unused pages as first freelist
931
932         memset (page, 0, 1 << pagebits);
933
934         while( blk & ((1 << leafxtra) - 1) ) {
935           if( bt_writepage (mgr, page, blk, 0) ) {
936                 fprintf(stderr, "unable to write initial free blk %d\r\n", blk);
937                 exit(1);
938           }
939           page->right = blk++;
940         }
941
942         // initialize first page of leaves
943
944         memset (page, 0, 1 << pagebits);
945         bt_initpage (mgr, page, leaf_page, 0);
946
947 mgrlatch:
948 #ifdef unix
949         free (pagezero);
950 #else
951         VirtualFree (pagezero, 0, MEM_RELEASE);
952 #endif
953
954         lock->l_type = F_UNLCK;
955
956         if( fcntl (mgr->idx, F_SETLK, lock) < 0 ) {
957                 fprintf (stderr, "Unable to unlock page zero\n");
958                 exit(1);
959         }
960
961         //      map first segment
962
963         mgr->segments = 1;
964         mgr->maxseg = MIN_seg;
965         mgr->pages = calloc (MIN_seg, sizeof(unsigned char *));
966
967         flag = PROT_READ | PROT_WRITE;
968         mgr->pages[0] = mmap (0, (uid)mgr->page_size << SEG_bits, flag, MAP_SHARED, mgr->idx, 0);
969
970         if( mgr->pages[0] == MAP_FAILED ) {
971                 fprintf (stderr, "Unable to mmap pagezero btree segment, error = %d\n", errno);
972                 return bt_mgrclose (mgr), NULL;
973         }
974
975         mgr->pagezero = (BtPageZero *)mgr->pages[0];
976 //      mlock (mgr->pagezero, mgr->page_size);
977
978         //      allocate latch pool
979
980         mgr->latchsets = (BtLatchSet *)(mgr->pages[0] + ((uid)LATCH_page << mgr->page_bits));
981         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->pagezero->latchtotal);
982
983         return mgr;
984 }
985
986 //      open BTree access method
987 //      based on buffer manager
988
989 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
990 {
991 BtDb *bt = malloc (sizeof(*bt));
992
993         memset (bt, 0, sizeof(*bt));
994         bt->tid = sys_gettid();
995         bt->main = main;
996         bt->mgr = mgr;
997         return bt;
998 }
999
1000 //  compare two keys, return > 0, = 0, or < 0
1001 //  =0: keys are same
1002 //  -1: key2 > key1
1003 //  +1: key2 < key1
1004 //  as the comparison value
1005
1006 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1007 {
1008 uint len1 = key1->len;
1009 int ans;
1010
1011         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1012                 return ans;
1013
1014         if( len1 > len2 )
1015                 return 1;
1016         if( len1 < len2 )
1017                 return -1;
1018
1019         return 0;
1020 }
1021
1022 // place write, read, or parent lock on requested page_no.
1023
1024 void bt_lockpage(BtLock mode, BtLatchSet *latch, pid_t tid, uint line)
1025 {
1026         switch( mode ) {
1027         case BtLockRead:
1028                 ReadLock (latch->readwr);
1029                 break;
1030         case BtLockWrite:
1031                 WriteLock (latch->readwr, tid, line);
1032                 break;
1033         case BtLockAccess:
1034                 ReadLock (latch->access);
1035                 break;
1036         case BtLockDelete:
1037                 WriteLock (latch->access, tid, line);
1038                 break;
1039         case BtLockParent:
1040                 WriteLock (latch->parent, tid, line);
1041                 break;
1042         case BtLockLink:
1043                 WriteLock (latch->link, tid, line);
1044                 break;
1045         }
1046 }
1047
1048 // remove write, read, or parent lock on requested page
1049
1050 void bt_unlockpage(BtLock mode, BtLatchSet *latch, uint line)
1051 {
1052         switch( mode ) {
1053         case BtLockRead:
1054                 ReadRelease (latch->readwr);
1055                 break;
1056         case BtLockWrite:
1057                 WriteRelease (latch->readwr);
1058                 break;
1059         case BtLockAccess:
1060                 ReadRelease (latch->access);
1061                 break;
1062         case BtLockDelete:
1063                 WriteRelease (latch->access);
1064                 break;
1065         case BtLockParent:
1066                 WriteRelease (latch->parent);
1067                 break;
1068         case BtLockLink:
1069                 WriteRelease (latch->link);
1070                 break;
1071         }
1072 }
1073
1074 //      allocate a new page
1075 //      return with page latched, but unlocked.
1076 //      contents is cleared for lvl > 0
1077
1078 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents)
1079 {
1080 uint page_size = mgr->page_size, blk;
1081 uid *freechain;
1082 uid page_no;
1083
1084         //      lock allocation page
1085
1086         bt_mutexlock(mgr->pagezero->lock);
1087
1088         if( contents->lvl ) {
1089                 freechain = &mgr->pagezero->freechain;
1090                 mgr->pagezero->upperpages++;
1091         } else {
1092                 freechain = &mgr->pagezero->leafchain;
1093                 mgr->pagezero->leafpages++;
1094                 page_size <<= mgr->leaf_xtra;
1095         }
1096
1097         // use empty chain first
1098         // else allocate new page
1099
1100         if( page_no = *freechain ) {
1101                 if( set->latch = bt_pinlatch (mgr, page_no) )
1102                         set->page = bt_mappage (mgr, set->latch);
1103                 else
1104                         return mgr->line = __LINE__, mgr->err = BTERR_struct;
1105
1106                 *freechain = set->page->right;
1107
1108                 //  the page is currently nopromote and this
1109                 //  will keep bt_promote out.
1110
1111                 //      contents will replace this bit
1112                 //  and pin will keep bt_promote out
1113
1114                 contents->nopromote = 0;
1115
1116                 memcpy (set->page, contents, page_size);
1117
1118 //              if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1119 //                fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1120
1121                 bt_releasemutex(mgr->pagezero->lock);
1122                 return 0;
1123         }
1124
1125         //  obtain next available page number
1126         //      suitable for leaf or higher level
1127
1128         page_no = mgr->pagezero->allocpage;
1129         mgr->pagezero->allocpage += 1 << mgr->leaf_xtra;
1130
1131         //      keep bt_promote out of this page
1132
1133         contents->nopromote = 1;
1134
1135         // unlock allocation latch and
1136         //      extend file into new page.
1137
1138 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1139 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1140
1141         if( bt_writepage (mgr, contents, page_no, !contents->lvl) )
1142                 fprintf(stderr, "Write %lld error %d\n", page_no + blk, errno);
1143
1144         //      chain together unused non-leaf allocation
1145
1146         if( contents->lvl ) {
1147           memset (contents, 0, mgr->page_size);
1148
1149           for( blk = 1; blk < 1 << mgr->leaf_xtra; blk++ ) {
1150                 if( bt_writepage (mgr, contents, page_no + blk, 0) )
1151                   fprintf(stderr, "Write %lld error %d\n", page_no + blk, errno);
1152                 contents->right = page_no + blk;
1153                 *freechain = page_no + blk;
1154           }
1155         }
1156
1157         bt_releasemutex(mgr->pagezero->lock);
1158
1159         if( set->latch = bt_pinlatch (mgr, page_no) )
1160                 set->page = bt_mappage (mgr, set->latch);
1161         else
1162                 return mgr->err;
1163
1164         // now pin will keep bt_promote out
1165
1166         set->page->nopromote = 0;
1167         return 0;
1168 }
1169
1170 //  find slot in page for given key at a given level
1171
1172 int bt_findslot (BtPage page, unsigned char *key, uint len)
1173 {
1174 uint diff, higher = page->cnt, low = 1, slot;
1175 uint good = 0;
1176
1177         //        make stopper key an infinite fence value
1178
1179         if( page->right )
1180                 higher++;
1181         else
1182                 good++;
1183
1184         //      low is the lowest candidate.
1185         //  loop ends when they meet
1186
1187         //  higher is already
1188         //      tested as .ge. the passed key.
1189
1190         while( diff = higher - low ) {
1191                 slot = low + ( diff >> 1 );
1192                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1193                         low = slot + 1;
1194                 else
1195                         higher = slot, good++;
1196         }
1197
1198         //      return zero if key is on right link page
1199
1200         return good ? higher : 0;
1201 }
1202
1203 //  find and load page at given level for given key
1204 //      leave page rd or wr locked as requested
1205
1206 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, pid_t tid)
1207 {
1208 uid page_no = ROOT_page, prevpage_no = 0;
1209 uint drill = 0xff, slot;
1210 uint mode, prevmode;
1211 BtPageSet prev[1];
1212 BtVal *val;
1213 BtKey *ptr;
1214
1215   //  start at root of btree and drill down
1216
1217   do {
1218         if( set->latch = bt_pinlatch (mgr, page_no) )
1219           set->page = bt_mappage (mgr, set->latch);
1220         else
1221           return 0;
1222
1223         if( page_no > ROOT_page )
1224           bt_lockpage(BtLockAccess, set->latch, tid, __LINE__);
1225
1226         //      release & unpin parent or left sibling page
1227
1228         if( prevpage_no ) {
1229           bt_unlockpage(prevmode, prev->latch, __LINE__);
1230           bt_unpinlatch (prev->latch);
1231           prevpage_no = 0;
1232         }
1233
1234         // obtain mode lock using lock coupling through AccessLock
1235         // determine lock mode of drill level
1236
1237         mode = (drill == lvl) ? lock : BtLockRead; 
1238         bt_lockpage(mode, set->latch, tid, __LINE__);
1239
1240         // grab our fence key
1241
1242         ptr=fenceptr(set->page);
1243
1244         if( set->page->free )
1245                 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1246
1247         if( page_no > ROOT_page )
1248           bt_unlockpage(BtLockAccess, set->latch, __LINE__);
1249
1250         // re-read and re-lock root after determining actual level of root
1251
1252         if( set->page->lvl != drill) {
1253                 if( set->latch->page_no != ROOT_page )
1254                         return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1255                         
1256                 drill = set->page->lvl;
1257
1258                 if( lock != BtLockRead && drill == lvl ) {
1259                   bt_unlockpage(mode, set->latch, __LINE__);
1260                   bt_unpinlatch (set->latch);
1261                   continue;
1262                 }
1263         }
1264
1265         prevpage_no = set->latch->page_no;
1266         prevmode = mode;
1267         *prev = *set;
1268
1269         //  if requested key is beyond our fence,
1270         //      slide to the right
1271
1272         if( keycmp (ptr, key, len) < 0 )
1273           if( page_no = set->page->right )
1274                 continue;
1275
1276         //  if page is part of a delete operation,
1277         //      slide to the left;
1278
1279         if( set->page->kill ) {
1280           bt_lockpage(BtLockLink, set->latch, tid, __LINE__);
1281           page_no = set->page->left;
1282           bt_unlockpage(BtLockLink, set->latch, __LINE__);
1283           continue;
1284         }
1285
1286         //  find key on page at this level
1287         //  and descend to requested level
1288
1289         if( slot = bt_findslot (set->page, key, len) ) {
1290           if( drill == lvl )
1291                 return slot;
1292
1293           // find next non-dead slot -- the fence key if nothing else
1294
1295           while( slotptr(set->page, slot)->dead )
1296                 if( slot++ < set->page->cnt )
1297                   continue;
1298                 else
1299                   return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1300
1301           val = valptr(set->page, slot);
1302
1303           if( val->len == BtId )
1304                 page_no = bt_getid(val->value);
1305           else
1306                 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1307
1308           drill--;
1309           continue;
1310          }
1311
1312         //  slide right into next page
1313
1314         page_no = set->page->right;
1315
1316   } while( page_no );
1317
1318   // return error on end of right chain
1319
1320   mgr->line = __LINE__, mgr->err = BTERR_struct;
1321   return 0;     // return error
1322 }
1323
1324 //      return page to free list
1325 //      page must be delete, link & write locked
1326 //      and have no keys pointing to it.
1327
1328 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1329 {
1330 uid *freechain;
1331
1332         //      lock allocation page
1333
1334         bt_mutexlock (mgr->pagezero->lock);
1335
1336         if( set->page->lvl ) {
1337                 freechain = &mgr->pagezero->freechain;
1338                 mgr->pagezero->upperpages--;
1339         } else {
1340                 freechain = &mgr->pagezero->leafchain;
1341                 mgr->pagezero->leafpages--;
1342         }
1343
1344         //      store chain link
1345
1346         set->page->right = *freechain;
1347         *freechain = set->latch->page_no;
1348         set->page->free = 1;
1349
1350 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1351 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1352
1353         // unlock released page
1354         // and unlock allocation page
1355
1356         bt_unlockpage (BtLockDelete, set->latch, __LINE__);
1357         bt_unlockpage (BtLockWrite, set->latch, __LINE__);
1358         bt_unlockpage (BtLockLink, set->latch, __LINE__);
1359         bt_unpinlatch (set->latch);
1360         bt_releasemutex (mgr->pagezero->lock);
1361 }
1362
1363 //      a fence key was deleted from an interiour level page
1364 //      push new fence value upwards
1365
1366 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl)
1367 {
1368 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1369 unsigned char value[BtId];
1370 BtKey* ptr;
1371 uint idx;
1372
1373         //      remove the old fence value
1374
1375         ptr = fenceptr(set->page);
1376         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1377         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1378         set->page->fence = slotptr(set->page, set->page->cnt)->off;
1379
1380         //  cache new fence value
1381
1382         ptr = fenceptr(set->page);
1383         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1384
1385         bt_lockpage (BtLockParent, set->latch, 0, __LINE__);
1386         bt_unlockpage (BtLockWrite, set->latch, __LINE__);
1387
1388         //      insert new (now smaller) fence key
1389
1390         bt_putid (value, set->latch->page_no);
1391         ptr = (BtKey*)leftkey;
1392
1393         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique) )
1394           return mgr->err;
1395
1396         //      now delete old fence key
1397
1398         ptr = (BtKey*)rightkey;
1399
1400         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1) )
1401                 return mgr->err;
1402
1403         bt_unlockpage (BtLockParent, set->latch, __LINE__);
1404         bt_unpinlatch(set->latch);
1405         return 0;
1406 }
1407
1408 //      root has a single child
1409 //      collapse a level from the tree
1410
1411 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root)
1412 {
1413 BtPageSet child[1];
1414 uid page_no;
1415 BtVal *val;
1416 uint idx;
1417
1418   // find the child entry and promote as new root contents
1419
1420   do {
1421         for( idx = 0; idx++ < root->page->cnt; )
1422           if( !slotptr(root->page, idx)->dead )
1423                 break;
1424
1425         val = valptr(root->page, idx);
1426
1427         if( val->len == BtId )
1428                 page_no = bt_getid (valptr(root->page, idx)->value);
1429         else
1430                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1431
1432         if( child->latch = bt_pinlatch (mgr, page_no) )
1433                 child->page = bt_mappage (mgr, child->latch);
1434         else
1435                 return mgr->err;
1436
1437         bt_lockpage (BtLockDelete, child->latch, 0, __LINE__);
1438         bt_lockpage (BtLockWrite, child->latch, 0, __LINE__);
1439
1440         memcpy (root->page, child->page, mgr->page_size);
1441         bt_freepage (mgr, child);
1442
1443   } while( root->page->lvl > 1 && root->page->act == 1 );
1444
1445   bt_unlockpage (BtLockWrite, root->latch, __LINE__);
1446   bt_unpinlatch (root->latch);
1447   return 0;
1448 }
1449
1450 //  delete a page and manage key
1451 //  call with page writelocked
1452
1453 //      returns with page unpinned
1454 //      from the page pool.
1455
1456 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, uint lvl)
1457 {
1458 unsigned char higherfence[BT_keyarray], lowerfence[BT_keyarray];
1459 uint page_size = mgr->page_size, kill;
1460 BtPageSet right[1], temp[1];
1461 unsigned char value[BtId];
1462 uid page_no, right2;
1463 BtKey *ptr;
1464
1465         if( !lvl )
1466                 page_size <<= mgr->leaf_xtra;
1467
1468         //  cache original copy of original fence key
1469         //      that is going to be deleted.
1470
1471         ptr = fenceptr(set->page);
1472         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1473
1474         //      pin and lock our right page
1475
1476         page_no = set->page->right;
1477
1478         if( right->latch = bt_pinlatch (mgr, page_no) )
1479                 right->page = bt_mappage (mgr, right->latch);
1480         else
1481                 return 0;
1482
1483         bt_lockpage (BtLockWrite, right->latch, 0, __LINE__);
1484
1485         if( right->page->kill || set->page->kill )
1486                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1487
1488         // pull contents of right sibling over our empty page
1489         //      preserving our left page number, and its right page number.
1490
1491         bt_lockpage (BtLockLink, set->latch, 0, __LINE__);
1492         page_no = set->page->left;
1493         memcpy (set->page, right->page, page_size);
1494         set->page->left = page_no;
1495         bt_unlockpage (BtLockLink, set->latch, __LINE__);
1496
1497         //  fix left link from far right page
1498
1499         if( right2 = set->page->right ) {
1500           if( temp->latch = bt_pinlatch (mgr, right2) )
1501                 temp->page = bt_mappage (mgr, temp->latch);
1502           else
1503                 return 0;
1504
1505           bt_lockpage (BtLockAccess, temp->latch, 0, __LINE__);
1506       bt_lockpage(BtLockLink, temp->latch, 0, __LINE__);
1507           temp->page->left = set->latch->page_no;
1508           bt_unlockpage(BtLockLink, temp->latch, __LINE__);
1509           bt_unlockpage(BtLockAccess, temp->latch, __LINE__);
1510           bt_unpinlatch (temp->latch);
1511         } else if( !lvl ) {     // our page is now rightmost leaf
1512           bt_mutexlock (mgr->pagezero->lock);
1513           mgr->pagezero->rightleaf = set->latch->page_no;
1514           bt_releasemutex(mgr->pagezero->lock);
1515         }
1516
1517         ptr = fenceptr(set->page);
1518         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1519
1520         // mark right page as being deleted and release lock
1521         //      keep lock on parent modification.
1522
1523         right->page->kill = 1;
1524         bt_lockpage (BtLockParent, right->latch, 0, __LINE__);
1525         bt_unlockpage (BtLockWrite, right->latch, __LINE__);
1526
1527         bt_lockpage (BtLockParent, set->latch, 0, __LINE__);
1528         bt_unlockpage (BtLockWrite, set->latch, __LINE__);
1529
1530         // redirect the new higher key directly to our new node
1531
1532         ptr = (BtKey *)higherfence;
1533         bt_putid (value, set->latch->page_no);
1534
1535         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update) )
1536           return mgr->err;
1537
1538         //      delete our original fence key in parent
1539
1540         ptr = (BtKey *)lowerfence;
1541
1542         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1) )
1543           return mgr->err;
1544
1545         //  wait for all access to drain away with delete lock,
1546         //      then obtain write lock to right node and free it.
1547
1548         bt_lockpage (BtLockDelete, right->latch, 0, __LINE__);
1549         bt_lockpage (BtLockWrite, right->latch, 0, __LINE__);
1550         bt_lockpage (BtLockLink, right->latch, 0, __LINE__);
1551         bt_unlockpage (BtLockParent, right->latch, __LINE__);
1552         bt_freepage (mgr, right);
1553
1554         //      release parent lock to our node
1555
1556         bt_unlockpage (BtLockParent, set->latch, __LINE__);
1557         bt_unpinlatch (set->latch);
1558         return 0;
1559 }
1560
1561 //  find and delete key on page by marking delete flag bit
1562 //  if page becomes empty, delete it from the btree
1563
1564 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl)
1565 {
1566 uint slot, idx, found, fence;
1567 BtPageSet set[1];
1568 BtSlot *node;
1569 BtKey *ptr;
1570 BtVal *val;
1571
1572         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, 0) ) {
1573                 node = slotptr(set->page, slot);
1574                 ptr = keyptr(set->page, slot);
1575         } else
1576                 return mgr->err;
1577
1578         // if librarian slot, advance to real slot
1579
1580         if( node->type == Librarian ) {
1581                 ptr = keyptr(set->page, ++slot);
1582                 node = slotptr(set->page, slot);
1583         }
1584
1585         fence = slot == set->page->cnt;
1586
1587         // delete the key, ignore request if already dead
1588
1589         if( found = !keycmp (ptr, key, len) )
1590           if( found = node->dead == 0 ) {
1591                 val = valptr(set->page,slot);
1592                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1593                 set->page->act--;
1594                 node->dead = 1;
1595
1596                 // collapse empty slots beneath the fence
1597                 // on interiour nodes
1598
1599                 if( lvl )
1600                  while( idx = set->page->cnt - 1 )
1601                   if( slotptr(set->page, idx)->dead ) {
1602                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1603                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1604                   } else
1605                         break;
1606           }
1607
1608         if( !found )
1609                 return 0;
1610
1611         //      did we delete a fence key in an upper level?
1612
1613         if( lvl && set->page->act && fence )
1614           return bt_fixfence (mgr, set, lvl);
1615
1616         //      do we need to collapse root?
1617
1618         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1619           return bt_collapseroot (mgr, set);
1620
1621         //      delete empty page
1622
1623         if( !set->page->act )
1624           return bt_deletepage (mgr, set, set->page->lvl);
1625
1626         bt_unlockpage(BtLockWrite, set->latch, __LINE__);
1627         bt_unpinlatch (set->latch);
1628         return 0;
1629 }
1630
1631 //      check page for space available,
1632 //      clean if necessary and return
1633 //      0 - page needs splitting
1634 //      >0  new slot value
1635
1636 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
1637 {
1638 uint page_size = mgr->page_size;
1639 BtPage page = set->page, frame;
1640 uint cnt = 0, idx = 0;
1641 uint max = page->cnt;
1642 uint newslot = max;
1643 BtKey *key;
1644 BtVal *val;
1645
1646         if( !set->page->lvl )
1647                 page_size <<= mgr->leaf_xtra;
1648
1649         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1650                 return slot;
1651
1652         //      skip cleanup and proceed to split
1653         //      if there's not enough garbage
1654         //      to bother with.
1655
1656         if( page->garbage < page_size / 5 )
1657                 return 0;
1658
1659         frame = malloc (page_size);
1660         memcpy (frame, page, page_size);
1661
1662         // skip page info and set rest of page to zero
1663
1664         memset (page+1, 0, page_size - sizeof(*page));
1665
1666         page->min = page_size;
1667         page->garbage = 0;
1668         page->act = 0;
1669
1670         // clean up page first by
1671         // removing dead keys
1672
1673         while( cnt++ < max ) {
1674                 if( cnt == slot )
1675                         newslot = idx + 2;
1676
1677                 if( cnt < max || frame->lvl )
1678                   if( slotptr(frame,cnt)->dead )
1679                         continue;
1680
1681                 // copy the value across
1682
1683                 val = valptr(frame, cnt);
1684                 page->min -= val->len + sizeof(BtVal);
1685                 memcpy ((unsigned char *)page + page->min, val, val->len + sizeof(BtVal));
1686
1687                 // copy the key across
1688
1689                 key = keyptr(frame, cnt);
1690                 page->min -= key->len + sizeof(BtKey);
1691                 memcpy ((unsigned char *)page + page->min, key, key->len + sizeof(BtKey));
1692
1693                 // make a librarian slot
1694
1695                 slotptr(page, ++idx)->off = page->min;
1696                 slotptr(page, idx)->type = Librarian;
1697                 slotptr(page, idx)->dead = 1;
1698
1699                 // set up the slot
1700
1701                 slotptr(page, ++idx)->off = page->min;
1702                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
1703
1704                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
1705                   page->act++;
1706         }
1707
1708         page->fence = page->min;
1709         page->cnt = idx;
1710         free (frame);
1711
1712         //      see if page has enough space now, or does it need splitting?
1713
1714         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1715                 return newslot;
1716
1717         return 0;
1718 }
1719
1720 // split the root and raise the height of the btree
1721
1722 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right)
1723 {  
1724 unsigned char leftkey[BT_keyarray];
1725 uint nxt = mgr->page_size;
1726 unsigned char value[BtId];
1727 BtPage frame, page;
1728 BtPageSet left[1];
1729 uid left_page_no;
1730 BtKey *ptr;
1731 BtVal *val;
1732
1733         frame = malloc (mgr->page_size);
1734         memcpy (frame, root->page, mgr->page_size);
1735
1736         //      save left page fence key for new root
1737
1738         ptr = fenceptr(root->page);
1739         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1740
1741         //  Obtain an empty page to use, and copy the current
1742         //  root contents into it, e.g. lower keys
1743
1744         if( bt_newpage(mgr, left, frame) )
1745                 return mgr->err;
1746
1747         left_page_no = left->latch->page_no;
1748         bt_unpinlatch (left->latch);
1749         free (frame);
1750
1751         //      left link the pages together
1752
1753         page = bt_mappage (mgr, right);
1754         page->left = left_page_no;
1755
1756         // preserve the page info at the bottom
1757         // of higher keys and set rest to zero
1758
1759         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
1760
1761         // insert stopper key at top of newroot page
1762         // and increase the root height
1763
1764         nxt -= BtId + sizeof(BtVal);
1765         bt_putid (value, right->page_no);
1766         val = (BtVal *)((unsigned char *)root->page + nxt);
1767         memcpy (val->value, value, BtId);
1768         val->len = BtId;
1769
1770         nxt -= 2 + sizeof(BtKey);
1771         root->page->fence = nxt;
1772
1773         slotptr(root->page, 2)->off = nxt;
1774         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1775         ptr->len = 2;
1776         ptr->key[0] = 0xff;
1777         ptr->key[1] = 0xff;
1778
1779         // insert lower keys page fence key on newroot page as first key
1780
1781         nxt -= BtId + sizeof(BtVal);
1782         bt_putid (value, left_page_no);
1783         val = (BtVal *)((unsigned char *)root->page + nxt);
1784         memcpy (val->value, value, BtId);
1785         val->len = BtId;
1786
1787         ptr = (BtKey *)leftkey;
1788         nxt -= ptr->len + sizeof(BtKey);
1789         slotptr(root->page, 1)->off = nxt;
1790         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1791         
1792         root->page->right = 0;
1793         root->page->min = nxt;          // reset lowest used offset and key count
1794         root->page->cnt = 2;
1795         root->page->act = 2;
1796         root->page->lvl++;
1797
1798         // release and unpin root pages
1799
1800         bt_unlockpage(BtLockWrite, root->latch, __LINE__);
1801         bt_unpinlatch (root->latch);
1802
1803         bt_unpinlatch (right);
1804         return 0;
1805 }
1806
1807 //  split already locked full node
1808 //      leave it locked.
1809 //      return pool entry for new right
1810 //      page, pinned & unlocked
1811
1812 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, uint linkleft)
1813 {
1814 uint page_size = mgr->page_size;
1815 BtPageSet right[1], temp[1];
1816 uint cnt = 0, idx = 0, max;
1817 uint lvl = set->page->lvl;
1818 BtPage frame;
1819 BtKey *key;
1820 BtVal *val;
1821 uid right2;
1822 uint entry;
1823 uint prev;
1824
1825         if( !set->page->lvl )
1826                 page_size <<= mgr->leaf_xtra;
1827
1828         //  split higher half of keys to frame
1829
1830         frame = malloc (page_size);
1831         memset (frame, 0, page_size);
1832         frame->min = page_size;
1833         max = set->page->cnt;
1834         cnt = max / 2;
1835         idx = 0;
1836
1837         while( cnt++ < max ) {
1838                 if( cnt < max || set->page->lvl )
1839                   if( slotptr(set->page, cnt)->dead )
1840                         continue;
1841
1842                 val = valptr(set->page, cnt);
1843                 frame->min -= val->len + sizeof(BtVal);
1844                 memcpy ((unsigned char *)frame + frame->min, val, val->len + sizeof(BtVal));
1845
1846                 key = keyptr(set->page, cnt);
1847                 frame->min -= key->len + sizeof(BtKey);
1848                 memcpy ((unsigned char *)frame + frame->min, key, key->len + sizeof(BtKey));
1849
1850                 //      add librarian slot
1851
1852                 slotptr(frame, ++idx)->off = frame->min;
1853                 slotptr(frame, idx)->type = Librarian;
1854                 slotptr(frame, idx)->dead = 1;
1855
1856                 //  add actual slot
1857
1858                 slotptr(frame, ++idx)->off = frame->min;
1859                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
1860
1861                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1862                   frame->act++;
1863         }
1864
1865         frame->fence = frame->min;
1866         frame->cnt = idx;
1867         frame->lvl = lvl;
1868
1869         // link right node
1870
1871         if( set->latch->page_no > ROOT_page ) {
1872                 right2 = set->page->right;
1873                 frame->right = right2;
1874
1875                 if( linkleft )
1876                         frame->left = set->latch->page_no;
1877         }
1878
1879         //      get new free page and write higher keys to it.
1880
1881         if( bt_newpage(mgr, right, frame) )
1882                 return 0;
1883
1884         //      link far right's left pointer to new page
1885
1886         if( linkleft && set->latch->page_no > ROOT_page )
1887          if( right2 ) {
1888           if( temp->latch = bt_pinlatch (mgr, right2) )
1889                 temp->page = bt_mappage (mgr, temp->latch);
1890           else
1891                 return 0;
1892
1893       bt_lockpage(BtLockLink, temp->latch, 0, __LINE__);
1894           temp->page->left = right->latch->page_no;
1895           bt_unlockpage(BtLockLink, temp->latch, __LINE__);
1896           bt_unpinlatch (temp->latch);
1897          } else if( !lvl ) {    // page is rightmost leaf
1898           bt_mutexlock (mgr->pagezero->lock);
1899           mgr->pagezero->rightleaf = right->latch->page_no;
1900           bt_releasemutex(mgr->pagezero->lock);
1901          }
1902
1903         // process lower keys
1904
1905         memcpy (frame, set->page, page_size);
1906         memset (set->page+1, 0, page_size - sizeof(*set->page));
1907
1908         set->page->min = page_size;
1909         set->page->garbage = 0;
1910         set->page->act = 0;
1911         max /= 2;
1912         cnt = 0;
1913         idx = 0;
1914
1915         //  assemble page of smaller keys
1916
1917         while( cnt++ < max ) {
1918                 if( slotptr(frame, cnt)->dead )
1919                         continue;
1920                 val = valptr(frame, cnt);
1921                 set->page->min -= val->len + sizeof(BtVal);
1922                 memcpy ((unsigned char *)set->page + set->page->min, val, val->len + sizeof(BtVal));
1923
1924                 key = keyptr(frame, cnt);
1925                 set->page->min -= key->len + sizeof(BtKey);
1926                 memcpy ((unsigned char *)set->page + set->page->min, key, key->len + sizeof(BtKey));
1927
1928                 //      add librarian slot
1929
1930                 slotptr(set->page, ++idx)->off = set->page->min;
1931                 slotptr(set->page, idx)->type = Librarian;
1932                 slotptr(set->page, idx)->dead = 1;
1933
1934                 //      add actual slot
1935
1936                 slotptr(set->page, ++idx)->off = set->page->min;
1937                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
1938                 set->page->act++;
1939         }
1940
1941         set->page->right = right->latch->page_no;
1942         set->page->fence = set->page->min;
1943         set->page->cnt = idx;
1944         free(frame);
1945
1946         entry = right->latch - mgr->latchsets;
1947         return entry;
1948 }
1949
1950 //      fix keys for newly split page
1951 //      call with both pages pinned & locked
1952 //  return unlocked and unpinned
1953
1954 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right)
1955 {
1956 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1957 unsigned char value[BtId];
1958 uint lvl = set->page->lvl;
1959 BtPageSet temp[1];
1960 BtPage page;
1961 BtKey *ptr;
1962 uid right2;
1963
1964         // if current page is the root page, split it
1965
1966         if( set->latch->page_no == ROOT_page )
1967                 return bt_splitroot (mgr, set, right);
1968
1969         ptr = fenceptr(set->page);
1970         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1971
1972         page = bt_mappage (mgr, right);
1973
1974         ptr = fenceptr(page);
1975         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1976
1977         //      splice in far right page's left page_no
1978
1979         if( right2 = page->right ) {
1980           if( temp->latch = bt_pinlatch (mgr, right2) )
1981                 temp->page = bt_mappage (mgr, temp->latch);
1982           else
1983                 return 0;
1984
1985       bt_lockpage(BtLockLink, temp->latch, 0, __LINE__);
1986           temp->page->left = right->page_no;
1987           bt_unlockpage(BtLockLink, temp->latch, __LINE__);
1988           bt_unpinlatch (temp->latch);
1989         } else if( !lvl ) {  // right page is far right page
1990           bt_mutexlock (mgr->pagezero->lock);
1991           mgr->pagezero->rightleaf = right->page_no;
1992           bt_releasemutex(mgr->pagezero->lock);
1993         }
1994         // insert new fences in their parent pages
1995
1996         bt_lockpage (BtLockParent, right, 0, __LINE__);
1997
1998         bt_lockpage (BtLockParent, set->latch, 0, __LINE__);
1999         bt_unlockpage (BtLockWrite, set->latch, __LINE__);
2000
2001         // insert new fence for reformulated left block of smaller keys
2002
2003         bt_putid (value, set->latch->page_no);
2004         ptr = (BtKey *)leftkey;
2005
2006         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique) )
2007                 return mgr->err;
2008
2009         // switch fence for right block of larger keys to new right page
2010
2011         bt_putid (value, right->page_no);
2012         ptr = (BtKey *)rightkey;
2013
2014         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique) )
2015                 return mgr->err;
2016
2017         bt_unlockpage (BtLockParent, set->latch, __LINE__);
2018         bt_unpinlatch (set->latch);
2019
2020         bt_unlockpage (BtLockParent, right, __LINE__);
2021         bt_unpinlatch (right);
2022         return 0;
2023 }
2024
2025 //      install new key and value onto page
2026 //      page must already be checked for
2027 //      adequate space
2028
2029 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2030 {
2031 uint idx, librarian;
2032 BtSlot *node;
2033 BtKey *ptr;
2034 BtVal *val;
2035 int rate;
2036
2037         //      if previous slot is a librarian slot, use it
2038
2039         if( slot > 1 )
2040           if( slotptr(set->page, slot-1)->type == Librarian )
2041                 slot--;
2042
2043         // copy value onto page
2044
2045         set->page->min -= vallen + sizeof(BtVal);
2046         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2047         memcpy (val->value, value, vallen);
2048         val->len = vallen;
2049
2050         // copy key onto page
2051
2052         set->page->min -= keylen + sizeof(BtKey);
2053         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2054         memcpy (ptr->key, key, keylen);
2055         ptr->len = keylen;
2056         
2057         //      find first empty slot at or above our insert slot
2058
2059         for( idx = slot; idx < set->page->cnt; idx++ )
2060           if( slotptr(set->page, idx)->dead )
2061                 break;
2062
2063         // now insert key into array before slot.
2064
2065         //      if we're going all the way to the top,
2066         //      add as many librarian slots as
2067         //      makes sense.
2068
2069         if( idx == set->page->cnt ) {
2070         int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2071
2072                 librarian = ++idx - slot;
2073                 avail /= sizeof(BtSlot);
2074
2075                 if( avail < 0 )
2076                         avail = 0;
2077
2078                 if( librarian > avail )
2079                         librarian = avail;
2080
2081                 if( librarian ) {
2082                         rate = (idx - slot) / librarian;
2083                         set->page->cnt += librarian;
2084                         idx += librarian;
2085                 } else
2086                         rate = 0;
2087         } else
2088                 librarian = 0, rate = 0;
2089
2090         //  transfer slots and add librarian slots
2091
2092         while( idx > slot ) {
2093                 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2094
2095                 //      add librarian slot per rate
2096
2097                 if( librarian )
2098                  if( (idx - slot)/2 <= librarian * rate ) {
2099                         node = slotptr(set->page, --idx);
2100                         node->off = node[1].off;
2101                         node->type = Librarian;
2102                         node->dead = 1;
2103                         librarian--;
2104                   }
2105
2106                 --idx;
2107         }
2108
2109         set->page->act++;
2110
2111         //      fill in new slot
2112
2113         node = slotptr(set->page, slot);
2114         node->off = set->page->min;
2115         node->type = type;
2116         node->dead = 0;
2117         return 0;
2118 }
2119
2120 //  Insert new key into the btree at given level.
2121 //      either add a new key or update/add an existing one
2122
2123 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type)
2124 {
2125 uint slot, idx, len, entry;
2126 BtPageSet set[1];
2127 BtSlot *node;
2128 BtKey *ptr;
2129 BtVal *val;
2130
2131   while( 1 ) { // find the page and slot for the current key
2132         if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, 0) ) {
2133                 node = slotptr(set->page, slot);
2134                 ptr = keyptr(set->page, slot);
2135         } else
2136                 return mgr->err;
2137
2138         // if librarian slot == found slot, advance to real slot
2139
2140         if( node->type == Librarian ) {
2141                 node = slotptr(set->page, ++slot);
2142                 ptr = keyptr(set->page, slot);
2143         }
2144
2145         //  if inserting a duplicate key or unique
2146         //      key that doesn't exist on the page,
2147         //      check for adequate space on the page
2148         //      and insert the new key before slot.
2149
2150         switch( type ) {
2151         case Unique:
2152         case Duplicate:
2153           if( !keycmp (ptr, key, keylen) )
2154                 break;
2155
2156           if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2157             if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
2158                   return mgr->err;
2159                 else
2160                   goto insxit;
2161
2162           if( entry = bt_splitpage (mgr, set, 1) )
2163             if( !bt_splitkeys (mgr, set, entry + mgr->latchsets) )
2164                   continue;
2165
2166           return mgr->err;
2167
2168         case Update:
2169           if( keycmp (ptr, key, keylen) )
2170                 goto insxit;
2171
2172           break;
2173         }
2174
2175         // if key already exists, update value and return
2176
2177         val = valptr(set->page, slot);
2178
2179         if( val->len >= vallen ) {
2180           if( node->dead )
2181                 set->page->act++;
2182           node->type = type;
2183           node->dead = 0;
2184
2185           set->page->garbage += val->len - vallen;
2186           val->len = vallen;
2187           memcpy (val->value, value, vallen);
2188           goto insxit;
2189         }
2190
2191         //  new update value doesn't fit in existing value area
2192         //      make sure page has room
2193
2194         if( !node->dead )
2195           set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2196         else
2197           set->page->act++;
2198
2199         node->type = type;
2200         node->dead = 0;
2201
2202         if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2203           break;
2204
2205         if( entry = bt_splitpage (mgr, set, 1) )
2206           if( !bt_splitkeys (mgr, set, entry + mgr->latchsets) )
2207                 continue;
2208
2209         return mgr->err;
2210   }
2211
2212   //  copy key and value onto page and update slot
2213
2214   set->page->min -= vallen + sizeof(BtVal);
2215   val = (BtVal*)((unsigned char *)set->page + set->page->min);
2216   memcpy (val->value, value, vallen);
2217   val->len = vallen;
2218
2219   set->page->min -= keylen + sizeof(BtKey);
2220   ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2221   memcpy (ptr->key, key, keylen);
2222   ptr->len = keylen;
2223         
2224   slotptr(set->page,slot)->off = set->page->min;
2225
2226 insxit:
2227   bt_unlockpage(BtLockWrite, set->latch, __LINE__);
2228   bt_unpinlatch (set->latch);
2229   return 0;
2230 }
2231
2232 //      determine actual page where key is located
2233 //  return slot number
2234
2235 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx, BtPageSet *set)
2236 {
2237 BtKey *key = keyptr(source,locks[idx].src), *ptr;
2238 uint slot = locks[idx].slot;
2239 uint entry;
2240
2241         if( locks[idx].reuse )
2242           entry = locks[idx-1].entry;
2243         else
2244           entry = locks[idx].entry;
2245
2246         if( slot ) {
2247                 set->latch = mgr->latchsets + entry;
2248                 set->page = bt_mappage (mgr, set->latch);
2249                 return slot;
2250         }
2251
2252         //      find where our key is located 
2253         //      on current page or pages split on
2254         //      same page txn operations.
2255
2256         do {
2257                 set->latch = mgr->latchsets + entry;
2258                 set->page = bt_mappage (mgr, set->latch);
2259
2260                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2261                   if( slotptr(set->page, slot)->type == Librarian )
2262                         slot++;
2263                   if( locks[idx].reuse )
2264                         locks[idx].entry = entry;
2265                   return slot;
2266                 }
2267         } while( entry = set->latch->split );
2268
2269         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2270         return 0;
2271 }
2272
2273 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx)
2274 {
2275 BtKey *key = keyptr(source, locks[idx].src);
2276 BtVal *val = valptr(source, locks[idx].src);
2277 BtLatchSet *latch;
2278 BtPageSet set[1];
2279 uint entry, slot;
2280
2281   while( slot = bt_atomicpage (mgr, source, locks, idx, set) ) {
2282         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2283           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,locks[idx].src)->type) )
2284                 return mgr->err;
2285
2286           return 0;
2287         }
2288
2289         //  split page
2290
2291         if( entry = bt_splitpage (mgr, set, 0) )
2292           latch = mgr->latchsets + entry;
2293         else
2294           return mgr->err;
2295
2296         //      splice right page into split chain
2297         //      and WriteLock it
2298
2299         bt_lockpage(BtLockWrite, latch, 0, __LINE__);
2300         latch->split = set->latch->split;
2301         set->latch->split = entry;
2302
2303         // clear slot number for atomic page
2304
2305         locks[idx].slot = 0;
2306   }
2307
2308   return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2309 }
2310
2311 //      perform delete from smaller btree
2312 //  insert a delete slot if not found there
2313
2314 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx)
2315 {
2316 BtKey *key = keyptr(source, locks[idx].src);
2317 BtLatchSet *latch;
2318 uint slot, entry;
2319 BtPageSet set[1];
2320 BtSlot *node;
2321 BtKey *ptr;
2322 BtVal *val;
2323
2324   while( slot = bt_atomicpage (mgr, source, locks, idx, set) ) {
2325         node = slotptr(set->page, slot);
2326         ptr = keyptr(set->page, slot);
2327         val = valptr(set->page, slot);
2328
2329         //  if slot is not found on cache btree, insert a delete slot
2330         //      otherwise ignore the request.
2331
2332         if( keycmp (ptr, key->key, key->len) )
2333          if( !mgr->type )
2334           if( slot = bt_cleanpage(mgr, set, key->len, slot, 0) )
2335                 return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete);
2336           else { //  split page before inserting Delete slot
2337                 if( entry = bt_splitpage (mgr, set, 0) )
2338                   latch = mgr->latchsets + entry;
2339                 else
2340               return mgr->err;
2341
2342                 //      splice right page into split chain
2343                 //      and WriteLock it
2344
2345                 bt_lockpage(BtLockWrite, latch, 0, __LINE__);
2346                 latch->split = set->latch->split;
2347                 set->latch->split = entry;
2348
2349                 // clear slot number for atomic page
2350
2351                 locks[idx].slot = 0;
2352                 continue;
2353           }
2354          else
2355            return 0;
2356
2357         //      if node is already dead,
2358         //      ignore the request.
2359
2360         if( node->type == Delete || node->dead )
2361                 return 0;
2362
2363         //  if main LSM btree, delete the slot
2364         //      else change to delete type.
2365
2366         if( mgr->type ) {
2367                 set->page->act--;
2368                 node->dead = 1;
2369         } else
2370                 node->type = Delete;
2371
2372         __sync_fetch_and_add(&mgr->found, 1);
2373         return 0;
2374   }
2375
2376   return mgr->line = __LINE__, mgr->err = BTERR_struct;
2377 }
2378
2379 //      release master's splits from right to left
2380
2381 void bt_atomicrelease (BtMgr *mgr, uint entry)
2382 {
2383 BtLatchSet *latch = mgr->latchsets + entry;
2384
2385         if( latch->split )
2386                 bt_atomicrelease (mgr, latch->split);
2387
2388         latch->split = 0;
2389         bt_unlockpage(BtLockWrite, latch, __LINE__);
2390         bt_unpinlatch(latch);
2391 }
2392
2393 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2394 {
2395 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2396 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2397
2398         return keycmp (key1, key2->key, key2->len);
2399 }
2400 //      atomic modification of a batch of keys.
2401
2402 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2403 {
2404 uint src, idx, slot, samepage, entry, que = 0;
2405 BtKey *key, *ptr, *key2;
2406 int result = 0;
2407 BtSlot temp[1];
2408 int type;
2409
2410   // stable sort the list of keys into order to
2411   //    prevent deadlocks between threads.
2412 /*
2413   for( src = 1; src++ < source->cnt; ) {
2414         *temp = *slotptr(source,src);
2415         key = keyptr (source,src);
2416
2417         for( idx = src; --idx; ) {
2418           key2 = keyptr (source,idx);
2419           if( keycmp (key, key2->key, key2->len) < 0 ) {
2420                 *slotptr(source,idx+1) = *slotptr(source,idx);
2421                 *slotptr(source,idx) = *temp;
2422           } else
2423                 break;
2424         }
2425   }
2426 */
2427         qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2428
2429   //  perform the individual actions in the transaction
2430
2431   if( bt_atomicexec (bt->mgr, source, source->cnt, bt->tid) )
2432         return bt->mgr->err;
2433
2434   // if number of active pages
2435   // is greater than the buffer pool
2436   // promote page into larger btree
2437
2438   if( bt->main )
2439    if( bt->mgr->pagezero->leafpages > bt->mgr->maxleaves )
2440         if( bt_promote (bt) )
2441           return bt->mgr->err;
2442
2443   // return success
2444
2445   return 0;
2446 }
2447
2448 //      execute the source list of inserts/deletes
2449
2450 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, uint count, pid_t tid)
2451 {
2452 uint slot, src, idx, samepage, entry, outidx;
2453 BtPageSet set[1], prev[1];
2454 unsigned char value[BtId];
2455 BtLatchSet *latch;
2456 uid right_page_no;
2457 AtomicTxn *locks;
2458 BtKey *key, *ptr;
2459 BtPage page;
2460 BtVal *val;
2461
2462   locks = calloc (count, sizeof(AtomicTxn));
2463   memset (set, 0, sizeof(BtPageSet));
2464   outidx = 0;
2465
2466   // Load the leaf page for each key
2467   // group same page references with reuse bit
2468
2469   for( src = 0; src++ < count; ) {
2470         if( slotptr(source,src)->dead )
2471           continue;
2472
2473         key = keyptr(source, src);
2474
2475         // first determine if this modification falls
2476         // on the same page as the previous modification
2477         //      note that the far right leaf page is a special case
2478
2479         if( samepage = !!set->page )
2480           samepage = !set->page->right || keycmp (ptr, key->key, key->len) >= 0;
2481
2482         if( !samepage )
2483           if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, tid) )
2484                 ptr = fenceptr(set->page), set->latch->split = 0;
2485           else
2486                 return mgr->err;
2487         else
2488           slot = 0;
2489
2490         if( slot )
2491          if( slotptr(set->page, slot)->type == Librarian )
2492           slot++;
2493
2494         entry = set->latch - mgr->latchsets;
2495         locks[outidx].reuse = samepage;
2496         locks[outidx].entry = entry;
2497         locks[outidx].slot = slot;
2498         locks[outidx].src = src;
2499         outidx++;
2500   }
2501
2502   // insert or delete each key
2503   // process any splits or merges
2504   // run through txn list backwards
2505
2506   samepage = outidx;
2507
2508   for( src = outidx; src--; ) {
2509         if( locks[src].reuse )
2510           continue;
2511
2512         //  perform the txn operations
2513         //      from smaller to larger on
2514         //  the same page
2515
2516         for( idx = src; idx < samepage; idx++ )
2517          switch( slotptr(source,locks[idx].src)->type ) {
2518          case Delete:
2519           if( bt_atomicdelete (mgr, source, locks, idx) )
2520                 return mgr->err;
2521           break;
2522
2523         case Duplicate:
2524         case Unique:
2525           if( bt_atomicinsert (mgr, source, locks, idx) )
2526                 return mgr->err;
2527           break;
2528
2529         default:
2530           bt_atomicpage (mgr, source, locks, idx, set);
2531           break;
2532         }
2533
2534         //      after the same page operations have finished,
2535         //  process master page for splits or deletion.
2536
2537         latch = prev->latch = mgr->latchsets + locks[src].entry;
2538         prev->page = bt_mappage (mgr, prev->latch);
2539         samepage = src;
2540
2541         //  pick-up all splits from master page
2542         //      each one is already pinned & WriteLocked.
2543
2544         while( entry = prev->latch->split ) {
2545           set->latch = mgr->latchsets + entry;
2546           set->page = bt_mappage (mgr, set->latch);
2547
2548           // delete empty master page by undoing its split
2549           //  (this is potentially another empty page)
2550           //  note that there are no pointers to it yet
2551
2552           if( !prev->page->act ) {
2553                 set->page->left = prev->page->left;
2554                 memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
2555                 bt_lockpage (BtLockDelete, set->latch, 0, __LINE__);
2556                 bt_lockpage (BtLockLink, set->latch, 0, __LINE__);
2557                 prev->latch->split = set->latch->split;
2558                 bt_freepage (mgr, set);
2559                 continue;
2560           }
2561
2562           // remove empty split page from the split chain
2563           // and return it to the free list. No other
2564           // thread has its page number yet.
2565
2566           if( !set->page->act ) {
2567                 prev->page->right = set->page->right;
2568                 prev->latch->split = set->latch->split;
2569
2570                 bt_lockpage (BtLockDelete, set->latch, 0, __LINE__);
2571                 bt_lockpage (BtLockLink, set->latch, 0, __LINE__);
2572                 bt_freepage (mgr, set);
2573                 continue;
2574           }
2575
2576           //  update prev's fence key
2577
2578           ptr = fenceptr(prev->page);
2579           bt_putid (value, prev->latch->page_no);
2580
2581           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique) )
2582                 return mgr->err;
2583
2584           // splice in the left link into the split page
2585
2586           set->page->left = prev->latch->page_no;
2587           *prev = *set;
2588         }
2589
2590         //  update left pointer in next right page from last split page
2591         //      (if all splits were reversed or none occurred, latch->split == 0)
2592
2593         if( latch->split ) {
2594           //  fix left pointer in master's original (now split)
2595           //  far right sibling or set rightmost page in page zero
2596
2597           if( right_page_no = prev->page->right ) {
2598                 if( set->latch = bt_pinlatch (mgr, right_page_no) )
2599                   set->page = bt_mappage (mgr, set->latch);
2600                 else
2601                   return mgr->err;
2602
2603             bt_lockpage (BtLockLink, set->latch, 0, __LINE__);
2604             set->page->left = prev->latch->page_no;
2605             bt_unlockpage (BtLockLink, set->latch, __LINE__);
2606                 bt_unpinlatch (set->latch);
2607           } else {      // prev is rightmost page
2608             bt_mutexlock (mgr->pagezero->lock);
2609                 mgr->pagezero->rightleaf = prev->latch->page_no;
2610             bt_releasemutex(mgr->pagezero->lock);
2611           }
2612
2613           //  switch the original fence key from the
2614           //  master page to the last split page.
2615
2616           ptr = fenceptr(prev->page);
2617           bt_putid (value, prev->latch->page_no);
2618
2619           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update) )
2620                 return mgr->err;
2621
2622           //  unlock and unpin the split pages
2623
2624           bt_atomicrelease (mgr, latch->split);
2625
2626           //  unlock and unpin the master page
2627
2628           latch->split = 0;
2629           bt_unlockpage(BtLockWrite, latch, __LINE__);
2630           bt_unpinlatch(latch);
2631           continue;
2632         }
2633
2634         //      since there are no splits remaining, we're
2635         //  finished if master page occupied
2636
2637         if( prev->page->act ) {
2638           bt_unlockpage(BtLockWrite, prev->latch, __LINE__);
2639           bt_unpinlatch(prev->latch);
2640           continue;
2641         }
2642
2643         // any and all splits were reversed, and the
2644         // master page located in prev is empty, delete it
2645
2646         if( bt_deletepage (mgr, prev, 0) )
2647                 return mgr->err;
2648   }
2649
2650   // delete the slots
2651
2652   for( idx = 0; idx++ < count; ) {
2653         if( slotptr(source,idx)->dead )
2654           continue;
2655
2656         slotptr(source,idx)->dead = 1;
2657         source->act--;
2658   }
2659
2660   free (locks);
2661   return 0;
2662 }
2663
2664 //  pick & promote a page into the larger btree
2665
2666 BTERR bt_promote (BtDb *bt)
2667 {
2668 BtPageSet set[1];
2669 uint slot, idx;
2670 BtSlot *node;
2671 uid page_no;
2672 BtKey *ptr;
2673 BtVal *val;
2674
2675   bt_mutexlock(bt->mgr->pagezero->promote);
2676
2677   while( 1 ) {
2678         if( bt->mgr->pagezero->leafpromote < bt->mgr->pagezero->allocpage )
2679                 page_no = bt->mgr->pagezero->leafpromote;
2680         else
2681                 page_no = bt->mgr->pagezero->leaf_page;
2682
2683         bt->mgr->pagezero->leafpromote = page_no + (1 << bt->mgr->leaf_xtra);
2684
2685         if( page_no < bt->mgr->pagezero->leaf_page )
2686                 continue;
2687
2688         if( set->latch = bt_pinlatch (bt->mgr, page_no) )
2689                 set->page = bt_mappage (bt->mgr,set->latch);
2690
2691         //      skip upper level pages
2692
2693         if( set->page->lvl ) {
2694           set->latch->pin--;
2695           bt_releasemutex(set->latch->modify);
2696           continue;
2697         }
2698
2699         if( !bt_mutextry(set->latch->modify) ) {
2700           set->latch->pin--;
2701           bt_releasemutex(set->latch->modify);
2702           continue;
2703         }
2704
2705         //  skip this page if it was pinned
2706
2707         if( set->latch->pin > 1 ) {
2708           set->latch->pin--;
2709           bt_releasemutex(set->latch->modify);
2710           continue;
2711         }
2712
2713         // page has no right sibling
2714
2715         if( !set->page->right ) {
2716           set->latch->pin--;
2717           bt_releasemutex(set->latch->modify);
2718           continue;
2719         }
2720
2721         // page is being killed or constructed
2722
2723         if( set->page->nopromote || set->page->kill ) {
2724           set->latch->pin--;
2725           bt_releasemutex(set->latch->modify);
2726           continue;
2727         }
2728
2729         //      leave it locked for the
2730         //      duration of the promotion.
2731
2732         bt_releasemutex(bt->mgr->pagezero->promote);
2733         bt_lockpage (BtLockWrite, set->latch, 0, __LINE__);
2734         bt_releasemutex(set->latch->modify);
2735
2736         // transfer slots in our selected page to the main btree
2737
2738 if( !((page_no>>bt->mgr->leaf_xtra)%100) )
2739 fprintf(stderr, "Promote page %lld, %d keys\n", page_no, set->page->act);
2740
2741         if( bt_atomicexec (bt->main, set->page, set->page->cnt, bt->tid) ) {
2742                 fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
2743                 return bt->main->err;
2744         }
2745
2746         //  now delete the page
2747
2748         if( bt_deletepage (bt->mgr, set, 0) )
2749                 fprintf (stderr, "Promote: delete page err = %d\n", bt->mgr->err);
2750
2751         return bt->mgr->err;
2752   }
2753 }
2754
2755 //      find unique key == given key, or first duplicate key in
2756 //      leaf level and return number of value bytes
2757 //      or (-1) if not found.
2758
2759 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2760 {
2761 int ret = -1, type;
2762 BtPageSet set[1];
2763 BtSlot *node;
2764 BtKey *ptr;
2765 BtVal *val;
2766 uint slot;
2767
2768  for( type = 0; type < 2; type++ )
2769   if( slot = bt_loadpage (type ? bt->main : bt->mgr, set, key, keylen, 0, BtLockRead, 0) ) {
2770         node = slotptr(set->page, slot);
2771
2772         //      skip librarian slot place holder
2773
2774         if( node->type == Librarian )
2775           node = slotptr(set->page, ++slot);
2776
2777         ptr = keyptr(set->page, slot);
2778
2779         //      not there if we reach the stopper key
2780         //  or the key doesn't match what's on the page.
2781
2782         if( slot == set->page->cnt )
2783           if( !set->page->right ) {
2784                 bt_unlockpage (BtLockRead, set->latch, __LINE__);
2785                 bt_unpinlatch (set->latch);
2786                 continue;
2787         }
2788
2789         if( keycmp (ptr, key, keylen) ) {
2790                 bt_unlockpage (BtLockRead, set->latch, __LINE__);
2791                 bt_unpinlatch (set->latch);
2792                 continue;
2793         }
2794
2795         // key matches, return >= 0 value bytes copied
2796         //      or -1 if not there.
2797
2798         if( node->type == Delete || node->dead ) {
2799                 ret = -1;
2800                 goto findxit;
2801         }
2802
2803         val = valptr (set->page,slot);
2804
2805         if( valmax > val->len )
2806                 valmax = val->len;
2807
2808         memcpy (value, val->value, valmax);
2809         ret = valmax;
2810         goto findxit;
2811   }
2812
2813   ret = -1;
2814
2815 findxit:
2816   if( type < 2 ) {
2817         bt_unlockpage (BtLockRead, set->latch, __LINE__);
2818         bt_unpinlatch (set->latch);
2819   }
2820   return ret;
2821 }
2822
2823 //      set cursor to highest slot on right-most page
2824
2825 BTERR bt_lastkey (BtDb *bt)
2826 {
2827 uid cache_page_no = bt->mgr->pagezero->rightleaf;
2828 uid main_page_no = bt->main->pagezero->rightleaf;
2829
2830         if( bt->cacheset->latch = bt_pinlatch (bt->mgr, cache_page_no) )
2831                 bt->cacheset->page = bt_mappage (bt->mgr, bt->cacheset->latch);
2832         else
2833                 return bt->mgr->err;
2834
2835     bt_lockpage(BtLockRead, bt->cacheset->latch, 0, __LINE__);
2836         bt->cacheslot = bt->cacheset->page->cnt;
2837
2838         if( bt->mainset->latch = bt_pinlatch (bt->main, main_page_no) )
2839                 bt->mainset->page = bt_mappage (bt->main, bt->mainset->latch);
2840         else
2841                 return bt->main->err;
2842
2843     bt_lockpage(BtLockRead, bt->mainset->latch, 0, __LINE__);
2844         bt->mainslot = bt->mainset->page->cnt;
2845         bt->phase = 2;
2846         return 0;
2847 }
2848
2849 //      return previous slot on cursor page
2850
2851 uint bt_prevslot (BtMgr *mgr, BtPageSet *set, uint slot)
2852 {
2853 uid next, us = set->latch->page_no;
2854
2855   while( 1 ) {
2856         while( --slot )
2857           if( slotptr(set->page, slot)->dead )
2858                 continue;
2859           else
2860                 return slot;
2861
2862         next = set->page->left;
2863
2864         if( !next )
2865                 return 0;
2866
2867         do {
2868           bt_unlockpage(BtLockRead, set->latch, __LINE__);
2869           bt_unpinlatch (set->latch);
2870
2871           if( set->latch = bt_pinlatch (mgr, next) )
2872             set->page = bt_mappage (mgr, set->latch);
2873           else
2874             return 0;
2875
2876       bt_lockpage(BtLockRead, set->latch, 0, __LINE__);
2877           next = set->page->right;
2878
2879         } while( next != us );
2880
2881     slot = set->page->cnt + 1;
2882   }
2883 }
2884
2885 //  advance to previous key
2886
2887 BTERR bt_prevkey (BtDb *bt)
2888 {
2889 int cmp;
2890
2891   // first advance last key(s) one previous slot
2892
2893   while( 1 ) {
2894         switch( bt->phase ) {
2895         case 0:
2896           bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot);
2897           break;
2898         case 1:
2899           bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot);
2900           break;
2901         case 2:
2902           bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot);
2903           bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot);
2904         break;
2905         }
2906
2907   // return next key
2908
2909         if( bt->cacheslot ) {
2910           bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
2911           bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
2912           bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
2913         }
2914
2915         if( bt->mainslot ) {
2916           bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
2917           bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
2918           bt->mainval = valptr(bt->mainset->page, bt->mainslot);
2919         }
2920
2921         if( bt->mainslot && bt->cacheslot )
2922           cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
2923         else if( bt->cacheslot )
2924           cmp = 1;
2925         else if( bt->mainslot )
2926           cmp = -1;
2927         else
2928           return 0;
2929
2930         //  cache key is larger
2931
2932         if( cmp > 0 ) {
2933           bt->phase = 0;
2934           if( bt->cachenode->type == Delete )
2935                 continue;
2936           return bt->cacheslot;
2937         }
2938
2939         //  main key is larger
2940
2941         if( cmp < 0 ) {
2942           bt->phase = 1;
2943           return bt->mainslot;
2944         }
2945
2946         //      keys are equal
2947
2948         bt->phase = 2;
2949
2950         if( bt->cachenode->type == Delete )
2951                 continue;
2952
2953         return bt->cacheslot;
2954   }
2955 }
2956
2957 //      advance to next slot in cache or main btree
2958 //      return 0 for EOF/error
2959
2960 uint bt_nextslot (BtMgr *mgr, BtPageSet *set, uint slot)
2961 {
2962 BtPage page;
2963 uid page_no;
2964
2965   while( 1 ) {
2966         while( slot++ < set->page->cnt )
2967           if( slotptr(set->page, slot)->dead )
2968                 continue;
2969           else if( slot < set->page->cnt || set->page->right )
2970                 return slot;
2971           else
2972                 return 0;
2973
2974         bt_unlockpage(BtLockRead, set->latch, __LINE__);
2975         bt_unpinlatch (set->latch);
2976
2977         if( page_no = set->page->right )
2978           if( set->latch = bt_pinlatch (mgr, page_no) )
2979                 set->page = bt_mappage (mgr, set->latch);
2980           else
2981                 return 0;
2982         else
2983           return 0; // EOF
2984
2985         // obtain access lock using lock chaining with Access mode
2986
2987         bt_lockpage(BtLockAccess, set->latch, 0, __LINE__);
2988         bt_lockpage(BtLockRead, set->latch, 0, __LINE__);
2989         bt_unlockpage(BtLockAccess, set->latch, __LINE__);
2990         slot = 0;
2991   }
2992 }
2993
2994 //  advance to next key
2995
2996 BTERR bt_nextkey (BtDb *bt)
2997 {
2998 int cmp;
2999
3000   // first advance last key(s) one next slot
3001
3002   while( 1 ) {
3003         switch( bt->phase ) {
3004         case 0:
3005           bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot);
3006           break;
3007         case 1:
3008           bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot);
3009           break;
3010         case 2:
3011           bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot);
3012           bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot);
3013         break;
3014         }
3015
3016   // return next key
3017
3018         if( bt->cacheslot ) {
3019           bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3020           bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3021           bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3022         }
3023
3024         if( bt->mainslot ) {
3025           bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3026           bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3027           bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3028         }
3029
3030         if( bt->mainslot && bt->cacheslot )
3031           cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3032         else if( bt->mainslot )
3033           cmp = 1;
3034         else if( bt->cacheslot )
3035           cmp = -1;
3036         else
3037           return 0;
3038
3039         //  main key is larger
3040         //      return smaller key
3041
3042         if( cmp < 0 ) {
3043           bt->phase = 0;
3044           if( bt->cachenode->type == Delete )
3045                 continue;
3046           return bt->cacheslot;
3047         }
3048
3049         //  cache key is larger
3050
3051         if( cmp > 0 ) {
3052           bt->phase = 1;
3053           return bt->mainslot;
3054         }
3055
3056         //      keys are equal
3057
3058         bt->phase = 2;
3059
3060         if( bt->cachenode->type == Delete )
3061                 continue;
3062
3063         return bt->cacheslot;
3064   }
3065 }
3066
3067 //  start sweep of keys
3068
3069 BTERR bt_startkey (BtDb *bt, unsigned char *key, uint len)
3070 {
3071 BtPageSet set[1];
3072 uint slot;
3073
3074         // cache btree page
3075
3076         if( slot = bt_loadpage (bt->mgr, bt->cacheset, key, len, 0, BtLockRead, 0) )
3077                 bt->cacheslot = slot - 1;
3078         else
3079           return bt->mgr->err;
3080
3081         // main btree page
3082
3083         if( slot = bt_loadpage (bt->main, bt->mainset, key, len, 0, BtLockRead, 0) )
3084                 bt->mainslot = slot - 1;
3085         else
3086           return bt->mgr->err;
3087
3088         bt->phase = 2;
3089         return 0;
3090 }
3091
3092 //      flush cache pages to main btree
3093
3094 BTERR bt_flushmain (BtDb *bt)
3095 {
3096 uint count, cnt = 0;
3097 BtPageSet set[1];
3098
3099   while( bt->mgr->pagezero->leafpages > 0 ) {
3100         if( set->latch = bt_pinlatch (bt->mgr, bt->mgr->pagezero->leaf_page) )
3101           set->page = bt_mappage (bt->mgr, set->latch);
3102         else
3103           return bt->mgr->err;
3104
3105         bt_lockpage(BtLockWrite, set->latch, 0, __LINE__);
3106         count = set->page->cnt;
3107
3108         if( !set->page->right )
3109                 count--;
3110
3111 if( !(cnt++ % 100) )
3112 fprintf(stderr, "Promote LEAF_page %d with %d keys\n", cnt, set->page->act);
3113
3114         if( bt_atomicexec (bt->main, set->page, count, bt->tid) )
3115           return bt->mgr->line = bt->main->line, bt->mgr->err = bt->main->err;
3116
3117         if( set->page->right )
3118           if( bt_deletepage (bt->mgr, set, 0) )
3119                 return bt->mgr->err;
3120           else
3121                 continue;
3122
3123         bt_unlockpage(BtLockWrite, set->latch, __LINE__);
3124         bt_unpinlatch (set->latch);
3125         return 0;
3126   }
3127
3128   //  leaf page count is off
3129
3130   bt->mgr->line = __LINE__;
3131   return bt->mgr->err = BTERR_ovflw;
3132 }
3133
3134 #ifdef STANDALONE
3135
3136 #ifndef unix
3137 double getCpuTime(int type)
3138 {
3139 FILETIME crtime[1];
3140 FILETIME xittime[1];
3141 FILETIME systime[1];
3142 FILETIME usrtime[1];
3143 SYSTEMTIME timeconv[1];
3144 double ans = 0;
3145
3146         memset (timeconv, 0, sizeof(SYSTEMTIME));
3147
3148         switch( type ) {
3149         case 0:
3150                 GetSystemTimeAsFileTime (xittime);
3151                 FileTimeToSystemTime (xittime, timeconv);
3152                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3153                 break;
3154         case 1:
3155                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3156                 FileTimeToSystemTime (usrtime, timeconv);
3157                 break;
3158         case 2:
3159                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3160                 FileTimeToSystemTime (systime, timeconv);
3161                 break;
3162         }
3163
3164         ans += (double)timeconv->wHour * 3600;
3165         ans += (double)timeconv->wMinute * 60;
3166         ans += (double)timeconv->wSecond;
3167         ans += (double)timeconv->wMilliseconds / 1000;
3168         return ans;
3169 }
3170 #else
3171 #include <time.h>
3172 #include <sys/resource.h>
3173
3174 double getCpuTime(int type)
3175 {
3176 struct rusage used[1];
3177 struct timeval tv[1];
3178
3179         switch( type ) {
3180         case 0:
3181                 gettimeofday(tv, NULL);
3182                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3183
3184         case 1:
3185                 getrusage(RUSAGE_SELF, used);
3186                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3187
3188         case 2:
3189                 getrusage(RUSAGE_SELF, used);
3190                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3191         }
3192
3193         return 0;
3194 }
3195 #endif
3196
3197 void bt_poolaudit (BtMgr *mgr, char *type)
3198 {
3199 BtLatchSet *latch, test[1];
3200 uint entry;
3201
3202         memset (test, 0, sizeof(test));
3203
3204         if( memcmp (test, mgr->latchsets, sizeof(test)) )
3205                 fprintf(stderr, "%s latchset zero overwritten\n", type);
3206
3207         for( entry = 0; ++entry < mgr->pagezero->latchtotal; ) {
3208                 latch = mgr->latchsets + entry;
3209
3210                 if( *latch->modify->value )
3211                         fprintf(stderr, "%s latchset %d modifylocked for page %lld\n", type, entry, latch->page_no);
3212
3213                 if( latch->pin )
3214                         fprintf(stderr, "%s latchset %d pinned %d times for page %lld\n", type, entry, latch->pin, latch->page_no);
3215         }
3216 }
3217
3218 typedef struct {
3219         char idx;
3220         char *type;
3221         char *infile;
3222         BtMgr *main;
3223         BtMgr *mgr;
3224         int num;
3225 } ThreadArg;
3226
3227 //  standalone program to index file of keys
3228 //  then list them onto std-out
3229
3230 #ifdef unix
3231 void *index_file (void *arg)
3232 #else
3233 uint __stdcall index_file (void *arg)
3234 #endif
3235 {
3236 int line = 0, found = 0, cnt = 0, cachecnt, idx;
3237 int ch, len = 0, slot, type = 0;
3238 unsigned char key[BT_maxkey];
3239 unsigned char buff[65536];
3240 uint nxt = sizeof(buff);
3241 ThreadArg *args = arg;
3242 uint counts[8][2];
3243 BtPageSet set[1];
3244 BtPage page;
3245 uid page_no;
3246 int vallen;
3247 BtKey *ptr;
3248 BtVal *val;
3249 uint size;
3250 BtDb *bt;
3251 FILE *in;
3252
3253         bt = bt_open (args->mgr, args->main);
3254         page = (BtPage)buff;
3255
3256         if( args->idx < strlen (args->type) )
3257                 ch = args->type[args->idx];
3258         else
3259                 ch = args->type[strlen(args->type) - 1];
3260
3261         switch(ch | 0x20)
3262         {
3263         case 'a':
3264           bt_poolaudit(bt->mgr, "cache");
3265           bt_poolaudit(bt->main, "main");
3266           break;
3267
3268         case 'm':
3269           fprintf(stderr, "started flushing cache to main btree\n");
3270
3271           if( bt->main )
3272                 if( bt_flushmain(bt) )
3273                   fprintf(stderr, "Error %d Line: %d\n", bt->mgr->err, bt->mgr->line), exit(0);
3274
3275           break;
3276
3277         case 'd':
3278                 type = Delete;
3279
3280         case 'p':
3281                 if( !type )
3282                         type = Unique;
3283
3284                 if( args->num )
3285                  if( type == Delete )
3286                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3287                  else
3288                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3289                 else
3290                  if( type == Delete )
3291                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3292                  else
3293                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3294
3295                 if( in = fopen (args->infile, "rb") )
3296                   while( ch = getc(in), ch != EOF )
3297                         if( ch == '\n' )
3298                         {
3299                           line++;
3300
3301                           if( !args->num ) {
3302                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique) )
3303                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3304                             len = 0;
3305                                 continue;
3306                           }
3307
3308                           nxt -= len - 10;
3309                           memcpy (buff + nxt, key + 10, len - 10);
3310                           nxt -= 1;
3311                           buff[nxt] = len - 10;
3312                           nxt -= 10;
3313                           memcpy (buff + nxt, key, 10);
3314                           nxt -= 1;
3315                           buff[nxt] = 10;
3316                           slotptr(page,++cnt)->off  = nxt;
3317                           slotptr(page,cnt)->type = type;
3318                           slotptr(page,cnt)->dead = 0;
3319                           len = 0;
3320
3321                           if( cnt < args->num )
3322                                 continue;
3323
3324                           page->cnt = cnt;
3325                           page->act = cnt;
3326                           page->min = nxt;
3327
3328                           if( bt_atomictxn (bt, page) )
3329                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3330                           nxt = sizeof(buff);
3331                           cnt = 0;
3332
3333                         }
3334                         else if( len < BT_maxkey )
3335                                 key[len++] = ch;
3336                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3337                 break;
3338
3339         case 'w':
3340                 fprintf(stderr, "started indexing for %s\n", args->infile);
3341                 if( in = fopen (args->infile, "r") )
3342                   while( ch = getc(in), ch != EOF )
3343                         if( ch == '\n' )
3344                         {
3345                           line++;
3346
3347                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique) )
3348                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3349                           len = 0;
3350                         }
3351                         else if( len < BT_maxkey )
3352                                 key[len++] = ch;
3353                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3354                 break;
3355
3356         case 'f':
3357                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3358                 if( in = fopen (args->infile, "rb") )
3359                   while( ch = getc(in), ch != EOF )
3360                         if( ch == '\n' )
3361                         {
3362                           line++;
3363                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3364                                 found++;
3365                           else if( bt->mgr->err )
3366                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3367                           len = 0;
3368                         }
3369                         else if( len < BT_maxkey )
3370                                 key[len++] = ch;
3371                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3372                 break;
3373
3374         case 's':
3375                 fprintf(stderr, "started forward scan\n");
3376                 if( bt_startkey (bt, NULL, 0) )
3377                         fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3378
3379                 while( bt_nextkey (bt) ) {
3380                   if( bt->phase == 1 ) {
3381                         len = bt->mainkey->len;
3382
3383                         if( bt->mainnode->type == Duplicate )
3384                                 len -= BtId;
3385
3386                         fwrite (bt->mainkey->key, len, 1, stdout);
3387                         fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3388                   } else {
3389                         len = bt->cachekey->len;
3390
3391                         if( bt->cachenode->type == Duplicate )
3392                                 len -= BtId;
3393
3394                         fwrite (bt->cachekey->key, len, 1, stdout);
3395                         fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3396                   }
3397
3398                   fputc ('\n', stdout);
3399                   cnt++;
3400             }
3401
3402                 bt_unlockpage(BtLockRead, bt->cacheset->latch, __LINE__);
3403                 bt_unpinlatch (bt->cacheset->latch);
3404
3405                 bt_unlockpage(BtLockRead, bt->mainset->latch, __LINE__);
3406                 bt_unpinlatch (bt->mainset->latch);
3407
3408                 fprintf(stderr, " Total keys read %d\n", cnt);
3409                 break;
3410
3411         case 'r':
3412                 fprintf(stderr, "started reverse scan\n");
3413                 if( bt_lastkey (bt) )
3414                         fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3415
3416                 while( bt_prevkey (bt) ) {
3417                   if( bt->phase == 1 ) {
3418                         len = bt->mainkey->len;
3419
3420                         if( bt->mainnode->type == Duplicate )
3421                                 len -= BtId;
3422
3423                         fwrite (bt->mainkey->key, len, 1, stdout);
3424                         fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3425                   } else {
3426                         len = bt->cachekey->len;
3427
3428                         if( bt->cachenode->type == Duplicate )
3429                                 len -= BtId;
3430
3431                         fwrite (bt->cachekey->key, len, 1, stdout);
3432                         fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3433                   }
3434
3435                   fputc ('\n', stdout);
3436                   cnt++;
3437             }
3438
3439                 bt_unlockpage(BtLockRead, bt->cacheset->latch, __LINE__);
3440                 bt_unpinlatch (bt->cacheset->latch);
3441
3442                 bt_unlockpage(BtLockRead, bt->mainset->latch, __LINE__);
3443                 bt_unpinlatch (bt->mainset->latch);
3444
3445                 fprintf(stderr, " Total keys read %d\n", cnt);
3446                 break;
3447
3448         case 'c':
3449                 fprintf(stderr, "started counting LSM cache btree\n");
3450                 memset (counts, 0, sizeof(counts));
3451                 page_no = bt->mgr->pagezero->leaf_page;
3452
3453                 size = bt->mgr->page_size << bt->mgr->leaf_xtra;
3454                 page = malloc(size);
3455
3456 #ifdef unix
3457                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3458 #endif
3459                 while( page_no < bt->mgr->pagezero->allocpage ) {
3460                   if( bt_readpage (bt->mgr, page, page_no, 0) )
3461                         fprintf(stderr, "Unable to read page %lld from cache\n", page_no), exit(1);
3462                   if( !page->lvl && !page->free ) {
3463                     cnt += page->act;
3464
3465                         for( idx = 0; idx++ < page->cnt; ) {
3466                         BtSlot *node = slotptr (page, idx);
3467                           counts[node->type][node->dead]++;
3468                         }
3469                   }
3470                   page_no += 1 << bt->mgr->leaf_xtra;
3471                 }
3472                 
3473                 cachecnt = --cnt;       // remove stopper key
3474                 counts[Unique][0]--;
3475
3476                 fprintf(stderr, "  Unique    : %d  dead: %d\n", counts[Unique][0], counts[Unique][1]);
3477                 fprintf(stderr, "  Duplicates: %d  dead: %d\n", counts[Duplicate][0], counts[Duplicate][1]);
3478                 fprintf(stderr, "  Librarian : %d  dead: %d\n", counts[Librarian][0], counts[Librarian][1]);
3479                 fprintf(stderr, "  Deletion  : %d  dead: %d\n", counts[Delete][0], counts[Delete][1]);
3480                 fprintf(stderr, "total cache keys count: %d\n", cachecnt);
3481                 free (page);
3482
3483                 fprintf(stderr, "started counting LSM main btree\n");
3484                 memset (counts, 0, sizeof(counts));
3485                 size = bt->main->page_size << bt->main->leaf_xtra;
3486                 page_no = bt->mgr->pagezero->leaf_page;
3487                 page = malloc(size);
3488                 cnt = 0;
3489
3490 #ifdef unix
3491                 posix_fadvise( bt->main->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3492 #endif
3493                 while( page_no < bt->main->pagezero->allocpage ) {
3494                   if( bt_readpage (bt->main, page, page_no, 0) )
3495                         fprintf(stderr, "Unable to read page %lld from main\n", page_no), exit(1);
3496                   if( !page->lvl && !page->free ) {
3497                     cnt += page->act;
3498
3499                         for( idx = 0; idx++ < page->cnt; ) {
3500                         BtSlot *node = slotptr (page, idx);
3501                           counts[node->type][node->dead]++;
3502                         }
3503                   }
3504                   page_no += 1 << bt->main->leaf_xtra;
3505                 }
3506                 
3507                 cnt--;  // remove stopper key
3508                 counts[Unique][0]--;
3509
3510                 fprintf(stderr, "  Unique    : %d  dead: %d\n", counts[Unique][0], counts[Unique][1]);
3511                 fprintf(stderr, "  Duplicates: %d  dead: %d\n", counts[Duplicate][0], counts[Duplicate][1]);
3512                 fprintf(stderr, "  Librarian : %d  dead: %d\n", counts[Librarian][0], counts[Librarian][1]);
3513                 fprintf(stderr, "  Deletion  : %d  dead: %d\n", counts[Delete][0], counts[Delete][1]);
3514                 fprintf(stderr, "total main keys count : %d\n", cnt);
3515                 fprintf(stderr, "Total keys counted    : %d\n", cnt + cachecnt);
3516                 free (page);
3517                 break;
3518         }
3519
3520         bt_close (bt);
3521 #ifdef unix
3522         return NULL;
3523 #else
3524         return 0;
3525 #endif
3526 }
3527
3528 typedef struct timeval timer;
3529
3530 int main (int argc, char **argv)
3531 {
3532 int idx, cnt, len, slot, err;
3533 double start, stop;
3534 #ifdef unix
3535 pthread_t *threads;
3536 #else
3537 HANDLE *threads;
3538 #endif
3539 ThreadArg *args;
3540 uint mainleafpool = 0;
3541 uint mainleafxtra = 0;
3542 uint maxleaves = 0;
3543 uint poolsize = 0;
3544 uint leafpool = 0;
3545 uint leafxtra = 0;
3546 uint mainpool = 0;
3547 uint mainbits = 0;
3548 int bits = 16;
3549 float elapsed;
3550 int num = 0;
3551 char key[1];
3552 BtMgr *main;
3553 BtMgr *mgr;
3554 BtKey *ptr;
3555
3556         if( argc < 3 ) {
3557                 fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize txnsize mainbits mainleafbits mainpool maxleaves src_file1 src_file2 ... ]\n", argv[0]);
3558                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
3559                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
3560                 fprintf (stderr, "  cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort/(c)ount/(m)ainflush/(a)udit, with a one character command for each input src_file. A command can also be given with no input file\n");
3561                 fprintf (stderr, "  pagebits is the page size in bits for the cache btree\n");
3562                 fprintf (stderr, "  leafbits is the number of xtra bits for a leaf page\n");
3563                 fprintf (stderr, "  poolsize is the number of latches in latch pool for the cache btree\n");
3564                 fprintf (stderr, "  txnsize = n to block transactions into n units, or zero for no transactions\n");
3565                 fprintf (stderr, "  mainbits is the page size of the main btree in bits\n");
3566                 fprintf (stderr, "  mainpool is the number of latches in the main latch pool\n");
3567                 fprintf (stderr, "  maxleaves is the threashold for LSM leaf page promotion\n");
3568                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3569                 exit(0);
3570         }
3571
3572         start = getCpuTime(0);
3573
3574         if( argc > 4 )
3575                 bits = atoi(argv[4]);
3576
3577         if( argc > 5 )
3578                 leafxtra = atoi(argv[5]);
3579
3580         if( argc > 6 )
3581                 poolsize = atoi(argv[6]);
3582
3583         if( argc > 7 )
3584                 num = atoi(argv[7]);
3585
3586         if( argc > 8 )
3587                 mainbits = atoi(argv[8]);
3588
3589         if( argc > 9 )
3590                 mainleafxtra = atoi(argv[9]);
3591
3592         if( argc > 10 )
3593                 mainpool = atoi(argv[10]);
3594
3595         if( argc > 11 )
3596                 maxleaves = atoi(argv[11]);
3597
3598         if( argc > 12 )
3599                 cnt = argc - 12;
3600         else
3601                 cnt = 0;
3602
3603 #ifdef unix
3604         threads = malloc (cnt * sizeof(pthread_t));
3605 #else
3606         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3607 #endif
3608         args = malloc ((cnt + 1) * sizeof(ThreadArg));
3609
3610         mgr = bt_mgr (argv[1], bits, leafxtra, poolsize);
3611
3612         if( !mgr ) {
3613                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3614                 exit (1);
3615         } else {
3616                 mgr->maxleaves = maxleaves;
3617                 mgr->type = 0;
3618         }
3619
3620         main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool);
3621
3622         if( !main ) {
3623                 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3624                 exit (1);
3625         } else
3626                 main->type = 1;
3627
3628         //      fire off threads
3629
3630         if( cnt > 1 )
3631           for( idx = 0; idx < cnt; idx++ ) {
3632                 args[idx].infile = argv[idx + 12];
3633                 args[idx].type = argv[3];
3634                 args[idx].main = main;
3635                 args[idx].mgr = mgr;
3636                 args[idx].num = num;
3637                 args[idx].idx = idx;
3638 #ifdef unix
3639                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3640                         fprintf(stderr, "Error creating thread %d\n", err);
3641 #else
3642                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3643 #endif
3644           }
3645         else {
3646                 args[0].infile = argv[12];
3647                 args[0].type = argv[3];
3648                 args[0].main = main;
3649                 args[0].mgr = mgr;
3650                 args[0].num = num;
3651                 args[0].idx = 0;
3652                 index_file (args);
3653         }
3654
3655         //      wait for termination
3656
3657 #ifdef unix
3658         if( cnt > 1 )
3659           for( idx = 0; idx < cnt; idx++ )
3660                 pthread_join (threads[idx], NULL);
3661 #else
3662         if( cnt > 1 )
3663           WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3664
3665         if( cnt > 1 )
3666           for( idx = 0; idx < cnt; idx++ )
3667                 CloseHandle(threads[idx]);
3668 #endif
3669         fprintf(stderr, "cache %lld leaves %lld upper %d found\n", mgr->pagezero->leafpages, mgr->pagezero->upperpages, mgr->found);
3670         if( main )
3671                 fprintf(stderr, "main %lld leaves %lld upper %d found\n", main->pagezero->leafpages, main->pagezero->upperpages, main->found);
3672
3673         bt_mgrclose (mgr);
3674
3675         if( main )
3676                 bt_mgrclose (main);
3677
3678         elapsed = getCpuTime(0) - start;
3679         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3680         elapsed = getCpuTime(1);
3681         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3682         elapsed = getCpuTime(2);
3683         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3684 }
3685
3686 BtKey *bt_fence (BtPage page)
3687 {
3688 return fenceptr(page);
3689 }
3690
3691 BtKey *bt_key (BtPage page, uint slot)
3692 {
3693 return keyptr(page,slot);
3694 }
3695
3696 BtSlot *bt_slot (BtPage page, uint slot)
3697 {
3698 return slotptr(page,slot);
3699 }
3700 #endif  //STANDALONE