]> pd.if.org Git - btree/blob - threadskv10b.c
Rewrite re-entrant phase-fair latching, continue LSM btree code
[btree] / threadskv10b.c
1 // btree version threadskv10b futex version
2 //      with reworked bt_deletekey code,
3 //      phase-fair re-entrant reader writer locks,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      redo log for failure recovery
10 //      and LSM B-trees for write optimization
11
12 // 17 OCT 2014
13
14 // author: karl malbrain, malbrain@cal.berkeley.edu
15
16 /*
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
19
20 The orginal author is Karl Malbrain.
21
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
28 */
29
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
32
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
35
36 #ifdef linux
37 #define _GNU_SOURCE
38 #include <linux/futex.h>
39 #define SYS_futex 202
40 #endif
41
42 #ifdef unix
43 #include <unistd.h>
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <fcntl.h>
47 #include <sys/time.h>
48 #include <sys/mman.h>
49 #include <errno.h>
50 #include <pthread.h>
51 #include <limits.h>
52 #else
53 #define WIN32_LEAN_AND_MEAN
54 #include <windows.h>
55 #include <stdio.h>
56 #include <stdlib.h>
57 #include <time.h>
58 #include <fcntl.h>
59 #include <process.h>
60 #include <intrin.h>
61 #endif
62
63 #include <memory.h>
64 #include <string.h>
65 #include <stddef.h>
66
67 typedef unsigned long long      uid;
68 typedef unsigned long long      logseqno;
69
70 #ifndef unix
71 typedef unsigned long long      off64_t;
72 typedef unsigned short          ushort;
73 typedef unsigned int            uint;
74 #endif
75
76 #define BT_ro 0x6f72    // ro
77 #define BT_rw 0x7772    // rw
78
79 #define BT_maxbits              26                                      // maximum page size in bits
80 #define BT_minbits              9                                       // minimum page size in bits
81 #define BT_minpage              (1 << BT_minbits)       // minimum page size
82 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
83
84 //  BTree page number constants
85 #define ALLOC_page              0       // allocation page
86 #define ROOT_page               1       // root of the btree
87 #define LEAF_page               2       // first page of leaves
88 #define REDO_page               3       // first page of redo buffer
89
90 //      Number of levels to create in a new BTree
91
92 #define MIN_lvl                 2
93
94 /*
95 There are six lock types for each node in four independent sets: 
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
101 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
102 */
103
104 typedef enum{
105         BtLockAccess = 1,
106         BtLockDelete = 2,
107         BtLockRead   = 4,
108         BtLockWrite  = 8,
109         BtLockParent = 16,
110         BtLockAtomic = 32,
111         BtLockLink   = 64
112 } BtLock;
113
114 typedef struct {
115   union {
116         struct {
117           volatile ushort xlock[1];             // one writer has exclusive lock
118           volatile ushort wrt[1];               // count of other writers waiting
119         } bits[1];
120         uint value[1];
121   };
122 } BtMutexLatch;
123
124 #define XCL 1
125 #define WRT 65536
126
127 //      definition for reader/writer reentrant lock implementation
128
129 typedef struct {
130   BtMutexLatch xcl[1];
131   union {
132         struct {
133                 volatile ushort tid[1];
134                 volatile ushort readers[1];
135         } bits[1];
136         uint value[1];
137   };
138   volatile ushort waitwrite[1];
139   volatile ushort waitread[1];
140   volatile ushort phase[1];             // phase == 1 for reading after write
141   volatile ushort dup[1];               // reentrant counter
142 } RWLock;
143
144 //      write only reentrant lock
145
146 typedef struct {
147   BtMutexLatch xcl[1];
148   union {
149         struct {
150                 volatile ushort tid[1];
151                 volatile ushort dup[1];
152         } bits[1];
153         uint value[1];
154   };
155   volatile uint waiters[1];
156 } WOLock;
157
158 //      mode & definition for lite latch implementation
159
160 enum {
161         QueRd = 1,              // reader queue
162         QueWr = 2               // writer queue
163 } RWQueue;
164
165 //  hash table entries
166
167 typedef struct {
168         uint entry;             // Latch table entry at head of chain
169         BtMutexLatch latch[1];
170 } BtHashEntry;
171
172 //      latch manager table structure
173
174 typedef struct {
175         uid page_no;                    // latch set page number
176         RWLock readwr[1];               // read/write page lock
177         RWLock access[1];               // Access Intent/Page delete
178         RWLock parent[1];               // Posting of fence key in parent
179         RWLock atomic[1];               // Atomic update in progress
180         RWLock link[1];                 // left link being updated
181         uint split;                             // right split page atomic insert
182         uint next;                              // next entry in hash table chain
183         uint prev;                              // prev entry in hash table chain
184         ushort pin;                             // number of accessing threads
185         unsigned char dirty;    // page in cache is dirty (atomic setable)
186         BtMutexLatch modify[1]; // modify entry lite latch
187 } BtLatchSet;
188
189 //      Define the length of the page record numbers
190
191 #define BtId 6
192
193 //      Page key slot definition.
194
195 //      Keys are marked dead, but remain on the page until
196 //      it cleanup is called. The fence key (highest key) for
197 //      a leaf page is always present, even after cleanup.
198
199 //      Slot types
200
201 //      In addition to the Unique keys that occupy slots
202 //      there are Librarian and Duplicate key
203 //      slots occupying the key slot array.
204
205 //      The Librarian slots are dead keys that
206 //      serve as filler, available to add new Unique
207 //      or Dup slots that are inserted into the B-tree.
208
209 //      The Duplicate slots have had their key bytes extended
210 //      by 6 bytes to contain a binary duplicate key uniqueifier.
211
212 typedef enum {
213         Unique,
214         Librarian,
215         Duplicate,
216         Delete
217 } BtSlotType;
218
219 typedef struct {
220         uint off:BT_maxbits;    // page offset for key start
221         uint type:3;                    // type of slot
222         uint dead:1;                    // set for deleted slot
223 } BtSlot;
224
225 //      The key structure occupies space at the upper end of
226 //      each page.  It's a length byte followed by the key
227 //      bytes.
228
229 typedef struct {
230         unsigned char len;              // this can be changed to a ushort or uint
231         unsigned char key[0];
232 } BtKey;
233
234 //      the value structure also occupies space at the upper
235 //      end of the page. Each key is immediately followed by a value.
236
237 typedef struct {
238         unsigned char len;              // this can be changed to a ushort or uint
239         unsigned char value[0];
240 } BtVal;
241
242 #define BT_maxkey       255             // maximum number of bytes in a key
243 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
244
245 //      The first part of an index page.
246 //      It is immediately followed
247 //      by the BtSlot array of keys.
248
249 //      note that this structure size
250 //      must be a multiple of 8 bytes
251 //      in order to place PageZero correctly.
252
253 typedef struct BtPage_ {
254         uint cnt;                                       // count of keys in page
255         uint act;                                       // count of active keys
256         uint min;                                       // next key offset
257         uint garbage;                           // page garbage in bytes
258         unsigned char bits:7;           // page size in bits
259         unsigned char free:1;           // page is on free chain
260         unsigned char lvl:7;            // level of page
261         unsigned char kill:1;           // page is being deleted
262         unsigned char right[BtId];      // page number to right
263         unsigned char left[BtId];       // page number to left
264         unsigned char filler[2];        // padding to multiple of 8
265         logseqno lsn;                           // log sequence number applied
266         uid page_no;                            // this page number
267 } *BtPage;
268
269 //  The loadpage interface object
270
271 typedef struct {
272         BtPage page;            // current page pointer
273         BtLatchSet *latch;      // current page latch set
274 } BtPageSet;
275
276 //      structure for latch manager on ALLOC_page
277
278 typedef struct {
279         struct BtPage_ alloc[1];                // next page_no in right ptr
280         unsigned char freechain[BtId];  // head of free page_nos chain
281         unsigned long long activepages; // number of active pages
282         uint redopages;                                 // number of redo pages in file
283 } BtPageZero;
284
285 //      The object structure for Btree access
286
287 typedef struct {
288         uint page_size;                         // page size    
289         uint page_bits;                         // page size in bits    
290 #ifdef unix
291         int idx;
292 #else
293         HANDLE idx;
294 #endif
295         BtPageZero *pagezero;           // mapped allocation page
296         BtHashEntry *hashtable;         // the buffer pool hash table entries
297         BtLatchSet *latchsets;          // mapped latch set from buffer pool
298         unsigned char *pagepool;        // mapped to the buffer pool pages
299         unsigned char *redobuff;        // mapped recovery buffer pointer
300         logseqno lsn, flushlsn;         // current & first lsn flushed
301         BtMutexLatch redo[1];           // redo area lite latch
302         BtMutexLatch lock[1];           // allocation area lite latch
303         BtMutexLatch maps[1];           // mapping segments lite latch
304         ushort thread_no[1];            // next thread number
305         uint nlatchpage;                        // number of latch pages at BT_latch
306         uint latchtotal;                        // number of page latch entries
307         uint latchhash;                         // number of latch hash table slots
308         uint latchvictim;                       // next latch entry to examine
309         uint latchpromote;                      // next latch entry to promote
310         uint redolast;                          // last msync size of recovery buff
311         uint redoend;                           // eof/end element in recovery buff
312         int err;                                        // last error
313         int line;                                       // last error line no
314         int found;                                      // number of keys found by delete
315         int reads, writes;                      // number of reads and writes
316 #ifndef unix
317         HANDLE halloc;                          // allocation handle
318         HANDLE hpool;                           // buffer pool handle
319 #endif
320         uint segments;                          // number of memory mapped segments
321         unsigned char *pages[64000];// memory mapped segments of b-tree
322 } BtMgr;
323
324 typedef struct {
325         BtMgr *mgr;                                     // buffer manager for entire process
326         BtMgr *main;                            // buffer manager for main btree
327         BtPage frame;                           // cached page frame for promote
328         BtPage cursor;                          // cached page frame for start/next
329         ushort thread_no;                       // thread number
330         unsigned char key[BT_keyarray]; // last found complete key
331 } BtDb;
332
333 //      atomic txn structures
334
335 typedef struct {
336         logseqno reqlsn;        // redo log seq no required
337         uint entry;                     // latch table entry number
338         uint slot:31;           // page slot number
339         uint reuse:1;           // reused previous page
340 } AtomicTxn;
341
342 //      Catastrophic errors
343
344 typedef enum {
345         BTERR_ok = 0,
346         BTERR_struct,
347         BTERR_ovflw,
348         BTERR_lock,
349         BTERR_map,
350         BTERR_read,
351         BTERR_wrt,
352         BTERR_atomic,
353         BTERR_recovery
354 } BTERR;
355
356 #define CLOCK_bit 0x8000
357
358 // recovery manager entry types
359
360 typedef enum {
361         BTRM_eof = 0,   // rest of buffer is emtpy
362         BTRM_add,               // add a unique key-value to btree
363         BTRM_dup,               // add a duplicate key-value to btree
364         BTRM_del,               // delete a key-value from btree
365         BTRM_upd,               // update a key with a new value
366         BTRM_new,               // allocate a new empty page
367         BTRM_old                // reuse an old empty page
368 } BTRM;
369
370 // recovery manager entry
371 //      structure followed by BtKey & BtVal
372
373 typedef struct {
374         logseqno reqlsn;        // log sequence number required
375         logseqno lsn;           // log sequence number for entry
376         uint len;                       // length of entry
377         unsigned char type;     // type of entry
378         unsigned char lvl;      // level of btree entry pertains to
379 } BtLogHdr;
380
381 // B-Tree functions
382
383 extern void bt_close (BtDb *bt);
384 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
385 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
386 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
387 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
388 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
389 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
390 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
391
392 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
393
394 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
395 extern uint bt_nextkey   (BtDb *bt, uint slot);
396 extern uint bt_prevkey (BtDb *db, uint slot);
397 extern uint bt_lastkey (BtDb *db);
398
399 //      manager functions
400 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
401 extern void bt_mgrclose (BtMgr *mgr);
402 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
403 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
404
405 //      atomic transaction functions
406 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
407 BTERR bt_txnpromote (BtDb *bt);
408
409 //  The page is allocated from low and hi ends.
410 //  The key slots are allocated from the bottom,
411 //      while the text and value of the key
412 //  are allocated from the top.  When the two
413 //  areas meet, the page is split into two.
414
415 //  A key consists of a length byte, two bytes of
416 //  index number (0 - 65535), and up to 253 bytes
417 //  of key value.
418
419 //  Associated with each key is a value byte string
420 //      containing any value desired.
421
422 //  The b-tree root is always located at page 1.
423 //      The first leaf page of level zero is always
424 //      located on page 2.
425
426 //      The b-tree pages are linked with next
427 //      pointers to facilitate enumerators,
428 //      and provide for concurrency.
429
430 //      When to root page fills, it is split in two and
431 //      the tree height is raised by a new root at page
432 //      one with two keys.
433
434 //      Deleted keys are marked with a dead bit until
435 //      page cleanup. The fence key for a leaf node is
436 //      always present
437
438 //  To achieve maximum concurrency one page is locked at a time
439 //  as the tree is traversed to find leaf key in question. The right
440 //  page numbers are used in cases where the page is being split,
441 //      or consolidated.
442
443 //  Page 0 is dedicated to lock for new page extensions,
444 //      and chains empty pages together for reuse. It also
445 //      contains the latch manager hash table.
446
447 //      The ParentModification lock on a node is obtained to serialize posting
448 //      or changing the fence key for a node.
449
450 //      Empty pages are chained together through the ALLOC page and reused.
451
452 //      Access macros to address slot and key values from the page
453 //      Page slots use 1 based indexing.
454
455 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
456 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
457 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
458
459 void bt_putid(unsigned char *dest, uid id)
460 {
461 int i = BtId;
462
463         while( i-- )
464                 dest[i] = (unsigned char)id, id >>= 8;
465 }
466
467 uid bt_getid(unsigned char *src)
468 {
469 uid id = 0;
470 int i;
471
472         for( i = 0; i < BtId; i++ )
473                 id <<= 8, id |= *src++; 
474
475         return id;
476 }
477
478 //      lite weight spin lock Latch Manager
479
480 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
481 {
482         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
483 }
484
485 void bt_mutexlock(BtMutexLatch *latch)
486 {
487 BtMutexLatch prev[1];
488 uint slept = 0;
489
490   while( 1 ) {
491         *prev->value = __sync_fetch_and_or(latch->value, XCL);
492
493         if( !*prev->bits->xlock ) {                     // did we set XCL?
494             if( slept )
495                   __sync_fetch_and_sub(latch->value, WRT);
496                 return;
497         }
498
499         if( !slept ) {
500                 *prev->bits->wrt += 1;
501                 __sync_fetch_and_add(latch->value, WRT);
502         }
503
504         sys_futex (latch->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr);
505         slept = 1;
506   }
507 }
508
509 //      try to obtain write lock
510
511 //      return 1 if obtained,
512 //              0 otherwise
513
514 int bt_mutextry(BtMutexLatch *latch)
515 {
516 BtMutexLatch prev[1];
517
518         *prev->value = __sync_fetch_and_or(latch->value, XCL);
519
520         //      take write access if exclusive bit was clear
521
522         return !*prev->bits->xlock;
523 }
524
525 //      clear write mode
526
527 void bt_releasemutex(BtMutexLatch *latch)
528 {
529 BtMutexLatch prev[1];
530
531         *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
532
533         if( *prev->bits->wrt )
534           sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
535 }
536
537 //      reentrant reader/writer lock implementation
538
539 void WriteLock (RWLock *lock, ushort tid)
540 {
541 uint waited = 0;
542 RWLock prev[1];
543
544   while( 1 ) {
545         bt_mutexlock(lock->xcl);
546         *prev = *lock;
547
548         //      is this a re-entrant request?
549
550         if( *prev->bits->tid == tid )
551           *prev->dup += 1;
552
553         //      wait if write already taken, or there are readers
554
555         else if( *prev->bits->tid || *prev->bits->readers ) {
556           if( !waited )
557                 waited++, *lock->waitwrite += 1;
558
559         //      otherwise, we can take the lock
560
561         } else {
562           if( waited )
563                 *lock->waitwrite -= 1;
564
565           *lock->bits->tid = tid;
566           *lock->phase = 0;                     // set writing phase
567         }
568
569         bt_releasemutex(lock->xcl);
570
571         if( *lock->bits->tid == tid )
572           return;
573
574         sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr );
575   }
576 }
577
578 void WriteRelease (RWLock *lock)
579 {
580         bt_mutexlock(lock->xcl);
581
582         //      were we reentrant?
583
584         if( *lock->dup ) {
585           *lock->dup -= 1;
586           bt_releasemutex(lock->xcl);
587           return;
588         }
589
590         //      release write lock and
591         //      set reading after write phase
592
593         *lock->bits->tid = 0;
594
595         //      were readers waiting for a write cycle?
596
597         if( *lock->waitread ) {
598           *lock->phase = 1;
599           sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 32768, NULL, NULL, QueRd );
600
601         //  otherwise were writers waiting
602
603         } else if( *lock->waitwrite ) {
604           *lock->phase = 0;
605           sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
606         }
607
608         bt_releasemutex(lock->xcl);
609 }
610
611 void ReadLock (RWLock *lock, ushort tid)
612 {
613 uint xit, waited = 0;
614 RWLock prev[1];
615
616   while( 1 ) {
617         bt_mutexlock(lock->xcl);
618         *prev = *lock;
619         xit = 0;
620
621         //      wait if a write lock is currenty active
622         //      or we are not in a new read cycle and
623         //      writers are waiting.
624
625         if( *prev->bits->tid || !*prev->phase && *prev->waitwrite ) {
626           if( !waited )
627                 waited++, *lock->waitread += 1;
628
629         //      else we can take the lock
630
631         } else {
632           if( waited )
633                 *lock->waitread -= 1;
634
635           *lock->bits->readers += 1;
636           xit = 1;
637         }
638
639         bt_releasemutex(lock->xcl);
640
641         //  did we increment readers?
642
643         if( xit )
644           return;
645
646         sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueRd );
647   }
648 }
649
650 void ReadRelease (RWLock *lock)
651 {
652 RWLock prev[1];
653
654         bt_mutexlock(lock->xcl);
655         *prev = *lock;
656
657         *prev->bits->readers = *lock->bits->readers -= 1;
658
659         if( !*lock->waitread && *lock->waitwrite )
660                 *prev->phase = *lock->phase = 0;        // stop accepting new readers
661
662         bt_releasemutex(lock->xcl);
663
664         //      were writers waiting for a read cycle to finish?
665
666         if( !*prev->phase && !*prev->bits->readers )
667          if( *prev->waitwrite )
668           sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
669 }
670
671 //      recovery manager -- flush dirty pages
672
673 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
674 {
675 uint cnt3 = 0, cnt2 = 0, cnt = 0;
676 uint entry, segment;
677 BtLatchSet *latch;
678 BtPage page;
679
680   //    flush dirty pool pages to the btree
681
682 fprintf(stderr, "Start flushlsn  ");
683   for( entry = 1; entry < mgr->latchtotal; entry++ ) {
684         page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
685         latch = mgr->latchsets + entry;
686         bt_mutexlock (latch->modify);
687         bt_lockpage(BtLockRead, latch, thread_no);
688
689         if( latch->dirty ) {
690           bt_writepage(mgr, page, latch->page_no);
691           latch->dirty = 0, cnt++;
692         }
693 if( latch->pin & ~CLOCK_bit )
694 cnt2++;
695         bt_unlockpage(BtLockRead, latch);
696         bt_releasemutex (latch->modify);
697   }
698 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
699 fprintf(stderr, "begin sync");
700         for( segment = 0; segment < mgr->segments; segment++ )
701           if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
702                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
703 fprintf(stderr, " end sync\n");
704 }
705
706 //      recovery manager -- process current recovery buff on startup
707 //      this won't do much if previous session was properly closed.
708
709 BTERR bt_recoveryredo (BtMgr *mgr)
710 {
711 BtLogHdr *hdr, *eof;
712 uint offset = 0;
713 BtKey *key;
714 BtVal *val;
715
716   hdr = (BtLogHdr *)mgr->redobuff;
717   mgr->flushlsn = hdr->lsn;
718
719   while( 1 ) {
720         hdr = (BtLogHdr *)(mgr->redobuff + offset);
721         switch( hdr->type ) {
722         case BTRM_eof:
723                 mgr->lsn = hdr->lsn;
724                 return 0;
725         case BTRM_add:          // add a unique key-value to btree
726         
727         case BTRM_dup:          // add a duplicate key-value to btree
728         case BTRM_del:          // delete a key-value from btree
729         case BTRM_upd:          // update a key with a new value
730         case BTRM_new:          // allocate a new empty page
731         case BTRM_old:          // reuse an old empty page
732                 return 0;
733         }
734   }
735 }
736
737 //      recovery manager -- append new entry to recovery log
738 //      flush dirty pages to disk when it overflows.
739
740 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
741 {
742 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
743 uint amt = sizeof(BtLogHdr);
744 BtLogHdr *hdr, *eof;
745 uint last, end;
746
747         bt_mutexlock (mgr->redo);
748
749         if( key )
750           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
751
752         //      see if new entry fits in buffer
753         //      flush and reset if it doesn't
754
755         if( amt > size - mgr->redoend ) {
756           mgr->flushlsn = mgr->lsn;
757           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
758                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
759           mgr->redolast = 0;
760           mgr->redoend = 0;
761           bt_flushlsn(mgr, thread_no);
762         }
763
764         //      fill in new entry & either eof or end block
765
766         hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
767
768         hdr->len = amt;
769         hdr->type = type;
770         hdr->lvl = lvl;
771         hdr->lsn = ++mgr->lsn;
772
773         mgr->redoend += amt;
774
775         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
776         memset (eof, 0, sizeof(BtLogHdr));
777
778         //  fill in key and value
779
780         if( key ) {
781           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
782           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
783         }
784
785         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
786         memset (eof, 0, sizeof(BtLogHdr));
787         eof->lsn = mgr->lsn;
788
789         last = mgr->redolast & ~0xfff;
790         end = mgr->redoend;
791
792         if( end - last + sizeof(BtLogHdr) >= 32768 )
793           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
794                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
795           else
796                 mgr->redolast = end;
797
798         bt_releasemutex(mgr->redo);
799         return hdr->lsn;
800 }
801
802 //      recovery manager -- append transaction to recovery log
803 //      flush dirty pages to disk when it overflows.
804
805 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
806 {
807 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
808 uint amt = 0, src, type;
809 BtLogHdr *hdr, *eof;
810 uint last, end;
811 logseqno lsn;
812 BtKey *key;
813 BtVal *val;
814
815         //      determine amount of redo recovery log space required
816
817         for( src = 0; src++ < source->cnt; ) {
818           key = keyptr(source,src);
819           val = valptr(source,src);
820           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
821           amt += sizeof(BtLogHdr);
822         }
823
824         bt_mutexlock (mgr->redo);
825
826         //      see if new entry fits in buffer
827         //      flush and reset if it doesn't
828
829         if( amt > size - mgr->redoend ) {
830           mgr->flushlsn = mgr->lsn;
831           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
832                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
833           mgr->redolast = 0;
834           mgr->redoend = 0;
835           bt_flushlsn (mgr, thread_no);
836         }
837
838         //      assign new lsn to transaction
839
840         lsn = ++mgr->lsn;
841
842         //      fill in new entries
843
844         for( src = 0; src++ < source->cnt; ) {
845           key = keyptr(source, src);
846           val = valptr(source, src);
847
848           switch( slotptr(source, src)->type ) {
849           case Unique:
850                 type = BTRM_add;
851                 break;
852           case Duplicate:
853                 type = BTRM_dup;
854                 break;
855           case Delete:
856                 type = BTRM_del;
857                 break;
858           }
859
860           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
861           amt += sizeof(BtLogHdr);
862
863           hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
864           hdr->len = amt;
865           hdr->type = type;
866           hdr->lsn = lsn;
867           hdr->lvl = 0;
868
869           //  fill in key and value
870
871           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
872           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
873
874           mgr->redoend += amt;
875         }
876
877         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
878         memset (eof, 0, sizeof(BtLogHdr));
879         eof->lsn = lsn;
880
881         last = mgr->redolast & ~0xfff;
882         end = mgr->redoend;
883
884         if( end - last + sizeof(BtLogHdr) >= 32768 )
885           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
886                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
887           else
888                 mgr->redolast = end;
889
890         bt_releasemutex(mgr->redo);
891         return lsn;
892 }
893
894 //      sync a single btree page to disk
895
896 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
897 {
898 uint segment = latch->page_no >> 16;
899 BtPage perm;
900
901         if( bt_writepage (mgr, page, latch->page_no) )
902                 return mgr->err;
903
904         perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
905
906         if( msync (perm, mgr->page_size, MS_SYNC) < 0 )
907           fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
908
909         latch->dirty = 0;
910         return 0;
911 }
912
913 //      read page into buffer pool from permanent location in Btree file
914
915 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
916 {
917 int flag = PROT_READ | PROT_WRITE;
918 uint segment = page_no >> 16;
919 BtPage perm;
920
921   while( 1 ) {
922         if( segment < mgr->segments ) {
923           perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
924
925           memcpy (page, perm, mgr->page_size);
926           mgr->reads++;
927           return 0;
928         }
929
930         bt_mutexlock (mgr->maps);
931
932         if( segment < mgr->segments ) {
933           bt_releasemutex (mgr->maps);
934           continue;
935         }
936
937         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
938         mgr->segments++;
939
940         bt_releasemutex (mgr->maps);
941   }
942 }
943
944 //      write page to permanent location in Btree file
945 //      clear the dirty bit
946
947 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
948 {
949 int flag = PROT_READ | PROT_WRITE;
950 uint segment = page_no >> 16;
951 BtPage perm;
952
953   while( 1 ) {
954         if( segment < mgr->segments ) {
955           perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
956
957           memcpy (perm, page, mgr->page_size);
958           mgr->writes++;
959           return 0;
960         }
961
962         bt_mutexlock (mgr->maps);
963
964         if( segment < mgr->segments ) {
965           bt_releasemutex (mgr->maps);
966           continue;
967         }
968
969         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
970         bt_releasemutex (mgr->maps);
971         mgr->segments++;
972   }
973 }
974
975 //      set CLOCK bit in latch
976 //      decrement pin count
977
978 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
979 {
980         bt_mutexlock(latch->modify);
981         latch->pin |= CLOCK_bit;
982         latch->pin--;
983
984         bt_releasemutex(latch->modify);
985 }
986
987 //  return the btree cached page address
988
989 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
990 {
991 uid entry = latch - mgr->latchsets;
992 BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
993
994   return page;
995 }
996
997 //  return next available latch entry
998 //        and with latch entry locked
999
1000 uint bt_availnext (BtMgr *mgr)
1001 {
1002 BtLatchSet *latch;
1003 uint entry;
1004
1005   while( 1 ) {
1006 #ifdef unix
1007         entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
1008 #else
1009         entry = _InterlockedIncrement (&mgr->latchvictim);
1010 #endif
1011         entry %= mgr->latchtotal;
1012
1013         if( !entry )
1014                 continue;
1015
1016         latch = mgr->latchsets + entry;
1017
1018         if( !bt_mutextry(latch->modify) )
1019                 continue;
1020
1021         //  return this entry if it is not pinned
1022
1023         if( !latch->pin )
1024                 return entry;
1025
1026         //      if the CLOCK bit is set
1027         //      reset it to zero.
1028
1029         latch->pin &= ~CLOCK_bit;
1030         bt_releasemutex(latch->modify);
1031   }
1032 }
1033
1034 //      pin page in buffer pool
1035 //      return with latchset pinned
1036
1037 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
1038 {
1039 uint hashidx = page_no % mgr->latchhash;
1040 BtLatchSet *latch;
1041 uint entry, idx;
1042 BtPage page;
1043
1044   //  try to find our entry
1045
1046   bt_mutexlock(mgr->hashtable[hashidx].latch);
1047
1048   if( entry = mgr->hashtable[hashidx].entry ) do
1049   {
1050         latch = mgr->latchsets + entry;
1051         if( page_no == latch->page_no )
1052                 break;
1053   } while( entry = latch->next );
1054
1055   //  found our entry: increment pin
1056
1057   if( entry ) {
1058         latch = mgr->latchsets + entry;
1059         bt_mutexlock(latch->modify);
1060         latch->pin |= CLOCK_bit;
1061         latch->pin++;
1062
1063         bt_releasemutex(latch->modify);
1064         bt_releasemutex(mgr->hashtable[hashidx].latch);
1065         return latch;
1066   }
1067
1068   //  find and reuse unpinned entry
1069
1070 trynext:
1071
1072   entry = bt_availnext (mgr);
1073   latch = mgr->latchsets + entry;
1074
1075   idx = latch->page_no % mgr->latchhash;
1076
1077   //  if latch is on a different hash chain
1078   //    unlink from the old page_no chain
1079
1080   if( latch->page_no )
1081    if( idx != hashidx ) {
1082
1083         //  skip over this entry if latch not available
1084
1085     if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1086           bt_releasemutex(latch->modify);
1087           goto trynext;
1088         }
1089
1090         if( latch->prev )
1091           mgr->latchsets[latch->prev].next = latch->next;
1092         else
1093           mgr->hashtable[idx].entry = latch->next;
1094
1095         if( latch->next )
1096           mgr->latchsets[latch->next].prev = latch->prev;
1097
1098     bt_releasemutex (mgr->hashtable[idx].latch);
1099    }
1100
1101   page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1102
1103   //  update permanent page area in btree from buffer pool
1104   //    no read-lock is required since page is not pinned.
1105
1106   if( latch->dirty )
1107         if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
1108           return mgr->line = __LINE__, NULL;
1109         else
1110           latch->dirty = 0;
1111
1112   if( contents ) {
1113         memcpy (page, contents, mgr->page_size);
1114         latch->dirty = 1;
1115   } else if( bt_readpage (mgr, page, page_no) )
1116         return mgr->line = __LINE__, NULL;
1117
1118   //  link page as head of hash table chain
1119   //  if this is a never before used entry,
1120   //  or it was previously on a different
1121   //  hash table chain. Otherwise, just
1122   //  leave it in its current hash table
1123   //  chain position.
1124
1125   if( !latch->page_no || hashidx != idx ) {
1126         if( latch->next = mgr->hashtable[hashidx].entry )
1127           mgr->latchsets[latch->next].prev = entry;
1128
1129         mgr->hashtable[hashidx].entry = entry;
1130     latch->prev = 0;
1131   }
1132
1133   //  fill in latch structure
1134
1135   latch->pin = CLOCK_bit | 1;
1136   latch->page_no = page_no;
1137   latch->split = 0;
1138
1139   bt_releasemutex (latch->modify);
1140   bt_releasemutex (mgr->hashtable[hashidx].latch);
1141   return latch;
1142 }
1143   
1144 void bt_mgrclose (BtMgr *mgr)
1145 {
1146 BtLatchSet *latch;
1147 BtLogHdr *eof;
1148 uint num = 0;
1149 BtPage page;
1150 uint slot;
1151
1152         //      flush previously written dirty pages
1153         //      and write recovery buffer to disk
1154
1155         fdatasync (mgr->idx);
1156
1157         if( mgr->redoend ) {
1158                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1159                 memset (eof, 0, sizeof(BtLogHdr));
1160         }
1161
1162         //      write remaining dirty pool pages to the btree
1163
1164         for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1165                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1166                 latch = mgr->latchsets + slot;
1167
1168                 if( latch->dirty ) {
1169                         bt_writepage(mgr, page, latch->page_no);
1170                         latch->dirty = 0, num++;
1171                 }
1172         }
1173
1174         //      clear redo recovery buffer on disk.
1175
1176         if( mgr->pagezero->redopages ) {
1177                 eof = (BtLogHdr *)mgr->redobuff;
1178                 memset (eof, 0, sizeof(BtLogHdr));
1179                 eof->lsn = mgr->lsn;
1180                 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1181                   fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1182         }
1183
1184         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1185
1186 #ifdef unix
1187         while( mgr->segments )
1188                 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1189
1190         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1191         munmap (mgr->pagezero, mgr->page_size);
1192 #else
1193         FlushViewOfFile(mgr->pagezero, 0);
1194         UnmapViewOfFile(mgr->pagezero);
1195         UnmapViewOfFile(mgr->pagepool);
1196         CloseHandle(mgr->halloc);
1197         CloseHandle(mgr->hpool);
1198 #endif
1199 #ifdef unix
1200         close (mgr->idx);
1201         free (mgr);
1202 #else
1203         VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1204         FlushFileBuffers(mgr->idx);
1205         CloseHandle(mgr->idx);
1206         GlobalFree (mgr);
1207 #endif
1208 }
1209
1210 //      close and release memory
1211
1212 void bt_close (BtDb *bt)
1213 {
1214 #ifdef unix
1215         if( bt->frame )
1216                 free (bt->frame);
1217         if( bt->cursor )
1218                 free (bt->cursor);
1219 #else
1220         if( bt->frame)
1221                 VirtualFree (bt->frame, 0, MEM_RELEASE);
1222         if( bt->cursor)
1223                 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1224 #endif
1225         free (bt);
1226 }
1227
1228 //  open/create new btree buffer manager
1229
1230 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1231 //              size of page pool (e.g. 262144)
1232
1233 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1234 {
1235 uint lvl, attr, last, slot, idx;
1236 uint nlatchpage, latchhash;
1237 unsigned char value[BtId];
1238 int flag, initit = 0;
1239 BtPageZero *pagezero;
1240 BtLatchSet *latch;
1241 off64_t size;
1242 uint amt[1];
1243 BtMgr* mgr;
1244 BtKey* key;
1245 BtVal *val;
1246
1247         // determine sanity of page size and buffer pool
1248
1249         if( bits > BT_maxbits )
1250                 bits = BT_maxbits;
1251         else if( bits < BT_minbits )
1252                 bits = BT_minbits;
1253 #ifdef unix
1254         mgr = calloc (1, sizeof(BtMgr));
1255
1256         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1257
1258         if( mgr->idx == -1 ) {
1259                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1260                 return free(mgr), NULL;
1261         }
1262 #else
1263         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1264         attr = FILE_ATTRIBUTE_NORMAL;
1265         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1266
1267         if( mgr->idx == INVALID_HANDLE_VALUE ) {
1268                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1269                 return GlobalFree(mgr), NULL;
1270         }
1271 #endif
1272
1273 #ifdef unix
1274         pagezero = valloc (BT_maxpage);
1275         *amt = 0;
1276
1277         // read minimum page size to get root info
1278         //      to support raw disk partition files
1279         //      check if bits == 0 on the disk.
1280
1281         if( size = lseek (mgr->idx, 0L, 2) )
1282                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1283                         if( pagezero->alloc->bits )
1284                                 bits = pagezero->alloc->bits;
1285                         else
1286                                 initit = 1;
1287                 else
1288                         return free(mgr), free(pagezero), NULL;
1289         else
1290                 initit = 1;
1291 #else
1292         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1293         size = GetFileSize(mgr->idx, amt);
1294
1295         if( size || *amt ) {
1296                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1297                         return bt_mgrclose (mgr), NULL;
1298                 bits = pagezero->alloc->bits;
1299         } else
1300                 initit = 1;
1301 #endif
1302
1303         mgr->page_size = 1 << bits;
1304         mgr->page_bits = bits;
1305
1306         //  calculate number of latch hash table entries
1307
1308         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1309
1310         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1311         mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1312         mgr->latchtotal = nodemax;
1313
1314         if( !initit )
1315                 goto mgrlatch;
1316
1317         // initialize an empty b-tree with latch page, root page, page of leaves
1318         // and page(s) of latches and page pool cache
1319
1320         memset (pagezero, 0, 1 << bits);
1321         pagezero->alloc->lvl = MIN_lvl - 1;
1322         pagezero->alloc->bits = mgr->page_bits;
1323         pagezero->redopages = redopages;
1324
1325         bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1);
1326         pagezero->activepages = 2;
1327
1328         //  initialize left-most LEAF page in
1329         //      alloc->left and count of active leaf pages.
1330
1331         bt_putid (pagezero->alloc->left, LEAF_page);
1332         ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits);
1333
1334         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1335                 fprintf (stderr, "Unable to create btree page zero\n");
1336                 return bt_mgrclose (mgr), NULL;
1337         }
1338
1339         memset (pagezero, 0, 1 << bits);
1340         pagezero->alloc->bits = mgr->page_bits;
1341
1342         for( lvl=MIN_lvl; lvl--; ) {
1343         BtSlot *node = slotptr(pagezero->alloc, 1);
1344                 node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1345                 key = keyptr(pagezero->alloc, 1);
1346                 key->len = 2;           // create stopper key
1347                 key->key[0] = 0xff;
1348                 key->key[1] = 0xff;
1349
1350                 bt_putid(value, MIN_lvl - lvl + 1);
1351                 val = valptr(pagezero->alloc, 1);
1352                 val->len = lvl ? BtId : 0;
1353                 memcpy (val->value, value, val->len);
1354
1355                 pagezero->alloc->min = node->off;
1356                 pagezero->alloc->lvl = lvl;
1357                 pagezero->alloc->cnt = 1;
1358                 pagezero->alloc->act = 1;
1359                 pagezero->alloc->page_no = MIN_lvl - lvl;
1360
1361                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1362                         fprintf (stderr, "Unable to create btree page\n");
1363                         return bt_mgrclose (mgr), NULL;
1364                 }
1365         }
1366
1367 mgrlatch:
1368 #ifdef unix
1369         free (pagezero);
1370 #else
1371         VirtualFree (pagezero, 0, MEM_RELEASE);
1372 #endif
1373 #ifdef unix
1374         // mlock the first segment of 64K pages
1375
1376         flag = PROT_READ | PROT_WRITE;
1377         mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1378         mgr->segments = 1;
1379
1380         if( mgr->pages[0] == MAP_FAILED ) {
1381                 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1382                 return bt_mgrclose (mgr), NULL;
1383         }
1384
1385         mgr->pagezero = (BtPageZero *)mgr->pages[0];
1386         mlock (mgr->pagezero, mgr->page_size);
1387
1388         mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size;
1389         mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1390
1391         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1392         if( mgr->pagepool == MAP_FAILED ) {
1393                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1394                 return bt_mgrclose (mgr), NULL;
1395         }
1396 #else
1397         flag = PAGE_READWRITE;
1398         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1399         if( !mgr->halloc ) {
1400                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1401                 return bt_mgrclose (mgr), NULL;
1402         }
1403
1404         flag = FILE_MAP_WRITE;
1405         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1406         if( !mgr->pagezero ) {
1407                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1408                 return bt_mgrclose (mgr), NULL;
1409         }
1410
1411         flag = PAGE_READWRITE;
1412         size = (uid)mgr->nlatchpage << mgr->page_bits;
1413         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1414         if( !mgr->hpool ) {
1415                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1416                 return bt_mgrclose (mgr), NULL;
1417         }
1418
1419         flag = FILE_MAP_WRITE;
1420         mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1421         if( !mgr->pagepool ) {
1422                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1423                 return bt_mgrclose (mgr), NULL;
1424         }
1425 #endif
1426
1427         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1428         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1429         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1430
1431         return mgr;
1432 }
1433
1434 //      open BTree access method
1435 //      based on buffer manager
1436
1437 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1438 {
1439 BtDb *bt = malloc (sizeof(*bt));
1440
1441         memset (bt, 0, sizeof(*bt));
1442         bt->main = main;
1443         bt->mgr = mgr;
1444 #ifdef unix
1445         bt->cursor = valloc (mgr->page_size);
1446         bt->frame = valloc (mgr->page_size);
1447 #else
1448         bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1449         bt->frame = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1450 #endif
1451 #ifdef unix
1452         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1453 #else
1454         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1455 #endif
1456         return bt;
1457 }
1458
1459 //  compare two keys, return > 0, = 0, or < 0
1460 //  =0: keys are same
1461 //  -1: key2 > key1
1462 //  +1: key2 < key1
1463 //  as the comparison value
1464
1465 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1466 {
1467 uint len1 = key1->len;
1468 int ans;
1469
1470         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1471                 return ans;
1472
1473         if( len1 > len2 )
1474                 return 1;
1475         if( len1 < len2 )
1476                 return -1;
1477
1478         return 0;
1479 }
1480
1481 // place write, read, or parent lock on requested page_no.
1482
1483 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1484 {
1485         switch( mode ) {
1486         case BtLockRead:
1487                 ReadLock (latch->readwr, thread_no);
1488                 break;
1489         case BtLockWrite:
1490                 WriteLock (latch->readwr, thread_no);
1491                 break;
1492         case BtLockAccess:
1493                 ReadLock (latch->access, thread_no);
1494                 break;
1495         case BtLockDelete:
1496                 WriteLock (latch->access, thread_no);
1497                 break;
1498         case BtLockParent:
1499                 WriteLock (latch->parent, thread_no);
1500                 break;
1501         case BtLockAtomic:
1502                 WriteLock (latch->atomic, thread_no);
1503                 break;
1504         case BtLockAtomic | BtLockRead:
1505                 WriteLock (latch->atomic, thread_no);
1506                 ReadLock (latch->readwr, thread_no);
1507                 break;
1508         case BtLockAtomic | BtLockWrite:
1509                 WriteLock (latch->atomic, thread_no);
1510                 WriteLock (latch->readwr, thread_no);
1511                 break;
1512         case BtLockLink:
1513                 WriteLock (latch->link, thread_no);
1514                 break;
1515         }
1516 }
1517
1518 // remove write, read, or parent lock on requested page
1519
1520 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1521 {
1522         switch( mode ) {
1523         case BtLockRead:
1524                 ReadRelease (latch->readwr);
1525                 break;
1526         case BtLockWrite:
1527                 WriteRelease (latch->readwr);
1528                 break;
1529         case BtLockAccess:
1530                 ReadRelease (latch->access);
1531                 break;
1532         case BtLockDelete:
1533                 WriteRelease (latch->access);
1534                 break;
1535         case BtLockParent:
1536                 WriteRelease (latch->parent);
1537                 break;
1538         case BtLockAtomic:
1539                 WriteRelease (latch->atomic);
1540                 break;
1541         case BtLockAtomic | BtLockRead:
1542                 WriteRelease (latch->atomic);
1543                 ReadRelease (latch->readwr);
1544                 break;
1545         case BtLockAtomic | BtLockWrite:
1546                 WriteRelease (latch->atomic);
1547                 WriteRelease (latch->readwr);
1548                 break;
1549         case BtLockLink:
1550                 WriteRelease (latch->link);
1551                 break;
1552         }
1553 }
1554
1555 //      allocate a new page
1556 //      return with page latched, but unlocked.
1557
1558 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1559 {
1560 uid page_no;
1561 int blk;
1562
1563         //      lock allocation page
1564
1565         bt_mutexlock(mgr->lock);
1566
1567         // use empty chain first
1568         // else allocate new page
1569
1570         if( page_no = bt_getid(mgr->pagezero->freechain) ) {
1571                 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1572                         set->page = bt_mappage (mgr, set->latch);
1573                 else
1574                         return mgr->line = __LINE__, mgr->err = BTERR_struct;
1575
1576                 mgr->pagezero->activepages++;
1577                 bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
1578
1579                 //  the page is currently free and this
1580                 //  will keep bt_txnpromote out.
1581
1582                 //      contents will replace this bit
1583                 //  and pin will keep bt_txnpromote out
1584
1585                 contents->page_no = page_no;
1586                 set->latch->dirty = 1;
1587
1588                 memcpy (set->page, contents, mgr->page_size);
1589
1590 //              if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1591 //                fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1592
1593                 bt_releasemutex(mgr->lock);
1594                 return 0;
1595         }
1596
1597         page_no = bt_getid(mgr->pagezero->alloc->right);
1598         bt_putid(mgr->pagezero->alloc->right, page_no+1);
1599
1600         // unlock allocation latch and
1601         //      extend file into new page.
1602
1603         mgr->pagezero->activepages++;
1604 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1605 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1606         bt_releasemutex(mgr->lock);
1607
1608         //      keep bt_txnpromote out of this page
1609
1610         contents->free = 1;
1611         contents->page_no = page_no;
1612         pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
1613
1614         //      don't load cache from btree page, load it from contents
1615
1616         if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1617                 set->page = bt_mappage (mgr, set->latch);
1618         else
1619                 return mgr->err;
1620
1621         // now pin will keep bt_txnpromote out
1622
1623         set->page->free = 0;
1624         return 0;
1625 }
1626
1627 //  find slot in page for given key at a given level
1628
1629 int bt_findslot (BtPage page, unsigned char *key, uint len)
1630 {
1631 uint diff, higher = page->cnt, low = 1, slot;
1632 uint good = 0;
1633
1634         //        make stopper key an infinite fence value
1635
1636         if( bt_getid (page->right) )
1637                 higher++;
1638         else
1639                 good++;
1640
1641         //      low is the lowest candidate.
1642         //  loop ends when they meet
1643
1644         //  higher is already
1645         //      tested as .ge. the passed key.
1646
1647         while( diff = higher - low ) {
1648                 slot = low + ( diff >> 1 );
1649                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1650                         low = slot + 1;
1651                 else
1652                         higher = slot, good++;
1653         }
1654
1655         //      return zero if key is on right link page
1656
1657         return good ? higher : 0;
1658 }
1659
1660 //  find and load page at given level for given key
1661 //      leave page rd or wr locked as requested
1662
1663 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1664 {
1665 uid page_no = ROOT_page, prevpage_no = 0;
1666 uint drill = 0xff, slot;
1667 BtLatchSet *prevlatch;
1668 uint mode, prevmode;
1669 BtPage prevpage;
1670 BtVal *val;
1671
1672   //  start at root of btree and drill down
1673
1674   do {
1675         // determine lock mode of drill level
1676         mode = (drill == lvl) ? lock : BtLockRead; 
1677
1678         if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1679           return 0;
1680
1681         // obtain access lock using lock chaining with Access mode
1682
1683         if( page_no > ROOT_page )
1684           bt_lockpage(BtLockAccess, set->latch, thread_no);
1685
1686         set->page = bt_mappage (mgr, set->latch);
1687
1688         //      release & unpin parent or left sibling page
1689
1690         if( prevpage_no ) {
1691           bt_unlockpage(prevmode, prevlatch);
1692           bt_unpinlatch (mgr, prevlatch);
1693           prevpage_no = 0;
1694         }
1695
1696         // obtain mode lock using lock chaining through AccessLock
1697
1698         bt_lockpage(mode, set->latch, thread_no);
1699
1700         if( set->page->free )
1701                 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1702
1703         if( page_no > ROOT_page )
1704           bt_unlockpage(BtLockAccess, set->latch);
1705
1706         // re-read and re-lock root after determining actual level of root
1707
1708         if( set->page->lvl != drill) {
1709                 if( set->latch->page_no != ROOT_page )
1710                         return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1711                         
1712                 drill = set->page->lvl;
1713
1714                 if( lock != BtLockRead && drill == lvl ) {
1715                   bt_unlockpage(mode, set->latch);
1716                   bt_unpinlatch (mgr, set->latch);
1717                   continue;
1718                 }
1719         }
1720
1721         prevpage_no = set->latch->page_no;
1722         prevlatch = set->latch;
1723         prevpage = set->page;
1724         prevmode = mode;
1725
1726         //  find key on page at this level
1727         //  and descend to requested level
1728
1729         if( !set->page->kill )
1730          if( slot = bt_findslot (set->page, key, len) ) {
1731           if( drill == lvl )
1732                 return slot;
1733
1734           // find next non-dead slot -- the fence key if nothing else
1735
1736           while( slotptr(set->page, slot)->dead )
1737                 if( slot++ < set->page->cnt )
1738                   continue;
1739                 else
1740                   return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1741
1742           val = valptr(set->page, slot);
1743
1744           if( val->len == BtId )
1745                 page_no = bt_getid(valptr(set->page, slot)->value);
1746           else
1747                 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1748
1749           drill--;
1750           continue;
1751          }
1752
1753         //  slide right into next page
1754
1755         page_no = bt_getid(set->page->right);
1756   } while( page_no );
1757
1758   // return error on end of right chain
1759
1760   mgr->line = __LINE__, mgr->err = BTERR_struct;
1761   return 0;     // return error
1762 }
1763
1764 //      return page to free list
1765 //      page must be delete & write locked
1766 //      and have no keys pointing to it.
1767
1768 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1769 {
1770         //      lock allocation page
1771
1772         bt_mutexlock (mgr->lock);
1773
1774         //      store chain
1775
1776         memcpy(set->page->right, mgr->pagezero->freechain, BtId);
1777         bt_putid(mgr->pagezero->freechain, set->latch->page_no);
1778         set->latch->dirty = 1;
1779         set->page->free = 1;
1780
1781         // decrement active page count
1782
1783         mgr->pagezero->activepages--;
1784
1785 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1786 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1787
1788         // unlock released page
1789         // and unlock allocation page
1790
1791         bt_unlockpage (BtLockDelete, set->latch);
1792         bt_unlockpage (BtLockWrite, set->latch);
1793         bt_unpinlatch (mgr, set->latch);
1794         bt_releasemutex (mgr->lock);
1795 }
1796
1797 //      a fence key was deleted from a page
1798 //      push new fence value upwards
1799
1800 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1801 {
1802 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1803 unsigned char value[BtId];
1804 BtKey* ptr;
1805 uint idx;
1806
1807         //      remove the old fence value
1808
1809         ptr = keyptr(set->page, set->page->cnt);
1810         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1811         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1812         set->latch->dirty = 1;
1813
1814         //  cache new fence value
1815
1816         ptr = keyptr(set->page, set->page->cnt);
1817         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1818
1819         bt_lockpage (BtLockParent, set->latch, thread_no);
1820         bt_unlockpage (BtLockWrite, set->latch);
1821
1822         //      insert new (now smaller) fence key
1823
1824         bt_putid (value, set->latch->page_no);
1825         ptr = (BtKey*)leftkey;
1826
1827         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1828           return mgr->err;
1829
1830         //      now delete old fence key
1831
1832         ptr = (BtKey*)rightkey;
1833
1834         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1835                 return mgr->err;
1836
1837         bt_unlockpage (BtLockParent, set->latch);
1838         bt_unpinlatch(mgr, set->latch);
1839         return 0;
1840 }
1841
1842 //      root has a single child
1843 //      collapse a level from the tree
1844
1845 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1846 {
1847 BtPageSet child[1];
1848 uid page_no;
1849 BtVal *val;
1850 uint idx;
1851
1852   // find the child entry and promote as new root contents
1853
1854   do {
1855         for( idx = 0; idx++ < root->page->cnt; )
1856           if( !slotptr(root->page, idx)->dead )
1857                 break;
1858
1859         val = valptr(root->page, idx);
1860
1861         if( val->len == BtId )
1862                 page_no = bt_getid (valptr(root->page, idx)->value);
1863         else
1864                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1865
1866         if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1867                 child->page = bt_mappage (mgr, child->latch);
1868         else
1869                 return mgr->err;
1870
1871         bt_lockpage (BtLockDelete, child->latch, thread_no);
1872         bt_lockpage (BtLockWrite, child->latch, thread_no);
1873
1874         memcpy (root->page, child->page, mgr->page_size);
1875         root->latch->dirty = 1;
1876
1877         bt_freepage (mgr, child);
1878
1879   } while( root->page->lvl > 1 && root->page->act == 1 );
1880
1881   bt_unlockpage (BtLockWrite, root->latch);
1882   bt_unpinlatch (mgr, root->latch);
1883   return 0;
1884 }
1885
1886 //  delete a page and manage keys
1887 //  call with page writelocked
1888
1889 //      returns the right page pool entry for freeing
1890 //      or zero on error.
1891
1892 uint bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, BtLock mode)
1893 {
1894 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1895 unsigned char value[BtId];
1896 uint lvl = set->page->lvl;
1897 BtPageSet right[1];
1898 uid page_no;
1899 BtKey *ptr;
1900
1901         //      cache copy of fence key
1902         //      to remove in parent
1903
1904         ptr = keyptr(set->page, set->page->cnt);
1905         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1906
1907         //      obtain lock on right page
1908
1909         page_no = bt_getid(set->page->right);
1910
1911         if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1912                 right->page = bt_mappage (mgr, right->latch);
1913         else
1914                 return 0;
1915
1916         bt_lockpage (mode, right->latch, thread_no);
1917
1918         // cache copy of key to update
1919
1920         ptr = keyptr(right->page, right->page->cnt);
1921         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1922
1923         if( right->page->kill )
1924                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1925
1926         // pull contents of right peer into our empty page
1927
1928         bt_lockpage (BtLockLink, set->latch, thread_no);
1929         memcpy (right->page->left, set->page->left, BtId);
1930         memcpy (set->page, right->page, mgr->page_size);
1931         set->page->page_no = set->latch->page_no;
1932         set->latch->dirty = 1;
1933         bt_unlockpage (BtLockLink, set->latch);
1934
1935         // mark right page deleted and point it to left page
1936         //      until we can post parent updates that remove access
1937         //      to the deleted page.
1938
1939         bt_putid (right->page->right, set->latch->page_no);
1940         right->latch->dirty = 1;
1941         right->page->kill = 1;
1942
1943         bt_lockpage (BtLockParent, right->latch, thread_no);
1944         bt_unlockpage (mode, right->latch);
1945
1946         bt_lockpage (BtLockParent, set->latch, thread_no);
1947         bt_unlockpage (BtLockWrite, set->latch);
1948
1949         // redirect higher key directly to our new node contents
1950
1951         bt_putid (value, set->latch->page_no);
1952         ptr = (BtKey*)higherfence;
1953
1954         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1955           return 0;
1956
1957         //      delete old lower key to our node
1958
1959         ptr = (BtKey*)lowerfence;
1960
1961         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1962           return 0;
1963
1964         bt_unlockpage (BtLockParent, set->latch);
1965         return right->latch - mgr->latchsets;
1966 }
1967
1968 //  find and delete key on page by marking delete flag bit
1969 //  if page becomes empty, delete it from the btree
1970
1971 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
1972 {
1973 uint slot, idx, found, fence, entry;
1974 BtPageSet set[1], right[1];
1975 BtSlot *node;
1976 BtKey *ptr;
1977 BtVal *val;
1978
1979         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
1980                 node = slotptr(set->page, slot);
1981                 ptr = keyptr(set->page, slot);
1982         } else
1983                 return mgr->err;
1984
1985         // if librarian slot, advance to real slot
1986
1987         if( node->type == Librarian ) {
1988                 ptr = keyptr(set->page, ++slot);
1989                 node = slotptr(set->page, slot);
1990         }
1991
1992         fence = slot == set->page->cnt;
1993
1994         // delete the key, ignore request if already dead
1995
1996         if( found = !keycmp (ptr, key, len) )
1997           if( found = node->dead == 0 ) {
1998                 val = valptr(set->page,slot);
1999                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2000                 set->page->act--;
2001
2002                 //  mark node type as delete
2003
2004                 node->type = Delete;
2005                 node->dead = 1;
2006
2007                 // collapse empty slots beneath the fence
2008                 // on interiour nodes
2009
2010                 if( lvl )
2011                  while( idx = set->page->cnt - 1 )
2012                   if( slotptr(set->page, idx)->dead ) {
2013                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2014                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2015                   } else
2016                         break;
2017           }
2018
2019         if( !found )
2020                 return 0;
2021
2022         //      did we delete a fence key in an upper level?
2023
2024         if( lvl && set->page->act && fence )
2025           if( bt_fixfence (mgr, set, lvl, thread_no) )
2026                 return mgr->err;
2027           else
2028                 return 0;
2029
2030         //      do we need to collapse root?
2031
2032         if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2033           if( bt_collapseroot (mgr, set, thread_no) )
2034                 return mgr->err;
2035           else
2036                 return 0;
2037
2038         //      delete empty page
2039
2040         if( !set->page->act ) {
2041           if( entry = bt_deletepage (mgr, set, thread_no, BtLockWrite) )
2042                 right->latch = mgr->latchsets + entry;
2043           else
2044                  return mgr->err;
2045
2046           //    obtain delete and write locks to right node
2047
2048           bt_unlockpage (BtLockParent, right->latch);
2049           right->page = bt_mappage (mgr, right->latch);
2050           bt_lockpage (BtLockDelete, right->latch, thread_no);
2051           bt_lockpage (BtLockWrite, right->latch, thread_no);
2052           bt_freepage (mgr, right);
2053
2054           bt_unpinlatch (mgr, set->latch);
2055           return 0;
2056         }
2057
2058         set->latch->dirty = 1;
2059         bt_unlockpage(BtLockWrite, set->latch);
2060         bt_unpinlatch (mgr, set->latch);
2061         return 0;
2062 }
2063
2064 //      advance to next slot
2065
2066 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2067 {
2068 BtLatchSet *prevlatch;
2069 uid page_no;
2070
2071         if( slot < set->page->cnt )
2072                 return slot + 1;
2073
2074         prevlatch = set->latch;
2075
2076         if( page_no = bt_getid(set->page->right) )
2077           if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2078                 set->page = bt_mappage (bt->mgr, set->latch);
2079           else
2080                 return 0;
2081         else
2082           return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2083
2084         // obtain access lock using lock chaining with Access mode
2085
2086         bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2087
2088         bt_unlockpage(BtLockRead, prevlatch);
2089         bt_unpinlatch (bt->mgr, prevlatch);
2090
2091         bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2092         bt_unlockpage(BtLockAccess, set->latch);
2093         return 1;
2094 }
2095
2096 //      find unique key == given key, or first duplicate key in
2097 //      leaf level and return number of value bytes
2098 //      or (-1) if not found.
2099
2100 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2101 {
2102 BtPageSet set[1];
2103 uint len, slot;
2104 int ret = -1;
2105 BtKey *ptr;
2106 BtVal *val;
2107
2108   if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2109    do {
2110         ptr = keyptr(set->page, slot);
2111
2112         //      skip librarian slot place holder
2113
2114         if( slotptr(set->page, slot)->type == Librarian )
2115                 ptr = keyptr(set->page, ++slot);
2116
2117         //      return actual key found
2118
2119         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2120         len = ptr->len;
2121
2122         if( slotptr(set->page, slot)->type == Duplicate )
2123                 len -= BtId;
2124
2125         //      not there if we reach the stopper key
2126
2127         if( slot == set->page->cnt )
2128           if( !bt_getid (set->page->right) )
2129                 break;
2130
2131         // if key exists, return >= 0 value bytes copied
2132         //      otherwise return (-1)
2133
2134         if( slotptr(set->page, slot)->dead )
2135                 continue;
2136
2137         if( keylen == len )
2138           if( !memcmp (ptr->key, key, len) ) {
2139                 val = valptr (set->page,slot);
2140                 if( valmax > val->len )
2141                         valmax = val->len;
2142                 memcpy (value, val->value, valmax);
2143                 ret = valmax;
2144           }
2145
2146         break;
2147
2148    } while( slot = bt_findnext (bt, set, slot) );
2149
2150   bt_unlockpage (BtLockRead, set->latch);
2151   bt_unpinlatch (bt->mgr, set->latch);
2152   return ret;
2153 }
2154
2155 //      check page for space available,
2156 //      clean if necessary and return
2157 //      0 - page needs splitting
2158 //      >0  new slot value
2159
2160 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2161 {
2162 BtPage page = set->page, frame;
2163 uint nxt = mgr->page_size;
2164 uint cnt = 0, idx = 0;
2165 uint max = page->cnt;
2166 uint newslot = max;
2167 BtKey *key;
2168 BtVal *val;
2169
2170         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2171                 return slot;
2172
2173         //      skip cleanup and proceed to split
2174         //      if there's not enough garbage
2175         //      to bother with.
2176
2177         if( page->garbage < nxt / 5 )
2178                 return 0;
2179
2180         frame = malloc (mgr->page_size);
2181         memcpy (frame, page, mgr->page_size);
2182
2183         // skip page info and set rest of page to zero
2184
2185         memset (page+1, 0, mgr->page_size - sizeof(*page));
2186         set->latch->dirty = 1;
2187
2188         page->garbage = 0;
2189         page->act = 0;
2190
2191         // clean up page first by
2192         // removing deleted keys
2193
2194         while( cnt++ < max ) {
2195                 if( cnt == slot )
2196                         newslot = idx + 2;
2197
2198                 if( cnt < max || frame->lvl )
2199                   if( slotptr(frame,cnt)->dead )
2200                         continue;
2201
2202                 // copy the value across
2203
2204                 val = valptr(frame, cnt);
2205                 nxt -= val->len + sizeof(BtVal);
2206                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2207
2208                 // copy the key across
2209
2210                 key = keyptr(frame, cnt);
2211                 nxt -= key->len + sizeof(BtKey);
2212                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2213
2214                 // make a librarian slot
2215
2216                 slotptr(page, ++idx)->off = nxt;
2217                 slotptr(page, idx)->type = Librarian;
2218                 slotptr(page, idx)->dead = 1;
2219
2220                 // set up the slot
2221
2222                 slotptr(page, ++idx)->off = nxt;
2223                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2224
2225                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2226                         page->act++;
2227         }
2228
2229         page->min = nxt;
2230         page->cnt = idx;
2231         free (frame);
2232
2233         //      see if page has enough space now, or does it need splitting?
2234
2235         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2236                 return newslot;
2237
2238         return 0;
2239 }
2240
2241 // split the root and raise the height of the btree
2242
2243 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2244 {  
2245 unsigned char leftkey[BT_keyarray];
2246 uint nxt = mgr->page_size;
2247 unsigned char value[BtId];
2248 BtPageSet left[1];
2249 uid left_page_no;
2250 BtPage frame;
2251 BtKey *ptr;
2252 BtVal *val;
2253
2254         frame = malloc (mgr->page_size);
2255         memcpy (frame, root->page, mgr->page_size);
2256
2257         //      save left page fence key for new root
2258
2259         ptr = keyptr(root->page, root->page->cnt);
2260         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2261
2262         //  Obtain an empty page to use, and copy the current
2263         //  root contents into it, e.g. lower keys
2264
2265         if( bt_newpage(mgr, left, frame, page_no) )
2266                 return mgr->err;
2267
2268         left_page_no = left->latch->page_no;
2269         bt_unpinlatch (mgr, left->latch);
2270         free (frame);
2271
2272         // preserve the page info at the bottom
2273         // of higher keys and set rest to zero
2274
2275         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2276
2277         // insert stopper key at top of newroot page
2278         // and increase the root height
2279
2280         nxt -= BtId + sizeof(BtVal);
2281         bt_putid (value, right->page_no);
2282         val = (BtVal *)((unsigned char *)root->page + nxt);
2283         memcpy (val->value, value, BtId);
2284         val->len = BtId;
2285
2286         nxt -= 2 + sizeof(BtKey);
2287         slotptr(root->page, 2)->off = nxt;
2288         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2289         ptr->len = 2;
2290         ptr->key[0] = 0xff;
2291         ptr->key[1] = 0xff;
2292
2293         // insert lower keys page fence key on newroot page as first key
2294
2295         nxt -= BtId + sizeof(BtVal);
2296         bt_putid (value, left_page_no);
2297         val = (BtVal *)((unsigned char *)root->page + nxt);
2298         memcpy (val->value, value, BtId);
2299         val->len = BtId;
2300
2301         ptr = (BtKey *)leftkey;
2302         nxt -= ptr->len + sizeof(BtKey);
2303         slotptr(root->page, 1)->off = nxt;
2304         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2305         
2306         bt_putid(root->page->right, 0);
2307         root->page->min = nxt;          // reset lowest used offset and key count
2308         root->page->cnt = 2;
2309         root->page->act = 2;
2310         root->page->lvl++;
2311
2312         mgr->pagezero->alloc->lvl = root->page->lvl;
2313
2314         // release and unpin root pages
2315
2316         bt_unlockpage(BtLockWrite, root->latch);
2317         bt_unpinlatch (mgr, root->latch);
2318
2319         bt_unpinlatch (mgr, right);
2320         return 0;
2321 }
2322
2323 //  split already locked full node
2324 //      leave it locked.
2325 //      return pool entry for new right
2326 //      page, pinned & unlocked
2327
2328 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2329 {
2330 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2331 BtPage frame = malloc (mgr->page_size);
2332 uint lvl = set->page->lvl;
2333 BtPageSet right[1];
2334 BtKey *key, *ptr;
2335 BtVal *val, *src;
2336 uid right2;
2337 uint prev;
2338
2339         //  split higher half of keys to frame
2340
2341         memset (frame, 0, mgr->page_size);
2342         max = set->page->cnt;
2343         cnt = max / 2;
2344         idx = 0;
2345
2346         while( cnt++ < max ) {
2347                 if( cnt < max || set->page->lvl )
2348                   if( slotptr(set->page, cnt)->dead )
2349                         continue;
2350
2351                 src = valptr(set->page, cnt);
2352                 nxt -= src->len + sizeof(BtVal);
2353                 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2354
2355                 key = keyptr(set->page, cnt);
2356                 nxt -= key->len + sizeof(BtKey);
2357                 ptr = (BtKey*)((unsigned char *)frame + nxt);
2358                 memcpy (ptr, key, key->len + sizeof(BtKey));
2359
2360                 //      add librarian slot
2361
2362                 slotptr(frame, ++idx)->off = nxt;
2363                 slotptr(frame, idx)->type = Librarian;
2364                 slotptr(frame, idx)->dead = 1;
2365
2366                 //  add actual slot
2367
2368                 slotptr(frame, ++idx)->off = nxt;
2369                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2370
2371                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2372                         frame->act++;
2373         }
2374
2375         frame->bits = mgr->page_bits;
2376         frame->min = nxt;
2377         frame->cnt = idx;
2378         frame->lvl = lvl;
2379
2380         // link right node
2381
2382         if( set->latch->page_no > ROOT_page )
2383                 bt_putid (frame->right, bt_getid (set->page->right));
2384
2385         //      get new free page and write higher keys to it.
2386
2387         if( bt_newpage(mgr, right, frame, thread_no) )
2388                 return 0;
2389
2390         // process lower keys
2391
2392         memcpy (frame, set->page, mgr->page_size);
2393         memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2394         set->latch->dirty = 1;
2395
2396         nxt = mgr->page_size;
2397         set->page->garbage = 0;
2398         set->page->act = 0;
2399         max /= 2;
2400         cnt = 0;
2401         idx = 0;
2402
2403         if( slotptr(frame, max)->type == Librarian )
2404                 max--;
2405
2406         //  assemble page of smaller keys
2407
2408         while( cnt++ < max ) {
2409                 if( slotptr(frame, cnt)->dead )
2410                         continue;
2411                 val = valptr(frame, cnt);
2412                 nxt -= val->len + sizeof(BtVal);
2413                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2414
2415                 key = keyptr(frame, cnt);
2416                 nxt -= key->len + sizeof(BtKey);
2417                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2418
2419                 //      add librarian slot
2420
2421                 slotptr(set->page, ++idx)->off = nxt;
2422                 slotptr(set->page, idx)->type = Librarian;
2423                 slotptr(set->page, idx)->dead = 1;
2424
2425                 //      add actual slot
2426
2427                 slotptr(set->page, ++idx)->off = nxt;
2428                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2429                 set->page->act++;
2430         }
2431
2432         bt_putid(set->page->right, right->latch->page_no);
2433         set->page->min = nxt;
2434         set->page->cnt = idx;
2435         free(frame);
2436
2437         return right->latch - mgr->latchsets;
2438 }
2439
2440 //      fix keys for newly split page
2441 //      call with both pages pinned & locked
2442 //  return unlocked and unpinned
2443
2444 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2445 {
2446 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2447 unsigned char value[BtId];
2448 uint lvl = set->page->lvl;
2449 BtPage page;
2450 BtKey *ptr;
2451
2452         // if current page is the root page, split it
2453
2454         if( set->latch->page_no == ROOT_page )
2455                 return bt_splitroot (mgr, set, right, thread_no);
2456
2457         ptr = keyptr(set->page, set->page->cnt);
2458         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2459
2460         page = bt_mappage (mgr, right);
2461
2462         ptr = keyptr(page, page->cnt);
2463         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2464
2465         // insert new fences in their parent pages
2466
2467         bt_lockpage (BtLockParent, right, thread_no);
2468
2469         bt_lockpage (BtLockParent, set->latch, thread_no);
2470         bt_unlockpage (BtLockWrite, set->latch);
2471
2472         // insert new fence for reformulated left block of smaller keys
2473
2474         bt_putid (value, set->latch->page_no);
2475         ptr = (BtKey *)leftkey;
2476
2477         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2478                 return mgr->err;
2479
2480         // switch fence for right block of larger keys to new right page
2481
2482         bt_putid (value, right->page_no);
2483         ptr = (BtKey *)rightkey;
2484
2485         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2486                 return mgr->err;
2487
2488         bt_unlockpage (BtLockParent, set->latch);
2489         bt_unpinlatch (mgr, set->latch);
2490
2491         bt_unlockpage (BtLockParent, right);
2492         bt_unpinlatch (mgr, right);
2493         return 0;
2494 }
2495
2496 //      install new key and value onto page
2497 //      page must already be checked for
2498 //      adequate space
2499
2500 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2501 {
2502 uint idx, librarian;
2503 BtSlot *node;
2504 BtKey *ptr;
2505 BtVal *val;
2506 int rate;
2507
2508         //      if found slot > desired slot and previous slot
2509         //      is a librarian slot, use it
2510
2511         if( slot > 1 )
2512           if( slotptr(set->page, slot-1)->type == Librarian )
2513                 slot--;
2514
2515         // copy value onto page
2516
2517         set->page->min -= vallen + sizeof(BtVal);
2518         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2519         memcpy (val->value, value, vallen);
2520         val->len = vallen;
2521
2522         // copy key onto page
2523
2524         set->page->min -= keylen + sizeof(BtKey);
2525         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2526         memcpy (ptr->key, key, keylen);
2527         ptr->len = keylen;
2528         
2529         //      find first empty slot
2530
2531         for( idx = slot; idx < set->page->cnt; idx++ )
2532           if( slotptr(set->page, idx)->dead )
2533                 break;
2534
2535         // now insert key into array before slot,
2536         //      adding as many librarian slots as
2537         //      makes sense.
2538
2539         if( idx == set->page->cnt ) {
2540         int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2541
2542                 librarian = ++idx - slot;
2543                 avail /= sizeof(BtSlot);
2544
2545                 if( avail < 0 )
2546                         avail = 0;
2547
2548                 if( librarian > avail )
2549                         librarian = avail;
2550
2551                 if( librarian ) {
2552                         rate = (idx - slot) / librarian;
2553                         set->page->cnt += librarian;
2554                         idx += librarian;
2555                 } else
2556                         rate = 0;
2557         } else
2558                 librarian = 0, rate = 0;
2559
2560         while( idx > slot ) {
2561                 //  transfer slot
2562                 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2563                 idx--;
2564
2565                 //      add librarian slot per rate
2566
2567                 if( librarian )
2568                  if( (idx - slot + 1)/2 <= librarian * rate ) {
2569                         node = slotptr(set->page, idx--);
2570                         node->off = node[1].off;
2571                         node->type = Librarian;
2572                         node->dead = 1;
2573                         librarian--;
2574                   }
2575         }
2576
2577         set->latch->dirty = 1;
2578         set->page->act++;
2579
2580         //      fill in new slot
2581
2582         node = slotptr(set->page, slot);
2583         node->off = set->page->min;
2584         node->type = type;
2585         node->dead = 0;
2586
2587         if( release ) {
2588                 bt_unlockpage (BtLockWrite, set->latch);
2589                 bt_unpinlatch (mgr, set->latch);
2590         }
2591
2592         return 0;
2593 }
2594
2595 //  Insert new key into the btree at given level.
2596 //      either add a new key or update/add an existing one
2597
2598 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2599 {
2600 uint slot, idx, len, entry;
2601 BtPageSet set[1];
2602 BtSlot *node;
2603 BtKey *ptr;
2604 BtVal *val;
2605
2606   while( 1 ) { // find the page and slot for the current key
2607         if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2608                 node = slotptr(set->page, slot);
2609                 ptr = keyptr(set->page, slot);
2610         } else {
2611                 if( !mgr->err )
2612                         mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2613                 return mgr->err;
2614         }
2615
2616         // if librarian slot == found slot, advance to real slot
2617
2618         if( node->type == Librarian )
2619           if( !keycmp (ptr, key, keylen) ) {
2620                 ptr = keyptr(set->page, ++slot);
2621                 node = slotptr(set->page, slot);
2622           }
2623
2624         //  if inserting a duplicate key or unique
2625         //      key that doesn't exist on the page,
2626         //      check for adequate space on the page
2627         //      and insert the new key before slot.
2628
2629         switch( type ) {
2630         case Unique:
2631         case Duplicate:
2632           if( keycmp (ptr, key, keylen) )
2633            if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2634             return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
2635            else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2636                 return mgr->err;
2637            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2638                 return mgr->err;
2639            else
2640                 continue;
2641
2642           // if key already exists, update value and return
2643
2644           val = valptr(set->page, slot);
2645
2646           if( val->len >= vallen ) {
2647                 if( slotptr(set->page, slot)->dead )
2648                         set->page->act++;
2649                 node->type = type;
2650                 node->dead = 0;
2651
2652                 set->page->garbage += val->len - vallen;
2653                 set->latch->dirty = 1;
2654                 val->len = vallen;
2655                 memcpy (val->value, value, vallen);
2656                 bt_unlockpage(BtLockWrite, set->latch);
2657                 bt_unpinlatch (mgr, set->latch);
2658                 return 0;
2659           }
2660
2661           //  new update value doesn't fit in existing value area
2662           //    make sure page has room
2663
2664           if( !node->dead )
2665                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2666           else
2667                 set->page->act++;
2668
2669           node->type = type;
2670           node->dead = 0;
2671
2672           if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2673            if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2674                 return mgr->err;
2675            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2676                 return mgr->err;
2677            else
2678                 continue;
2679
2680           //  copy key and value onto page and update slot
2681
2682           set->page->min -= vallen + sizeof(BtVal);
2683           val = (BtVal*)((unsigned char *)set->page + set->page->min);
2684           memcpy (val->value, value, vallen);
2685           val->len = vallen;
2686
2687           set->latch->dirty = 1;
2688           set->page->min -= keylen + sizeof(BtKey);
2689           ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2690           memcpy (ptr->key, key, keylen);
2691           ptr->len = keylen;
2692         
2693           node->off = set->page->min;
2694           bt_unlockpage(BtLockWrite, set->latch);
2695           bt_unpinlatch (mgr, set->latch);
2696           return 0;
2697     }
2698   }
2699   return 0;
2700 }
2701
2702 //      determine actual page where key is located
2703 //  return slot number
2704
2705 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2706 {
2707 BtKey *key = keyptr(source,src);
2708 uint slot = locks[src].slot;
2709 uint entry;
2710
2711         if( src > 1 && locks[src].reuse )
2712           entry = locks[src-1].entry, slot = 0;
2713         else
2714           entry = locks[src].entry;
2715
2716         if( slot ) {
2717                 set->latch = mgr->latchsets + entry;
2718                 set->page = bt_mappage (mgr, set->latch);
2719                 return slot;
2720         }
2721
2722         //      is locks->reuse set? or was slot zeroed?
2723         //      if so, find where our key is located 
2724         //      on current page or pages split on
2725         //      same page txn operations.
2726
2727         do {
2728                 set->latch = mgr->latchsets + entry;
2729                 set->page = bt_mappage (mgr, set->latch);
2730
2731                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2732                   if( slotptr(set->page, slot)->type == Librarian )
2733                         slot++;
2734                   if( locks[src].reuse )
2735                         locks[src].entry = entry;
2736                   return slot;
2737                 }
2738         } while( entry = set->latch->split );
2739
2740         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2741         return 0;
2742 }
2743
2744 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2745 {
2746 BtKey *key = keyptr(source, src);
2747 BtVal *val = valptr(source, src);
2748 BtLatchSet *latch;
2749 BtPageSet set[1];
2750 uint entry, slot;
2751
2752   while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2753         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2754           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2755                 return mgr->err;
2756           set->page->lsn = lsn;
2757           return 0;
2758         }
2759
2760         //  split page
2761
2762         if( entry = bt_splitpage (mgr, set, thread_no) )
2763           latch = mgr->latchsets + entry;
2764         else
2765           return mgr->err;
2766
2767         //      splice right page into split chain
2768         //      and WriteLock it
2769
2770         bt_lockpage(BtLockWrite, latch, thread_no);
2771         latch->split = set->latch->split;
2772         set->latch->split = entry;
2773         locks[src].slot = 0;
2774   }
2775
2776   return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2777 }
2778
2779 //      perform delete from smaller btree
2780 //  insert a delete slot if not found there
2781
2782 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2783 {
2784 BtKey *key = keyptr(source, src);
2785 BtPageSet set[1];
2786 uint idx, slot;
2787 BtSlot *node;
2788 BtKey *ptr;
2789 BtVal *val;
2790
2791         if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2792           node = slotptr(set->page, slot);
2793           ptr = keyptr(set->page, slot);
2794           val = valptr(set->page, slot);
2795         } else
2796           return mgr->line = __LINE__, mgr->err = BTERR_struct;
2797
2798         //  if slot is not found, insert a delete slot
2799
2800         if( keycmp (ptr, key->key, key->len) )
2801           return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
2802
2803         //      if node is already dead,
2804         //      ignore the request.
2805
2806         if( node->dead )
2807                 return 0;
2808
2809         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2810         set->latch->dirty = 1;
2811         set->page->lsn = lsn;
2812         set->page->act--;
2813
2814         node->dead = 0;
2815         __sync_fetch_and_add(&mgr->found, 1);
2816         return 0;
2817 }
2818
2819 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2820 {
2821 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2822 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2823
2824         return keycmp (key1, key2->key, key2->len);
2825 }
2826 //      atomic modification of a batch of keys.
2827
2828 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2829 {
2830 uint src, idx, slot, samepage, entry, que = 0;
2831 BtKey *key, *ptr, *key2;
2832 int result = 0;
2833 BtSlot temp[1];
2834 logseqno lsn;
2835 int type;
2836
2837   // stable sort the list of keys into order to
2838   //    prevent deadlocks between threads.
2839 /*
2840   for( src = 1; src++ < source->cnt; ) {
2841         *temp = *slotptr(source,src);
2842         key = keyptr (source,src);
2843
2844         for( idx = src; --idx; ) {
2845           key2 = keyptr (source,idx);
2846           if( keycmp (key, key2->key, key2->len) < 0 ) {
2847                 *slotptr(source,idx+1) = *slotptr(source,idx);
2848                 *slotptr(source,idx) = *temp;
2849           } else
2850                 break;
2851         }
2852   }
2853 */
2854         qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2855   //  add entries to redo log
2856
2857   if( bt->mgr->pagezero->redopages )
2858         lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2859   else
2860         lsn = 0;
2861
2862   //  perform the individual actions in the transaction
2863
2864   if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2865         return bt->mgr->err;
2866
2867   // if number of active pages
2868   // is greater than the buffer pool
2869   // promote page into larger btree
2870
2871   if( bt->main )
2872    while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
2873         if( bt_txnpromote (bt) )
2874           return bt->mgr->err;
2875
2876   // return success
2877
2878   return 0;
2879 }
2880
2881 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2882 {
2883 uint src, idx, slot, samepage, entry, que = 0;
2884 BtPageSet set[1], prev[1], right[1];
2885 unsigned char value[BtId];
2886 uid right_page_no;
2887 BtLatchSet *latch;
2888 AtomicTxn *locks;
2889 BtKey *key, *ptr;
2890 BtPage page;
2891 BtVal *val;
2892
2893   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2894
2895   // Load the leaf page for each key
2896   // group same page references with reuse bit
2897   // and determine any constraint violations
2898
2899   for( src = 0; src++ < source->cnt; ) {
2900         key = keyptr(source, src);
2901         slot = 0;
2902
2903         // first determine if this modification falls
2904         // on the same page as the previous modification
2905         //      note that the far right leaf page is a special case
2906
2907         if( samepage = src > 1 )
2908           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2909                 slot = bt_findslot(set->page, key->key, key->len);
2910
2911         if( !slot )
2912           if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockAtomic + BtLockWrite, thread_no) )
2913                 set->latch->split = 0;
2914           else
2915                 return mgr->err;
2916
2917         if( slotptr(set->page, slot)->type == Librarian )
2918           slot++;
2919
2920         if( !samepage ) {
2921           locks[src].entry = set->latch - mgr->latchsets;
2922           locks[src].slot = slot;
2923           locks[src].reuse = 0;
2924         } else {
2925           locks[src].entry = 0;
2926           locks[src].slot = 0;
2927           locks[src].reuse = 1;
2928         }
2929
2930         //      capture current lsn for master page
2931
2932         locks[src].reqlsn = set->page->lsn;
2933   }
2934
2935   // insert or delete each key
2936   // process any splits or merges
2937   // run through txn list backwards
2938
2939   samepage = source->cnt + 1;
2940
2941   for( src = source->cnt; src; src-- ) {
2942         if( locks[src].reuse )
2943           continue;
2944
2945         //  perform the txn operations
2946         //      from smaller to larger on
2947         //  the same page
2948
2949         for( idx = src; idx < samepage; idx++ )
2950          switch( slotptr(source,idx)->type ) {
2951          case Delete:
2952           if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
2953                 return mgr->err;
2954           break;
2955
2956         case Duplicate:
2957         case Unique:
2958           if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
2959                 return mgr->err;
2960           break;
2961
2962         default:
2963           bt_atomicpage (mgr, source, locks, idx, set);
2964           continue;
2965         }
2966
2967         //      after the same page operations have finished,
2968         //  process master page for splits or deletion.
2969
2970         latch = prev->latch = mgr->latchsets + locks[src].entry;
2971         prev->page = bt_mappage (mgr, prev->latch);
2972         samepage = src;
2973
2974         //  pick-up all splits from master page
2975         //      each one is already pinned & WriteLocked.
2976
2977         if( entry = latch->split ) do {
2978           set->latch = mgr->latchsets + entry;
2979           set->page = bt_mappage (mgr, set->latch);
2980
2981           // delete empty master page by undoing its split
2982           //  (this is potentially another empty page)
2983
2984           if( !prev->page->act ) {
2985                 memcpy (set->page->left, prev->page->left, BtId);
2986                 memcpy (prev->page, set->page, mgr->page_size);
2987                 bt_lockpage (BtLockDelete, set->latch, thread_no);
2988                 prev->latch->split = set->latch->split;
2989                 prev->latch->dirty = 1;
2990                 bt_freepage (mgr, set);
2991                 continue;
2992           }
2993
2994           // remove empty split page from the split chain
2995           // and return it to the free list. No other
2996           // thread has its page number yet.
2997
2998           if( !set->page->act ) {
2999                 memcpy (prev->page->right, set->page->right, BtId);
3000                 prev->latch->split = set->latch->split;
3001
3002                 bt_lockpage (BtLockDelete, set->latch, thread_no);
3003                 bt_freepage (mgr, set);
3004                 continue;
3005           }
3006
3007           //  update prev's fence key
3008
3009           ptr = keyptr(prev->page,prev->page->cnt);
3010           bt_putid (value, prev->latch->page_no);
3011
3012           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3013                 return mgr->err;
3014
3015           // splice in the left link into the split page
3016
3017           bt_putid (set->page->left, prev->latch->page_no);
3018
3019           if( lsm )
3020                 bt_syncpage (mgr, prev->page, prev->latch);
3021
3022           // page is unlocked & unpinned below to avoid bt_txnpromote
3023
3024           *prev = *set;
3025         } while( entry = prev->latch->split );
3026
3027         //  update left pointer in next right page from last split page
3028         //      (if all splits were reversed or none occurred, latch->split == 0)
3029
3030         if( latch->split ) {
3031           //  fix left pointer in master's original (now split)
3032           //  far right sibling or set rightmost page in page zero
3033
3034           if( right_page_no = bt_getid (prev->page->right) ) {
3035                 if( set->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
3036                   set->page = bt_mappage (mgr, set->latch);
3037                 else
3038                   return mgr->err;
3039
3040             bt_lockpage (BtLockLink, set->latch, thread_no);
3041             bt_putid (set->page->left, prev->latch->page_no);
3042                 set->latch->dirty = 1;
3043
3044             bt_unlockpage (BtLockLink, set->latch);
3045                 bt_unpinlatch (mgr, set->latch);
3046           } else {      // prev is rightmost page
3047             bt_mutexlock (mgr->lock);
3048                 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3049             bt_releasemutex(mgr->lock);
3050           }
3051
3052           //  Process last page split in chain
3053           //  by switching the key from the master
3054           //  page to the last split.
3055
3056           ptr = keyptr(prev->page,prev->page->cnt);
3057           bt_putid (value, prev->latch->page_no);
3058
3059           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3060                 return mgr->err;
3061
3062           if( lsm )
3063                 bt_syncpage (mgr, prev->page, prev->latch);
3064
3065           // unlock and unpin master page
3066
3067           bt_unlockpage(BtLockAtomic, latch);
3068           bt_unlockpage(BtLockWrite, latch);
3069           bt_unpinlatch(mgr, latch);
3070
3071           // go through the list of splits and
3072           // release the locks and unpin
3073
3074           while( entry = latch->split ) {
3075                 latch = mgr->latchsets + entry;
3076             bt_unlockpage(BtLockWrite, latch);
3077                 bt_unpinlatch(mgr, latch);
3078           }
3079
3080           continue;
3081         }
3082
3083         //      since there are no splits, we're
3084         //  finished if master page occupied
3085
3086         bt_unlockpage(BtLockAtomic, prev->latch);
3087
3088         if( prev->page->act ) {
3089           bt_unlockpage(BtLockWrite, prev->latch);
3090
3091           if( lsm )
3092                 bt_syncpage (mgr, prev->page, prev->latch);
3093
3094           bt_unpinlatch(mgr, prev->latch);
3095           continue;
3096         }
3097
3098         // any and all splits were reversed, and the
3099         // master page located in prev is empty, delete it
3100
3101         if( entry = bt_deletepage (mgr, prev, thread_no, BtLockWrite) )
3102                 right->latch = mgr->latchsets + entry;
3103         else
3104                 return mgr->err;
3105
3106         //      obtain delete and write locks to right node
3107
3108         bt_unlockpage (BtLockParent, right->latch);
3109         right->page = bt_mappage (mgr, right->latch);
3110         bt_lockpage (BtLockDelete, right->latch, thread_no);
3111         bt_lockpage (BtLockWrite, right->latch, thread_no);
3112         bt_freepage (mgr, right);
3113
3114         bt_unpinlatch (mgr, prev->latch);
3115   }
3116
3117   free (locks);
3118   return 0;
3119 }
3120
3121 //  promote a page into the larger btree
3122
3123 BTERR bt_txnpromote (BtDb *bt)
3124 {
3125 BtPageSet set[1], right[1];
3126 uint entry, slot, idx;
3127 BtSlot *node;
3128 BtKey *ptr;
3129 BtVal *val;
3130
3131   while( 1 ) {
3132 #ifdef unix
3133         entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
3134 #else
3135         entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
3136 #endif
3137         entry %= bt->mgr->latchtotal;
3138
3139         if( !entry )
3140                 continue;
3141
3142         set->latch = bt->mgr->latchsets + entry;
3143
3144         if( !bt_mutextry(set->latch->modify) )
3145                 continue;
3146
3147         //  skip this entry if it is pinned
3148
3149         if( set->latch->pin & ~CLOCK_bit ) {
3150           bt_releasemutex(set->latch->modify);
3151           continue;
3152         }
3153
3154         set->page = bt_mappage (bt->mgr, set->latch);
3155
3156         // entry never used or has no right sibling
3157
3158         if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3159           bt_releasemutex(set->latch->modify);
3160           continue;
3161         }
3162
3163         // entry interiour node or being killed
3164
3165         if( set->page->lvl || set->page->free || set->page->kill ) {
3166           bt_releasemutex(set->latch->modify);
3167           continue;
3168         }
3169
3170         //  pin the page for our useage
3171
3172         set->latch->pin++;
3173         bt_releasemutex(set->latch->modify);
3174         bt_lockpage (BtLockAtomic | BtLockWrite, set->latch, bt->thread_no);
3175         memcpy (bt->frame, set->page, bt->mgr->page_size);
3176
3177 if( !(set->latch->page_no % 100) )
3178 fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act);
3179
3180         if( entry = bt_deletepage (bt->mgr, set, bt->thread_no, BtLockAtomic | BtLockWrite) )
3181                 right->latch = bt->mgr->latchsets + entry;
3182         else
3183                 return bt->mgr->err;
3184
3185         //      obtain delete and write locks to right node
3186
3187         bt_unlockpage (BtLockParent, right->latch);
3188         right->page = bt_mappage (bt->mgr, right->latch);
3189
3190         //  release page with its new contents
3191
3192         bt_unlockpage (BtLockAtomic, set->latch);
3193         bt_unpinlatch (bt->mgr, set->latch);
3194
3195         // transfer slots in our selected page to larger btree
3196
3197         if( bt_atomicexec (bt->main, bt->frame, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) )
3198                 return bt->main->err;
3199
3200         //  free the page we took over
3201
3202         bt_lockpage (BtLockDelete, right->latch, bt->thread_no);
3203         bt_lockpage (BtLockWrite, right->latch, bt->thread_no);
3204         bt_freepage (bt->mgr, right);
3205         return 0;
3206   }
3207 }
3208
3209 //      set cursor to highest slot on highest page
3210
3211 uint bt_lastkey (BtDb *bt)
3212 {
3213 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3214 BtPageSet set[1];
3215
3216         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3217                 set->page = bt_mappage (bt->mgr, set->latch);
3218         else
3219                 return 0;
3220
3221     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3222         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3223     bt_unlockpage(BtLockRead, set->latch);
3224         bt_unpinlatch (bt->mgr, set->latch);
3225         return bt->cursor->cnt;
3226 }
3227
3228 //      return previous slot on cursor page
3229
3230 uint bt_prevkey (BtDb *bt, uint slot)
3231 {
3232 uid cursor_page = bt->cursor->page_no;
3233 uid ourright, next, us = cursor_page;
3234 BtPageSet set[1];
3235
3236         if( --slot )
3237                 return slot;
3238
3239         ourright = bt_getid(bt->cursor->right);
3240
3241 goleft:
3242         if( !(next = bt_getid(bt->cursor->left)) )
3243                 return 0;
3244
3245 findourself:
3246         cursor_page = next;
3247
3248         if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3249                 set->page = bt_mappage (bt->mgr, set->latch);
3250         else
3251                 return 0;
3252
3253     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3254         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3255         bt_unlockpage(BtLockRead, set->latch);
3256         bt_unpinlatch (bt->mgr, set->latch);
3257         
3258         next = bt_getid (bt->cursor->right);
3259
3260         if( bt->cursor->kill )
3261                 goto findourself;
3262
3263         if( next != us )
3264           if( next == ourright )
3265                 goto goleft;
3266           else
3267                 goto findourself;
3268
3269         return bt->cursor->cnt;
3270 }
3271
3272 //  return next slot on cursor page
3273 //  or slide cursor right into next page
3274
3275 uint bt_nextkey (BtDb *bt, uint slot)
3276 {
3277 BtPageSet set[1];
3278 uid right;
3279
3280   do {
3281         right = bt_getid(bt->cursor->right);
3282
3283         while( slot++ < bt->cursor->cnt )
3284           if( slotptr(bt->cursor,slot)->dead )
3285                 continue;
3286           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3287                 return slot;
3288           else
3289                 break;
3290
3291         if( !right )
3292                 break;
3293
3294         if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3295                 set->page = bt_mappage (bt->mgr, set->latch);
3296         else
3297                 return 0;
3298
3299     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3300         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3301         bt_unlockpage(BtLockRead, set->latch);
3302         bt_unpinlatch (bt->mgr, set->latch);
3303         slot = 0;
3304
3305   } while( 1 );
3306
3307   return bt->mgr->err = 0;
3308 }
3309
3310 //  cache page of keys into cursor and return starting slot for given key
3311
3312 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3313 {
3314 BtPageSet set[1];
3315 uint slot;
3316
3317         // cache page for retrieval
3318
3319         if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3320           memcpy (bt->cursor, set->page, bt->mgr->page_size);
3321         else
3322           return 0;
3323
3324         bt_unlockpage(BtLockRead, set->latch);
3325         bt_unpinlatch (bt->mgr, set->latch);
3326         return slot;
3327 }
3328
3329 #ifdef STANDALONE
3330
3331 #ifndef unix
3332 double getCpuTime(int type)
3333 {
3334 FILETIME crtime[1];
3335 FILETIME xittime[1];
3336 FILETIME systime[1];
3337 FILETIME usrtime[1];
3338 SYSTEMTIME timeconv[1];
3339 double ans = 0;
3340
3341         memset (timeconv, 0, sizeof(SYSTEMTIME));
3342
3343         switch( type ) {
3344         case 0:
3345                 GetSystemTimeAsFileTime (xittime);
3346                 FileTimeToSystemTime (xittime, timeconv);
3347                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3348                 break;
3349         case 1:
3350                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3351                 FileTimeToSystemTime (usrtime, timeconv);
3352                 break;
3353         case 2:
3354                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3355                 FileTimeToSystemTime (systime, timeconv);
3356                 break;
3357         }
3358
3359         ans += (double)timeconv->wHour * 3600;
3360         ans += (double)timeconv->wMinute * 60;
3361         ans += (double)timeconv->wSecond;
3362         ans += (double)timeconv->wMilliseconds / 1000;
3363         return ans;
3364 }
3365 #else
3366 #include <time.h>
3367 #include <sys/resource.h>
3368
3369 double getCpuTime(int type)
3370 {
3371 struct rusage used[1];
3372 struct timeval tv[1];
3373
3374         switch( type ) {
3375         case 0:
3376                 gettimeofday(tv, NULL);
3377                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3378
3379         case 1:
3380                 getrusage(RUSAGE_SELF, used);
3381                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3382
3383         case 2:
3384                 getrusage(RUSAGE_SELF, used);
3385                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3386         }
3387
3388         return 0;
3389 }
3390 #endif
3391
3392 void bt_poolaudit (BtMgr *mgr)
3393 {
3394 BtLatchSet *latch;
3395 uint entry = 0;
3396
3397         while( ++entry < mgr->latchtotal ) {
3398                 latch = mgr->latchsets + entry;
3399
3400                 if( *latch->readwr->value )
3401                         fprintf(stderr, "latchset %d wrtlocked for page %d\n", entry, latch->page_no);
3402
3403                 if( *latch->access->value )
3404                         fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3405
3406                 if( *latch->parent->value )
3407                         fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3408
3409                 if( *latch->atomic->value )
3410                         fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3411
3412                 if( *latch->modify->value )
3413                         fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3414
3415                 if( latch->pin & ~CLOCK_bit )
3416                         fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3417         }
3418 }
3419
3420 typedef struct {
3421         char idx;
3422         char *type;
3423         char *infile;
3424         BtMgr *main;
3425         BtMgr *mgr;
3426         int num;
3427 } ThreadArg;
3428
3429 //  standalone program to index file of keys
3430 //  then list them onto std-out
3431
3432 #ifdef unix
3433 void *index_file (void *arg)
3434 #else
3435 uint __stdcall index_file (void *arg)
3436 #endif
3437 {
3438 int line = 0, found = 0, cnt = 0, idx;
3439 uid next, page_no = LEAF_page;  // start on first page of leaves
3440 int ch, len = 0, slot, type = 0;
3441 unsigned char key[BT_maxkey];
3442 unsigned char txn[65536];
3443 ThreadArg *args = arg;
3444 BtPage page, frame;
3445 BtPageSet set[1];
3446 uint nxt = 65536;
3447 BtKey *ptr;
3448 BtVal *val;
3449 BtDb *bt;
3450 FILE *in;
3451
3452         bt = bt_open (args->mgr, args->main);
3453         page = (BtPage)txn;
3454
3455         if( args->idx < strlen (args->type) )
3456                 ch = args->type[args->idx];
3457         else
3458                 ch = args->type[strlen(args->type) - 1];
3459
3460         switch(ch | 0x20)
3461         {
3462         case 'd':
3463                 type = Delete;
3464
3465         case 'p':
3466                 if( !type )
3467                         type = Unique;
3468
3469                 if( args->num )
3470                  if( type == Delete )
3471                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3472                  else
3473                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3474                 else
3475                  if( type == Delete )
3476                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3477                  else
3478                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3479
3480                 if( in = fopen (args->infile, "rb") )
3481                   while( ch = getc(in), ch != EOF )
3482                         if( ch == '\n' )
3483                         {
3484                           line++;
3485
3486                           if( !args->num ) {
3487                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3488                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3489                             len = 0;
3490                                 continue;
3491                           }
3492
3493                           nxt -= len - 10;
3494                           memcpy (txn + nxt, key + 10, len - 10);
3495                           nxt -= 1;
3496                           txn[nxt] = len - 10;
3497                           nxt -= 10;
3498                           memcpy (txn + nxt, key, 10);
3499                           nxt -= 1;
3500                           txn[nxt] = 10;
3501                           slotptr(page,++cnt)->off  = nxt;
3502                           slotptr(page,cnt)->type = type;
3503                           len = 0;
3504
3505                           if( cnt < args->num )
3506                                 continue;
3507
3508                           page->cnt = cnt;
3509                           page->act = cnt;
3510                           page->min = nxt;
3511
3512                           if( bt_atomictxn (bt, page) )
3513                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3514                           nxt = sizeof(txn);
3515                           cnt = 0;
3516
3517                         }
3518                         else if( len < BT_maxkey )
3519                                 key[len++] = ch;
3520                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3521                 break;
3522
3523         case 'w':
3524                 fprintf(stderr, "started indexing for %s\n", args->infile);
3525                 if( in = fopen (args->infile, "r") )
3526                   while( ch = getc(in), ch != EOF )
3527                         if( ch == '\n' )
3528                         {
3529                           line++;
3530
3531                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3532                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3533                           len = 0;
3534                         }
3535                         else if( len < BT_maxkey )
3536                                 key[len++] = ch;
3537                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3538                 break;
3539
3540         case 'f':
3541                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3542                 if( in = fopen (args->infile, "rb") )
3543                   while( ch = getc(in), ch != EOF )
3544                         if( ch == '\n' )
3545                         {
3546                           line++;
3547                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3548                                 found++;
3549                           else if( bt->mgr->err )
3550                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3551                           len = 0;
3552                         }
3553                         else if( len < BT_maxkey )
3554                                 key[len++] = ch;
3555                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3556                 break;
3557
3558         case 's':
3559                 fprintf(stderr, "started scanning\n");
3560                 
3561                 do {
3562                         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3563                                 set->page = bt_mappage (bt->mgr, set->latch);
3564                         else
3565                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3566
3567                         bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3568                         next = bt_getid (set->page->right);
3569
3570                         for( slot = 0; slot++ < set->page->cnt; )
3571                          if( next || slot < set->page->cnt )
3572                           if( !slotptr(set->page, slot)->dead ) {
3573                                 ptr = keyptr(set->page, slot);
3574                                 len = ptr->len;
3575
3576                             if( slotptr(set->page, slot)->type == Duplicate )
3577                                         len -= BtId;
3578
3579                                 fwrite (ptr->key, len, 1, stdout);
3580                                 val = valptr(set->page, slot);
3581                                 fwrite (val->value, val->len, 1, stdout);
3582                                 fputc ('\n', stdout);
3583                                 cnt++;
3584                            }
3585
3586                         bt_unlockpage (BtLockRead, set->latch);
3587                         bt_unpinlatch (bt->mgr, set->latch);
3588                 } while( page_no = next );
3589
3590                 fprintf(stderr, " Total keys read %d\n", cnt);
3591                 break;
3592
3593         case 'r':
3594                 fprintf(stderr, "started reverse scan\n");
3595                 if( slot = bt_lastkey (bt) )
3596                    while( slot = bt_prevkey (bt, slot) ) {
3597                         if( slotptr(bt->cursor, slot)->dead )
3598                           continue;
3599
3600                         ptr = keyptr(bt->cursor, slot);
3601                         len = ptr->len;
3602
3603                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3604                                 len -= BtId;
3605
3606                         fwrite (ptr->key, len, 1, stdout);
3607                         val = valptr(bt->cursor, slot);
3608                         fwrite (val->value, val->len, 1, stdout);
3609                         fputc ('\n', stdout);
3610                         cnt++;
3611                   }
3612
3613                 fprintf(stderr, " Total keys read %d\n", cnt);
3614                 break;
3615
3616         case 'c':
3617 #ifdef unix
3618                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3619 #endif
3620                 fprintf(stderr, "started counting\n");
3621                 next = REDO_page + bt->mgr->pagezero->redopages;
3622
3623                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3624                         if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3625                                 break;
3626
3627                         if( !bt->cursor->free && !bt->cursor->lvl )
3628                                 cnt += bt->cursor->act;
3629
3630                         bt->mgr->reads++;
3631                         page_no = next++;
3632                 }
3633                 
3634                 cnt--;  // remove stopper key
3635                 fprintf(stderr, " Total keys counted %d\n", cnt);
3636                 break;
3637         }
3638
3639         bt_close (bt);
3640 #ifdef unix
3641         return NULL;
3642 #else
3643         return 0;
3644 #endif
3645 }
3646
3647 typedef struct timeval timer;
3648
3649 int main (int argc, char **argv)
3650 {
3651 int idx, cnt, len, slot, err;
3652 int segsize, bits = 16;
3653 double start, stop;
3654 #ifdef unix
3655 pthread_t *threads;
3656 #else
3657 HANDLE *threads;
3658 #endif
3659 ThreadArg *args;
3660 uint redopages = 0;
3661 uint poolsize = 0;
3662 uint mainpool = 0;
3663 uint mainbits = 0;
3664 float elapsed;
3665 int num = 0;
3666 char key[1];
3667 BtMgr *main;
3668 BtMgr *mgr;
3669 BtKey *ptr;
3670
3671         if( argc < 3 ) {
3672                 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3673                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
3674                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
3675                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3676                 fprintf (stderr, "  page_bits is the page size in bits for the cache btree\n");
3677                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3678                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3679                 fprintf (stderr, "  recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3680                 fprintf (stderr, "  main_bits is the page size of the main btree in bits\n");
3681                 fprintf (stderr, "  main_pool is the number of main pages in the main buffer pool\n");
3682                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3683                 exit(0);
3684         }
3685
3686         start = getCpuTime(0);
3687
3688         if( argc > 4 )
3689                 bits = atoi(argv[4]);
3690
3691         if( argc > 5 )
3692                 poolsize = atoi(argv[5]);
3693
3694         if( !poolsize )
3695                 fprintf (stderr, "Warning: no mapped_pool\n");
3696
3697         if( argc > 6 )
3698                 num = atoi(argv[6]);
3699
3700         if( argc > 7 )
3701                 redopages = atoi(argv[7]);
3702
3703         if( redopages + REDO_page > 65535 )
3704                 fprintf (stderr, "Warning: Recovery buffer too large\n");
3705
3706         if( argc > 8 )
3707                 mainbits = atoi(argv[8]);
3708
3709         if( argc > 9 )
3710                 mainpool = atoi(argv[9]);
3711
3712         cnt = argc - 10;
3713 #ifdef unix
3714         threads = malloc (cnt * sizeof(pthread_t));
3715 #else
3716         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3717 #endif
3718         args = malloc ((cnt + 1) * sizeof(ThreadArg));
3719
3720         mgr = bt_mgr (argv[1], bits, poolsize, redopages);
3721
3722         if( !mgr ) {
3723                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3724                 exit (1);
3725         }
3726
3727         if( mainbits ) {
3728                 main = bt_mgr (argv[2], mainbits, mainpool, 0);
3729
3730                 if( !main ) {
3731                         fprintf(stderr, "Index Open Error %s\n", argv[2]);
3732                         exit (1);
3733                 }
3734         } else
3735                 main = NULL;
3736
3737         //      fire off threads
3738
3739         if( cnt )
3740           for( idx = 0; idx < cnt; idx++ ) {
3741                 args[idx].infile = argv[idx + 10];
3742                 args[idx].type = argv[3];
3743                 args[idx].main = main;
3744                 args[idx].mgr = mgr;
3745                 args[idx].num = num;
3746                 args[idx].idx = idx;
3747 #ifdef unix
3748                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3749                         fprintf(stderr, "Error creating thread %d\n", err);
3750 #else
3751                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3752 #endif
3753           }
3754         else {
3755                 args[0].infile = argv[idx + 10];
3756                 args[0].type = argv[3];
3757                 args[0].main = main;
3758                 args[0].mgr = mgr;
3759                 args[0].num = num;
3760                 args[0].idx = 0;
3761                 index_file (args);
3762         }
3763
3764         //      wait for termination
3765
3766 #ifdef unix
3767         for( idx = 0; idx < cnt; idx++ )
3768                 pthread_join (threads[idx], NULL);
3769 #else
3770         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3771
3772         for( idx = 0; idx < cnt; idx++ )
3773                 CloseHandle(threads[idx]);
3774 #endif
3775         bt_poolaudit(mgr);
3776
3777         if( main )
3778                 bt_poolaudit(main);
3779
3780         fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
3781
3782         if( main )
3783                 bt_mgrclose (main);
3784         bt_mgrclose (mgr);
3785
3786         elapsed = getCpuTime(0) - start;
3787         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3788         elapsed = getCpuTime(1);
3789         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3790         elapsed = getCpuTime(2);
3791         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3792 }
3793
3794 #endif  //STANDALONE