]> pd.if.org Git - btree/blob - threadskv10b.c
Rewrite re-entrant phase-fair latching, continue LSM btree code
[btree] / threadskv10b.c
1 // btree version threadskv10b futex version
2 //      with reworked bt_deletekey code,
3 //      phase-fair re-entrant reader writer locks,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      redo log for failure recovery
10 //      and LSM B-trees for write optimization
11
12 // 17 OCT 2014
13
14 // author: karl malbrain, malbrain@cal.berkeley.edu
15
16 /*
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
19
20 The orginal author is Karl Malbrain.
21
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
28 */
29
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
32
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
35
36 #ifdef linux
37 #define _GNU_SOURCE
38 #include <linux/futex.h>
39 #define SYS_futex 202
40 #endif
41
42 #ifdef unix
43 #include <unistd.h>
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <fcntl.h>
47 #include <sys/time.h>
48 #include <sys/mman.h>
49 #include <errno.h>
50 #include <pthread.h>
51 #include <limits.h>
52 #else
53 #define WIN32_LEAN_AND_MEAN
54 #include <windows.h>
55 #include <stdio.h>
56 #include <stdlib.h>
57 #include <time.h>
58 #include <fcntl.h>
59 #include <process.h>
60 #include <intrin.h>
61 #endif
62
63 #include <memory.h>
64 #include <string.h>
65 #include <stddef.h>
66
67 typedef unsigned long long      uid;
68 typedef unsigned long long      logseqno;
69
70 #ifndef unix
71 typedef unsigned long long      off64_t;
72 typedef unsigned short          ushort;
73 typedef unsigned int            uint;
74 #endif
75
76 #define BT_ro 0x6f72    // ro
77 #define BT_rw 0x7772    // rw
78
79 #define BT_maxbits              26                                      // maximum page size in bits
80 #define BT_minbits              9                                       // minimum page size in bits
81 #define BT_minpage              (1 << BT_minbits)       // minimum page size
82 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
83
84 //  BTree page number constants
85 #define ALLOC_page              0       // allocation page
86 #define ROOT_page               1       // root of the btree
87 #define LEAF_page               2       // first page of leaves
88 #define REDO_page               3       // first page of redo buffer
89
90 //      Number of levels to create in a new BTree
91
92 #define MIN_lvl                 2
93
94 /*
95 There are six lock types for each node in four independent sets: 
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
101 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
102 */
103
104 typedef enum{
105         BtLockAccess = 1,
106         BtLockDelete = 2,
107         BtLockRead   = 4,
108         BtLockWrite  = 8,
109         BtLockParent = 16,
110         BtLockAtomic = 32,
111         BtLockLink   = 64
112 } BtLock;
113
114 typedef struct {
115   union {
116         struct {
117           volatile ushort xlock[1];             // one writer has exclusive lock
118           volatile ushort wrt[1];               // count of other writers waiting
119         } bits[1];
120         uint value[1];
121   };
122 } BtMutexLatch;
123
124 #define XCL 1
125 #define WRT 65536
126
127 //      definition for reader/writer reentrant lock implementation
128
129 typedef struct {
130   BtMutexLatch xcl[1];
131   union {
132         struct {
133                 volatile ushort tid[1];
134                 volatile ushort readers[1];
135         } bits[1];
136         uint value[1];
137   };
138   volatile ushort waitwrite[1];
139   volatile ushort waitread[1];
140   volatile ushort phase[1];             // phase == 1 for reading after write
141   volatile ushort dup[1];               // reentrant counter
142 } RWLock;
143
144 //      write only reentrant lock
145
146 typedef struct {
147   BtMutexLatch xcl[1];
148   union {
149         struct {
150                 volatile ushort tid[1];
151                 volatile ushort dup[1];
152         } bits[1];
153         uint value[1];
154   };
155   volatile uint waiters[1];
156 } WOLock;
157
158 //      mode & definition for lite latch implementation
159
160 enum {
161         QueRd = 1,              // reader queue
162         QueWr = 2               // writer queue
163 } RWQueue;
164
165 //  hash table entries
166
167 typedef struct {
168         uint entry;             // Latch table entry at head of chain
169         BtMutexLatch latch[1];
170 } BtHashEntry;
171
172 //      latch manager table structure
173
174 typedef struct {
175         uid page_no;                    // latch set page number
176         RWLock readwr[1];               // read/write page lock
177         RWLock access[1];               // Access Intent/Page delete
178         RWLock parent[1];               // Posting of fence key in parent
179         RWLock atomic[1];               // Atomic update in progress
180         RWLock link[1];                 // left link being updated
181         uint split;                             // right split page atomic insert
182         uint next;                              // next entry in hash table chain
183         uint prev;                              // prev entry in hash table chain
184         ushort pin;                             // number of accessing threads
185         unsigned char dirty;    // page in cache is dirty (atomic setable)
186         BtMutexLatch modify[1]; // modify entry lite latch
187 } BtLatchSet;
188
189 //      Define the length of the page record numbers
190
191 #define BtId 6
192
193 //      Page key slot definition.
194
195 //      Keys are marked dead, but remain on the page until
196 //      it cleanup is called. The fence key (highest key) for
197 //      a leaf page is always present, even after cleanup.
198
199 //      Slot types
200
201 //      In addition to the Unique keys that occupy slots
202 //      there are Librarian and Duplicate key
203 //      slots occupying the key slot array.
204
205 //      The Librarian slots are dead keys that
206 //      serve as filler, available to add new Unique
207 //      or Dup slots that are inserted into the B-tree.
208
209 //      The Duplicate slots have had their key bytes extended
210 //      by 6 bytes to contain a binary duplicate key uniqueifier.
211
212 typedef enum {
213         Unique,
214         Librarian,
215         Duplicate,
216         Delete
217 } BtSlotType;
218
219 typedef struct {
220         uint off:BT_maxbits;    // page offset for key start
221         uint type:3;                    // type of slot
222         uint dead:1;                    // set for deleted slot
223 } BtSlot;
224
225 //      The key structure occupies space at the upper end of
226 //      each page.  It's a length byte followed by the key
227 //      bytes.
228
229 typedef struct {
230         unsigned char len;              // this can be changed to a ushort or uint
231         unsigned char key[0];
232 } BtKey;
233
234 //      the value structure also occupies space at the upper
235 //      end of the page. Each key is immediately followed by a value.
236
237 typedef struct {
238         unsigned char len;              // this can be changed to a ushort or uint
239         unsigned char value[0];
240 } BtVal;
241
242 #define BT_maxkey       255             // maximum number of bytes in a key
243 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
244
245 //      The first part of an index page.
246 //      It is immediately followed
247 //      by the BtSlot array of keys.
248
249 //      note that this structure size
250 //      must be a multiple of 8 bytes
251 //      in order to place PageZero correctly.
252
253 typedef struct BtPage_ {
254         uint cnt;                                       // count of keys in page
255         uint act;                                       // count of active keys
256         uint min;                                       // next key offset
257         uint garbage;                           // page garbage in bytes
258         unsigned char bits:7;           // page size in bits
259         unsigned char free:1;           // page is on free chain
260         unsigned char lvl:7;            // level of page
261         unsigned char kill:1;           // page is being deleted
262         unsigned char right[BtId];      // page number to right
263         unsigned char left[BtId];       // page number to left
264         unsigned char filler[2];        // padding to multiple of 8
265         logseqno lsn;                           // log sequence number applied
266         uid page_no;                            // this page number
267 } *BtPage;
268
269 //  The loadpage interface object
270
271 typedef struct {
272         BtPage page;            // current page pointer
273         BtLatchSet *latch;      // current page latch set
274 } BtPageSet;
275
276 //      structure for latch manager on ALLOC_page
277
278 typedef struct {
279         struct BtPage_ alloc[1];                // next page_no in right ptr
280         unsigned char freechain[BtId];  // head of free page_nos chain
281         unsigned long long activepages; // number of active pages
282         uint redopages;                                 // number of redo pages in file
283 } BtPageZero;
284
285 //      The object structure for Btree access
286
287 typedef struct {
288         uint page_size;                         // page size    
289         uint page_bits;                         // page size in bits    
290 #ifdef unix
291         int idx;
292 #else
293         HANDLE idx;
294 #endif
295         BtPageZero *pagezero;           // mapped allocation page
296         BtHashEntry *hashtable;         // the buffer pool hash table entries
297         BtLatchSet *latchsets;          // mapped latch set from buffer pool
298         unsigned char *pagepool;        // mapped to the buffer pool pages
299         unsigned char *redobuff;        // mapped recovery buffer pointer
300         logseqno lsn, flushlsn;         // current & first lsn flushed
301         BtMutexLatch redo[1];           // redo area lite latch
302         BtMutexLatch lock[1];           // allocation area lite latch
303         BtMutexLatch maps[1];           // mapping segments lite latch
304         ushort thread_no[1];            // next thread number
305         uint nlatchpage;                        // number of latch pages at BT_latch
306         uint latchtotal;                        // number of page latch entries
307         uint latchhash;                         // number of latch hash table slots
308         uint latchvictim;                       // next latch entry to examine
309         uint latchpromote;                      // next latch entry to promote
310         uint redolast;                          // last msync size of recovery buff
311         uint redoend;                           // eof/end element in recovery buff
312         int err;                                        // last error
313         int line;                                       // last error line no
314         int found;                                      // number of keys found by delete
315         int reads, writes;                      // number of reads and writes
316 #ifndef unix
317         HANDLE halloc;                          // allocation handle
318         HANDLE hpool;                           // buffer pool handle
319 #endif
320         uint segments;                          // number of memory mapped segments
321         unsigned char *pages[64000];// memory mapped segments of b-tree
322 } BtMgr;
323
324 typedef struct {
325         BtMgr *mgr;                                     // buffer manager for entire process
326         BtMgr *main;                            // buffer manager for main btree
327         BtPage frame;                           // cached page frame for promote
328         BtPage cursor;                          // cached page frame for start/next
329         ushort thread_no;                       // thread number
330         unsigned char key[BT_keyarray]; // last found complete key
331 } BtDb;
332
333 //      atomic txn structures
334
335 typedef struct {
336         logseqno reqlsn;        // redo log seq no required
337         uint entry;                     // latch table entry number
338         uint slot:31;           // page slot number
339         uint reuse:1;           // reused previous page
340 } AtomicTxn;
341
342 //      Catastrophic errors
343
344 typedef enum {
345         BTERR_ok = 0,
346         BTERR_struct,
347         BTERR_ovflw,
348         BTERR_lock,
349         BTERR_map,
350         BTERR_read,
351         BTERR_wrt,
352         BTERR_atomic,
353         BTERR_recovery
354 } BTERR;
355
356 #define CLOCK_bit 0x8000
357
358 // recovery manager entry types
359
360 typedef enum {
361         BTRM_eof = 0,   // rest of buffer is emtpy
362         BTRM_add,               // add a unique key-value to btree
363         BTRM_dup,               // add a duplicate key-value to btree
364         BTRM_del,               // delete a key-value from btree
365         BTRM_upd,               // update a key with a new value
366         BTRM_new,               // allocate a new empty page
367         BTRM_old                // reuse an old empty page
368 } BTRM;
369
370 // recovery manager entry
371 //      structure followed by BtKey & BtVal
372
373 typedef struct {
374         logseqno reqlsn;        // log sequence number required
375         logseqno lsn;           // log sequence number for entry
376         uint len;                       // length of entry
377         unsigned char type;     // type of entry
378         unsigned char lvl;      // level of btree entry pertains to
379 } BtLogHdr;
380
381 // B-Tree functions
382
383 extern void bt_close (BtDb *bt);
384 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
385 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
386 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
387 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
388 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
389 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
390 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
391
392 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
393
394 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
395 extern uint bt_nextkey   (BtDb *bt, uint slot);
396 extern uint bt_prevkey (BtDb *db, uint slot);
397 extern uint bt_lastkey (BtDb *db);
398
399 //      manager functions
400 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
401 extern void bt_mgrclose (BtMgr *mgr);
402 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
403 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
404
405 //      atomic transaction functions
406 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
407 BTERR bt_txnpromote (BtDb *bt);
408
409 //  The page is allocated from low and hi ends.
410 //  The key slots are allocated from the bottom,
411 //      while the text and value of the key
412 //  are allocated from the top.  When the two
413 //  areas meet, the page is split into two.
414
415 //  A key consists of a length byte, two bytes of
416 //  index number (0 - 65535), and up to 253 bytes
417 //  of key value.
418
419 //  Associated with each key is a value byte string
420 //      containing any value desired.
421
422 //  The b-tree root is always located at page 1.
423 //      The first leaf page of level zero is always
424 //      located on page 2.
425
426 //      The b-tree pages are linked with next
427 //      pointers to facilitate enumerators,
428 //      and provide for concurrency.
429
430 //      When to root page fills, it is split in two and
431 //      the tree height is raised by a new root at page
432 //      one with two keys.
433
434 //      Deleted keys are marked with a dead bit until
435 //      page cleanup. The fence key for a leaf node is
436 //      always present
437
438 //  To achieve maximum concurrency one page is locked at a time
439 //  as the tree is traversed to find leaf key in question. The right
440 //  page numbers are used in cases where the page is being split,
441 //      or consolidated.
442
443 //  Page 0 is dedicated to lock for new page extensions,
444 //      and chains empty pages together for reuse. It also
445 //      contains the latch manager hash table.
446
447 //      The ParentModification lock on a node is obtained to serialize posting
448 //      or changing the fence key for a node.
449
450 //      Empty pages are chained together through the ALLOC page and reused.
451
452 //      Access macros to address slot and key values from the page
453 //      Page slots use 1 based indexing.
454
455 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
456 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
457 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
458
459 void bt_putid(unsigned char *dest, uid id)
460 {
461 int i = BtId;
462
463         while( i-- )
464                 dest[i] = (unsigned char)id, id >>= 8;
465 }
466
467 uid bt_getid(unsigned char *src)
468 {
469 uid id = 0;
470 int i;
471
472         for( i = 0; i < BtId; i++ )
473                 id <<= 8, id |= *src++; 
474
475         return id;
476 }
477
478 //      lite weight spin lock Latch Manager
479
480 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
481 {
482         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
483 }
484
485 void bt_mutexlock(BtMutexLatch *latch)
486 {
487 BtMutexLatch prev[1];
488 uint slept = 0;
489
490   while( 1 ) {
491         *prev->value = __sync_fetch_and_or(latch->value, XCL);
492
493         if( !*prev->bits->xlock ) {                     // did we set XCL?
494             if( slept )
495                   __sync_fetch_and_sub(latch->value, WRT);
496                 return;
497         }
498
499         if( !slept ) {
500                 *prev->bits->wrt += 1;
501                 __sync_fetch_and_add(latch->value, WRT);
502         }
503
504         sys_futex (latch->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr);
505         slept = 1;
506   }
507 }
508
509 //      try to obtain write lock
510
511 //      return 1 if obtained,
512 //              0 otherwise
513
514 int bt_mutextry(BtMutexLatch *latch)
515 {
516 BtMutexLatch prev[1];
517
518         *prev->value = __sync_fetch_and_or(latch->value, XCL);
519
520         //      take write access if exclusive bit was clear
521
522         return !*prev->bits->xlock;
523 }
524
525 //      clear write mode
526
527 void bt_releasemutex(BtMutexLatch *latch)
528 {
529 BtMutexLatch prev[1];
530
531         *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
532
533         if( *prev->bits->wrt )
534           sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
535 }
536
537 //      reentrant reader/writer lock implementation
538
539 void WriteLock (RWLock *lock, ushort tid)
540 {
541 uint waited = 0;
542 RWLock prev[1];
543
544   while( 1 ) {
545         bt_mutexlock(lock->xcl);
546         *prev = *lock;
547
548         //      is this a re-entrant request?
549
550         if( *prev->bits->tid == tid )
551           *prev->dup += 1;
552
553         //      wait if write already taken, or there are readers
554
555         else if( *prev->bits->tid || *prev->bits->readers ) {
556           if( !waited )
557                 waited++, *lock->waitwrite += 1;
558
559         //      otherwise, we can take the lock
560
561         } else {
562           if( waited )
563                 *lock->waitwrite -= 1;
564
565           *lock->bits->tid = tid;
566           *lock->phase = 0;                     // set writing phase
567         }
568
569         bt_releasemutex(lock->xcl);
570
571         if( *lock->bits->tid == tid )
572           return;
573
574         sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr );
575   }
576 }
577
578 void WriteRelease (RWLock *lock)
579 {
580         bt_mutexlock(lock->xcl);
581
582         //      were we reentrant?
583
584         if( *lock->dup ) {
585           *lock->dup -= 1;
586           bt_releasemutex(lock->xcl);
587           return;
588         }
589
590         //      release write lock and
591         //      set reading after write phase
592
593         *lock->bits->tid = 0;
594
595         //      were readers waiting for a write cycle?
596
597         if( *lock->waitread ) {
598           *lock->phase = 1;
599           sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 32768, NULL, NULL, QueRd );
600
601         //  otherwise were writers waiting
602
603         } else if( *lock->waitwrite ) {
604           *lock->phase = 0;
605           sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
606         }
607
608         bt_releasemutex(lock->xcl);
609 }
610
611 void ReadLock (RWLock *lock, ushort tid)
612 {
613 uint xit, waited = 0;
614 RWLock prev[1];
615
616   while( 1 ) {
617         bt_mutexlock(lock->xcl);
618         *prev = *lock;
619         xit = 0;
620
621         //      wait if a write lock is currenty active
622         //      or we are not in a new read cycle and
623         //      writers are waiting.
624
625         if( *prev->bits->tid || !*prev->phase && *prev->waitwrite ) {
626           if( !waited )
627                 waited++, *lock->waitread += 1;
628
629         //      else we can take the lock
630
631         } else {
632           if( waited )
633                 *lock->waitread -= 1;
634
635           *lock->bits->readers += 1;
636           xit = 1;
637         }
638
639         bt_releasemutex(lock->xcl);
640
641         //  did we increment readers?
642
643         if( xit )
644           return;
645
646         sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueRd );
647   }
648 }
649
650 void ReadRelease (RWLock *lock)
651 {
652 RWLock prev[1];
653
654         bt_mutexlock(lock->xcl);
655         *prev = *lock;
656
657         *prev->bits->readers = *lock->bits->readers -= 1;
658
659         if( !*lock->waitread && *lock->waitwrite )
660                 *prev->phase = *lock->phase = 0;        // stop accepting new readers
661
662         bt_releasemutex(lock->xcl);
663
664         //      were writers waiting for a read cycle to finish?
665
666         if( !*prev->phase && !*prev->bits->readers )
667          if( *prev->waitwrite )
668           sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
669 }
670
671 //      recovery manager -- flush dirty pages
672
673 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
674 {
675 uint cnt3 = 0, cnt2 = 0, cnt = 0;
676 uint entry, segment;
677 BtLatchSet *latch;
678 BtPage page;
679
680   //    flush dirty pool pages to the btree
681
682 fprintf(stderr, "Start flushlsn  ");
683   for( entry = 1; entry < mgr->latchtotal; entry++ ) {
684         page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
685         latch = mgr->latchsets + entry;
686         bt_mutexlock (latch->modify);
687         bt_lockpage(BtLockRead, latch, thread_no);
688
689         if( latch->dirty ) {
690           bt_writepage(mgr, page, latch->page_no);
691           latch->dirty = 0, cnt++;
692         }
693 if( latch->pin & ~CLOCK_bit )
694 cnt2++;
695         bt_unlockpage(BtLockRead, latch);
696         bt_releasemutex (latch->modify);
697   }
698 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
699 fprintf(stderr, "begin sync");
700         for( segment = 0; segment < mgr->segments; segment++ )
701           if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
702                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
703 fprintf(stderr, " end sync\n");
704 }
705
706 //      recovery manager -- process current recovery buff on startup
707 //      this won't do much if previous session was properly closed.
708
709 BTERR bt_recoveryredo (BtMgr *mgr)
710 {
711 BtLogHdr *hdr, *eof;
712 uint offset = 0;
713 BtKey *key;
714 BtVal *val;
715
716   hdr = (BtLogHdr *)mgr->redobuff;
717   mgr->flushlsn = hdr->lsn;
718
719   while( 1 ) {
720         hdr = (BtLogHdr *)(mgr->redobuff + offset);
721         switch( hdr->type ) {
722         case BTRM_eof:
723                 mgr->lsn = hdr->lsn;
724                 return 0;
725         case BTRM_add:          // add a unique key-value to btree
726         
727         case BTRM_dup:          // add a duplicate key-value to btree
728         case BTRM_del:          // delete a key-value from btree
729         case BTRM_upd:          // update a key with a new value
730         case BTRM_new:          // allocate a new empty page
731         case BTRM_old:          // reuse an old empty page
732                 return 0;
733         }
734   }
735 }
736
737 //      recovery manager -- append new entry to recovery log
738 //      flush dirty pages to disk when it overflows.
739
740 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
741 {
742 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
743 uint amt = sizeof(BtLogHdr);
744 BtLogHdr *hdr, *eof;
745 uint last, end;
746
747         bt_mutexlock (mgr->redo);
748
749         if( key )
750           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
751
752         //      see if new entry fits in buffer
753         //      flush and reset if it doesn't
754
755         if( amt > size - mgr->redoend ) {
756           mgr->flushlsn = mgr->lsn;
757           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
758                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
759           mgr->redolast = 0;
760           mgr->redoend = 0;
761           bt_flushlsn(mgr, thread_no);
762         }
763
764         //      fill in new entry & either eof or end block
765
766         hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
767
768         hdr->len = amt;
769         hdr->type = type;
770         hdr->lvl = lvl;
771         hdr->lsn = ++mgr->lsn;
772
773         mgr->redoend += amt;
774
775         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
776         memset (eof, 0, sizeof(BtLogHdr));
777
778         //  fill in key and value
779
780         if( key ) {
781           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
782           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
783         }
784
785         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
786         memset (eof, 0, sizeof(BtLogHdr));
787         eof->lsn = mgr->lsn;
788
789         last = mgr->redolast & ~0xfff;
790         end = mgr->redoend;
791
792         if( end - last + sizeof(BtLogHdr) >= 32768 )
793           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
794                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
795           else
796                 mgr->redolast = end;
797
798         bt_releasemutex(mgr->redo);
799         return hdr->lsn;
800 }
801
802 //      recovery manager -- append transaction to recovery log
803 //      flush dirty pages to disk when it overflows.
804
805 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
806 {
807 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
808 uint amt = 0, src, type;
809 BtLogHdr *hdr, *eof;
810 uint last, end;
811 logseqno lsn;
812 BtKey *key;
813 BtVal *val;
814
815         //      determine amount of redo recovery log space required
816
817         for( src = 0; src++ < source->cnt; ) {
818           key = keyptr(source,src);
819           val = valptr(source,src);
820           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
821           amt += sizeof(BtLogHdr);
822         }
823
824         bt_mutexlock (mgr->redo);
825
826         //      see if new entry fits in buffer
827         //      flush and reset if it doesn't
828
829         if( amt > size - mgr->redoend ) {
830           mgr->flushlsn = mgr->lsn;
831           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
832                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
833           mgr->redolast = 0;
834           mgr->redoend = 0;
835           bt_flushlsn (mgr, thread_no);
836         }
837
838         //      assign new lsn to transaction
839
840         lsn = ++mgr->lsn;
841
842         //      fill in new entries
843
844         for( src = 0; src++ < source->cnt; ) {
845           key = keyptr(source, src);
846           val = valptr(source, src);
847
848           switch( slotptr(source, src)->type ) {
849           case Unique:
850                 type = BTRM_add;
851                 break;
852           case Duplicate:
853                 type = BTRM_dup;
854                 break;
855           case Delete:
856                 type = BTRM_del;
857                 break;
858           }
859
860           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
861           amt += sizeof(BtLogHdr);
862
863           hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
864           hdr->len = amt;
865           hdr->type = type;
866           hdr->lsn = lsn;
867           hdr->lvl = 0;
868
869           //  fill in key and value
870
871           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
872           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
873
874           mgr->redoend += amt;
875         }
876
877         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
878         memset (eof, 0, sizeof(BtLogHdr));
879         eof->lsn = lsn;
880
881         last = mgr->redolast & ~0xfff;
882         end = mgr->redoend;
883
884         if( end - last + sizeof(BtLogHdr) >= 32768 )
885           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
886                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
887           else
888                 mgr->redolast = end;
889
890         bt_releasemutex(mgr->redo);
891         return lsn;
892 }
893
894 //      sync a single btree page to disk
895
896 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
897 {
898 uint segment = latch->page_no >> 16;
899 BtPage perm;
900
901         if( bt_writepage (mgr, page, latch->page_no) )
902                 return mgr->err;
903
904         perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
905
906         if( msync (perm, mgr->page_size, MS_SYNC) < 0 )
907           fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
908
909         latch->dirty = 0;
910         return 0;
911 }
912
913 //      read page into buffer pool from permanent location in Btree file
914
915 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
916 {
917 int flag = PROT_READ | PROT_WRITE;
918 uint segment = page_no >> 16;
919 BtPage perm;
920
921   while( 1 ) {
922         if( segment < mgr->segments ) {
923           perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
924
925           memcpy (page, perm, mgr->page_size);
926           mgr->reads++;
927           return 0;
928         }
929
930         bt_mutexlock (mgr->maps);
931
932         if( segment < mgr->segments ) {
933           bt_releasemutex (mgr->maps);
934           continue;
935         }
936
937         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
938         mgr->segments++;
939
940         bt_releasemutex (mgr->maps);
941   }
942 }
943
944 //      write page to permanent location in Btree file
945 //      clear the dirty bit
946
947 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
948 {
949 int flag = PROT_READ | PROT_WRITE;
950 uint segment = page_no >> 16;
951 BtPage perm;
952
953   while( 1 ) {
954         if( segment < mgr->segments ) {
955           perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
956
957           memcpy (perm, page, mgr->page_size);
958           mgr->writes++;
959           return 0;
960         }
961
962         bt_mutexlock (mgr->maps);
963
964         if( segment < mgr->segments ) {
965           bt_releasemutex (mgr->maps);
966           continue;
967         }
968
969         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
970         bt_releasemutex (mgr->maps);
971         mgr->segments++;
972   }
973 }
974
975 //      set CLOCK bit in latch
976 //      decrement pin count
977
978 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
979 {
980         bt_mutexlock(latch->modify);
981         latch->pin |= CLOCK_bit;
982         latch->pin--;
983
984         bt_releasemutex(latch->modify);
985 }
986
987 //  return the btree cached page address
988
989 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
990 {
991 uid entry = latch - mgr->latchsets;
992 BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
993
994   return page;
995 }
996
997 //  return next available latch entry
998 //        and with latch entry locked
999
1000 uint bt_availnext (BtMgr *mgr)
1001 {
1002 BtLatchSet *latch;
1003 uint entry;
1004
1005   while( 1 ) {
1006 #ifdef unix
1007         entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
1008 #else
1009         entry = _InterlockedIncrement (&mgr->latchvictim);
1010 #endif
1011         entry %= mgr->latchtotal;
1012
1013         if( !entry )
1014                 continue;
1015
1016         latch = mgr->latchsets + entry;
1017
1018         if( !bt_mutextry(latch->modify) )
1019                 continue;
1020
1021         //  return this entry if it is not pinned
1022
1023         if( !latch->pin )
1024                 return entry;
1025
1026         //      if the CLOCK bit is set
1027         //      reset it to zero.
1028
1029         latch->pin &= ~CLOCK_bit;
1030         bt_releasemutex(latch->modify);
1031   }
1032 }
1033
1034 //      pin page in buffer pool
1035 //      return with latchset pinned
1036
1037 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
1038 {
1039 uint hashidx = page_no % mgr->latchhash;
1040 BtLatchSet *latch;
1041 uint entry, idx;
1042 BtPage page;
1043
1044   //  try to find our entry
1045
1046   bt_mutexlock(mgr->hashtable[hashidx].latch);
1047
1048   if( entry = mgr->hashtable[hashidx].entry ) do
1049   {
1050         latch = mgr->latchsets + entry;
1051         if( page_no == latch->page_no )
1052                 break;
1053   } while( entry = latch->next );
1054
1055   //  found our entry: increment pin
1056
1057   if( entry ) {
1058         latch = mgr->latchsets + entry;
1059         bt_mutexlock(latch->modify);
1060         latch->pin |= CLOCK_bit;
1061         latch->pin++;
1062
1063         bt_releasemutex(latch->modify);
1064         bt_releasemutex(mgr->hashtable[hashidx].latch);
1065         return latch;
1066   }
1067
1068   //  find and reuse unpinned entry
1069
1070 trynext:
1071
1072   entry = bt_availnext (mgr);
1073   latch = mgr->latchsets + entry;
1074
1075   idx = latch->page_no % mgr->latchhash;
1076
1077   //  if latch is on a different hash chain
1078   //    unlink from the old page_no chain
1079
1080   if( latch->page_no )
1081    if( idx != hashidx ) {
1082
1083         //  skip over this entry if latch not available
1084
1085     if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1086           bt_releasemutex(latch->modify);
1087           goto trynext;
1088         }
1089
1090         if( latch->prev )
1091           mgr->latchsets[latch->prev].next = latch->next;
1092         else
1093           mgr->hashtable[idx].entry = latch->next;
1094
1095         if( latch->next )
1096           mgr->latchsets[latch->next].prev = latch->prev;
1097
1098     bt_releasemutex (mgr->hashtable[idx].latch);
1099    }
1100
1101   page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1102
1103   //  update permanent page area in btree from buffer pool
1104   //    no read-lock is required since page is not pinned.
1105
1106   if( latch->dirty )
1107         if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
1108           return mgr->line = __LINE__, NULL;
1109         else
1110           latch->dirty = 0;
1111
1112   if( contents ) {
1113         memcpy (page, contents, mgr->page_size);
1114         latch->dirty = 1;
1115   } else if( bt_readpage (mgr, page, page_no) )
1116         return mgr->line = __LINE__, NULL;
1117
1118   //  link page as head of hash table chain
1119   //  if this is a never before used entry,
1120   //  or it was previously on a different
1121   //  hash table chain. Otherwise, just
1122   //  leave it in its current hash table
1123   //  chain position.
1124
1125   if( !latch->page_no || hashidx != idx ) {
1126         if( latch->next = mgr->hashtable[hashidx].entry )
1127           mgr->latchsets[latch->next].prev = entry;
1128
1129         mgr->hashtable[hashidx].entry = entry;
1130     latch->prev = 0;
1131   }
1132
1133   //  fill in latch structure
1134
1135   latch->pin = CLOCK_bit | 1;
1136   latch->page_no = page_no;
1137   latch->split = 0;
1138
1139   bt_releasemutex (latch->modify);
1140   bt_releasemutex (mgr->hashtable[hashidx].latch);
1141   return latch;
1142 }
1143   
1144 void bt_mgrclose (BtMgr *mgr)
1145 {
1146 BtLatchSet *latch;
1147 BtLogHdr *eof;
1148 uint num = 0;
1149 BtPage page;
1150 uint slot;
1151
1152         //      flush previously written dirty pages
1153         //      and write recovery buffer to disk
1154
1155         fdatasync (mgr->idx);
1156
1157         if( mgr->redoend ) {
1158                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1159                 memset (eof, 0, sizeof(BtLogHdr));
1160         }
1161
1162         //      write remaining dirty pool pages to the btree
1163
1164         for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1165                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1166                 latch = mgr->latchsets + slot;
1167
1168                 if( latch->dirty ) {
1169                         bt_writepage(mgr, page, latch->page_no);
1170                         latch->dirty = 0, num++;
1171                 }
1172         }
1173
1174         //      clear redo recovery buffer on disk.
1175
1176         if( mgr->pagezero->redopages ) {
1177                 eof = (BtLogHdr *)mgr->redobuff;
1178                 memset (eof, 0, sizeof(BtLogHdr));
1179                 eof->lsn = mgr->lsn;
1180                 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1181                   fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1182         }
1183
1184         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1185
1186 #ifdef unix
1187         while( mgr->segments )
1188                 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1189
1190         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1191         munmap (mgr->pagezero, mgr->page_size);
1192 #else
1193         FlushViewOfFile(mgr->pagezero, 0);
1194         UnmapViewOfFile(mgr->pagezero);
1195         UnmapViewOfFile(mgr->pagepool);
1196         CloseHandle(mgr->halloc);
1197         CloseHandle(mgr->hpool);
1198 #endif
1199 #ifdef unix
1200         close (mgr->idx);
1201         free (mgr);
1202 #else
1203         VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1204         FlushFileBuffers(mgr->idx);
1205         CloseHandle(mgr->idx);
1206         GlobalFree (mgr);
1207 #endif
1208 }
1209
1210 //      close and release memory
1211
1212 void bt_close (BtDb *bt)
1213 {
1214 #ifdef unix
1215         if( bt->frame )
1216                 free (bt->frame);
1217         if( bt->cursor )
1218                 free (bt->cursor);
1219 #else
1220         if( bt->frame)
1221                 VirtualFree (bt->frame, 0, MEM_RELEASE);
1222         if( bt->cursor)
1223                 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1224 #endif
1225         free (bt);
1226 }
1227
1228 //  open/create new btree buffer manager
1229
1230 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1231 //              size of page pool (e.g. 262144)
1232
1233 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1234 {
1235 uint lvl, attr, last, slot, idx;
1236 uint nlatchpage, latchhash;
1237 unsigned char value[BtId];
1238 int flag, initit = 0;
1239 BtPageZero *pagezero;
1240 BtLatchSet *latch;
1241 off64_t size;
1242 uint amt[1];
1243 BtMgr* mgr;
1244 BtKey* key;
1245 BtVal *val;
1246
1247         // determine sanity of page size and buffer pool
1248
1249         if( bits > BT_maxbits )
1250                 bits = BT_maxbits;
1251         else if( bits < BT_minbits )
1252                 bits = BT_minbits;
1253 #ifdef unix
1254         mgr = calloc (1, sizeof(BtMgr));
1255
1256         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1257
1258         if( mgr->idx == -1 ) {
1259                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1260                 return free(mgr), NULL;
1261         }
1262 #else
1263         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1264         attr = FILE_ATTRIBUTE_NORMAL;
1265         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1266
1267         if( mgr->idx == INVALID_HANDLE_VALUE ) {
1268                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1269                 return GlobalFree(mgr), NULL;
1270         }
1271 #endif
1272
1273 #ifdef unix
1274         pagezero = valloc (BT_maxpage);
1275         *amt = 0;
1276
1277         // read minimum page size to get root info
1278         //      to support raw disk partition files
1279         //      check if bits == 0 on the disk.
1280
1281         if( size = lseek (mgr->idx, 0L, 2) )
1282                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1283                         if( pagezero->alloc->bits )
1284                                 bits = pagezero->alloc->bits;
1285                         else
1286                                 initit = 1;
1287                 else
1288                         return free(mgr), free(pagezero), NULL;
1289         else
1290                 initit = 1;
1291 #else
1292         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1293         size = GetFileSize(mgr->idx, amt);
1294
1295         if( size || *amt ) {
1296                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1297                         return bt_mgrclose (mgr), NULL;
1298                 bits = pagezero->alloc->bits;
1299         } else
1300                 initit = 1;
1301 #endif
1302
1303         mgr->page_size = 1 << bits;
1304         mgr->page_bits = bits;
1305
1306         //  calculate number of latch hash table entries
1307
1308         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1309
1310         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1311         mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1312         mgr->latchtotal = nodemax;
1313
1314         if( !initit )
1315                 goto mgrlatch;
1316
1317         // initialize an empty b-tree with latch page, root page, page of leaves
1318         // and page(s) of latches and page pool cache
1319
1320         memset (pagezero, 0, 1 << bits);
1321         pagezero->alloc->lvl = MIN_lvl - 1;
1322         pagezero->alloc->bits = mgr->page_bits;
1323         pagezero->redopages = redopages;
1324
1325         bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1);
1326         pagezero->activepages = 2;
1327
1328         //  initialize left-most LEAF page in
1329         //      alloc->left and count of active leaf pages.
1330
1331         bt_putid (pagezero->alloc->left, LEAF_page);
1332         ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits);
1333
1334         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1335                 fprintf (stderr, "Unable to create btree page zero\n");
1336                 return bt_mgrclose (mgr), NULL;
1337         }
1338
1339         memset (pagezero, 0, 1 << bits);
1340         pagezero->alloc->bits = mgr->page_bits;
1341
1342         for( lvl=MIN_lvl; lvl--; ) {
1343         BtSlot *node = slotptr(pagezero->alloc, 1);
1344                 node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1345                 key = keyptr(pagezero->alloc, 1);
1346                 key->len = 2;           // create stopper key
1347                 key->key[0] = 0xff;
1348                 key->key[1] = 0xff;
1349
1350                 bt_putid(value, MIN_lvl - lvl + 1);
1351                 val = valptr(pagezero->alloc, 1);
1352                 val->len = lvl ? BtId : 0;
1353                 memcpy (val->value, value, val->len);
1354
1355                 pagezero->alloc->min = node->off;
1356                 pagezero->alloc->lvl = lvl;
1357                 pagezero->alloc->cnt = 1;
1358                 pagezero->alloc->act = 1;
1359                 pagezero->alloc->page_no = MIN_lvl - lvl;
1360
1361                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1362                         fprintf (stderr, "Unable to create btree page\n");
1363                         return bt_mgrclose (mgr), NULL;
1364                 }
1365         }
1366
1367 mgrlatch:
1368 #ifdef unix
1369         free (pagezero);
1370 #else
1371         VirtualFree (pagezero, 0, MEM_RELEASE);
1372 #endif
1373 #ifdef unix
1374         // mlock the first segment of 64K pages
1375
1376         flag = PROT_READ | PROT_WRITE;
1377         mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1378         mgr->segments = 1;
1379
1380         if( mgr->pages[0] == MAP_FAILED ) {
1381                 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1382                 return bt_mgrclose (mgr), NULL;
1383         }
1384
1385         mgr->pagezero = (BtPageZero *)mgr->pages[0];
1386         mlock (mgr->pagezero, mgr->page_size);
1387
1388         mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size;
1389         mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1390
1391         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1392         if( mgr->pagepool == MAP_FAILED ) {
1393                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1394                 return bt_mgrclose (mgr), NULL;
1395         }
1396 #else
1397         flag = PAGE_READWRITE;
1398         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1399         if( !mgr->halloc ) {
1400                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1401                 return bt_mgrclose (mgr), NULL;
1402         }
1403
1404         flag = FILE_MAP_WRITE;
1405         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1406         if( !mgr->pagezero ) {
1407                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1408                 return bt_mgrclose (mgr), NULL;
1409         }
1410
1411         flag = PAGE_READWRITE;
1412         size = (uid)mgr->nlatchpage << mgr->page_bits;
1413         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1414         if( !mgr->hpool ) {
1415                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1416                 return bt_mgrclose (mgr), NULL;
1417         }
1418
1419         flag = FILE_MAP_WRITE;
1420         mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1421         if( !mgr->pagepool ) {
1422                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1423                 return bt_mgrclose (mgr), NULL;
1424         }
1425 #endif
1426
1427         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1428         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1429         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1430
1431         return mgr;
1432 }
1433
1434 //      open BTree access method
1435 //      based on buffer manager
1436
1437 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1438 {
1439 BtDb *bt = malloc (sizeof(*bt));
1440
1441         memset (bt, 0, sizeof(*bt));
1442         bt->main = main;
1443         bt->mgr = mgr;
1444 #ifdef unix
1445         bt->cursor = valloc (mgr->page_size);
1446         bt->frame = valloc (mgr->page_size);
1447 #else
1448         bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1449         bt->frame = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1450 #endif
1451 #ifdef unix
1452         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1453 #else
1454         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1455 #endif
1456         return bt;
1457 }
1458
1459 //  compare two keys, return > 0, = 0, or < 0
1460 //  =0: keys are same
1461 //  -1: key2 > key1
1462 //  +1: key2 < key1
1463 //  as the comparison value
1464
1465 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1466 {
1467 uint len1 = key1->len;
1468 int ans;
1469
1470         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1471                 return ans;
1472
1473         if( len1 > len2 )
1474                 return 1;
1475         if( len1 < len2 )
1476                 return -1;
1477
1478         return 0;
1479 }
1480
1481 // place write, read, or parent lock on requested page_no.
1482
1483 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1484 {
1485         switch( mode ) {
1486         case BtLockRead:
1487                 ReadLock (latch->readwr, thread_no);
1488                 break;
1489         case BtLockWrite:
1490                 WriteLock (latch->readwr, thread_no);
1491                 break;
1492         case BtLockAccess:
1493                 ReadLock (latch->access, thread_no);
1494                 break;
1495         case BtLockDelete:
1496                 WriteLock (latch->access, thread_no);
1497                 break;
1498         case BtLockParent:
1499                 WriteLock (latch->parent, thread_no);
1500                 break;
1501         case BtLockAtomic:
1502                 WriteLock (latch->atomic, thread_no);
1503                 break;
1504         case BtLockAtomic | BtLockRead:
1505                 WriteLock (latch->atomic, thread_no);
1506                 ReadLock (latch->readwr, thread_no);
1507                 break;
1508         case BtLockAtomic | BtLockWrite:
1509                 WriteLock (latch->atomic, thread_no);
1510                 WriteLock (latch->readwr, thread_no);
1511                 break;
1512         case BtLockLink:
1513                 WriteLock (latch->link, thread_no);
1514                 break;
1515         }
1516 }
1517
1518 // remove write, read, or parent lock on requested page
1519
1520 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1521 {
1522         switch( mode ) {
1523         case BtLockRead:
1524                 ReadRelease (latch->readwr);
1525                 break;
1526         case BtLockWrite:
1527                 WriteRelease (latch->readwr);
1528                 break;
1529         case BtLockAccess:
1530                 ReadRelease (latch->access);
1531                 break;
1532         case BtLockDelete:
1533                 WriteRelease (latch->access);
1534                 break;
1535         case BtLockParent:
1536                 WriteRelease (latch->parent);
1537                 break;
1538         case BtLockAtomic:
1539                 WriteRelease (latch->atomic);
1540                 break;
1541         case BtLockAtomic | BtLockRead:
1542                 WriteRelease (latch->atomic);
1543                 ReadRelease (latch->readwr);
1544                 break;
1545         case BtLockAtomic | BtLockWrite:
1546                 WriteRelease (latch->atomic);
1547                 WriteRelease (latch->readwr);
1548                 break;
1549         case BtLockLink:
1550                 WriteRelease (latch->link);
1551                 break;
1552         }
1553 }
1554
1555 //      allocate a new page
1556 //      return with page latched, but unlocked.
1557
1558 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1559 {
1560 uid page_no;
1561 int blk;
1562
1563         //      lock allocation page
1564
1565         bt_mutexlock(mgr->lock);
1566
1567         // use empty chain first
1568         // else allocate new page
1569
1570         if( page_no = bt_getid(mgr->pagezero->freechain) ) {
1571                 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1572                         set->page = bt_mappage (mgr, set->latch);
1573                 else
1574                         return mgr->line = __LINE__, mgr->err = BTERR_struct;
1575
1576                 mgr->pagezero->activepages++;
1577                 bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
1578
1579                 //  the page is currently free and this
1580                 //  will keep bt_txnpromote out.
1581
1582                 //      contents will replace this bit
1583                 //  and pin will keep bt_txnpromote out
1584
1585                 contents->page_no = page_no;
1586                 set->latch->dirty = 1;
1587
1588                 memcpy (set->page, contents, mgr->page_size);
1589
1590 //              if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1591 //                fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1592
1593                 bt_releasemutex(mgr->lock);
1594                 return 0;
1595         }
1596
1597         page_no = bt_getid(mgr->pagezero->alloc->right);
1598         bt_putid(mgr->pagezero->alloc->right, page_no+1);
1599
1600         // unlock allocation latch and
1601         //      extend file into new page.
1602
1603         mgr->pagezero->activepages++;
1604 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1605 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1606         bt_releasemutex(mgr->lock);
1607
1608         //      keep bt_txnpromote out of this page
1609
1610         contents->free = 1;
1611         contents->page_no = page_no;
1612         pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
1613
1614         //      don't load cache from btree page, load it from contents
1615
1616         if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1617                 set->page = bt_mappage (mgr, set->latch);
1618         else
1619                 return mgr->err;
1620
1621         // now pin will keep bt_txnpromote out
1622
1623         set->page->free = 0;
1624         return 0;
1625 }
1626
1627 //  find slot in page for given key at a given level
1628
1629 int bt_findslot (BtPage page, unsigned char *key, uint len)
1630 {
1631 uint diff, higher = page->cnt, low = 1, slot;
1632 uint good = 0;
1633
1634         //        make stopper key an infinite fence value
1635
1636         if( bt_getid (page->right) )
1637                 higher++;
1638         else
1639                 good++;
1640
1641         //      low is the lowest candidate.
1642         //  loop ends when they meet
1643
1644         //  higher is already
1645         //      tested as .ge. the passed key.
1646
1647         while( diff = higher - low ) {
1648                 slot = low + ( diff >> 1 );
1649                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1650                         low = slot + 1;
1651                 else
1652                         higher = slot, good++;
1653         }
1654
1655         //      return zero if key is on right link page
1656
1657         return good ? higher : 0;
1658 }
1659
1660 //  find and load page at given level for given key
1661 //      leave page rd or wr locked as requested
1662
1663 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1664 {
1665 uid page_no = ROOT_page, prevpage_no = 0;
1666 uint drill = 0xff, slot;
1667 BtLatchSet *prevlatch;
1668 uint mode, prevmode;
1669 BtPage prevpage;
1670 BtVal *val;
1671
1672   //  start at root of btree and drill down
1673
1674   do {
1675         // determine lock mode of drill level
1676         mode = (drill == lvl) ? lock : BtLockRead; 
1677
1678         if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1679           return 0;
1680
1681         // obtain access lock using lock chaining with Access mode
1682
1683         if( page_no > ROOT_page )
1684           bt_lockpage(BtLockAccess, set->latch, thread_no);
1685
1686         set->page = bt_mappage (mgr, set->latch);
1687
1688         //      release & unpin parent or left sibling page
1689
1690         if( prevpage_no ) {
1691           bt_unlockpage(prevmode, prevlatch);
1692           bt_unpinlatch (mgr, prevlatch);
1693           prevpage_no = 0;
1694         }
1695
1696         // obtain mode lock using lock chaining through AccessLock
1697
1698         bt_lockpage(mode, set->latch, thread_no);
1699
1700         if( set->page->free )
1701                 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1702
1703         if( page_no > ROOT_page )
1704           bt_unlockpage(BtLockAccess, set->latch);
1705
1706         // re-read and re-lock root after determining actual level of root
1707
1708         if( set->page->lvl != drill) {
1709                 if( set->latch->page_no != ROOT_page )
1710                         return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1711                         
1712                 drill = set->page->lvl;
1713
1714                 if( lock != BtLockRead && drill == lvl ) {
1715                   bt_unlockpage(mode, set->latch);
1716                   bt_unpinlatch (mgr, set->latch);
1717                   continue;
1718                 }
1719         }
1720
1721         prevpage_no = set->latch->page_no;
1722         prevlatch = set->latch;
1723         prevpage = set->page;
1724         prevmode = mode;
1725
1726         //  find key on page at this level
1727         //  and descend to requested level
1728
1729         if( !set->page->kill )
1730          if( slot = bt_findslot (set->page, key, len) ) {
1731           if( drill == lvl )
1732                 return slot;
1733
1734           // find next non-dead slot -- the fence key if nothing else
1735
1736           while( slotptr(set->page, slot)->dead )
1737                 if( slot++ < set->page->cnt )
1738                   continue;
1739                 else
1740                   return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1741
1742           val = valptr(set->page, slot);
1743
1744           if( val->len == BtId )
1745                 page_no = bt_getid(valptr(set->page, slot)->value);
1746           else
1747                 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1748
1749           drill--;
1750           continue;
1751          }
1752
1753         //  slide right into next page
1754
1755         page_no = bt_getid(set->page->right);
1756   } while( page_no );
1757
1758   // return error on end of right chain
1759
1760   mgr->line = __LINE__, mgr->err = BTERR_struct;
1761   return 0;     // return error
1762 }
1763
1764 //      return page to free list
1765 //      page must be delete & write locked
1766 //      and have no keys pointing to it.
1767
1768 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1769 {
1770         //      lock allocation page
1771
1772         bt_mutexlock (mgr->lock);
1773
1774         //      store chain
1775
1776         memcpy(set->page->right, mgr->pagezero->freechain, BtId);
1777         bt_putid(mgr->pagezero->freechain, set->latch->page_no);
1778         set->latch->dirty = 1;
1779         set->page->free = 1;
1780
1781         // decrement active page count
1782
1783         mgr->pagezero->activepages--;
1784
1785 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1786 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1787
1788         // unlock released page
1789         // and unlock allocation page
1790
1791         bt_unlockpage (BtLockDelete, set->latch);
1792         bt_unlockpage (BtLockWrite, set->latch);
1793         bt_unpinlatch (mgr, set->latch);
1794         bt_releasemutex (mgr->lock);
1795 }
1796
1797 //      a fence key was deleted from a page
1798 //      push new fence value upwards
1799
1800 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1801 {
1802 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1803 unsigned char value[BtId];
1804 BtKey* ptr;
1805 uint idx;
1806
1807         //      remove the old fence value
1808
1809         ptr = keyptr(set->page, set->page->cnt);
1810         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1811         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1812         set->latch->dirty = 1;
1813
1814         //  cache new fence value
1815
1816         ptr = keyptr(set->page, set->page->cnt);
1817         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1818
1819         bt_lockpage (BtLockParent, set->latch, thread_no);
1820         bt_unlockpage (BtLockWrite, set->latch);
1821
1822         //      insert new (now smaller) fence key
1823
1824         bt_putid (value, set->latch->page_no);
1825         ptr = (BtKey*)leftkey;
1826
1827         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1828           return mgr->err;
1829
1830         //      now delete old fence key
1831
1832         ptr = (BtKey*)rightkey;
1833
1834         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1835                 return mgr->err;
1836
1837         bt_unlockpage (BtLockParent, set->latch);
1838         bt_unpinlatch(mgr, set->latch);
1839         return 0;
1840 }
1841
1842 //      root has a single child
1843 //      collapse a level from the tree
1844
1845 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1846 {
1847 BtPageSet child[1];
1848 uid page_no;
1849 BtVal *val;
1850 uint idx;
1851
1852   // find the child entry and promote as new root contents
1853
1854   do {
1855         for( idx = 0; idx++ < root->page->cnt; )
1856           if( !slotptr(root->page, idx)->dead )
1857                 break;
1858
1859         val = valptr(root->page, idx);
1860
1861         if( val->len == BtId )
1862                 page_no = bt_getid (valptr(root->page, idx)->value);
1863         else
1864                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1865
1866         if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1867                 child->page = bt_mappage (mgr, child->latch);
1868         else
1869                 return mgr->err;
1870
1871         bt_lockpage (BtLockDelete, child->latch, thread_no);
1872         bt_lockpage (BtLockWrite, child->latch, thread_no);
1873
1874         memcpy (root->page, child->page, mgr->page_size);
1875         root->latch->dirty = 1;
1876
1877         bt_freepage (mgr, child);
1878
1879   } while( root->page->lvl > 1 && root->page->act == 1 );
1880
1881   bt_unlockpage (BtLockWrite, root->latch);
1882   bt_unpinlatch (mgr, root->latch);
1883   return 0;
1884 }
1885
1886 //  delete a page and manage keys
1887 //  call with page writelocked
1888
1889 //      returns the right page pool entry for freeing
1890 //      or zero on error.
1891
1892 uint bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, BtLock mode)
1893 {
1894 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1895 unsigned char value[BtId];
1896 uint lvl = set->page->lvl;
1897 BtPageSet right[1];
1898 uid page_no;
1899 BtKey *ptr;
1900
1901         //      cache copy of fence key
1902         //      to remove in parent
1903
1904         ptr = keyptr(set->page, set->page->cnt);
1905         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1906
1907         //      obtain lock on right page
1908
1909         page_no = bt_getid(set->page->right);
1910
1911         if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1912                 right->page = bt_mappage (mgr, right->latch);
1913         else
1914                 return 0;
1915
1916         bt_lockpage (mode, right->latch, thread_no);
1917
1918         // cache copy of key to update
1919
1920         ptr = keyptr(right->page, right->page->cnt);
1921         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1922
1923         if( right->page->kill )
1924                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1925
1926         // pull contents of right peer into our empty page
1927
1928         bt_lockpage (BtLockLink, set->latch, thread_no);
1929         memcpy (right->page->left, set->page->left, BtId);
1930         memcpy (set->page, right->page, mgr->page_size);
1931         set->page->page_no = set->latch->page_no;
1932         set->latch->dirty = 1;
1933         bt_unlockpage (BtLockLink, set->latch);
1934
1935         // mark right page deleted and point it to left page
1936         //      until we can post parent updates that remove access
1937         //      to the deleted page.
1938
1939         bt_putid (right->page->right, set->latch->page_no);
1940         right->latch->dirty = 1;
1941         right->page->kill = 1;
1942
1943         bt_lockpage (BtLockParent, right->latch, thread_no);
1944         bt_unlockpage (mode, right->latch);
1945
1946         bt_lockpage (BtLockParent, set->latch, thread_no);
1947         bt_unlockpage (BtLockWrite, set->latch);
1948
1949         // redirect higher key directly to our new node contents
1950
1951         bt_putid (value, set->latch->page_no);
1952         ptr = (BtKey*)higherfence;
1953
1954         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1955           return 0;
1956
1957         //      delete old lower key to our node
1958
1959         ptr = (BtKey*)lowerfence;
1960
1961         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1962           return 0;
1963
1964         bt_unlockpage (BtLockParent, set->latch);
1965         return right->latch - mgr->latchsets;
1966 }
1967
1968 //  find and delete key on page by marking delete flag bit
1969 //  if page becomes empty, delete it from the btree
1970
1971 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
1972 {
1973 uint slot, idx, found, fence, entry;
1974 BtPageSet set[1], right[1];
1975 BtSlot *node;
1976 BtKey *ptr;
1977 BtVal *val;
1978
1979         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
1980                 node = slotptr(set->page, slot);
1981                 ptr = keyptr(set->page, slot);
1982         } else
1983                 return mgr->err;
1984
1985         // if librarian slot, advance to real slot
1986
1987         if( node->type == Librarian ) {
1988                 ptr = keyptr(set->page, ++slot);
1989                 node = slotptr(set->page, slot);
1990         }
1991
1992         fence = slot == set->page->cnt;
1993
1994         // delete the key, ignore request if already dead
1995
1996         if( found = !keycmp (ptr, key, len) )
1997           if( found = node->dead == 0 ) {
1998                 val = valptr(set->page,slot);
1999                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2000                 set->page->act--;
2001
2002                 //  mark node type as delete
2003
2004                 node->type = Delete;
2005                 node->dead = 1;
2006
2007                 // collapse empty slots beneath the fence
2008                 // on interiour nodes
2009
2010                 if( lvl )
2011                  while( idx = set->page->cnt - 1 )
2012                   if( slotptr(set->page, idx)->dead ) {
2013                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2014                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2015                   } else
2016                         break;
2017           }
2018
2019         if( !found )
2020                 return 0;
2021
2022         //      did we delete a fence key in an upper level?
2023
2024         if( lvl && set->page->act && fence )
2025           if( bt_fixfence (mgr, set, lvl, thread_no) )
2026                 return mgr->err;
2027           else
2028                 return 0;
2029
2030         //      do we need to collapse root?
2031
2032         if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2033           if( bt_collapseroot (mgr, set, thread_no) )
2034                 return mgr->err;
2035           else
2036                 return 0;
2037
2038         //      delete empty page
2039
2040         if( !set->page->act ) {
2041           if( entry = bt_deletepage (mgr, set, thread_no, BtLockWrite) )
2042                 right->latch = mgr->latchsets + entry;
2043           else
2044                  return mgr->err;
2045
2046           //    obtain delete and write locks to right node
2047
2048           bt_unlockpage (BtLockParent, right->latch);
2049           right->page = bt_mappage (mgr, right->latch);
2050           bt_lockpage (BtLockDelete, right->latch, thread_no);
2051           bt_lockpage (BtLockWrite, right->latch, thread_no);
2052           bt_freepage (mgr, right);
2053
2054           bt_unpinlatch (mgr, set->latch);
2055           return 0;
2056         }
2057
2058         set->latch->dirty = 1;
2059         bt_unlockpage(BtLockWrite, set->latch);
2060         bt_unpinlatch (mgr, set->latch);
2061         return 0;
2062 }
2063
2064 //      advance to next slot
2065
2066 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2067 {
2068 BtLatchSet *prevlatch;
2069 uid page_no;
2070
2071         if( slot < set->page->cnt )
2072                 return slot + 1;
2073
2074         prevlatch = set->latch;
2075
2076         if( page_no = bt_getid(set->page->right) )
2077           if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2078                 set->page = bt_mappage (bt->mgr, set->latch);
2079           else
2080                 return 0;
2081         else
2082           return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2083
2084         // obtain access lock using lock chaining with Access mode
2085
2086         bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2087
2088         bt_unlockpage(BtLockRead, prevlatch);
2089         bt_unpinlatch (bt->mgr, prevlatch);
2090
2091         bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2092         bt_unlockpage(BtLockAccess, set->latch);
2093         return 1;
2094 }
2095
2096 //      find unique key == given key, or first duplicate key in
2097 //      leaf level and return number of value bytes
2098 //      or (-1) if not found.
2099
2100 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2101 {
2102 BtPageSet set[1];
2103 uint len, slot;
2104 int ret = -1;
2105 BtKey *ptr;
2106 BtVal *val;
2107
2108   if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2109    do {
2110         ptr = keyptr(set->page, slot);
2111
2112         //      skip librarian slot place holder
2113
2114         if( slotptr(set->page, slot)->type == Librarian )
2115                 ptr = keyptr(set->page, ++slot);
2116
2117         //      return actual key found
2118
2119         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2120         len = ptr->len;
2121
2122         if( slotptr(set->page, slot)->type == Duplicate )
2123                 len -= BtId;
2124
2125         //      not there if we reach the stopper key
2126
2127         if( slot == set->page->cnt )
2128           if( !bt_getid (set->page->right) )
2129                 break;
2130
2131         // if key exists, return >= 0 value bytes copied
2132         //      otherwise return (-1)
2133
2134         if( slotptr(set->page, slot)->dead )
2135                 continue;
2136
2137         if( keylen == len )
2138           if( !memcmp (ptr->key, key, len) ) {
2139                 val = valptr (set->page,slot);
2140                 if( valmax > val->len )
2141                         valmax = val->len;
2142                 memcpy (value, val->value, valmax);
2143                 ret = valmax;
2144           }
2145
2146         break;
2147
2148    } while( slot = bt_findnext (bt, set, slot) );
2149
2150   bt_unlockpage (BtLockRead, set->latch);
2151   bt_unpinlatch (bt->mgr, set->latch);
2152   return ret;
2153 }
2154
2155 //      check page for space available,
2156 //      clean if necessary and return
2157 //      0 - page needs splitting
2158 //      >0  new slot value
2159
2160 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2161 {
2162 BtPage page = set->page, frame;
2163 uint nxt = mgr->page_size;
2164 uint cnt = 0, idx = 0;
2165 uint max = page->cnt;
2166 uint newslot = max;
2167 BtKey *key;
2168 BtVal *val;
2169
2170         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2171                 return slot;
2172
2173         //      skip cleanup and proceed to split
2174         //      if there's not enough garbage
2175         //      to bother with.
2176
2177         if( page->garbage < nxt / 5 )
2178                 return 0;
2179
2180         frame = malloc (mgr->page_size);
2181         memcpy (frame, page, mgr->page_size);
2182
2183         // skip page info and set rest of page to zero
2184
2185         memset (page+1, 0, mgr->page_size - sizeof(*page));
2186         set->latch->dirty = 1;
2187
2188         page->garbage = 0;
2189         page->act = 0;
2190
2191         // clean up page first by
2192         // removing deleted keys
2193
2194         while( cnt++ < max ) {
2195                 if( cnt == slot )
2196                         newslot = idx + 2;
2197
2198                 if( cnt < max || frame->lvl )
2199                   if( slotptr(frame,cnt)->dead )
2200                         continue;
2201
2202                 // copy the value across
2203
2204                 val = valptr(frame, cnt);
2205                 nxt -= val->len + sizeof(BtVal);
2206                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2207
2208                 // copy the key across
2209
2210                 key = keyptr(frame, cnt);
2211                 nxt -= key->len + sizeof(BtKey);
2212                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2213
2214                 // make a librarian slot
2215
2216                 slotptr(page, ++idx)->off = nxt;
2217                 slotptr(page, idx)->type = Librarian;
2218                 slotptr(page, idx)->dead = 1;
2219
2220                 // set up the slot
2221
2222                 slotptr(page, ++idx)->off = nxt;
2223                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2224
2225                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2226                         page->act++;
2227         }
2228
2229         page->min = nxt;
2230         page->cnt = idx;
2231         free (frame);
2232
2233         //      see if page has enough space now, or does it need splitting?
2234
2235         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2236                 return newslot;
2237
2238         return 0;
2239 }
2240
2241 // split the root and raise the height of the btree
2242
2243 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2244 {  
2245 unsigned char leftkey[BT_keyarray];
2246 uint nxt = mgr->page_size;
2247 unsigned char value[BtId];
2248 BtPageSet left[1];
2249 uid left_page_no;
2250 BtPage frame;
2251 BtKey *ptr;
2252 BtVal *val;
2253
2254         frame = malloc (mgr->page_size);
2255         memcpy (frame, root->page, mgr->page_size);
2256
2257         //      save left page fence key for new root
2258
2259         ptr = keyptr(root->page, root->page->cnt);
2260         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2261
2262         //  Obtain an empty page to use, and copy the current
2263         //  root contents into it, e.g. lower keys
2264
2265         if( bt_newpage(mgr, left, frame, page_no) )
2266                 return mgr->err;
2267
2268         left_page_no = left->latch->page_no;
2269         bt_unpinlatch (mgr, left->latch);
2270         free (frame);
2271
2272         // preserve the page info at the bottom
2273         // of higher keys and set rest to zero
2274
2275         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2276
2277         // insert stopper key at top of newroot page
2278         // and increase the root height
2279
2280         nxt -= BtId + sizeof(BtVal);
2281         bt_putid (value, right->page_no);
2282         val = (BtVal *)((unsigned char *)root->page + nxt);
2283         memcpy (val->value, value, BtId);
2284         val->len = BtId;
2285
2286         nxt -= 2 + sizeof(BtKey);
2287         slotptr(root->page, 2)->off = nxt;
2288         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2289         ptr->len = 2;
2290         ptr->key[0] = 0xff;
2291         ptr->key[1] = 0xff;
2292
2293         // insert lower keys page fence key on newroot page as first key
2294
2295         nxt -= BtId + sizeof(BtVal);
2296         bt_putid (value, left_page_no);
2297         val = (BtVal *)((unsigned char *)root->page + nxt);
2298         memcpy (val->value, value, BtId);
2299         val->len = BtId;
2300
2301         ptr = (BtKey *)leftkey;
2302         nxt -= ptr->len + sizeof(BtKey);
2303         slotptr(root->page, 1)->off = nxt;
2304         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2305         
2306         bt_putid(root->page->right, 0);
2307         root->page->min = nxt;          // reset lowest used offset and key count
2308         root->page->cnt = 2;
2309         root->page->act = 2;
2310         root->page->lvl++;
2311
2312         mgr->pagezero->alloc->lvl = root->page->lvl;
2313
2314         // release and unpin root pages
2315
2316         bt_unlockpage(BtLockWrite, root->latch);
2317         bt_unpinlatch (mgr, root->latch);
2318
2319         bt_unpinlatch (mgr, right);
2320         return 0;
2321 }
2322
2323 //  split already locked full node
2324 //      leave it locked.
2325 //      return pool entry for new right
2326 //      page, pinned & unlocked
2327
2328 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2329 {
2330 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2331 BtPage frame = malloc (mgr->page_size);
2332 uint lvl = set->page->lvl;
2333 BtPageSet right[1];
2334 BtKey *key, *ptr;
2335 BtVal *val, *src;
2336 uid right2;
2337 uint prev;
2338
2339         //  split higher half of keys to frame
2340
2341         memset (frame, 0, mgr->page_size);
2342         max = set->page->cnt;
2343         cnt = max / 2;
2344         idx = 0;
2345
2346         while( cnt++ < max ) {
2347                 if( cnt < max || set->page->lvl )
2348                   if( slotptr(set->page, cnt)->dead )
2349                         continue;
2350
2351                 src = valptr(set->page, cnt);
2352                 nxt -= src->len + sizeof(BtVal);
2353                 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2354
2355                 key = keyptr(set->page, cnt);
2356                 nxt -= key->len + sizeof(BtKey);
2357                 ptr = (BtKey*)((unsigned char *)frame + nxt);
2358                 memcpy (ptr, key, key->len + sizeof(BtKey));
2359
2360                 //      add librarian slot
2361
2362                 slotptr(frame, ++idx)->off = nxt;
2363                 slotptr(frame, idx)->type = Librarian;
2364                 slotptr(frame, idx)->dead = 1;
2365
2366                 //  add actual slot
2367
2368                 slotptr(frame, ++idx)->off = nxt;
2369                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2370
2371                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2372                         frame->act++;
2373         }
2374
2375         frame->bits = mgr->page_bits;
2376         frame->min = nxt;
2377         frame->cnt = idx;
2378         frame->lvl = lvl;
2379
2380         // link right node
2381
2382         if( set->latch->page_no > ROOT_page )
2383                 bt_putid (frame->right, bt_getid (set->page->right));
2384
2385         //      get new free page and write higher keys to it.
2386
2387         if( bt_newpage(mgr, right, frame, thread_no) )
2388                 return 0;
2389
2390         // process lower keys
2391
2392         memcpy (frame, set->page, mgr->page_size);
2393         memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2394         set->latch->dirty = 1;
2395
2396         nxt = mgr->page_size;
2397         set->page->garbage = 0;
2398         set->page->act = 0;
2399         max /= 2;
2400         cnt = 0;
2401         idx = 0;
2402
2403         if( slotptr(frame, max)->type == Librarian )
2404                 max--;
2405
2406         //  assemble page of smaller keys
2407
2408         while( cnt++ < max ) {
2409                 if( slotptr(frame, cnt)->dead )
2410                         continue;
2411                 val = valptr(frame, cnt);
2412                 nxt -= val->len + sizeof(BtVal);
2413                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2414
2415                 key = keyptr(frame, cnt);
2416                 nxt -= key->len + sizeof(BtKey);
2417                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2418
2419                 //      add librarian slot
2420
2421                 slotptr(set->page, ++idx)->off = nxt;
2422                 slotptr(set->page, idx)->type = Librarian;
2423                 slotptr(set->page, idx)->dead = 1;
2424
2425                 //      add actual slot
2426
2427                 slotptr(set->page, ++idx)->off = nxt;
2428                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2429                 set->page->act++;
2430         }
2431
2432         bt_putid(set->page->right, right->latch->page_no);
2433         set->page->min = nxt;
2434         set->page->cnt = idx;
2435         free(frame);
2436
2437         return right->latch - mgr->latchsets;
2438 }
2439
2440 //      fix keys for newly split page
2441 //      call with both pages pinned & locked
2442 //  return unlocked and unpinned
2443
2444 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2445 {
2446 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2447 unsigned char value[BtId];
2448 uint lvl = set->page->lvl;
2449 BtPage page;
2450 BtKey *ptr;
2451
2452         // if current page is the root page, split it
2453
2454         if( set->latch->page_no == ROOT_page )
2455                 return bt_splitroot (mgr, set, right, thread_no);
2456
2457         ptr = keyptr(set->page, set->page->cnt);
2458         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2459
2460         page = bt_mappage (mgr, right);
2461
2462         ptr = keyptr(page, page->cnt);
2463         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2464
2465         // insert new fences in their parent pages
2466
2467         bt_lockpage (BtLockParent, right, thread_no);
2468
2469         bt_lockpage (BtLockParent, set->latch, thread_no);
2470         bt_unlockpage (BtLockWrite, set->latch);
2471
2472         // insert new fence for reformulated left block of smaller keys
2473
2474         bt_putid (value, set->latch->page_no);
2475         ptr = (BtKey *)leftkey;
2476
2477         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2478                 return mgr->err;
2479
2480         // switch fence for right block of larger keys to new right page
2481
2482         bt_putid (value, right->page_no);
2483         ptr = (BtKey *)rightkey;
2484
2485         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2486                 return mgr->err;
2487
2488         bt_unlockpage (BtLockParent, set->latch);
2489         bt_unpinlatch (mgr, set->latch);
2490
2491         bt_unlockpage (BtLockParent, right);
2492         bt_unpinlatch (mgr, right);
2493         return 0;
2494 }
2495
2496 //      install new key and value onto page
2497 //      page must already be checked for
2498 //      adequate space
2499
2500 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2501 {
2502 uint idx, librarian;
2503 BtSlot *node;
2504 BtKey *ptr;
2505 BtVal *val;
2506 int rate;
2507
2508         //      if found slot > desired slot and previous slot
2509         //      is a librarian slot, use it
2510
2511         if( slot > 1 )
2512           if( slotptr(set->page, slot-1)->type == Librarian )
2513                 slot--;
2514
2515         // copy value onto page
2516
2517         set->page->min -= vallen + sizeof(BtVal);
2518         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2519         memcpy (val->value, value, vallen);
2520         val->len = vallen;
2521
2522         // copy key onto page
2523
2524         set->page->min -= keylen + sizeof(BtKey);
2525         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2526         memcpy (ptr->key, key, keylen);
2527         ptr->len = keylen;
2528         
2529         //      find first empty slot
2530
2531         for( idx = slot; idx < set->page->cnt; idx++ )
2532           if( slotptr(set->page, idx)->dead )
2533                 break;
2534
2535         // now insert key into array before slot,
2536         //      adding as many librarian slots as
2537         //      makes sense.
2538
2539         if( idx == set->page->cnt ) {
2540         int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2541
2542                 librarian = ++idx - slot;
2543                 avail /= sizeof(BtSlot);
2544
2545                 if( avail < 0 )
2546                         avail = 0;
2547
2548                 if( librarian > avail )
2549                         librarian = avail;
2550
2551                 if( librarian ) {
2552                         rate = (idx - slot) / librarian;
2553                         set->page->cnt += librarian;
2554                         idx += librarian;
2555                 } else
2556                         rate = 0;
2557         } else
2558                 librarian = 0, rate = 0;
2559
2560         while( idx > slot ) {
2561                 //  transfer slot
2562                 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2563                 idx--;
2564
2565                 //      add librarian slot per rate
2566
2567                 if( librarian )
2568                  if( (idx - slot + 1)/2 <= librarian * rate ) {
2569 //                if( rate && !(idx % rate) ) {
2570                         node = slotptr(set->page, idx--);
2571                         node->off = node[1].off;
2572                         node->type = Librarian;
2573                         node->dead = 1;
2574                         librarian--;
2575                   }
2576         }
2577
2578         set->latch->dirty = 1;
2579         set->page->act++;
2580
2581         //      fill in new slot
2582
2583         node = slotptr(set->page, slot);
2584         node->off = set->page->min;
2585         node->type = type;
2586         node->dead = 0;
2587
2588         if( release ) {
2589                 bt_unlockpage (BtLockWrite, set->latch);
2590                 bt_unpinlatch (mgr, set->latch);
2591         }
2592
2593         return 0;
2594 }
2595
2596 //  Insert new key into the btree at given level.
2597 //      either add a new key or update/add an existing one
2598
2599 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2600 {
2601 uint slot, idx, len, entry;
2602 BtPageSet set[1];
2603 BtSlot *node;
2604 BtKey *ptr;
2605 BtVal *val;
2606
2607   while( 1 ) { // find the page and slot for the current key
2608         if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2609                 node = slotptr(set->page, slot);
2610                 ptr = keyptr(set->page, slot);
2611         } else {
2612                 if( !mgr->err )
2613                         mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2614                 return mgr->err;
2615         }
2616
2617         // if librarian slot == found slot, advance to real slot
2618
2619         if( node->type == Librarian )
2620           if( !keycmp (ptr, key, keylen) ) {
2621                 ptr = keyptr(set->page, ++slot);
2622                 node = slotptr(set->page, slot);
2623           }
2624
2625         //  if inserting a duplicate key or unique
2626         //      key that doesn't exist on the page,
2627         //      check for adequate space on the page
2628         //      and insert the new key before slot.
2629
2630         switch( type ) {
2631         case Unique:
2632         case Duplicate:
2633           if( keycmp (ptr, key, keylen) )
2634            if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2635             return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
2636            else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2637                 return mgr->err;
2638            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2639                 return mgr->err;
2640            else
2641                 continue;
2642
2643           // if key already exists, update value and return
2644
2645           val = valptr(set->page, slot);
2646
2647           if( val->len >= vallen ) {
2648                 if( slotptr(set->page, slot)->dead )
2649                         set->page->act++;
2650                 node->type = type;
2651                 node->dead = 0;
2652
2653                 set->page->garbage += val->len - vallen;
2654                 set->latch->dirty = 1;
2655                 val->len = vallen;
2656                 memcpy (val->value, value, vallen);
2657                 bt_unlockpage(BtLockWrite, set->latch);
2658                 bt_unpinlatch (mgr, set->latch);
2659                 return 0;
2660           }
2661
2662           //  new update value doesn't fit in existing value area
2663           //    make sure page has room
2664
2665           if( !node->dead )
2666                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2667           else
2668                 set->page->act++;
2669
2670           node->type = type;
2671           node->dead = 0;
2672
2673           if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2674            if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2675                 return mgr->err;
2676            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2677                 return mgr->err;
2678            else
2679                 continue;
2680
2681           //  copy key and value onto page and update slot
2682
2683           set->page->min -= vallen + sizeof(BtVal);
2684           val = (BtVal*)((unsigned char *)set->page + set->page->min);
2685           memcpy (val->value, value, vallen);
2686           val->len = vallen;
2687
2688           set->latch->dirty = 1;
2689           set->page->min -= keylen + sizeof(BtKey);
2690           ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2691           memcpy (ptr->key, key, keylen);
2692           ptr->len = keylen;
2693         
2694           node->off = set->page->min;
2695           bt_unlockpage(BtLockWrite, set->latch);
2696           bt_unpinlatch (mgr, set->latch);
2697           return 0;
2698     }
2699   }
2700   return 0;
2701 }
2702
2703 //      determine actual page where key is located
2704 //  return slot number
2705
2706 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2707 {
2708 BtKey *key = keyptr(source,src);
2709 uint slot = locks[src].slot;
2710 uint entry;
2711
2712         if( src > 1 && locks[src].reuse )
2713           entry = locks[src-1].entry, slot = 0;
2714         else
2715           entry = locks[src].entry;
2716
2717         if( slot ) {
2718                 set->latch = mgr->latchsets + entry;
2719                 set->page = bt_mappage (mgr, set->latch);
2720                 return slot;
2721         }
2722
2723         //      is locks->reuse set? or was slot zeroed?
2724         //      if so, find where our key is located 
2725         //      on current page or pages split on
2726         //      same page txn operations.
2727
2728         do {
2729                 set->latch = mgr->latchsets + entry;
2730                 set->page = bt_mappage (mgr, set->latch);
2731
2732                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2733                   if( slotptr(set->page, slot)->type == Librarian )
2734                         slot++;
2735                   if( locks[src].reuse )
2736                         locks[src].entry = entry;
2737                   return slot;
2738                 }
2739         } while( entry = set->latch->split );
2740
2741         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2742         return 0;
2743 }
2744
2745 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2746 {
2747 BtKey *key = keyptr(source, src);
2748 BtVal *val = valptr(source, src);
2749 BtLatchSet *latch;
2750 BtPageSet set[1];
2751 uint entry, slot;
2752
2753   while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2754         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2755           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2756                 return mgr->err;
2757           set->page->lsn = lsn;
2758           return 0;
2759         }
2760
2761         //  split page
2762
2763         if( entry = bt_splitpage (mgr, set, thread_no) )
2764           latch = mgr->latchsets + entry;
2765         else
2766           return mgr->err;
2767
2768         //      splice right page into split chain
2769         //      and WriteLock it
2770
2771         bt_lockpage(BtLockWrite, latch, thread_no);
2772         latch->split = set->latch->split;
2773         set->latch->split = entry;
2774         locks[src].slot = 0;
2775   }
2776
2777   return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2778 }
2779
2780 //      perform delete from smaller btree
2781 //  insert a delete slot if not found there
2782
2783 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2784 {
2785 BtKey *key = keyptr(source, src);
2786 BtPageSet set[1];
2787 uint idx, slot;
2788 BtSlot *node;
2789 BtKey *ptr;
2790 BtVal *val;
2791
2792         if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2793           node = slotptr(set->page, slot);
2794           ptr = keyptr(set->page, slot);
2795           val = valptr(set->page, slot);
2796         } else
2797           return mgr->line = __LINE__, mgr->err = BTERR_struct;
2798
2799         //  if slot is not found, insert a delete slot
2800
2801         if( keycmp (ptr, key->key, key->len) )
2802           return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
2803
2804         //      if node is already dead,
2805         //      ignore the request.
2806
2807         if( node->dead )
2808                 return 0;
2809
2810         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2811         set->latch->dirty = 1;
2812         set->page->lsn = lsn;
2813         set->page->act--;
2814
2815         node->dead = 0;
2816         __sync_fetch_and_add(&mgr->found, 1);
2817         return 0;
2818 }
2819
2820 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2821 {
2822 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2823 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2824
2825         return keycmp (key1, key2->key, key2->len);
2826 }
2827 //      atomic modification of a batch of keys.
2828
2829 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2830 {
2831 uint src, idx, slot, samepage, entry, que = 0;
2832 BtKey *key, *ptr, *key2;
2833 int result = 0;
2834 BtSlot temp[1];
2835 logseqno lsn;
2836 int type;
2837
2838   // stable sort the list of keys into order to
2839   //    prevent deadlocks between threads.
2840 /*
2841   for( src = 1; src++ < source->cnt; ) {
2842         *temp = *slotptr(source,src);
2843         key = keyptr (source,src);
2844
2845         for( idx = src; --idx; ) {
2846           key2 = keyptr (source,idx);
2847           if( keycmp (key, key2->key, key2->len) < 0 ) {
2848                 *slotptr(source,idx+1) = *slotptr(source,idx);
2849                 *slotptr(source,idx) = *temp;
2850           } else
2851                 break;
2852         }
2853   }
2854 */
2855         qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2856   //  add entries to redo log
2857
2858   if( bt->mgr->pagezero->redopages )
2859         lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2860   else
2861         lsn = 0;
2862
2863   //  perform the individual actions in the transaction
2864
2865   if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2866         return bt->mgr->err;
2867
2868   // if number of active pages
2869   // is greater than the buffer pool
2870   // promote page into larger btree
2871
2872   if( bt->main )
2873    while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
2874         if( bt_txnpromote (bt) )
2875           return bt->mgr->err;
2876
2877   // return success
2878
2879   return 0;
2880 }
2881
2882 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2883 {
2884 uint src, idx, slot, samepage, entry, que = 0;
2885 BtPageSet set[1], prev[1], right[1];
2886 unsigned char value[BtId];
2887 uid right_page_no;
2888 BtLatchSet *latch;
2889 AtomicTxn *locks;
2890 BtKey *key, *ptr;
2891 BtPage page;
2892 BtVal *val;
2893
2894   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2895
2896   // Load the leaf page for each key
2897   // group same page references with reuse bit
2898   // and determine any constraint violations
2899
2900   for( src = 0; src++ < source->cnt; ) {
2901         key = keyptr(source, src);
2902         slot = 0;
2903
2904         // first determine if this modification falls
2905         // on the same page as the previous modification
2906         //      note that the far right leaf page is a special case
2907
2908         if( samepage = src > 1 )
2909           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2910                 slot = bt_findslot(set->page, key->key, key->len);
2911
2912         if( !slot )
2913           if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockAtomic + BtLockWrite, thread_no) )
2914                 set->latch->split = 0;
2915           else
2916                 return mgr->err;
2917
2918         if( slotptr(set->page, slot)->type == Librarian )
2919           slot++;
2920
2921         if( !samepage ) {
2922           locks[src].entry = set->latch - mgr->latchsets;
2923           locks[src].slot = slot;
2924           locks[src].reuse = 0;
2925         } else {
2926           locks[src].entry = 0;
2927           locks[src].slot = 0;
2928           locks[src].reuse = 1;
2929         }
2930
2931         //      capture current lsn for master page
2932
2933         locks[src].reqlsn = set->page->lsn;
2934   }
2935
2936   // insert or delete each key
2937   // process any splits or merges
2938   // run through txn list backwards
2939
2940   samepage = source->cnt + 1;
2941
2942   for( src = source->cnt; src; src-- ) {
2943         if( locks[src].reuse )
2944           continue;
2945
2946         //  perform the txn operations
2947         //      from smaller to larger on
2948         //  the same page
2949
2950         for( idx = src; idx < samepage; idx++ )
2951          switch( slotptr(source,idx)->type ) {
2952          case Delete:
2953           if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
2954                 return mgr->err;
2955           break;
2956
2957         case Duplicate:
2958         case Unique:
2959           if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
2960                 return mgr->err;
2961           break;
2962
2963         default:
2964           bt_atomicpage (mgr, source, locks, idx, set);
2965           break;
2966         }
2967
2968         //      after the same page operations have finished,
2969         //  process master page for splits or deletion.
2970
2971         latch = prev->latch = mgr->latchsets + locks[src].entry;
2972         prev->page = bt_mappage (mgr, prev->latch);
2973         samepage = src;
2974
2975         //  pick-up all splits from master page
2976         //      each one is already pinned & WriteLocked.
2977
2978         if( entry = latch->split ) do {
2979           set->latch = mgr->latchsets + entry;
2980           set->page = bt_mappage (mgr, set->latch);
2981
2982           // delete empty master page by undoing its split
2983           //  (this is potentially another empty page)
2984
2985           if( !prev->page->act ) {
2986                 memcpy (set->page->left, prev->page->left, BtId);
2987                 memcpy (prev->page, set->page, mgr->page_size);
2988                 bt_lockpage (BtLockDelete, set->latch, thread_no);
2989                 prev->latch->split = set->latch->split;
2990                 prev->latch->dirty = 1;
2991                 bt_freepage (mgr, set);
2992                 continue;
2993           }
2994
2995           // remove empty split page from the split chain
2996           // and return it to the free list. No other
2997           // thread has its page number yet.
2998
2999           if( !set->page->act ) {
3000                 memcpy (prev->page->right, set->page->right, BtId);
3001                 prev->latch->split = set->latch->split;
3002
3003                 bt_lockpage (BtLockDelete, set->latch, thread_no);
3004                 bt_freepage (mgr, set);
3005                 continue;
3006           }
3007
3008           //  update prev's fence key
3009
3010           ptr = keyptr(prev->page,prev->page->cnt);
3011           bt_putid (value, prev->latch->page_no);
3012
3013           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3014                 return mgr->err;
3015
3016           // splice in the left link into the split page
3017
3018           bt_putid (set->page->left, prev->latch->page_no);
3019
3020           if( lsm )
3021                 bt_syncpage (mgr, prev->page, prev->latch);
3022
3023           // page is unlocked & unpinned below to avoid bt_txnpromote
3024
3025           *prev = *set;
3026         } while( entry = prev->latch->split );
3027
3028         //  update left pointer in next right page from last split page
3029         //      (if all splits were reversed or none occurred, latch->split == 0)
3030
3031         if( latch->split ) {
3032           //  fix left pointer in master's original (now split)
3033           //  far right sibling or set rightmost page in page zero
3034
3035           if( right_page_no = bt_getid (prev->page->right) ) {
3036                 if( set->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
3037                   set->page = bt_mappage (mgr, set->latch);
3038                 else
3039                   return mgr->err;
3040
3041             bt_lockpage (BtLockLink, set->latch, thread_no);
3042             bt_putid (set->page->left, prev->latch->page_no);
3043                 set->latch->dirty = 1;
3044
3045             bt_unlockpage (BtLockLink, set->latch);
3046                 bt_unpinlatch (mgr, set->latch);
3047           } else {      // prev is rightmost page
3048             bt_mutexlock (mgr->lock);
3049                 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3050             bt_releasemutex(mgr->lock);
3051           }
3052
3053           //  Process last page split in chain
3054           //  by switching the key from the master
3055           //  page to the last split.
3056
3057           ptr = keyptr(prev->page,prev->page->cnt);
3058           bt_putid (value, prev->latch->page_no);
3059
3060           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3061                 return mgr->err;
3062
3063           if( lsm )
3064                 bt_syncpage (mgr, prev->page, prev->latch);
3065
3066           // unlock and unpin master page
3067
3068           bt_unlockpage(BtLockAtomic, latch);
3069           bt_unlockpage(BtLockWrite, latch);
3070           bt_unpinlatch(mgr, latch);
3071
3072           // go through the list of splits and
3073           // release the locks and unpin
3074
3075           while( entry = latch->split ) {
3076                 latch = mgr->latchsets + entry;
3077             bt_unlockpage(BtLockWrite, latch);
3078                 bt_unpinlatch(mgr, latch);
3079           }
3080
3081           continue;
3082         }
3083
3084         //      since there are no splits, we're
3085         //  finished if master page occupied
3086
3087         bt_unlockpage(BtLockAtomic, prev->latch);
3088
3089         if( prev->page->act ) {
3090           bt_unlockpage(BtLockWrite, prev->latch);
3091
3092           if( lsm )
3093                 bt_syncpage (mgr, prev->page, prev->latch);
3094
3095           bt_unpinlatch(mgr, prev->latch);
3096           continue;
3097         }
3098
3099         // any and all splits were reversed, and the
3100         // master page located in prev is empty, delete it
3101
3102         if( entry = bt_deletepage (mgr, prev, thread_no, BtLockWrite) )
3103                 right->latch = mgr->latchsets + entry;
3104         else
3105                 return mgr->err;
3106
3107         //      obtain delete and write locks to right node
3108
3109         bt_unlockpage (BtLockParent, right->latch);
3110         right->page = bt_mappage (mgr, right->latch);
3111         bt_lockpage (BtLockDelete, right->latch, thread_no);
3112         bt_lockpage (BtLockWrite, right->latch, thread_no);
3113         bt_freepage (mgr, right);
3114
3115         bt_unpinlatch (mgr, prev->latch);
3116   }
3117
3118   free (locks);
3119   return 0;
3120 }
3121
3122 //  promote a page into the larger btree
3123
3124 BTERR bt_txnpromote (BtDb *bt)
3125 {
3126 BtPageSet set[1], right[1];
3127 uint entry, slot, idx;
3128 BtSlot *node;
3129 BtKey *ptr;
3130 BtVal *val;
3131
3132   while( 1 ) {
3133 #ifdef unix
3134         entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
3135 #else
3136         entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
3137 #endif
3138         entry %= bt->mgr->latchtotal;
3139
3140         if( !entry )
3141                 continue;
3142
3143         set->latch = bt->mgr->latchsets + entry;
3144
3145         if( !bt_mutextry(set->latch->modify) )
3146                 continue;
3147
3148         //  skip this entry if it is pinned
3149
3150         if( set->latch->pin & ~CLOCK_bit ) {
3151           bt_releasemutex(set->latch->modify);
3152           continue;
3153         }
3154
3155         set->page = bt_mappage (bt->mgr, set->latch);
3156
3157         // entry never used or has no right sibling
3158
3159         if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3160           bt_releasemutex(set->latch->modify);
3161           continue;
3162         }
3163
3164         // entry interiour node or being killed
3165
3166         if( set->page->lvl || set->page->free || set->page->kill ) {
3167           bt_releasemutex(set->latch->modify);
3168           continue;
3169         }
3170
3171         //  pin the page for our useage
3172
3173         set->latch->pin++;
3174         bt_releasemutex(set->latch->modify);
3175         bt_lockpage (BtLockAtomic | BtLockWrite, set->latch, bt->thread_no);
3176         memcpy (bt->frame, set->page, bt->mgr->page_size);
3177
3178 if( !(set->latch->page_no % 100) )
3179 fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act);
3180
3181         if( entry = bt_deletepage (bt->mgr, set, bt->thread_no, BtLockAtomic | BtLockWrite) )
3182                 right->latch = bt->mgr->latchsets + entry;
3183         else
3184                 return bt->mgr->err;
3185
3186         //      obtain delete and write locks to right node
3187
3188         bt_unlockpage (BtLockParent, right->latch);
3189         right->page = bt_mappage (bt->mgr, right->latch);
3190
3191         //  release page with its new contents
3192
3193         bt_unlockpage (BtLockAtomic, set->latch);
3194         bt_unpinlatch (bt->mgr, set->latch);
3195
3196         // transfer slots in our selected page to larger btree
3197
3198         if( bt_atomicexec (bt->main, bt->frame, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) )
3199                 return bt->main->err;
3200
3201         //  free the page we took over
3202
3203         bt_lockpage (BtLockDelete, right->latch, bt->thread_no);
3204         bt_lockpage (BtLockWrite, right->latch, bt->thread_no);
3205         bt_freepage (bt->mgr, right);
3206         return 0;
3207   }
3208 }
3209
3210 //      set cursor to highest slot on highest page
3211
3212 uint bt_lastkey (BtDb *bt)
3213 {
3214 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3215 BtPageSet set[1];
3216
3217         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3218                 set->page = bt_mappage (bt->mgr, set->latch);
3219         else
3220                 return 0;
3221
3222     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3223         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3224     bt_unlockpage(BtLockRead, set->latch);
3225         bt_unpinlatch (bt->mgr, set->latch);
3226         return bt->cursor->cnt;
3227 }
3228
3229 //      return previous slot on cursor page
3230
3231 uint bt_prevkey (BtDb *bt, uint slot)
3232 {
3233 uid cursor_page = bt->cursor->page_no;
3234 uid ourright, next, us = cursor_page;
3235 BtPageSet set[1];
3236
3237         if( --slot )
3238                 return slot;
3239
3240         ourright = bt_getid(bt->cursor->right);
3241
3242 goleft:
3243         if( !(next = bt_getid(bt->cursor->left)) )
3244                 return 0;
3245
3246 findourself:
3247         cursor_page = next;
3248
3249         if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3250                 set->page = bt_mappage (bt->mgr, set->latch);
3251         else
3252                 return 0;
3253
3254     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3255         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3256         bt_unlockpage(BtLockRead, set->latch);
3257         bt_unpinlatch (bt->mgr, set->latch);
3258         
3259         next = bt_getid (bt->cursor->right);
3260
3261         if( bt->cursor->kill )
3262                 goto findourself;
3263
3264         if( next != us )
3265           if( next == ourright )
3266                 goto goleft;
3267           else
3268                 goto findourself;
3269
3270         return bt->cursor->cnt;
3271 }
3272
3273 //  return next slot on cursor page
3274 //  or slide cursor right into next page
3275
3276 uint bt_nextkey (BtDb *bt, uint slot)
3277 {
3278 BtPageSet set[1];
3279 uid right;
3280
3281   do {
3282         right = bt_getid(bt->cursor->right);
3283
3284         while( slot++ < bt->cursor->cnt )
3285           if( slotptr(bt->cursor,slot)->dead )
3286                 continue;
3287           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3288                 return slot;
3289           else
3290                 break;
3291
3292         if( !right )
3293                 break;
3294
3295         if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3296                 set->page = bt_mappage (bt->mgr, set->latch);
3297         else
3298                 return 0;
3299
3300     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3301         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3302         bt_unlockpage(BtLockRead, set->latch);
3303         bt_unpinlatch (bt->mgr, set->latch);
3304         slot = 0;
3305
3306   } while( 1 );
3307
3308   return bt->mgr->err = 0;
3309 }
3310
3311 //  cache page of keys into cursor and return starting slot for given key
3312
3313 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3314 {
3315 BtPageSet set[1];
3316 uint slot;
3317
3318         // cache page for retrieval
3319
3320         if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3321           memcpy (bt->cursor, set->page, bt->mgr->page_size);
3322         else
3323           return 0;
3324
3325         bt_unlockpage(BtLockRead, set->latch);
3326         bt_unpinlatch (bt->mgr, set->latch);
3327         return slot;
3328 }
3329
3330 #ifdef STANDALONE
3331
3332 #ifndef unix
3333 double getCpuTime(int type)
3334 {
3335 FILETIME crtime[1];
3336 FILETIME xittime[1];
3337 FILETIME systime[1];
3338 FILETIME usrtime[1];
3339 SYSTEMTIME timeconv[1];
3340 double ans = 0;
3341
3342         memset (timeconv, 0, sizeof(SYSTEMTIME));
3343
3344         switch( type ) {
3345         case 0:
3346                 GetSystemTimeAsFileTime (xittime);
3347                 FileTimeToSystemTime (xittime, timeconv);
3348                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3349                 break;
3350         case 1:
3351                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3352                 FileTimeToSystemTime (usrtime, timeconv);
3353                 break;
3354         case 2:
3355                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3356                 FileTimeToSystemTime (systime, timeconv);
3357                 break;
3358         }
3359
3360         ans += (double)timeconv->wHour * 3600;
3361         ans += (double)timeconv->wMinute * 60;
3362         ans += (double)timeconv->wSecond;
3363         ans += (double)timeconv->wMilliseconds / 1000;
3364         return ans;
3365 }
3366 #else
3367 #include <time.h>
3368 #include <sys/resource.h>
3369
3370 double getCpuTime(int type)
3371 {
3372 struct rusage used[1];
3373 struct timeval tv[1];
3374
3375         switch( type ) {
3376         case 0:
3377                 gettimeofday(tv, NULL);
3378                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3379
3380         case 1:
3381                 getrusage(RUSAGE_SELF, used);
3382                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3383
3384         case 2:
3385                 getrusage(RUSAGE_SELF, used);
3386                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3387         }
3388
3389         return 0;
3390 }
3391 #endif
3392
3393 void bt_poolaudit (BtMgr *mgr)
3394 {
3395 BtLatchSet *latch;
3396 uint entry = 0;
3397
3398         while( ++entry < mgr->latchtotal ) {
3399                 latch = mgr->latchsets + entry;
3400
3401                 if( *latch->readwr->value )
3402                         fprintf(stderr, "latchset %d wrtlocked for page %d\n", entry, latch->page_no);
3403
3404                 if( *latch->access->value )
3405                         fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3406
3407                 if( *latch->parent->value )
3408                         fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3409
3410                 if( *latch->atomic->value )
3411                         fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3412
3413                 if( *latch->modify->value )
3414                         fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3415
3416                 if( latch->pin & ~CLOCK_bit )
3417                         fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3418         }
3419 }
3420
3421 typedef struct {
3422         char idx;
3423         char *type;
3424         char *infile;
3425         BtMgr *main;
3426         BtMgr *mgr;
3427         int num;
3428 } ThreadArg;
3429
3430 //  standalone program to index file of keys
3431 //  then list them onto std-out
3432
3433 #ifdef unix
3434 void *index_file (void *arg)
3435 #else
3436 uint __stdcall index_file (void *arg)
3437 #endif
3438 {
3439 int line = 0, found = 0, cnt = 0, idx;
3440 uid next, page_no = LEAF_page;  // start on first page of leaves
3441 int ch, len = 0, slot, type = 0;
3442 unsigned char key[BT_maxkey];
3443 unsigned char txn[65536];
3444 ThreadArg *args = arg;
3445 BtPage page, frame;
3446 BtPageSet set[1];
3447 uint nxt = 65536;
3448 BtKey *ptr;
3449 BtVal *val;
3450 BtDb *bt;
3451 FILE *in;
3452
3453         bt = bt_open (args->mgr, args->main);
3454         page = (BtPage)txn;
3455
3456         if( args->idx < strlen (args->type) )
3457                 ch = args->type[args->idx];
3458         else
3459                 ch = args->type[strlen(args->type) - 1];
3460
3461         switch(ch | 0x20)
3462         {
3463         case 'd':
3464                 type = Delete;
3465
3466         case 'p':
3467                 if( !type )
3468                         type = Unique;
3469
3470                 if( args->num )
3471                  if( type == Delete )
3472                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3473                  else
3474                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3475                 else
3476                  if( type == Delete )
3477                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3478                  else
3479                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3480
3481                 if( in = fopen (args->infile, "rb") )
3482                   while( ch = getc(in), ch != EOF )
3483                         if( ch == '\n' )
3484                         {
3485                           line++;
3486
3487                           if( !args->num ) {
3488                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3489                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3490                             len = 0;
3491                                 continue;
3492                           }
3493
3494                           nxt -= len - 10;
3495                           memcpy (txn + nxt, key + 10, len - 10);
3496                           nxt -= 1;
3497                           txn[nxt] = len - 10;
3498                           nxt -= 10;
3499                           memcpy (txn + nxt, key, 10);
3500                           nxt -= 1;
3501                           txn[nxt] = 10;
3502                           slotptr(page,++cnt)->off  = nxt;
3503                           slotptr(page,cnt)->type = type;
3504                           len = 0;
3505
3506                           if( cnt < args->num )
3507                                 continue;
3508
3509                           page->cnt = cnt;
3510                           page->act = cnt;
3511                           page->min = nxt;
3512
3513                           if( bt_atomictxn (bt, page) )
3514                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3515                           nxt = sizeof(txn);
3516                           cnt = 0;
3517
3518                         }
3519                         else if( len < BT_maxkey )
3520                                 key[len++] = ch;
3521                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3522                 break;
3523
3524         case 'w':
3525                 fprintf(stderr, "started indexing for %s\n", args->infile);
3526                 if( in = fopen (args->infile, "r") )
3527                   while( ch = getc(in), ch != EOF )
3528                         if( ch == '\n' )
3529                         {
3530                           line++;
3531
3532                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3533                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3534                           len = 0;
3535                         }
3536                         else if( len < BT_maxkey )
3537                                 key[len++] = ch;
3538                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3539                 break;
3540
3541         case 'f':
3542                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3543                 if( in = fopen (args->infile, "rb") )
3544                   while( ch = getc(in), ch != EOF )
3545                         if( ch == '\n' )
3546                         {
3547                           line++;
3548                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3549                                 found++;
3550                           else if( bt->mgr->err )
3551                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3552                           len = 0;
3553                         }
3554                         else if( len < BT_maxkey )
3555                                 key[len++] = ch;
3556                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3557                 break;
3558
3559         case 's':
3560                 fprintf(stderr, "started scanning\n");
3561                 
3562                 do {
3563                         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3564                                 set->page = bt_mappage (bt->mgr, set->latch);
3565                         else
3566                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3567
3568                         bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3569                         next = bt_getid (set->page->right);
3570
3571                         for( slot = 0; slot++ < set->page->cnt; )
3572                          if( next || slot < set->page->cnt )
3573                           if( !slotptr(set->page, slot)->dead ) {
3574                                 ptr = keyptr(set->page, slot);
3575                                 len = ptr->len;
3576
3577                             if( slotptr(set->page, slot)->type == Duplicate )
3578                                         len -= BtId;
3579
3580                                 fwrite (ptr->key, len, 1, stdout);
3581                                 val = valptr(set->page, slot);
3582                                 fwrite (val->value, val->len, 1, stdout);
3583                                 fputc ('\n', stdout);
3584                                 cnt++;
3585                            }
3586
3587                         bt_unlockpage (BtLockRead, set->latch);
3588                         bt_unpinlatch (bt->mgr, set->latch);
3589                 } while( page_no = next );
3590
3591                 fprintf(stderr, " Total keys read %d\n", cnt);
3592                 break;
3593
3594         case 'r':
3595                 fprintf(stderr, "started reverse scan\n");
3596                 if( slot = bt_lastkey (bt) )
3597                    while( slot = bt_prevkey (bt, slot) ) {
3598                         if( slotptr(bt->cursor, slot)->dead )
3599                           continue;
3600
3601                         ptr = keyptr(bt->cursor, slot);
3602                         len = ptr->len;
3603
3604                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3605                                 len -= BtId;
3606
3607                         fwrite (ptr->key, len, 1, stdout);
3608                         val = valptr(bt->cursor, slot);
3609                         fwrite (val->value, val->len, 1, stdout);
3610                         fputc ('\n', stdout);
3611                         cnt++;
3612                   }
3613
3614                 fprintf(stderr, " Total keys read %d\n", cnt);
3615                 break;
3616
3617         case 'c':
3618 #ifdef unix
3619                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3620 #endif
3621                 fprintf(stderr, "started counting\n");
3622                 next = REDO_page + bt->mgr->pagezero->redopages;
3623
3624                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3625                         if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3626                                 break;
3627
3628                         if( !bt->cursor->free && !bt->cursor->lvl )
3629                                 cnt += bt->cursor->act;
3630
3631                         bt->mgr->reads++;
3632                         page_no = next++;
3633                 }
3634                 
3635                 cnt--;  // remove stopper key
3636                 fprintf(stderr, " Total keys counted %d\n", cnt);
3637                 break;
3638         }
3639
3640         bt_close (bt);
3641 #ifdef unix
3642         return NULL;
3643 #else
3644         return 0;
3645 #endif
3646 }
3647
3648 typedef struct timeval timer;
3649
3650 int main (int argc, char **argv)
3651 {
3652 int idx, cnt, len, slot, err;
3653 int segsize, bits = 16;
3654 double start, stop;
3655 #ifdef unix
3656 pthread_t *threads;
3657 #else
3658 HANDLE *threads;
3659 #endif
3660 ThreadArg *args;
3661 uint redopages = 0;
3662 uint poolsize = 0;
3663 uint mainpool = 0;
3664 uint mainbits = 0;
3665 float elapsed;
3666 int num = 0;
3667 char key[1];
3668 BtMgr *main;
3669 BtMgr *mgr;
3670 BtKey *ptr;
3671
3672         if( argc < 3 ) {
3673                 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3674                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
3675                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
3676                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3677                 fprintf (stderr, "  page_bits is the page size in bits for the cache btree\n");
3678                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3679                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3680                 fprintf (stderr, "  recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3681                 fprintf (stderr, "  main_bits is the page size of the main btree in bits\n");
3682                 fprintf (stderr, "  main_pool is the number of main pages in the main buffer pool\n");
3683                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3684                 exit(0);
3685         }
3686
3687         start = getCpuTime(0);
3688
3689         if( argc > 4 )
3690                 bits = atoi(argv[4]);
3691
3692         if( argc > 5 )
3693                 poolsize = atoi(argv[5]);
3694
3695         if( !poolsize )
3696                 fprintf (stderr, "Warning: no mapped_pool\n");
3697
3698         if( argc > 6 )
3699                 num = atoi(argv[6]);
3700
3701         if( argc > 7 )
3702                 redopages = atoi(argv[7]);
3703
3704         if( redopages + REDO_page > 65535 )
3705                 fprintf (stderr, "Warning: Recovery buffer too large\n");
3706
3707         if( argc > 8 )
3708                 mainbits = atoi(argv[8]);
3709
3710         if( argc > 9 )
3711                 mainpool = atoi(argv[9]);
3712
3713         cnt = argc - 10;
3714 #ifdef unix
3715         threads = malloc (cnt * sizeof(pthread_t));
3716 #else
3717         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3718 #endif
3719         args = malloc ((cnt + 1) * sizeof(ThreadArg));
3720
3721         mgr = bt_mgr (argv[1], bits, poolsize, redopages);
3722
3723         if( !mgr ) {
3724                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3725                 exit (1);
3726         }
3727
3728         if( mainbits ) {
3729                 main = bt_mgr (argv[2], mainbits, mainpool, 0);
3730
3731                 if( !main ) {
3732                         fprintf(stderr, "Index Open Error %s\n", argv[2]);
3733                         exit (1);
3734                 }
3735         } else
3736                 main = NULL;
3737
3738         //      fire off threads
3739
3740         if( cnt )
3741           for( idx = 0; idx < cnt; idx++ ) {
3742                 args[idx].infile = argv[idx + 10];
3743                 args[idx].type = argv[3];
3744                 args[idx].main = main;
3745                 args[idx].mgr = mgr;
3746                 args[idx].num = num;
3747                 args[idx].idx = idx;
3748 #ifdef unix
3749                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3750                         fprintf(stderr, "Error creating thread %d\n", err);
3751 #else
3752                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3753 #endif
3754           }
3755         else {
3756                 args[0].infile = argv[idx + 10];
3757                 args[0].type = argv[3];
3758                 args[0].main = main;
3759                 args[0].mgr = mgr;
3760                 args[0].num = num;
3761                 args[0].idx = 0;
3762                 index_file (args);
3763         }
3764
3765         //      wait for termination
3766
3767 #ifdef unix
3768         for( idx = 0; idx < cnt; idx++ )
3769                 pthread_join (threads[idx], NULL);
3770 #else
3771         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3772
3773         for( idx = 0; idx < cnt; idx++ )
3774                 CloseHandle(threads[idx]);
3775 #endif
3776         bt_poolaudit(mgr);
3777
3778         if( main )
3779                 bt_poolaudit(main);
3780
3781         fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
3782
3783         if( main )
3784                 bt_mgrclose (main);
3785         bt_mgrclose (mgr);
3786
3787         elapsed = getCpuTime(0) - start;
3788         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3789         elapsed = getCpuTime(1);
3790         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3791         elapsed = getCpuTime(2);
3792         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3793 }
3794
3795 #endif  //STANDALONE