]> pd.if.org Git - btree/blob - threadskv10b.c
Merge branch 'master' of https://github.com/malbrain/Btree-source-code
[btree] / threadskv10b.c
1 // btree version threadskv10b futex version
2 //      with reworked bt_deletekey code,
3 //      phase-fair re-entrant reader writer locks,
4 //      librarian page split code,
5 //      bi-directional cursors
6 //      traditional buffer pool manager
7 //      ACID batched key-value updates
8 //      redo log for failure recovery
9 //      and LSM B-trees for write optimization
10
11 // 17 OCT 2014
12
13 // author: karl malbrain, malbrain@cal.berkeley.edu
14
15 /*
16 This work, including the source code, documentation
17 and related data, is placed into the public domain.
18
19 The orginal author is Karl Malbrain.
20
21 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
22 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
23 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
24 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
25 RESULTING FROM THE USE, MODIFICATION, OR
26 REDISTRIBUTION OF THIS SOFTWARE.
27 */
28
29 // Please see the project home page for documentation
30 // code.google.com/p/high-concurrency-btree
31
32 #define _FILE_OFFSET_BITS 64
33 #define _LARGEFILE64_SOURCE
34
35 #ifdef linux
36 #define _GNU_SOURCE
37 #include <linux/futex.h>
38 #define SYS_futex 202
39 #endif
40
41 #ifdef unix
42 #include <unistd.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <fcntl.h>
46 #include <sys/time.h>
47 #include <sys/mman.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <limits.h>
51 #else
52 #define WIN32_LEAN_AND_MEAN
53 #include <windows.h>
54 #include <stdio.h>
55 #include <stdlib.h>
56 #include <time.h>
57 #include <fcntl.h>
58 #include <process.h>
59 #include <intrin.h>
60 #endif
61
62 #include <memory.h>
63 #include <string.h>
64 #include <stddef.h>
65
66 typedef unsigned long long      uid;
67 typedef unsigned long long      logseqno;
68
69 #ifndef unix
70 typedef unsigned long long      off64_t;
71 typedef unsigned short          ushort;
72 typedef unsigned int            uint;
73 #endif
74
75 #define BT_ro 0x6f72    // ro
76 #define BT_rw 0x7772    // rw
77
78 #define BT_maxbits              26                                      // maximum page size in bits
79 #define BT_minbits              9                                       // minimum page size in bits
80 #define BT_minpage              (1 << BT_minbits)       // minimum page size
81 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
82
83 //  BTree page number constants
84 #define ALLOC_page              0       // allocation page
85 #define ROOT_page               1       // root of the btree
86 #define LEAF_page               2       // first page of leaves
87 #define REDO_page               3       // first page of redo buffer
88
89 //      Number of levels to create in a new BTree
90
91 #define MIN_lvl                 2
92
93 /*
94 There are six lock types for each node in four independent sets: 
95 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
96 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
97 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
98 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
99 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
100 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
101 */
102
103 typedef enum{
104         BtLockAccess = 1,
105         BtLockDelete = 2,
106         BtLockRead   = 4,
107         BtLockWrite  = 8,
108         BtLockParent = 16,
109         BtLockAtomic = 32,
110         BtLockLink   = 64
111 } BtLock;
112
113 typedef struct {
114   union {
115         struct {
116           volatile ushort xlock[1];             // one writer has exclusive lock
117           volatile ushort wrt[1];               // count of other writers waiting
118         } bits[1];
119         uint value[1];
120   };
121 } BtMutexLatch;
122
123 #define XCL 1
124 #define WRT 65536
125
126 //      definition for reader/writer reentrant lock implementation
127
128 typedef struct {
129   BtMutexLatch xcl[1];
130   union {
131         struct {
132                 volatile ushort tid[1];
133                 volatile ushort readers[1];
134         } bits[1];
135         uint value[1];
136   };
137   volatile ushort waitwrite[1];
138   volatile ushort waitread[1];
139   volatile ushort phase[1];             // phase == 1 for reading after write
140   volatile ushort dup[1];               // reentrant counter
141 } RWLock;
142
143 //      write only reentrant lock
144
145 typedef struct {
146   BtMutexLatch xcl[1];
147   union {
148         struct {
149                 volatile ushort tid[1];
150                 volatile ushort dup[1];
151         } bits[1];
152         uint value[1];
153   };
154   volatile uint waiters[1];
155 } WOLock;
156
157 //      mode & definition for lite latch implementation
158
159 enum {
160         QueRd = 1,              // reader queue
161         QueWr = 2               // writer queue
162 } RWQueue;
163
164 //  hash table entries
165
166 typedef struct {
167         uint entry;             // Latch table entry at head of chain
168         BtMutexLatch latch[1];
169 } BtHashEntry;
170
171 //      latch manager table structure
172
173 typedef struct {
174         uid page_no;                    // latch set page number
175         RWLock readwr[1];               // read/write page lock
176         RWLock access[1];               // Access Intent/Page delete
177         RWLock parent[1];               // Posting of fence key in parent
178         RWLock atomic[1];               // Atomic update in progress
179         RWLock link[1];                 // left link being updated
180         uint split;                             // right split page atomic insert
181         uint next;                              // next entry in hash table chain
182         uint prev;                              // prev entry in hash table chain
183         ushort pin;                             // number of accessing threads
184         unsigned char dirty;    // page in cache is dirty (atomic setable)
185         BtMutexLatch modify[1]; // modify entry lite latch
186 } BtLatchSet;
187
188 //      Define the length of the page record numbers
189
190 #define BtId 6
191
192 //      Page key slot definition.
193
194 //      Keys are marked dead, but remain on the page until
195 //      it cleanup is called. The fence key (highest key) for
196 //      a leaf page is always present, even after cleanup.
197
198 //      Slot types
199
200 //      In addition to the Unique keys that occupy slots
201 //      there are Librarian and Duplicate key
202 //      slots occupying the key slot array.
203
204 //      The Librarian slots are dead keys that
205 //      serve as filler, available to add new Unique
206 //      or Dup slots that are inserted into the B-tree.
207
208 //      The Duplicate slots have had their key bytes extended
209 //      by 6 bytes to contain a binary duplicate key uniqueifier.
210
211 typedef enum {
212         Unique,
213         Librarian,
214         Duplicate,
215         Delete
216 } BtSlotType;
217
218 typedef struct {
219         uint off:BT_maxbits;    // page offset for key start
220         uint type:3;                    // type of slot
221         uint dead:1;                    // set for deleted slot
222 } BtSlot;
223
224 //      The key structure occupies space at the upper end of
225 //      each page.  It's a length byte followed by the key
226 //      bytes.
227
228 typedef struct {
229         unsigned char len;              // this can be changed to a ushort or uint
230         unsigned char key[0];
231 } BtKey;
232
233 //      the value structure also occupies space at the upper
234 //      end of the page. Each key is immediately followed by a value.
235
236 typedef struct {
237         unsigned char len;              // this can be changed to a ushort or uint
238         unsigned char value[0];
239 } BtVal;
240
241 #define BT_maxkey       255             // maximum number of bytes in a key
242 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
243
244 //      The first part of an index page.
245 //      It is immediately followed
246 //      by the BtSlot array of keys.
247
248 //      note that this structure size
249 //      must be a multiple of 8 bytes
250 //      in order to place PageZero correctly.
251
252 typedef struct BtPage_ {
253         uint cnt;                                       // count of keys in page
254         uint act;                                       // count of active keys
255         uint min;                                       // next key offset
256         uint garbage;                           // page garbage in bytes
257         unsigned char bits:7;           // page size in bits
258         unsigned char free:1;           // page is on free chain
259         unsigned char lvl:7;            // level of page
260         unsigned char kill:1;           // page is being deleted
261         unsigned char right[BtId];      // page number to right
262         unsigned char left[BtId];       // page number to left
263         unsigned char filler[2];        // padding to multiple of 8
264         logseqno lsn;                           // log sequence number applied
265         uid page_no;                            // this page number
266 } *BtPage;
267
268 //  The loadpage interface object
269
270 typedef struct {
271         BtPage page;            // current page pointer
272         BtLatchSet *latch;      // current page latch set
273 } BtPageSet;
274
275 //      structure for latch manager on ALLOC_page
276
277 typedef struct {
278         struct BtPage_ alloc[1];                // next page_no in right ptr
279         unsigned char freechain[BtId];  // head of free page_nos chain
280         unsigned long long activepages; // number of active pages
281         uint redopages;                                 // number of redo pages in file
282 } BtPageZero;
283
284 //      The object structure for Btree access
285
286 typedef struct {
287         uint page_size;                         // page size    
288         uint page_bits;                         // page size in bits    
289 #ifdef unix
290         int idx;
291 #else
292         HANDLE idx;
293 #endif
294         BtPageZero *pagezero;           // mapped allocation page
295         BtHashEntry *hashtable;         // the buffer pool hash table entries
296         BtLatchSet *latchsets;          // mapped latch set from buffer pool
297         unsigned char *pagepool;        // mapped to the buffer pool pages
298         unsigned char *redobuff;        // mapped recovery buffer pointer
299         logseqno lsn, flushlsn;         // current & first lsn flushed
300         BtMutexLatch redo[1];           // redo area lite latch
301         BtMutexLatch lock[1];           // allocation area lite latch
302         BtMutexLatch maps[1];           // mapping segments lite latch
303         ushort thread_no[1];            // next thread number
304         uint nlatchpage;                        // number of latch pages at BT_latch
305         uint latchtotal;                        // number of page latch entries
306         uint latchhash;                         // number of latch hash table slots
307         uint latchvictim;                       // next latch entry to examine
308         uint latchpromote;                      // next latch entry to promote
309         uint redolast;                          // last msync size of recovery buff
310         uint redoend;                           // eof/end element in recovery buff
311         int err;                                        // last error
312         int line;                                       // last error line no
313         int found;                                      // number of keys found by delete
314         int reads, writes;                      // number of reads and writes
315 #ifndef unix
316         HANDLE halloc;                          // allocation handle
317         HANDLE hpool;                           // buffer pool handle
318 #endif
319         uint segments;                          // number of memory mapped segments
320         unsigned char *pages[64000];// memory mapped segments of b-tree
321 } BtMgr;
322
323 typedef struct {
324         BtMgr *mgr;                                     // buffer manager for entire process
325         BtMgr *main;                            // buffer manager for main btree
326         BtPage frame;                           // cached page frame for promote
327         BtPage cursor;                          // cached page frame for start/next
328         ushort thread_no;                       // thread number
329         unsigned char key[BT_keyarray]; // last found complete key
330 } BtDb;
331
332 //      atomic txn structures
333
334 typedef struct {
335         logseqno reqlsn;        // redo log seq no required
336         uint entry;                     // latch table entry number
337         uint slot:31;           // page slot number
338         uint reuse:1;           // reused previous page
339 } AtomicTxn;
340
341 //      Catastrophic errors
342
343 typedef enum {
344         BTERR_ok = 0,
345         BTERR_struct,
346         BTERR_ovflw,
347         BTERR_lock,
348         BTERR_map,
349         BTERR_read,
350         BTERR_wrt,
351         BTERR_atomic,
352         BTERR_recovery
353 } BTERR;
354
355 #define CLOCK_bit 0x8000
356
357 // recovery manager entry types
358
359 typedef enum {
360         BTRM_eof = 0,   // rest of buffer is emtpy
361         BTRM_add,               // add a unique key-value to btree
362         BTRM_dup,               // add a duplicate key-value to btree
363         BTRM_del,               // delete a key-value from btree
364         BTRM_upd,               // update a key with a new value
365         BTRM_new,               // allocate a new empty page
366         BTRM_old                // reuse an old empty page
367 } BTRM;
368
369 // recovery manager entry
370 //      structure followed by BtKey & BtVal
371
372 typedef struct {
373         logseqno reqlsn;        // log sequence number required
374         logseqno lsn;           // log sequence number for entry
375         uint len;                       // length of entry
376         unsigned char type;     // type of entry
377         unsigned char lvl;      // level of btree entry pertains to
378 } BtLogHdr;
379
380 // B-Tree functions
381
382 extern void bt_close (BtDb *bt);
383 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
384 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
385 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
386 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
387 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
388 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
389 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
390
391 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
392
393 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
394 extern uint bt_nextkey   (BtDb *bt, uint slot);
395 extern uint bt_prevkey (BtDb *db, uint slot);
396 extern uint bt_lastkey (BtDb *db);
397
398 //      manager functions
399 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
400 extern void bt_mgrclose (BtMgr *mgr);
401 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
402 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
403
404 //      atomic transaction functions
405 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
406 BTERR bt_txnpromote (BtDb *bt);
407
408 //  The page is allocated from low and hi ends.
409 //  The key slots are allocated from the bottom,
410 //      while the text and value of the key
411 //  are allocated from the top.  When the two
412 //  areas meet, the page is split into two.
413
414 //  A key consists of a length byte, two bytes of
415 //  index number (0 - 65535), and up to 253 bytes
416 //  of key value.
417
418 //  Associated with each key is a value byte string
419 //      containing any value desired.
420
421 //  The b-tree root is always located at page 1.
422 //      The first leaf page of level zero is always
423 //      located on page 2.
424
425 //      The b-tree pages are linked with next
426 //      pointers to facilitate enumerators,
427 //      and provide for concurrency.
428
429 //      When to root page fills, it is split in two and
430 //      the tree height is raised by a new root at page
431 //      one with two keys.
432
433 //      Deleted keys are marked with a dead bit until
434 //      page cleanup. The fence key for a leaf node is
435 //      always present
436
437 //  To achieve maximum concurrency one page is locked at a time
438 //  as the tree is traversed to find leaf key in question. The right
439 //  page numbers are used in cases where the page is being split,
440 //      or consolidated.
441
442 //  Page 0 is dedicated to lock for new page extensions,
443 //      and chains empty pages together for reuse. It also
444 //      contains the latch manager hash table.
445
446 //      The ParentModification lock on a node is obtained to serialize posting
447 //      or changing the fence key for a node.
448
449 //      Empty pages are chained together through the ALLOC page and reused.
450
451 //      Access macros to address slot and key values from the page
452 //      Page slots use 1 based indexing.
453
454 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
455 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
456 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
457
458 void bt_putid(unsigned char *dest, uid id)
459 {
460 int i = BtId;
461
462         while( i-- )
463                 dest[i] = (unsigned char)id, id >>= 8;
464 }
465
466 uid bt_getid(unsigned char *src)
467 {
468 uid id = 0;
469 int i;
470
471         for( i = 0; i < BtId; i++ )
472                 id <<= 8, id |= *src++; 
473
474         return id;
475 }
476
477 //      lite weight spin lock Latch Manager
478
479 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
480 {
481         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
482 }
483
484 void bt_mutexlock(BtMutexLatch *latch)
485 {
486 BtMutexLatch prev[1];
487 uint slept = 0;
488
489   while( 1 ) {
490         *prev->value = __sync_fetch_and_or(latch->value, XCL);
491
492         if( !*prev->bits->xlock ) {                     // did we set XCL?
493             if( slept )
494                   __sync_fetch_and_sub(latch->value, WRT);
495                 return;
496         }
497
498         if( !slept ) {
499                 *prev->bits->wrt += 1;
500                 __sync_fetch_and_add(latch->value, WRT);
501         }
502
503         sys_futex (latch->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr);
504         slept = 1;
505   }
506 }
507
508 //      try to obtain write lock
509
510 //      return 1 if obtained,
511 //              0 otherwise
512
513 int bt_mutextry(BtMutexLatch *latch)
514 {
515 BtMutexLatch prev[1];
516
517         *prev->value = __sync_fetch_and_or(latch->value, XCL);
518
519         //      take write access if exclusive bit was clear
520
521         return !*prev->bits->xlock;
522 }
523
524 //      clear write mode
525
526 void bt_releasemutex(BtMutexLatch *latch)
527 {
528 BtMutexLatch prev[1];
529
530         *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
531
532         if( *prev->bits->wrt )
533           sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
534 }
535
536 //      reentrant reader/writer lock implementation
537
538 void WriteLock (RWLock *lock, ushort tid)
539 {
540 uint waited = 0;
541 RWLock prev[1];
542
543   while( 1 ) {
544         bt_mutexlock(lock->xcl);
545         *prev = *lock;
546
547         //      is this a re-entrant request?
548
549         if( *prev->bits->tid == tid )
550           *prev->dup += 1;
551
552         //      wait if write already taken, or there are readers
553
554         else if( *prev->bits->tid || *prev->bits->readers ) {
555           if( !waited )
556                 waited++, *lock->waitwrite += 1;
557
558         //      otherwise, we can take the lock
559
560         } else {
561           if( waited )
562                 *lock->waitwrite -= 1;
563
564           *lock->bits->tid = tid;
565           *lock->phase = 0;                     // set writing phase
566         }
567
568         bt_releasemutex(lock->xcl);
569
570         if( *lock->bits->tid == tid )
571           return;
572
573         sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr );
574   }
575 }
576
577 void WriteRelease (RWLock *lock)
578 {
579         bt_mutexlock(lock->xcl);
580
581         //      were we reentrant?
582
583         if( *lock->dup ) {
584           *lock->dup -= 1;
585           bt_releasemutex(lock->xcl);
586           return;
587         }
588
589         //      release write lock and
590         //      set reading after write phase
591
592         *lock->bits->tid = 0;
593
594         //      were readers waiting for a write cycle?
595
596         if( *lock->waitread ) {
597           *lock->phase = 1;
598           sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 32768, NULL, NULL, QueRd );
599
600         //  otherwise were writers waiting
601
602         } else if( *lock->waitwrite ) {
603           *lock->phase = 0;
604           sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
605         }
606
607         bt_releasemutex(lock->xcl);
608 }
609
610 void ReadLock (RWLock *lock, ushort tid)
611 {
612 uint xit, waited = 0;
613 RWLock prev[1];
614
615   while( 1 ) {
616         bt_mutexlock(lock->xcl);
617         *prev = *lock;
618         xit = 0;
619
620         //      wait if a write lock is currenty active
621         //      or we are not in a new read cycle and
622         //      writers are waiting.
623
624         if( *prev->bits->tid || !*prev->phase && *prev->waitwrite ) {
625           if( !waited )
626                 waited++, *lock->waitread += 1;
627
628         //      else we can take the lock
629
630         } else {
631           if( waited )
632                 *lock->waitread -= 1;
633
634           *lock->bits->readers += 1;
635           xit = 1;
636         }
637
638         bt_releasemutex(lock->xcl);
639
640         //  did we increment readers?
641
642         if( xit )
643           return;
644
645         sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueRd );
646   }
647 }
648
649 void ReadRelease (RWLock *lock)
650 {
651 RWLock prev[1];
652
653         bt_mutexlock(lock->xcl);
654         *prev = *lock;
655
656         *prev->bits->readers = *lock->bits->readers -= 1;
657
658         if( !*lock->waitread && *lock->waitwrite )
659                 *prev->phase = *lock->phase = 0;        // stop accepting new readers
660
661         bt_releasemutex(lock->xcl);
662
663         //      were writers waiting for a read cycle to finish?
664
665         if( !*prev->phase && !*prev->bits->readers )
666          if( *prev->waitwrite )
667           sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
668 }
669
670 //      recovery manager -- flush dirty pages
671
672 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
673 {
674 uint cnt3 = 0, cnt2 = 0, cnt = 0;
675 uint entry, segment;
676 BtLatchSet *latch;
677 BtPage page;
678
679   //    flush dirty pool pages to the btree
680
681 fprintf(stderr, "Start flushlsn  ");
682   for( entry = 1; entry < mgr->latchtotal; entry++ ) {
683         page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
684         latch = mgr->latchsets + entry;
685         bt_mutexlock (latch->modify);
686         bt_lockpage(BtLockRead, latch, thread_no);
687
688         if( latch->dirty ) {
689           bt_writepage(mgr, page, latch->page_no);
690           latch->dirty = 0, cnt++;
691         }
692 if( latch->pin & ~CLOCK_bit )
693 cnt2++;
694         bt_unlockpage(BtLockRead, latch);
695         bt_releasemutex (latch->modify);
696   }
697 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
698 fprintf(stderr, "begin sync");
699         for( segment = 0; segment < mgr->segments; segment++ )
700           if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
701                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
702 fprintf(stderr, " end sync\n");
703 }
704
705 //      recovery manager -- process current recovery buff on startup
706 //      this won't do much if previous session was properly closed.
707
708 BTERR bt_recoveryredo (BtMgr *mgr)
709 {
710 BtLogHdr *hdr, *eof;
711 uint offset = 0;
712 BtKey *key;
713 BtVal *val;
714
715   hdr = (BtLogHdr *)mgr->redobuff;
716   mgr->flushlsn = hdr->lsn;
717
718   while( 1 ) {
719         hdr = (BtLogHdr *)(mgr->redobuff + offset);
720         switch( hdr->type ) {
721         case BTRM_eof:
722                 mgr->lsn = hdr->lsn;
723                 return 0;
724         case BTRM_add:          // add a unique key-value to btree
725         
726         case BTRM_dup:          // add a duplicate key-value to btree
727         case BTRM_del:          // delete a key-value from btree
728         case BTRM_upd:          // update a key with a new value
729         case BTRM_new:          // allocate a new empty page
730         case BTRM_old:          // reuse an old empty page
731                 return 0;
732         }
733   }
734 }
735
736 //      recovery manager -- append new entry to recovery log
737 //      flush dirty pages to disk when it overflows.
738
739 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
740 {
741 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
742 uint amt = sizeof(BtLogHdr);
743 BtLogHdr *hdr, *eof;
744 uint last, end;
745
746         bt_mutexlock (mgr->redo);
747
748         if( key )
749           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
750
751         //      see if new entry fits in buffer
752         //      flush and reset if it doesn't
753
754         if( amt > size - mgr->redoend ) {
755           mgr->flushlsn = mgr->lsn;
756           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
757                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
758           mgr->redolast = 0;
759           mgr->redoend = 0;
760           bt_flushlsn(mgr, thread_no);
761         }
762
763         //      fill in new entry & either eof or end block
764
765         hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
766
767         hdr->len = amt;
768         hdr->type = type;
769         hdr->lvl = lvl;
770         hdr->lsn = ++mgr->lsn;
771
772         mgr->redoend += amt;
773
774         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
775         memset (eof, 0, sizeof(BtLogHdr));
776
777         //  fill in key and value
778
779         if( key ) {
780           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
781           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
782         }
783
784         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
785         memset (eof, 0, sizeof(BtLogHdr));
786         eof->lsn = mgr->lsn;
787
788         last = mgr->redolast & ~0xfff;
789         end = mgr->redoend;
790
791         if( end - last + sizeof(BtLogHdr) >= 32768 )
792           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
793                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
794           else
795                 mgr->redolast = end;
796
797         bt_releasemutex(mgr->redo);
798         return hdr->lsn;
799 }
800
801 //      recovery manager -- append transaction to recovery log
802 //      flush dirty pages to disk when it overflows.
803
804 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
805 {
806 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
807 uint amt = 0, src, type;
808 BtLogHdr *hdr, *eof;
809 uint last, end;
810 logseqno lsn;
811 BtKey *key;
812 BtVal *val;
813
814         //      determine amount of redo recovery log space required
815
816         for( src = 0; src++ < source->cnt; ) {
817           key = keyptr(source,src);
818           val = valptr(source,src);
819           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
820           amt += sizeof(BtLogHdr);
821         }
822
823         bt_mutexlock (mgr->redo);
824
825         //      see if new entry fits in buffer
826         //      flush and reset if it doesn't
827
828         if( amt > size - mgr->redoend ) {
829           mgr->flushlsn = mgr->lsn;
830           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
831                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
832           mgr->redolast = 0;
833           mgr->redoend = 0;
834           bt_flushlsn (mgr, thread_no);
835         }
836
837         //      assign new lsn to transaction
838
839         lsn = ++mgr->lsn;
840
841         //      fill in new entries
842
843         for( src = 0; src++ < source->cnt; ) {
844           key = keyptr(source, src);
845           val = valptr(source, src);
846
847           switch( slotptr(source, src)->type ) {
848           case Unique:
849                 type = BTRM_add;
850                 break;
851           case Duplicate:
852                 type = BTRM_dup;
853                 break;
854           case Delete:
855                 type = BTRM_del;
856                 break;
857           }
858
859           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
860           amt += sizeof(BtLogHdr);
861
862           hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
863           hdr->len = amt;
864           hdr->type = type;
865           hdr->lsn = lsn;
866           hdr->lvl = 0;
867
868           //  fill in key and value
869
870           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
871           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
872
873           mgr->redoend += amt;
874         }
875
876         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
877         memset (eof, 0, sizeof(BtLogHdr));
878         eof->lsn = lsn;
879
880         last = mgr->redolast & ~0xfff;
881         end = mgr->redoend;
882
883         if( end - last + sizeof(BtLogHdr) >= 32768 )
884           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
885                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
886           else
887                 mgr->redolast = end;
888
889         bt_releasemutex(mgr->redo);
890         return lsn;
891 }
892
893 //      sync a single btree page to disk
894
895 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
896 {
897 uint segment = latch->page_no >> 16;
898 BtPage perm;
899
900         if( bt_writepage (mgr, page, latch->page_no) )
901                 return mgr->err;
902
903         perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
904
905         if( msync (perm, mgr->page_size, MS_SYNC) < 0 )
906           fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
907
908         latch->dirty = 0;
909         return 0;
910 }
911
912 //      read page into buffer pool from permanent location in Btree file
913
914 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
915 {
916 int flag = PROT_READ | PROT_WRITE;
917 uint segment = page_no >> 16;
918 BtPage perm;
919
920   while( 1 ) {
921         if( segment < mgr->segments ) {
922           perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
923
924           memcpy (page, perm, mgr->page_size);
925           mgr->reads++;
926           return 0;
927         }
928
929         bt_mutexlock (mgr->maps);
930
931         if( segment < mgr->segments ) {
932           bt_releasemutex (mgr->maps);
933           continue;
934         }
935
936         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
937         mgr->segments++;
938
939         bt_releasemutex (mgr->maps);
940   }
941 }
942
943 //      write page to permanent location in Btree file
944 //      clear the dirty bit
945
946 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
947 {
948 int flag = PROT_READ | PROT_WRITE;
949 uint segment = page_no >> 16;
950 BtPage perm;
951
952   while( 1 ) {
953         if( segment < mgr->segments ) {
954           perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
955
956           memcpy (perm, page, mgr->page_size);
957           mgr->writes++;
958           return 0;
959         }
960
961         bt_mutexlock (mgr->maps);
962
963         if( segment < mgr->segments ) {
964           bt_releasemutex (mgr->maps);
965           continue;
966         }
967
968         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
969         bt_releasemutex (mgr->maps);
970         mgr->segments++;
971   }
972 }
973
974 //      set CLOCK bit in latch
975 //      decrement pin count
976
977 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
978 {
979         bt_mutexlock(latch->modify);
980         latch->pin |= CLOCK_bit;
981         latch->pin--;
982
983         bt_releasemutex(latch->modify);
984 }
985
986 //  return the btree cached page address
987
988 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
989 {
990 uid entry = latch - mgr->latchsets;
991 BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
992
993   return page;
994 }
995
996 //  return next available latch entry
997 //        and with latch entry locked
998
999 uint bt_availnext (BtMgr *mgr)
1000 {
1001 BtLatchSet *latch;
1002 uint entry;
1003
1004   while( 1 ) {
1005 #ifdef unix
1006         entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
1007 #else
1008         entry = _InterlockedIncrement (&mgr->latchvictim);
1009 #endif
1010         entry %= mgr->latchtotal;
1011
1012         if( !entry )
1013                 continue;
1014
1015         latch = mgr->latchsets + entry;
1016
1017         if( !bt_mutextry(latch->modify) )
1018                 continue;
1019
1020         //  return this entry if it is not pinned
1021
1022         if( !latch->pin )
1023                 return entry;
1024
1025         //      if the CLOCK bit is set
1026         //      reset it to zero.
1027
1028         latch->pin &= ~CLOCK_bit;
1029         bt_releasemutex(latch->modify);
1030   }
1031 }
1032
1033 //      pin page in buffer pool
1034 //      return with latchset pinned
1035
1036 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
1037 {
1038 uint hashidx = page_no % mgr->latchhash;
1039 BtLatchSet *latch;
1040 uint entry, idx;
1041 BtPage page;
1042
1043   //  try to find our entry
1044
1045   bt_mutexlock(mgr->hashtable[hashidx].latch);
1046
1047   if( entry = mgr->hashtable[hashidx].entry ) do
1048   {
1049         latch = mgr->latchsets + entry;
1050         if( page_no == latch->page_no )
1051                 break;
1052   } while( entry = latch->next );
1053
1054   //  found our entry: increment pin
1055
1056   if( entry ) {
1057         latch = mgr->latchsets + entry;
1058         bt_mutexlock(latch->modify);
1059         latch->pin |= CLOCK_bit;
1060         latch->pin++;
1061
1062         bt_releasemutex(latch->modify);
1063         bt_releasemutex(mgr->hashtable[hashidx].latch);
1064         return latch;
1065   }
1066
1067   //  find and reuse unpinned entry
1068
1069 trynext:
1070
1071   entry = bt_availnext (mgr);
1072   latch = mgr->latchsets + entry;
1073
1074   idx = latch->page_no % mgr->latchhash;
1075
1076   //  if latch is on a different hash chain
1077   //    unlink from the old page_no chain
1078
1079   if( latch->page_no )
1080    if( idx != hashidx ) {
1081
1082         //  skip over this entry if latch not available
1083
1084     if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1085           bt_releasemutex(latch->modify);
1086           goto trynext;
1087         }
1088
1089         if( latch->prev )
1090           mgr->latchsets[latch->prev].next = latch->next;
1091         else
1092           mgr->hashtable[idx].entry = latch->next;
1093
1094         if( latch->next )
1095           mgr->latchsets[latch->next].prev = latch->prev;
1096
1097     bt_releasemutex (mgr->hashtable[idx].latch);
1098    }
1099
1100   page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1101
1102   //  update permanent page area in btree from buffer pool
1103   //    no read-lock is required since page is not pinned.
1104
1105   if( latch->dirty )
1106         if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
1107           return mgr->line = __LINE__, NULL;
1108         else
1109           latch->dirty = 0;
1110
1111   if( contents ) {
1112         memcpy (page, contents, mgr->page_size);
1113         latch->dirty = 1;
1114   } else if( bt_readpage (mgr, page, page_no) )
1115         return mgr->line = __LINE__, NULL;
1116
1117   //  link page as head of hash table chain
1118   //  if this is a never before used entry,
1119   //  or it was previously on a different
1120   //  hash table chain. Otherwise, just
1121   //  leave it in its current hash table
1122   //  chain position.
1123
1124   if( !latch->page_no || hashidx != idx ) {
1125         if( latch->next = mgr->hashtable[hashidx].entry )
1126           mgr->latchsets[latch->next].prev = entry;
1127
1128         mgr->hashtable[hashidx].entry = entry;
1129     latch->prev = 0;
1130   }
1131
1132   //  fill in latch structure
1133
1134   latch->pin = CLOCK_bit | 1;
1135   latch->page_no = page_no;
1136   latch->split = 0;
1137
1138   bt_releasemutex (latch->modify);
1139   bt_releasemutex (mgr->hashtable[hashidx].latch);
1140   return latch;
1141 }
1142   
1143 void bt_mgrclose (BtMgr *mgr)
1144 {
1145 BtLatchSet *latch;
1146 BtLogHdr *eof;
1147 uint num = 0;
1148 BtPage page;
1149 uint slot;
1150
1151         //      flush previously written dirty pages
1152         //      and write recovery buffer to disk
1153
1154         fdatasync (mgr->idx);
1155
1156         if( mgr->redoend ) {
1157                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1158                 memset (eof, 0, sizeof(BtLogHdr));
1159         }
1160
1161         //      write remaining dirty pool pages to the btree
1162
1163         for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1164                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1165                 latch = mgr->latchsets + slot;
1166
1167                 if( latch->dirty ) {
1168                         bt_writepage(mgr, page, latch->page_no);
1169                         latch->dirty = 0, num++;
1170                 }
1171         }
1172
1173         //      clear redo recovery buffer on disk.
1174
1175         if( mgr->pagezero->redopages ) {
1176                 eof = (BtLogHdr *)mgr->redobuff;
1177                 memset (eof, 0, sizeof(BtLogHdr));
1178                 eof->lsn = mgr->lsn;
1179                 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1180                   fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1181         }
1182
1183         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1184
1185 #ifdef unix
1186         while( mgr->segments )
1187                 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1188
1189         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1190         munmap (mgr->pagezero, mgr->page_size);
1191 #else
1192         FlushViewOfFile(mgr->pagezero, 0);
1193         UnmapViewOfFile(mgr->pagezero);
1194         UnmapViewOfFile(mgr->pagepool);
1195         CloseHandle(mgr->halloc);
1196         CloseHandle(mgr->hpool);
1197 #endif
1198 #ifdef unix
1199         close (mgr->idx);
1200         free (mgr);
1201 #else
1202         VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1203         FlushFileBuffers(mgr->idx);
1204         CloseHandle(mgr->idx);
1205         GlobalFree (mgr);
1206 #endif
1207 }
1208
1209 //      close and release memory
1210
1211 void bt_close (BtDb *bt)
1212 {
1213 #ifdef unix
1214         if( bt->frame )
1215                 free (bt->frame);
1216         if( bt->cursor )
1217                 free (bt->cursor);
1218 #else
1219         if( bt->frame)
1220                 VirtualFree (bt->frame, 0, MEM_RELEASE);
1221         if( bt->cursor)
1222                 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1223 #endif
1224         free (bt);
1225 }
1226
1227 //  open/create new btree buffer manager
1228
1229 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1230 //              size of page pool (e.g. 262144)
1231
1232 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1233 {
1234 uint lvl, attr, last, slot, idx;
1235 uint nlatchpage, latchhash;
1236 unsigned char value[BtId];
1237 int flag, initit = 0;
1238 BtPageZero *pagezero;
1239 BtLatchSet *latch;
1240 off64_t size;
1241 uint amt[1];
1242 BtMgr* mgr;
1243 BtKey* key;
1244 BtVal *val;
1245
1246         // determine sanity of page size and buffer pool
1247
1248         if( bits > BT_maxbits )
1249                 bits = BT_maxbits;
1250         else if( bits < BT_minbits )
1251                 bits = BT_minbits;
1252 #ifdef unix
1253         mgr = calloc (1, sizeof(BtMgr));
1254
1255         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1256
1257         if( mgr->idx == -1 ) {
1258                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1259                 return free(mgr), NULL;
1260         }
1261 #else
1262         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1263         attr = FILE_ATTRIBUTE_NORMAL;
1264         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1265
1266         if( mgr->idx == INVALID_HANDLE_VALUE ) {
1267                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1268                 return GlobalFree(mgr), NULL;
1269         }
1270 #endif
1271
1272 #ifdef unix
1273         pagezero = valloc (BT_maxpage);
1274         *amt = 0;
1275
1276         // read minimum page size to get root info
1277         //      to support raw disk partition files
1278         //      check if bits == 0 on the disk.
1279
1280         if( size = lseek (mgr->idx, 0L, 2) )
1281                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1282                         if( pagezero->alloc->bits )
1283                                 bits = pagezero->alloc->bits;
1284                         else
1285                                 initit = 1;
1286                 else
1287                         return free(mgr), free(pagezero), NULL;
1288         else
1289                 initit = 1;
1290 #else
1291         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1292         size = GetFileSize(mgr->idx, amt);
1293
1294         if( size || *amt ) {
1295                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1296                         return bt_mgrclose (mgr), NULL;
1297                 bits = pagezero->alloc->bits;
1298         } else
1299                 initit = 1;
1300 #endif
1301
1302         mgr->page_size = 1 << bits;
1303         mgr->page_bits = bits;
1304
1305         //  calculate number of latch hash table entries
1306
1307         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1308
1309         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1310         mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1311         mgr->latchtotal = nodemax;
1312
1313         if( !initit )
1314                 goto mgrlatch;
1315
1316         // initialize an empty b-tree with latch page, root page, page of leaves
1317         // and page(s) of latches and page pool cache
1318
1319         memset (pagezero, 0, 1 << bits);
1320         pagezero->alloc->lvl = MIN_lvl - 1;
1321         pagezero->alloc->bits = mgr->page_bits;
1322         pagezero->redopages = redopages;
1323
1324         bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1);
1325         pagezero->activepages = 2;
1326
1327         //  initialize left-most LEAF page in
1328         //      alloc->left and count of active leaf pages.
1329
1330         bt_putid (pagezero->alloc->left, LEAF_page);
1331         ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits);
1332
1333         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1334                 fprintf (stderr, "Unable to create btree page zero\n");
1335                 return bt_mgrclose (mgr), NULL;
1336         }
1337
1338         memset (pagezero, 0, 1 << bits);
1339         pagezero->alloc->bits = mgr->page_bits;
1340
1341         for( lvl=MIN_lvl; lvl--; ) {
1342         BtSlot *node = slotptr(pagezero->alloc, 1);
1343                 node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1344                 key = keyptr(pagezero->alloc, 1);
1345                 key->len = 2;           // create stopper key
1346                 key->key[0] = 0xff;
1347                 key->key[1] = 0xff;
1348
1349                 bt_putid(value, MIN_lvl - lvl + 1);
1350                 val = valptr(pagezero->alloc, 1);
1351                 val->len = lvl ? BtId : 0;
1352                 memcpy (val->value, value, val->len);
1353
1354                 pagezero->alloc->min = node->off;
1355                 pagezero->alloc->lvl = lvl;
1356                 pagezero->alloc->cnt = 1;
1357                 pagezero->alloc->act = 1;
1358                 pagezero->alloc->page_no = MIN_lvl - lvl;
1359
1360                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1361                         fprintf (stderr, "Unable to create btree page\n");
1362                         return bt_mgrclose (mgr), NULL;
1363                 }
1364         }
1365
1366 mgrlatch:
1367 #ifdef unix
1368         free (pagezero);
1369 #else
1370         VirtualFree (pagezero, 0, MEM_RELEASE);
1371 #endif
1372 #ifdef unix
1373         // mlock the first segment of 64K pages
1374
1375         flag = PROT_READ | PROT_WRITE;
1376         mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1377         mgr->segments = 1;
1378
1379         if( mgr->pages[0] == MAP_FAILED ) {
1380                 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1381                 return bt_mgrclose (mgr), NULL;
1382         }
1383
1384         mgr->pagezero = (BtPageZero *)mgr->pages[0];
1385         mlock (mgr->pagezero, mgr->page_size);
1386
1387         mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size;
1388         mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1389
1390         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1391         if( mgr->pagepool == MAP_FAILED ) {
1392                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1393                 return bt_mgrclose (mgr), NULL;
1394         }
1395 #else
1396         flag = PAGE_READWRITE;
1397         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1398         if( !mgr->halloc ) {
1399                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1400                 return bt_mgrclose (mgr), NULL;
1401         }
1402
1403         flag = FILE_MAP_WRITE;
1404         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1405         if( !mgr->pagezero ) {
1406                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1407                 return bt_mgrclose (mgr), NULL;
1408         }
1409
1410         flag = PAGE_READWRITE;
1411         size = (uid)mgr->nlatchpage << mgr->page_bits;
1412         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1413         if( !mgr->hpool ) {
1414                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1415                 return bt_mgrclose (mgr), NULL;
1416         }
1417
1418         flag = FILE_MAP_WRITE;
1419         mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1420         if( !mgr->pagepool ) {
1421                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1422                 return bt_mgrclose (mgr), NULL;
1423         }
1424 #endif
1425
1426         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1427         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1428         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1429
1430         return mgr;
1431 }
1432
1433 //      open BTree access method
1434 //      based on buffer manager
1435
1436 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1437 {
1438 BtDb *bt = malloc (sizeof(*bt));
1439
1440         memset (bt, 0, sizeof(*bt));
1441         bt->main = main;
1442         bt->mgr = mgr;
1443 #ifdef unix
1444         bt->cursor = valloc (mgr->page_size);
1445         bt->frame = valloc (mgr->page_size);
1446 #else
1447         bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1448         bt->frame = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1449 #endif
1450 #ifdef unix
1451         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1452 #else
1453         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1454 #endif
1455         return bt;
1456 }
1457
1458 //  compare two keys, return > 0, = 0, or < 0
1459 //  =0: keys are same
1460 //  -1: key2 > key1
1461 //  +1: key2 < key1
1462 //  as the comparison value
1463
1464 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1465 {
1466 uint len1 = key1->len;
1467 int ans;
1468
1469         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1470                 return ans;
1471
1472         if( len1 > len2 )
1473                 return 1;
1474         if( len1 < len2 )
1475                 return -1;
1476
1477         return 0;
1478 }
1479
1480 // place write, read, or parent lock on requested page_no.
1481
1482 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1483 {
1484         switch( mode ) {
1485         case BtLockRead:
1486                 ReadLock (latch->readwr, thread_no);
1487                 break;
1488         case BtLockWrite:
1489                 WriteLock (latch->readwr, thread_no);
1490                 break;
1491         case BtLockAccess:
1492                 ReadLock (latch->access, thread_no);
1493                 break;
1494         case BtLockDelete:
1495                 WriteLock (latch->access, thread_no);
1496                 break;
1497         case BtLockParent:
1498                 WriteLock (latch->parent, thread_no);
1499                 break;
1500         case BtLockAtomic:
1501                 WriteLock (latch->atomic, thread_no);
1502                 break;
1503         case BtLockAtomic | BtLockRead:
1504                 WriteLock (latch->atomic, thread_no);
1505                 ReadLock (latch->readwr, thread_no);
1506                 break;
1507         case BtLockAtomic | BtLockWrite:
1508                 WriteLock (latch->atomic, thread_no);
1509                 WriteLock (latch->readwr, thread_no);
1510                 break;
1511         case BtLockLink:
1512                 WriteLock (latch->link, thread_no);
1513                 break;
1514         }
1515 }
1516
1517 // remove write, read, or parent lock on requested page
1518
1519 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1520 {
1521         switch( mode ) {
1522         case BtLockRead:
1523                 ReadRelease (latch->readwr);
1524                 break;
1525         case BtLockWrite:
1526                 WriteRelease (latch->readwr);
1527                 break;
1528         case BtLockAccess:
1529                 ReadRelease (latch->access);
1530                 break;
1531         case BtLockDelete:
1532                 WriteRelease (latch->access);
1533                 break;
1534         case BtLockParent:
1535                 WriteRelease (latch->parent);
1536                 break;
1537         case BtLockAtomic:
1538                 WriteRelease (latch->atomic);
1539                 break;
1540         case BtLockAtomic | BtLockRead:
1541                 WriteRelease (latch->atomic);
1542                 ReadRelease (latch->readwr);
1543                 break;
1544         case BtLockAtomic | BtLockWrite:
1545                 WriteRelease (latch->atomic);
1546                 WriteRelease (latch->readwr);
1547                 break;
1548         case BtLockLink:
1549                 WriteRelease (latch->link);
1550                 break;
1551         }
1552 }
1553
1554 //      allocate a new page
1555 //      return with page latched, but unlocked.
1556
1557 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1558 {
1559 uid page_no;
1560 int blk;
1561
1562         //      lock allocation page
1563
1564         bt_mutexlock(mgr->lock);
1565
1566         // use empty chain first
1567         // else allocate new page
1568
1569         if( page_no = bt_getid(mgr->pagezero->freechain) ) {
1570                 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1571                         set->page = bt_mappage (mgr, set->latch);
1572                 else
1573                         return mgr->line = __LINE__, mgr->err = BTERR_struct;
1574
1575                 mgr->pagezero->activepages++;
1576                 bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
1577
1578                 //  the page is currently free and this
1579                 //  will keep bt_txnpromote out.
1580
1581                 //      contents will replace this bit
1582                 //  and pin will keep bt_txnpromote out
1583
1584                 contents->page_no = page_no;
1585                 set->latch->dirty = 1;
1586
1587                 memcpy (set->page, contents, mgr->page_size);
1588
1589 //              if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1590 //                fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1591
1592                 bt_releasemutex(mgr->lock);
1593                 return 0;
1594         }
1595
1596         page_no = bt_getid(mgr->pagezero->alloc->right);
1597         bt_putid(mgr->pagezero->alloc->right, page_no+1);
1598
1599         // unlock allocation latch and
1600         //      extend file into new page.
1601
1602         mgr->pagezero->activepages++;
1603 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1604 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1605         bt_releasemutex(mgr->lock);
1606
1607         //      keep bt_txnpromote out of this page
1608
1609         contents->free = 1;
1610         contents->page_no = page_no;
1611         pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
1612
1613         //      don't load cache from btree page, load it from contents
1614
1615         if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1616                 set->page = bt_mappage (mgr, set->latch);
1617         else
1618                 return mgr->err;
1619
1620         // now pin will keep bt_txnpromote out
1621
1622         set->page->free = 0;
1623         return 0;
1624 }
1625
1626 //  find slot in page for given key at a given level
1627
1628 int bt_findslot (BtPage page, unsigned char *key, uint len)
1629 {
1630 uint diff, higher = page->cnt, low = 1, slot;
1631 uint good = 0;
1632
1633         //        make stopper key an infinite fence value
1634
1635         if( bt_getid (page->right) )
1636                 higher++;
1637         else
1638                 good++;
1639
1640         //      low is the lowest candidate.
1641         //  loop ends when they meet
1642
1643         //  higher is already
1644         //      tested as .ge. the passed key.
1645
1646         while( diff = higher - low ) {
1647                 slot = low + ( diff >> 1 );
1648                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1649                         low = slot + 1;
1650                 else
1651                         higher = slot, good++;
1652         }
1653
1654         //      return zero if key is on right link page
1655
1656         return good ? higher : 0;
1657 }
1658
1659 //  find and load page at given level for given key
1660 //      leave page rd or wr locked as requested
1661
1662 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1663 {
1664 uid page_no = ROOT_page, prevpage_no = 0;
1665 uint drill = 0xff, slot;
1666 BtLatchSet *prevlatch;
1667 uint mode, prevmode;
1668 BtPage prevpage;
1669 BtVal *val;
1670
1671   //  start at root of btree and drill down
1672
1673   do {
1674         // determine lock mode of drill level
1675         mode = (drill == lvl) ? lock : BtLockRead; 
1676
1677         if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1678           return 0;
1679
1680         // obtain access lock using lock chaining with Access mode
1681
1682         if( page_no > ROOT_page )
1683           bt_lockpage(BtLockAccess, set->latch, thread_no);
1684
1685         set->page = bt_mappage (mgr, set->latch);
1686
1687         //      release & unpin parent or left sibling page
1688
1689         if( prevpage_no ) {
1690           bt_unlockpage(prevmode, prevlatch);
1691           bt_unpinlatch (mgr, prevlatch);
1692           prevpage_no = 0;
1693         }
1694
1695         // obtain mode lock using lock chaining through AccessLock
1696
1697         bt_lockpage(mode, set->latch, thread_no);
1698
1699         if( set->page->free )
1700                 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1701
1702         if( page_no > ROOT_page )
1703           bt_unlockpage(BtLockAccess, set->latch);
1704
1705         // re-read and re-lock root after determining actual level of root
1706
1707         if( set->page->lvl != drill) {
1708                 if( set->latch->page_no != ROOT_page )
1709                         return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1710                         
1711                 drill = set->page->lvl;
1712
1713                 if( lock != BtLockRead && drill == lvl ) {
1714                   bt_unlockpage(mode, set->latch);
1715                   bt_unpinlatch (mgr, set->latch);
1716                   continue;
1717                 }
1718         }
1719
1720         prevpage_no = set->latch->page_no;
1721         prevlatch = set->latch;
1722         prevpage = set->page;
1723         prevmode = mode;
1724
1725         //  find key on page at this level
1726         //  and descend to requested level
1727
1728         if( !set->page->kill )
1729          if( slot = bt_findslot (set->page, key, len) ) {
1730           if( drill == lvl )
1731                 return slot;
1732
1733           // find next non-dead slot -- the fence key if nothing else
1734
1735           while( slotptr(set->page, slot)->dead )
1736                 if( slot++ < set->page->cnt )
1737                   continue;
1738                 else
1739                   return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1740
1741           val = valptr(set->page, slot);
1742
1743           if( val->len == BtId )
1744                 page_no = bt_getid(valptr(set->page, slot)->value);
1745           else
1746                 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1747
1748           drill--;
1749           continue;
1750          }
1751
1752         //  slide right into next page
1753
1754         page_no = bt_getid(set->page->right);
1755   } while( page_no );
1756
1757   // return error on end of right chain
1758
1759   mgr->line = __LINE__, mgr->err = BTERR_struct;
1760   return 0;     // return error
1761 }
1762
1763 //      return page to free list
1764 //      page must be delete & write locked
1765 //      and have no keys pointing to it.
1766
1767 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1768 {
1769         //      lock allocation page
1770
1771         bt_mutexlock (mgr->lock);
1772
1773         //      store chain
1774
1775         memcpy(set->page->right, mgr->pagezero->freechain, BtId);
1776         bt_putid(mgr->pagezero->freechain, set->latch->page_no);
1777         set->latch->dirty = 1;
1778         set->page->free = 1;
1779
1780         // decrement active page count
1781
1782         mgr->pagezero->activepages--;
1783
1784 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1785 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1786
1787         // unlock released page
1788         // and unlock allocation page
1789
1790         bt_unlockpage (BtLockDelete, set->latch);
1791         bt_unlockpage (BtLockWrite, set->latch);
1792         bt_unpinlatch (mgr, set->latch);
1793         bt_releasemutex (mgr->lock);
1794 }
1795
1796 //      a fence key was deleted from a page
1797 //      push new fence value upwards
1798
1799 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1800 {
1801 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1802 unsigned char value[BtId];
1803 BtKey* ptr;
1804 uint idx;
1805
1806         //      remove the old fence value
1807
1808         ptr = keyptr(set->page, set->page->cnt);
1809         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1810         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1811         set->latch->dirty = 1;
1812
1813         //  cache new fence value
1814
1815         ptr = keyptr(set->page, set->page->cnt);
1816         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1817
1818         bt_lockpage (BtLockParent, set->latch, thread_no);
1819         bt_unlockpage (BtLockWrite, set->latch);
1820
1821         //      insert new (now smaller) fence key
1822
1823         bt_putid (value, set->latch->page_no);
1824         ptr = (BtKey*)leftkey;
1825
1826         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1827           return mgr->err;
1828
1829         //      now delete old fence key
1830
1831         ptr = (BtKey*)rightkey;
1832
1833         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1834                 return mgr->err;
1835
1836         bt_unlockpage (BtLockParent, set->latch);
1837         bt_unpinlatch(mgr, set->latch);
1838         return 0;
1839 }
1840
1841 //      root has a single child
1842 //      collapse a level from the tree
1843
1844 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1845 {
1846 BtPageSet child[1];
1847 uid page_no;
1848 BtVal *val;
1849 uint idx;
1850
1851   // find the child entry and promote as new root contents
1852
1853   do {
1854         for( idx = 0; idx++ < root->page->cnt; )
1855           if( !slotptr(root->page, idx)->dead )
1856                 break;
1857
1858         val = valptr(root->page, idx);
1859
1860         if( val->len == BtId )
1861                 page_no = bt_getid (valptr(root->page, idx)->value);
1862         else
1863                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1864
1865         if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1866                 child->page = bt_mappage (mgr, child->latch);
1867         else
1868                 return mgr->err;
1869
1870         bt_lockpage (BtLockDelete, child->latch, thread_no);
1871         bt_lockpage (BtLockWrite, child->latch, thread_no);
1872
1873         memcpy (root->page, child->page, mgr->page_size);
1874         root->latch->dirty = 1;
1875
1876         bt_freepage (mgr, child);
1877
1878   } while( root->page->lvl > 1 && root->page->act == 1 );
1879
1880   bt_unlockpage (BtLockWrite, root->latch);
1881   bt_unpinlatch (mgr, root->latch);
1882   return 0;
1883 }
1884
1885 //  delete a page and manage keys
1886 //  call with page writelocked
1887
1888 //      returns the right page pool entry for freeing
1889 //      or zero on error.
1890
1891 uint bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, BtLock mode)
1892 {
1893 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1894 unsigned char value[BtId];
1895 uint lvl = set->page->lvl;
1896 BtPageSet right[1];
1897 uid page_no;
1898 BtKey *ptr;
1899
1900         //      cache copy of fence key
1901         //      to remove in parent
1902
1903         ptr = keyptr(set->page, set->page->cnt);
1904         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1905
1906         //      obtain lock on right page
1907
1908         page_no = bt_getid(set->page->right);
1909
1910         if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1911                 right->page = bt_mappage (mgr, right->latch);
1912         else
1913                 return 0;
1914
1915         bt_lockpage (mode, right->latch, thread_no);
1916
1917         // cache copy of key to update
1918
1919         ptr = keyptr(right->page, right->page->cnt);
1920         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1921
1922         if( right->page->kill )
1923                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1924
1925         // pull contents of right peer into our empty page
1926
1927         bt_lockpage (BtLockLink, set->latch, thread_no);
1928         memcpy (right->page->left, set->page->left, BtId);
1929         memcpy (set->page, right->page, mgr->page_size);
1930         set->page->page_no = set->latch->page_no;
1931         set->latch->dirty = 1;
1932         bt_unlockpage (BtLockLink, set->latch);
1933
1934         // mark right page deleted and point it to left page
1935         //      until we can post parent updates that remove access
1936         //      to the deleted page.
1937
1938         bt_putid (right->page->right, set->latch->page_no);
1939         right->latch->dirty = 1;
1940         right->page->kill = 1;
1941
1942         bt_lockpage (BtLockParent, right->latch, thread_no);
1943         bt_unlockpage (mode, right->latch);
1944
1945         bt_lockpage (BtLockParent, set->latch, thread_no);
1946         bt_unlockpage (BtLockWrite, set->latch);
1947
1948         // redirect higher key directly to our new node contents
1949
1950         bt_putid (value, set->latch->page_no);
1951         ptr = (BtKey*)higherfence;
1952
1953         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1954           return 0;
1955
1956         //      delete old lower key to our node
1957
1958         ptr = (BtKey*)lowerfence;
1959
1960         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1961           return 0;
1962
1963         bt_unlockpage (BtLockParent, set->latch);
1964         return right->latch - mgr->latchsets;
1965 }
1966
1967 //  find and delete key on page by marking delete flag bit
1968 //  if page becomes empty, delete it from the btree
1969
1970 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
1971 {
1972 uint slot, idx, found, fence, entry;
1973 BtPageSet set[1], right[1];
1974 BtSlot *node;
1975 BtKey *ptr;
1976 BtVal *val;
1977
1978         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
1979                 node = slotptr(set->page, slot);
1980                 ptr = keyptr(set->page, slot);
1981         } else
1982                 return mgr->err;
1983
1984         // if librarian slot, advance to real slot
1985
1986         if( node->type == Librarian ) {
1987                 ptr = keyptr(set->page, ++slot);
1988                 node = slotptr(set->page, slot);
1989         }
1990
1991         fence = slot == set->page->cnt;
1992
1993         // delete the key, ignore request if already dead
1994
1995         if( found = !keycmp (ptr, key, len) )
1996           if( found = node->dead == 0 ) {
1997                 val = valptr(set->page,slot);
1998                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1999                 set->page->act--;
2000
2001                 //  mark node type as delete
2002
2003                 node->type = Delete;
2004                 node->dead = 1;
2005
2006                 // collapse empty slots beneath the fence
2007                 // on interiour nodes
2008
2009                 if( lvl )
2010                  while( idx = set->page->cnt - 1 )
2011                   if( slotptr(set->page, idx)->dead ) {
2012                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2013                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2014                   } else
2015                         break;
2016           }
2017
2018         if( !found )
2019                 return 0;
2020
2021         //      did we delete a fence key in an upper level?
2022
2023         if( lvl && set->page->act && fence )
2024           if( bt_fixfence (mgr, set, lvl, thread_no) )
2025                 return mgr->err;
2026           else
2027                 return 0;
2028
2029         //      do we need to collapse root?
2030
2031         if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2032           if( bt_collapseroot (mgr, set, thread_no) )
2033                 return mgr->err;
2034           else
2035                 return 0;
2036
2037         //      delete empty page
2038
2039         if( !set->page->act ) {
2040           if( entry = bt_deletepage (mgr, set, thread_no, BtLockWrite) )
2041                 right->latch = mgr->latchsets + entry;
2042           else
2043                  return mgr->err;
2044
2045           //    obtain delete and write locks to right node
2046
2047           bt_unlockpage (BtLockParent, right->latch);
2048           right->page = bt_mappage (mgr, right->latch);
2049           bt_lockpage (BtLockDelete, right->latch, thread_no);
2050           bt_lockpage (BtLockWrite, right->latch, thread_no);
2051           bt_freepage (mgr, right);
2052
2053           bt_unpinlatch (mgr, set->latch);
2054           return 0;
2055         }
2056
2057         set->latch->dirty = 1;
2058         bt_unlockpage(BtLockWrite, set->latch);
2059         bt_unpinlatch (mgr, set->latch);
2060         return 0;
2061 }
2062
2063 //      advance to next slot
2064
2065 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2066 {
2067 BtLatchSet *prevlatch;
2068 uid page_no;
2069
2070         if( slot < set->page->cnt )
2071                 return slot + 1;
2072
2073         prevlatch = set->latch;
2074
2075         if( page_no = bt_getid(set->page->right) )
2076           if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2077                 set->page = bt_mappage (bt->mgr, set->latch);
2078           else
2079                 return 0;
2080         else
2081           return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2082
2083         // obtain access lock using lock chaining with Access mode
2084
2085         bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2086
2087         bt_unlockpage(BtLockRead, prevlatch);
2088         bt_unpinlatch (bt->mgr, prevlatch);
2089
2090         bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2091         bt_unlockpage(BtLockAccess, set->latch);
2092         return 1;
2093 }
2094
2095 //      find unique key == given key, or first duplicate key in
2096 //      leaf level and return number of value bytes
2097 //      or (-1) if not found.
2098
2099 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2100 {
2101 BtPageSet set[1];
2102 uint len, slot;
2103 int ret = -1;
2104 BtKey *ptr;
2105 BtVal *val;
2106
2107   if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2108    do {
2109         ptr = keyptr(set->page, slot);
2110
2111         //      skip librarian slot place holder
2112
2113         if( slotptr(set->page, slot)->type == Librarian )
2114                 ptr = keyptr(set->page, ++slot);
2115
2116         //      return actual key found
2117
2118         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2119         len = ptr->len;
2120
2121         if( slotptr(set->page, slot)->type == Duplicate )
2122                 len -= BtId;
2123
2124         //      not there if we reach the stopper key
2125
2126         if( slot == set->page->cnt )
2127           if( !bt_getid (set->page->right) )
2128                 break;
2129
2130         // if key exists, return >= 0 value bytes copied
2131         //      otherwise return (-1)
2132
2133         if( slotptr(set->page, slot)->dead )
2134                 continue;
2135
2136         if( keylen == len )
2137           if( !memcmp (ptr->key, key, len) ) {
2138                 val = valptr (set->page,slot);
2139                 if( valmax > val->len )
2140                         valmax = val->len;
2141                 memcpy (value, val->value, valmax);
2142                 ret = valmax;
2143           }
2144
2145         break;
2146
2147    } while( slot = bt_findnext (bt, set, slot) );
2148
2149   bt_unlockpage (BtLockRead, set->latch);
2150   bt_unpinlatch (bt->mgr, set->latch);
2151   return ret;
2152 }
2153
2154 //      check page for space available,
2155 //      clean if necessary and return
2156 //      0 - page needs splitting
2157 //      >0  new slot value
2158
2159 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2160 {
2161 BtPage page = set->page, frame;
2162 uint nxt = mgr->page_size;
2163 uint cnt = 0, idx = 0;
2164 uint max = page->cnt;
2165 uint newslot = max;
2166 BtKey *key;
2167 BtVal *val;
2168
2169         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2170                 return slot;
2171
2172         //      skip cleanup and proceed to split
2173         //      if there's not enough garbage
2174         //      to bother with.
2175
2176         if( page->garbage < nxt / 5 )
2177                 return 0;
2178
2179         frame = malloc (mgr->page_size);
2180         memcpy (frame, page, mgr->page_size);
2181
2182         // skip page info and set rest of page to zero
2183
2184         memset (page+1, 0, mgr->page_size - sizeof(*page));
2185         set->latch->dirty = 1;
2186
2187         page->garbage = 0;
2188         page->act = 0;
2189
2190         // clean up page first by
2191         // removing deleted keys
2192
2193         while( cnt++ < max ) {
2194                 if( cnt == slot )
2195                         newslot = idx + 2;
2196
2197                 if( cnt < max || frame->lvl )
2198                   if( slotptr(frame,cnt)->dead )
2199                         continue;
2200
2201                 // copy the value across
2202
2203                 val = valptr(frame, cnt);
2204                 nxt -= val->len + sizeof(BtVal);
2205                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2206
2207                 // copy the key across
2208
2209                 key = keyptr(frame, cnt);
2210                 nxt -= key->len + sizeof(BtKey);
2211                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2212
2213                 // make a librarian slot
2214
2215                 slotptr(page, ++idx)->off = nxt;
2216                 slotptr(page, idx)->type = Librarian;
2217                 slotptr(page, idx)->dead = 1;
2218
2219                 // set up the slot
2220
2221                 slotptr(page, ++idx)->off = nxt;
2222                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2223
2224                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2225                         page->act++;
2226         }
2227
2228         page->min = nxt;
2229         page->cnt = idx;
2230         free (frame);
2231
2232         //      see if page has enough space now, or does it need splitting?
2233
2234         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2235                 return newslot;
2236
2237         return 0;
2238 }
2239
2240 // split the root and raise the height of the btree
2241
2242 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2243 {  
2244 unsigned char leftkey[BT_keyarray];
2245 uint nxt = mgr->page_size;
2246 unsigned char value[BtId];
2247 BtPageSet left[1];
2248 uid left_page_no;
2249 BtPage frame;
2250 BtKey *ptr;
2251 BtVal *val;
2252
2253         frame = malloc (mgr->page_size);
2254         memcpy (frame, root->page, mgr->page_size);
2255
2256         //      save left page fence key for new root
2257
2258         ptr = keyptr(root->page, root->page->cnt);
2259         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2260
2261         //  Obtain an empty page to use, and copy the current
2262         //  root contents into it, e.g. lower keys
2263
2264         if( bt_newpage(mgr, left, frame, page_no) )
2265                 return mgr->err;
2266
2267         left_page_no = left->latch->page_no;
2268         bt_unpinlatch (mgr, left->latch);
2269         free (frame);
2270
2271         // preserve the page info at the bottom
2272         // of higher keys and set rest to zero
2273
2274         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2275
2276         // insert stopper key at top of newroot page
2277         // and increase the root height
2278
2279         nxt -= BtId + sizeof(BtVal);
2280         bt_putid (value, right->page_no);
2281         val = (BtVal *)((unsigned char *)root->page + nxt);
2282         memcpy (val->value, value, BtId);
2283         val->len = BtId;
2284
2285         nxt -= 2 + sizeof(BtKey);
2286         slotptr(root->page, 2)->off = nxt;
2287         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2288         ptr->len = 2;
2289         ptr->key[0] = 0xff;
2290         ptr->key[1] = 0xff;
2291
2292         // insert lower keys page fence key on newroot page as first key
2293
2294         nxt -= BtId + sizeof(BtVal);
2295         bt_putid (value, left_page_no);
2296         val = (BtVal *)((unsigned char *)root->page + nxt);
2297         memcpy (val->value, value, BtId);
2298         val->len = BtId;
2299
2300         ptr = (BtKey *)leftkey;
2301         nxt -= ptr->len + sizeof(BtKey);
2302         slotptr(root->page, 1)->off = nxt;
2303         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2304         
2305         bt_putid(root->page->right, 0);
2306         root->page->min = nxt;          // reset lowest used offset and key count
2307         root->page->cnt = 2;
2308         root->page->act = 2;
2309         root->page->lvl++;
2310
2311         mgr->pagezero->alloc->lvl = root->page->lvl;
2312
2313         // release and unpin root pages
2314
2315         bt_unlockpage(BtLockWrite, root->latch);
2316         bt_unpinlatch (mgr, root->latch);
2317
2318         bt_unpinlatch (mgr, right);
2319         return 0;
2320 }
2321
2322 //  split already locked full node
2323 //      leave it locked.
2324 //      return pool entry for new right
2325 //      page, pinned & unlocked
2326
2327 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2328 {
2329 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2330 BtPage frame = malloc (mgr->page_size);
2331 uint lvl = set->page->lvl;
2332 BtPageSet right[1];
2333 BtKey *key, *ptr;
2334 BtVal *val, *src;
2335 uid right2;
2336 uint prev;
2337
2338         //  split higher half of keys to frame
2339
2340         memset (frame, 0, mgr->page_size);
2341         max = set->page->cnt;
2342         cnt = max / 2;
2343         idx = 0;
2344
2345         while( cnt++ < max ) {
2346                 if( cnt < max || set->page->lvl )
2347                   if( slotptr(set->page, cnt)->dead )
2348                         continue;
2349
2350                 src = valptr(set->page, cnt);
2351                 nxt -= src->len + sizeof(BtVal);
2352                 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2353
2354                 key = keyptr(set->page, cnt);
2355                 nxt -= key->len + sizeof(BtKey);
2356                 ptr = (BtKey*)((unsigned char *)frame + nxt);
2357                 memcpy (ptr, key, key->len + sizeof(BtKey));
2358
2359                 //      add librarian slot
2360
2361                 slotptr(frame, ++idx)->off = nxt;
2362                 slotptr(frame, idx)->type = Librarian;
2363                 slotptr(frame, idx)->dead = 1;
2364
2365                 //  add actual slot
2366
2367                 slotptr(frame, ++idx)->off = nxt;
2368                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2369
2370                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2371                         frame->act++;
2372         }
2373
2374         frame->bits = mgr->page_bits;
2375         frame->min = nxt;
2376         frame->cnt = idx;
2377         frame->lvl = lvl;
2378
2379         // link right node
2380
2381         if( set->latch->page_no > ROOT_page )
2382                 bt_putid (frame->right, bt_getid (set->page->right));
2383
2384         //      get new free page and write higher keys to it.
2385
2386         if( bt_newpage(mgr, right, frame, thread_no) )
2387                 return 0;
2388
2389         // process lower keys
2390
2391         memcpy (frame, set->page, mgr->page_size);
2392         memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2393         set->latch->dirty = 1;
2394
2395         nxt = mgr->page_size;
2396         set->page->garbage = 0;
2397         set->page->act = 0;
2398         max /= 2;
2399         cnt = 0;
2400         idx = 0;
2401
2402         if( slotptr(frame, max)->type == Librarian )
2403                 max--;
2404
2405         //  assemble page of smaller keys
2406
2407         while( cnt++ < max ) {
2408                 if( slotptr(frame, cnt)->dead )
2409                         continue;
2410                 val = valptr(frame, cnt);
2411                 nxt -= val->len + sizeof(BtVal);
2412                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2413
2414                 key = keyptr(frame, cnt);
2415                 nxt -= key->len + sizeof(BtKey);
2416                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2417
2418                 //      add librarian slot
2419
2420                 slotptr(set->page, ++idx)->off = nxt;
2421                 slotptr(set->page, idx)->type = Librarian;
2422                 slotptr(set->page, idx)->dead = 1;
2423
2424                 //      add actual slot
2425
2426                 slotptr(set->page, ++idx)->off = nxt;
2427                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2428                 set->page->act++;
2429         }
2430
2431         bt_putid(set->page->right, right->latch->page_no);
2432         set->page->min = nxt;
2433         set->page->cnt = idx;
2434         free(frame);
2435
2436         return right->latch - mgr->latchsets;
2437 }
2438
2439 //      fix keys for newly split page
2440 //      call with both pages pinned & locked
2441 //  return unlocked and unpinned
2442
2443 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2444 {
2445 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2446 unsigned char value[BtId];
2447 uint lvl = set->page->lvl;
2448 BtPage page;
2449 BtKey *ptr;
2450
2451         // if current page is the root page, split it
2452
2453         if( set->latch->page_no == ROOT_page )
2454                 return bt_splitroot (mgr, set, right, thread_no);
2455
2456         ptr = keyptr(set->page, set->page->cnt);
2457         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2458
2459         page = bt_mappage (mgr, right);
2460
2461         ptr = keyptr(page, page->cnt);
2462         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2463
2464         // insert new fences in their parent pages
2465
2466         bt_lockpage (BtLockParent, right, thread_no);
2467
2468         bt_lockpage (BtLockParent, set->latch, thread_no);
2469         bt_unlockpage (BtLockWrite, set->latch);
2470
2471         // insert new fence for reformulated left block of smaller keys
2472
2473         bt_putid (value, set->latch->page_no);
2474         ptr = (BtKey *)leftkey;
2475
2476         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2477                 return mgr->err;
2478
2479         // switch fence for right block of larger keys to new right page
2480
2481         bt_putid (value, right->page_no);
2482         ptr = (BtKey *)rightkey;
2483
2484         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2485                 return mgr->err;
2486
2487         bt_unlockpage (BtLockParent, set->latch);
2488         bt_unpinlatch (mgr, set->latch);
2489
2490         bt_unlockpage (BtLockParent, right);
2491         bt_unpinlatch (mgr, right);
2492         return 0;
2493 }
2494
2495 //      install new key and value onto page
2496 //      page must already be checked for
2497 //      adequate space
2498
2499 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2500 {
2501 uint idx, librarian;
2502 BtSlot *node;
2503 BtKey *ptr;
2504 BtVal *val;
2505 int rate;
2506
2507         //      if found slot > desired slot and previous slot
2508         //      is a librarian slot, use it
2509
2510         if( slot > 1 )
2511           if( slotptr(set->page, slot-1)->type == Librarian )
2512                 slot--;
2513
2514         // copy value onto page
2515
2516         set->page->min -= vallen + sizeof(BtVal);
2517         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2518         memcpy (val->value, value, vallen);
2519         val->len = vallen;
2520
2521         // copy key onto page
2522
2523         set->page->min -= keylen + sizeof(BtKey);
2524         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2525         memcpy (ptr->key, key, keylen);
2526         ptr->len = keylen;
2527         
2528         //      find first empty slot
2529
2530         for( idx = slot; idx < set->page->cnt; idx++ )
2531           if( slotptr(set->page, idx)->dead )
2532                 break;
2533
2534         // now insert key into array before slot,
2535         //      adding as many librarian slots as
2536         //      makes sense.
2537
2538         if( idx == set->page->cnt ) {
2539         int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2540
2541                 librarian = ++idx - slot;
2542                 avail /= sizeof(BtSlot);
2543
2544                 if( avail < 0 )
2545                         avail = 0;
2546
2547                 if( librarian > avail )
2548                         librarian = avail;
2549
2550                 if( librarian ) {
2551                         rate = (idx - slot) / librarian;
2552                         set->page->cnt += librarian;
2553                         idx += librarian;
2554                 } else
2555                         rate = 0;
2556         } else
2557                 librarian = 0, rate = 0;
2558
2559         while( idx > slot ) {
2560                 //  transfer slot
2561                 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2562                 idx--;
2563
2564                 //      add librarian slot per rate
2565
2566                 if( librarian )
2567                  if( (idx - slot + 1)/2 <= librarian * rate ) {
2568                         node = slotptr(set->page, idx--);
2569                         node->off = node[1].off;
2570                         node->type = Librarian;
2571                         node->dead = 1;
2572                         librarian--;
2573                   }
2574         }
2575
2576         set->latch->dirty = 1;
2577         set->page->act++;
2578
2579         //      fill in new slot
2580
2581         node = slotptr(set->page, slot);
2582         node->off = set->page->min;
2583         node->type = type;
2584         node->dead = 0;
2585
2586         if( release ) {
2587                 bt_unlockpage (BtLockWrite, set->latch);
2588                 bt_unpinlatch (mgr, set->latch);
2589         }
2590
2591         return 0;
2592 }
2593
2594 //  Insert new key into the btree at given level.
2595 //      either add a new key or update/add an existing one
2596
2597 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2598 {
2599 uint slot, idx, len, entry;
2600 BtPageSet set[1];
2601 BtSlot *node;
2602 BtKey *ptr;
2603 BtVal *val;
2604
2605   while( 1 ) { // find the page and slot for the current key
2606         if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2607                 node = slotptr(set->page, slot);
2608                 ptr = keyptr(set->page, slot);
2609         } else {
2610                 if( !mgr->err )
2611                         mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2612                 return mgr->err;
2613         }
2614
2615         // if librarian slot == found slot, advance to real slot
2616
2617         if( node->type == Librarian )
2618           if( !keycmp (ptr, key, keylen) ) {
2619                 ptr = keyptr(set->page, ++slot);
2620                 node = slotptr(set->page, slot);
2621           }
2622
2623         //  if inserting a duplicate key or unique
2624         //      key that doesn't exist on the page,
2625         //      check for adequate space on the page
2626         //      and insert the new key before slot.
2627
2628         switch( type ) {
2629         case Unique:
2630         case Duplicate:
2631           if( keycmp (ptr, key, keylen) )
2632            if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2633             return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
2634            else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2635                 return mgr->err;
2636            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2637                 return mgr->err;
2638            else
2639                 continue;
2640
2641           // if key already exists, update value and return
2642
2643           val = valptr(set->page, slot);
2644
2645           if( val->len >= vallen ) {
2646                 if( slotptr(set->page, slot)->dead )
2647                         set->page->act++;
2648                 node->type = type;
2649                 node->dead = 0;
2650
2651                 set->page->garbage += val->len - vallen;
2652                 set->latch->dirty = 1;
2653                 val->len = vallen;
2654                 memcpy (val->value, value, vallen);
2655                 bt_unlockpage(BtLockWrite, set->latch);
2656                 bt_unpinlatch (mgr, set->latch);
2657                 return 0;
2658           }
2659
2660           //  new update value doesn't fit in existing value area
2661           //    make sure page has room
2662
2663           if( !node->dead )
2664                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2665           else
2666                 set->page->act++;
2667
2668           node->type = type;
2669           node->dead = 0;
2670
2671           if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2672            if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2673                 return mgr->err;
2674            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2675                 return mgr->err;
2676            else
2677                 continue;
2678
2679           //  copy key and value onto page and update slot
2680
2681           set->page->min -= vallen + sizeof(BtVal);
2682           val = (BtVal*)((unsigned char *)set->page + set->page->min);
2683           memcpy (val->value, value, vallen);
2684           val->len = vallen;
2685
2686           set->latch->dirty = 1;
2687           set->page->min -= keylen + sizeof(BtKey);
2688           ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2689           memcpy (ptr->key, key, keylen);
2690           ptr->len = keylen;
2691         
2692           node->off = set->page->min;
2693           bt_unlockpage(BtLockWrite, set->latch);
2694           bt_unpinlatch (mgr, set->latch);
2695           return 0;
2696     }
2697   }
2698   return 0;
2699 }
2700
2701 //      determine actual page where key is located
2702 //  return slot number
2703
2704 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2705 {
2706 BtKey *key = keyptr(source,src);
2707 uint slot = locks[src].slot;
2708 uint entry;
2709
2710         if( src > 1 && locks[src].reuse )
2711           entry = locks[src-1].entry, slot = 0;
2712         else
2713           entry = locks[src].entry;
2714
2715         if( slot ) {
2716                 set->latch = mgr->latchsets + entry;
2717                 set->page = bt_mappage (mgr, set->latch);
2718                 return slot;
2719         }
2720
2721         //      is locks->reuse set? or was slot zeroed?
2722         //      if so, find where our key is located 
2723         //      on current page or pages split on
2724         //      same page txn operations.
2725
2726         do {
2727                 set->latch = mgr->latchsets + entry;
2728                 set->page = bt_mappage (mgr, set->latch);
2729
2730                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2731                   if( slotptr(set->page, slot)->type == Librarian )
2732                         slot++;
2733                   if( locks[src].reuse )
2734                         locks[src].entry = entry;
2735                   return slot;
2736                 }
2737         } while( entry = set->latch->split );
2738
2739         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2740         return 0;
2741 }
2742
2743 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2744 {
2745 BtKey *key = keyptr(source, src);
2746 BtVal *val = valptr(source, src);
2747 BtLatchSet *latch;
2748 BtPageSet set[1];
2749 uint entry, slot;
2750
2751   while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2752         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2753           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2754                 return mgr->err;
2755           set->page->lsn = lsn;
2756           return 0;
2757         }
2758
2759         //  split page
2760
2761         if( entry = bt_splitpage (mgr, set, thread_no) )
2762           latch = mgr->latchsets + entry;
2763         else
2764           return mgr->err;
2765
2766         //      splice right page into split chain
2767         //      and WriteLock it
2768
2769         bt_lockpage(BtLockWrite, latch, thread_no);
2770         latch->split = set->latch->split;
2771         set->latch->split = entry;
2772         locks[src].slot = 0;
2773   }
2774
2775   return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2776 }
2777
2778 //      perform delete from smaller btree
2779 //  insert a delete slot if not found there
2780
2781 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2782 {
2783 BtKey *key = keyptr(source, src);
2784 BtPageSet set[1];
2785 uint idx, slot;
2786 BtSlot *node;
2787 BtKey *ptr;
2788 BtVal *val;
2789
2790         if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2791           node = slotptr(set->page, slot);
2792           ptr = keyptr(set->page, slot);
2793           val = valptr(set->page, slot);
2794         } else
2795           return mgr->line = __LINE__, mgr->err = BTERR_struct;
2796
2797         //  if slot is not found, insert a delete slot
2798
2799         if( keycmp (ptr, key->key, key->len) )
2800           return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
2801
2802         //      if node is already dead,
2803         //      ignore the request.
2804
2805         if( node->dead )
2806                 return 0;
2807
2808         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2809         set->latch->dirty = 1;
2810         set->page->lsn = lsn;
2811         set->page->act--;
2812
2813         node->dead = 0;
2814         __sync_fetch_and_add(&mgr->found, 1);
2815         return 0;
2816 }
2817
2818 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2819 {
2820 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2821 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2822
2823         return keycmp (key1, key2->key, key2->len);
2824 }
2825 //      atomic modification of a batch of keys.
2826
2827 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2828 {
2829 uint src, idx, slot, samepage, entry, que = 0;
2830 BtKey *key, *ptr, *key2;
2831 int result = 0;
2832 BtSlot temp[1];
2833 logseqno lsn;
2834 int type;
2835
2836   // stable sort the list of keys into order to
2837   //    prevent deadlocks between threads.
2838 /*
2839   for( src = 1; src++ < source->cnt; ) {
2840         *temp = *slotptr(source,src);
2841         key = keyptr (source,src);
2842
2843         for( idx = src; --idx; ) {
2844           key2 = keyptr (source,idx);
2845           if( keycmp (key, key2->key, key2->len) < 0 ) {
2846                 *slotptr(source,idx+1) = *slotptr(source,idx);
2847                 *slotptr(source,idx) = *temp;
2848           } else
2849                 break;
2850         }
2851   }
2852 */
2853         qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2854   //  add entries to redo log
2855
2856   if( bt->mgr->pagezero->redopages )
2857         lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2858   else
2859         lsn = 0;
2860
2861   //  perform the individual actions in the transaction
2862
2863   if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2864         return bt->mgr->err;
2865
2866   // if number of active pages
2867   // is greater than the buffer pool
2868   // promote page into larger btree
2869
2870   if( bt->main )
2871    while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
2872         if( bt_txnpromote (bt) )
2873           return bt->mgr->err;
2874
2875   // return success
2876
2877   return 0;
2878 }
2879
2880 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2881 {
2882 uint src, idx, slot, samepage, entry, que = 0;
2883 BtPageSet set[1], prev[1], right[1];
2884 unsigned char value[BtId];
2885 uid right_page_no;
2886 BtLatchSet *latch;
2887 AtomicTxn *locks;
2888 BtKey *key, *ptr;
2889 BtPage page;
2890 BtVal *val;
2891
2892   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2893
2894   // Load the leaf page for each key
2895   // group same page references with reuse bit
2896   // and determine any constraint violations
2897
2898   for( src = 0; src++ < source->cnt; ) {
2899         key = keyptr(source, src);
2900         slot = 0;
2901
2902         // first determine if this modification falls
2903         // on the same page as the previous modification
2904         //      note that the far right leaf page is a special case
2905
2906         if( samepage = src > 1 )
2907           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2908                 slot = bt_findslot(set->page, key->key, key->len);
2909
2910         if( !slot )
2911           if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockAtomic + BtLockWrite, thread_no) )
2912                 set->latch->split = 0;
2913           else
2914                 return mgr->err;
2915
2916         if( slotptr(set->page, slot)->type == Librarian )
2917           slot++;
2918
2919         if( !samepage ) {
2920           locks[src].entry = set->latch - mgr->latchsets;
2921           locks[src].slot = slot;
2922           locks[src].reuse = 0;
2923         } else {
2924           locks[src].entry = 0;
2925           locks[src].slot = 0;
2926           locks[src].reuse = 1;
2927         }
2928
2929         //      capture current lsn for master page
2930
2931         locks[src].reqlsn = set->page->lsn;
2932   }
2933
2934   // insert or delete each key
2935   // process any splits or merges
2936   // run through txn list backwards
2937
2938   samepage = source->cnt + 1;
2939
2940   for( src = source->cnt; src; src-- ) {
2941         if( locks[src].reuse )
2942           continue;
2943
2944         //  perform the txn operations
2945         //      from smaller to larger on
2946         //  the same page
2947
2948         for( idx = src; idx < samepage; idx++ )
2949          switch( slotptr(source,idx)->type ) {
2950          case Delete:
2951           if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
2952                 return mgr->err;
2953           break;
2954
2955         case Duplicate:
2956         case Unique:
2957           if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
2958                 return mgr->err;
2959           break;
2960
2961         default:
2962           bt_atomicpage (mgr, source, locks, idx, set);
2963           continue;
2964         }
2965
2966         //      after the same page operations have finished,
2967         //  process master page for splits or deletion.
2968
2969         latch = prev->latch = mgr->latchsets + locks[src].entry;
2970         prev->page = bt_mappage (mgr, prev->latch);
2971         samepage = src;
2972
2973         //  pick-up all splits from master page
2974         //      each one is already pinned & WriteLocked.
2975
2976         if( entry = latch->split ) do {
2977           set->latch = mgr->latchsets + entry;
2978           set->page = bt_mappage (mgr, set->latch);
2979
2980           // delete empty master page by undoing its split
2981           //  (this is potentially another empty page)
2982
2983           if( !prev->page->act ) {
2984                 memcpy (set->page->left, prev->page->left, BtId);
2985                 memcpy (prev->page, set->page, mgr->page_size);
2986                 bt_lockpage (BtLockDelete, set->latch, thread_no);
2987                 prev->latch->split = set->latch->split;
2988                 prev->latch->dirty = 1;
2989                 bt_freepage (mgr, set);
2990                 continue;
2991           }
2992
2993           // remove empty split page from the split chain
2994           // and return it to the free list. No other
2995           // thread has its page number yet.
2996
2997           if( !set->page->act ) {
2998                 memcpy (prev->page->right, set->page->right, BtId);
2999                 prev->latch->split = set->latch->split;
3000
3001                 bt_lockpage (BtLockDelete, set->latch, thread_no);
3002                 bt_freepage (mgr, set);
3003                 continue;
3004           }
3005
3006           //  update prev's fence key
3007
3008           ptr = keyptr(prev->page,prev->page->cnt);
3009           bt_putid (value, prev->latch->page_no);
3010
3011           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3012                 return mgr->err;
3013
3014           // splice in the left link into the split page
3015
3016           bt_putid (set->page->left, prev->latch->page_no);
3017
3018           if( lsm )
3019                 bt_syncpage (mgr, prev->page, prev->latch);
3020
3021           // page is unlocked & unpinned below to avoid bt_txnpromote
3022
3023           *prev = *set;
3024         } while( entry = prev->latch->split );
3025
3026         //  update left pointer in next right page from last split page
3027         //      (if all splits were reversed or none occurred, latch->split == 0)
3028
3029         if( latch->split ) {
3030           //  fix left pointer in master's original (now split)
3031           //  far right sibling or set rightmost page in page zero
3032
3033           if( right_page_no = bt_getid (prev->page->right) ) {
3034                 if( set->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
3035                   set->page = bt_mappage (mgr, set->latch);
3036                 else
3037                   return mgr->err;
3038
3039             bt_lockpage (BtLockLink, set->latch, thread_no);
3040             bt_putid (set->page->left, prev->latch->page_no);
3041                 set->latch->dirty = 1;
3042
3043             bt_unlockpage (BtLockLink, set->latch);
3044                 bt_unpinlatch (mgr, set->latch);
3045           } else {      // prev is rightmost page
3046             bt_mutexlock (mgr->lock);
3047                 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3048             bt_releasemutex(mgr->lock);
3049           }
3050
3051           //  Process last page split in chain
3052           //  by switching the key from the master
3053           //  page to the last split.
3054
3055           ptr = keyptr(prev->page,prev->page->cnt);
3056           bt_putid (value, prev->latch->page_no);
3057
3058           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3059                 return mgr->err;
3060
3061           if( lsm )
3062                 bt_syncpage (mgr, prev->page, prev->latch);
3063
3064           // unlock and unpin master page
3065
3066           bt_unlockpage(BtLockAtomic, latch);
3067           bt_unlockpage(BtLockWrite, latch);
3068           bt_unpinlatch(mgr, latch);
3069
3070           // go through the list of splits and
3071           // release the locks and unpin
3072
3073           while( entry = latch->split ) {
3074                 latch = mgr->latchsets + entry;
3075             bt_unlockpage(BtLockWrite, latch);
3076                 bt_unpinlatch(mgr, latch);
3077           }
3078
3079           continue;
3080         }
3081
3082         //      since there are no splits, we're
3083         //  finished if master page occupied
3084
3085         bt_unlockpage(BtLockAtomic, prev->latch);
3086
3087         if( prev->page->act ) {
3088           bt_unlockpage(BtLockWrite, prev->latch);
3089
3090           if( lsm )
3091                 bt_syncpage (mgr, prev->page, prev->latch);
3092
3093           bt_unpinlatch(mgr, prev->latch);
3094           continue;
3095         }
3096
3097         // any and all splits were reversed, and the
3098         // master page located in prev is empty, delete it
3099
3100         if( entry = bt_deletepage (mgr, prev, thread_no, BtLockWrite) )
3101                 right->latch = mgr->latchsets + entry;
3102         else
3103                 return mgr->err;
3104
3105         //      obtain delete and write locks to right node
3106
3107         bt_unlockpage (BtLockParent, right->latch);
3108         right->page = bt_mappage (mgr, right->latch);
3109         bt_lockpage (BtLockDelete, right->latch, thread_no);
3110         bt_lockpage (BtLockWrite, right->latch, thread_no);
3111         bt_freepage (mgr, right);
3112
3113         bt_unpinlatch (mgr, prev->latch);
3114   }
3115
3116   free (locks);
3117   return 0;
3118 }
3119
3120 //  promote a page into the larger btree
3121
3122 BTERR bt_txnpromote (BtDb *bt)
3123 {
3124 BtPageSet set[1], right[1];
3125 uint entry, slot, idx;
3126 BtSlot *node;
3127 BtKey *ptr;
3128 BtVal *val;
3129
3130   while( 1 ) {
3131 #ifdef unix
3132         entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
3133 #else
3134         entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
3135 #endif
3136         entry %= bt->mgr->latchtotal;
3137
3138         if( !entry )
3139                 continue;
3140
3141         set->latch = bt->mgr->latchsets + entry;
3142
3143         if( !bt_mutextry(set->latch->modify) )
3144                 continue;
3145
3146         //  skip this entry if it is pinned
3147
3148         if( set->latch->pin & ~CLOCK_bit ) {
3149           bt_releasemutex(set->latch->modify);
3150           continue;
3151         }
3152
3153         set->page = bt_mappage (bt->mgr, set->latch);
3154
3155         // entry never used or has no right sibling
3156
3157         if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3158           bt_releasemutex(set->latch->modify);
3159           continue;
3160         }
3161
3162         // entry interiour node or being killed
3163
3164         if( set->page->lvl || set->page->free || set->page->kill ) {
3165           bt_releasemutex(set->latch->modify);
3166           continue;
3167         }
3168
3169         //  pin the page for our useage
3170
3171         set->latch->pin++;
3172         bt_releasemutex(set->latch->modify);
3173         bt_lockpage (BtLockAtomic | BtLockWrite, set->latch, bt->thread_no);
3174         memcpy (bt->frame, set->page, bt->mgr->page_size);
3175
3176 if( !(set->latch->page_no % 100) )
3177 fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act);
3178
3179         if( entry = bt_deletepage (bt->mgr, set, bt->thread_no, BtLockAtomic | BtLockWrite) )
3180                 right->latch = bt->mgr->latchsets + entry;
3181         else
3182                 return bt->mgr->err;
3183
3184         //      obtain delete and write locks to right node
3185
3186         bt_unlockpage (BtLockParent, right->latch);
3187         right->page = bt_mappage (bt->mgr, right->latch);
3188
3189         //  release page with its new contents
3190
3191         bt_unlockpage (BtLockAtomic, set->latch);
3192         bt_unpinlatch (bt->mgr, set->latch);
3193
3194         // transfer slots in our selected page to larger btree
3195
3196         if( bt_atomicexec (bt->main, bt->frame, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) )
3197                 return bt->main->err;
3198
3199         //  free the page we took over
3200
3201         bt_lockpage (BtLockDelete, right->latch, bt->thread_no);
3202         bt_lockpage (BtLockWrite, right->latch, bt->thread_no);
3203         bt_freepage (bt->mgr, right);
3204         return 0;
3205   }
3206 }
3207
3208 //      set cursor to highest slot on highest page
3209
3210 uint bt_lastkey (BtDb *bt)
3211 {
3212 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3213 BtPageSet set[1];
3214
3215         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3216                 set->page = bt_mappage (bt->mgr, set->latch);
3217         else
3218                 return 0;
3219
3220     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3221         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3222     bt_unlockpage(BtLockRead, set->latch);
3223         bt_unpinlatch (bt->mgr, set->latch);
3224         return bt->cursor->cnt;
3225 }
3226
3227 //      return previous slot on cursor page
3228
3229 uint bt_prevkey (BtDb *bt, uint slot)
3230 {
3231 uid cursor_page = bt->cursor->page_no;
3232 uid ourright, next, us = cursor_page;
3233 BtPageSet set[1];
3234
3235         if( --slot )
3236                 return slot;
3237
3238         ourright = bt_getid(bt->cursor->right);
3239
3240 goleft:
3241         if( !(next = bt_getid(bt->cursor->left)) )
3242                 return 0;
3243
3244 findourself:
3245         cursor_page = next;
3246
3247         if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3248                 set->page = bt_mappage (bt->mgr, set->latch);
3249         else
3250                 return 0;
3251
3252     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3253         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3254         bt_unlockpage(BtLockRead, set->latch);
3255         bt_unpinlatch (bt->mgr, set->latch);
3256         
3257         next = bt_getid (bt->cursor->right);
3258
3259         if( bt->cursor->kill )
3260                 goto findourself;
3261
3262         if( next != us )
3263           if( next == ourright )
3264                 goto goleft;
3265           else
3266                 goto findourself;
3267
3268         return bt->cursor->cnt;
3269 }
3270
3271 //  return next slot on cursor page
3272 //  or slide cursor right into next page
3273
3274 uint bt_nextkey (BtDb *bt, uint slot)
3275 {
3276 BtPageSet set[1];
3277 uid right;
3278
3279   do {
3280         right = bt_getid(bt->cursor->right);
3281
3282         while( slot++ < bt->cursor->cnt )
3283           if( slotptr(bt->cursor,slot)->dead )
3284                 continue;
3285           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3286                 return slot;
3287           else
3288                 break;
3289
3290         if( !right )
3291                 break;
3292
3293         if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3294                 set->page = bt_mappage (bt->mgr, set->latch);
3295         else
3296                 return 0;
3297
3298     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3299         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3300         bt_unlockpage(BtLockRead, set->latch);
3301         bt_unpinlatch (bt->mgr, set->latch);
3302         slot = 0;
3303
3304   } while( 1 );
3305
3306   return bt->mgr->err = 0;
3307 }
3308
3309 //  cache page of keys into cursor and return starting slot for given key
3310
3311 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3312 {
3313 BtPageSet set[1];
3314 uint slot;
3315
3316         // cache page for retrieval
3317
3318         if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3319           memcpy (bt->cursor, set->page, bt->mgr->page_size);
3320         else
3321           return 0;
3322
3323         bt_unlockpage(BtLockRead, set->latch);
3324         bt_unpinlatch (bt->mgr, set->latch);
3325         return slot;
3326 }
3327
3328 #ifdef STANDALONE
3329
3330 #ifndef unix
3331 double getCpuTime(int type)
3332 {
3333 FILETIME crtime[1];
3334 FILETIME xittime[1];
3335 FILETIME systime[1];
3336 FILETIME usrtime[1];
3337 SYSTEMTIME timeconv[1];
3338 double ans = 0;
3339
3340         memset (timeconv, 0, sizeof(SYSTEMTIME));
3341
3342         switch( type ) {
3343         case 0:
3344                 GetSystemTimeAsFileTime (xittime);
3345                 FileTimeToSystemTime (xittime, timeconv);
3346                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3347                 break;
3348         case 1:
3349                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3350                 FileTimeToSystemTime (usrtime, timeconv);
3351                 break;
3352         case 2:
3353                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3354                 FileTimeToSystemTime (systime, timeconv);
3355                 break;
3356         }
3357
3358         ans += (double)timeconv->wHour * 3600;
3359         ans += (double)timeconv->wMinute * 60;
3360         ans += (double)timeconv->wSecond;
3361         ans += (double)timeconv->wMilliseconds / 1000;
3362         return ans;
3363 }
3364 #else
3365 #include <time.h>
3366 #include <sys/resource.h>
3367
3368 double getCpuTime(int type)
3369 {
3370 struct rusage used[1];
3371 struct timeval tv[1];
3372
3373         switch( type ) {
3374         case 0:
3375                 gettimeofday(tv, NULL);
3376                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3377
3378         case 1:
3379                 getrusage(RUSAGE_SELF, used);
3380                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3381
3382         case 2:
3383                 getrusage(RUSAGE_SELF, used);
3384                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3385         }
3386
3387         return 0;
3388 }
3389 #endif
3390
3391 void bt_poolaudit (BtMgr *mgr)
3392 {
3393 BtLatchSet *latch;
3394 uint entry = 0;
3395
3396         while( ++entry < mgr->latchtotal ) {
3397                 latch = mgr->latchsets + entry;
3398
3399                 if( *latch->readwr->value )
3400                         fprintf(stderr, "latchset %d wrtlocked for page %d\n", entry, latch->page_no);
3401
3402                 if( *latch->access->value )
3403                         fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3404
3405                 if( *latch->parent->value )
3406                         fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3407
3408                 if( *latch->atomic->value )
3409                         fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3410
3411                 if( *latch->modify->value )
3412                         fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3413
3414                 if( latch->pin & ~CLOCK_bit )
3415                         fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3416         }
3417 }
3418
3419 typedef struct {
3420         char idx;
3421         char *type;
3422         char *infile;
3423         BtMgr *main;
3424         BtMgr *mgr;
3425         int num;
3426 } ThreadArg;
3427
3428 //  standalone program to index file of keys
3429 //  then list them onto std-out
3430
3431 #ifdef unix
3432 void *index_file (void *arg)
3433 #else
3434 uint __stdcall index_file (void *arg)
3435 #endif
3436 {
3437 int line = 0, found = 0, cnt = 0, idx;
3438 uid next, page_no = LEAF_page;  // start on first page of leaves
3439 int ch, len = 0, slot, type = 0;
3440 unsigned char key[BT_maxkey];
3441 unsigned char txn[65536];
3442 ThreadArg *args = arg;
3443 BtPage page, frame;
3444 BtPageSet set[1];
3445 uint nxt = 65536;
3446 BtKey *ptr;
3447 BtVal *val;
3448 BtDb *bt;
3449 FILE *in;
3450
3451         bt = bt_open (args->mgr, args->main);
3452         page = (BtPage)txn;
3453
3454         if( args->idx < strlen (args->type) )
3455                 ch = args->type[args->idx];
3456         else
3457                 ch = args->type[strlen(args->type) - 1];
3458
3459         switch(ch | 0x20)
3460         {
3461         case 'd':
3462                 type = Delete;
3463
3464         case 'p':
3465                 if( !type )
3466                         type = Unique;
3467
3468                 if( args->num )
3469                  if( type == Delete )
3470                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3471                  else
3472                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3473                 else
3474                  if( type == Delete )
3475                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3476                  else
3477                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3478
3479                 if( in = fopen (args->infile, "rb") )
3480                   while( ch = getc(in), ch != EOF )
3481                         if( ch == '\n' )
3482                         {
3483                           line++;
3484
3485                           if( !args->num ) {
3486                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3487                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3488                             len = 0;
3489                                 continue;
3490                           }
3491
3492                           nxt -= len - 10;
3493                           memcpy (txn + nxt, key + 10, len - 10);
3494                           nxt -= 1;
3495                           txn[nxt] = len - 10;
3496                           nxt -= 10;
3497                           memcpy (txn + nxt, key, 10);
3498                           nxt -= 1;
3499                           txn[nxt] = 10;
3500                           slotptr(page,++cnt)->off  = nxt;
3501                           slotptr(page,cnt)->type = type;
3502                           len = 0;
3503
3504                           if( cnt < args->num )
3505                                 continue;
3506
3507                           page->cnt = cnt;
3508                           page->act = cnt;
3509                           page->min = nxt;
3510
3511                           if( bt_atomictxn (bt, page) )
3512                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3513                           nxt = sizeof(txn);
3514                           cnt = 0;
3515
3516                         }
3517                         else if( len < BT_maxkey )
3518                                 key[len++] = ch;
3519                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3520                 break;
3521
3522         case 'w':
3523                 fprintf(stderr, "started indexing for %s\n", args->infile);
3524                 if( in = fopen (args->infile, "r") )
3525                   while( ch = getc(in), ch != EOF )
3526                         if( ch == '\n' )
3527                         {
3528                           line++;
3529
3530                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3531                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3532                           len = 0;
3533                         }
3534                         else if( len < BT_maxkey )
3535                                 key[len++] = ch;
3536                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3537                 break;
3538
3539         case 'f':
3540                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3541                 if( in = fopen (args->infile, "rb") )
3542                   while( ch = getc(in), ch != EOF )
3543                         if( ch == '\n' )
3544                         {
3545                           line++;
3546                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3547                                 found++;
3548                           else if( bt->mgr->err )
3549                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3550                           len = 0;
3551                         }
3552                         else if( len < BT_maxkey )
3553                                 key[len++] = ch;
3554                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3555                 break;
3556
3557         case 's':
3558                 fprintf(stderr, "started scanning\n");
3559                 
3560                 do {
3561                         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3562                                 set->page = bt_mappage (bt->mgr, set->latch);
3563                         else
3564                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3565
3566                         bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3567                         next = bt_getid (set->page->right);
3568
3569                         for( slot = 0; slot++ < set->page->cnt; )
3570                          if( next || slot < set->page->cnt )
3571                           if( !slotptr(set->page, slot)->dead ) {
3572                                 ptr = keyptr(set->page, slot);
3573                                 len = ptr->len;
3574
3575                             if( slotptr(set->page, slot)->type == Duplicate )
3576                                         len -= BtId;
3577
3578                                 fwrite (ptr->key, len, 1, stdout);
3579                                 val = valptr(set->page, slot);
3580                                 fwrite (val->value, val->len, 1, stdout);
3581                                 fputc ('\n', stdout);
3582                                 cnt++;
3583                            }
3584
3585                         bt_unlockpage (BtLockRead, set->latch);
3586                         bt_unpinlatch (bt->mgr, set->latch);
3587                 } while( page_no = next );
3588
3589                 fprintf(stderr, " Total keys read %d\n", cnt);
3590                 break;
3591
3592         case 'r':
3593                 fprintf(stderr, "started reverse scan\n");
3594                 if( slot = bt_lastkey (bt) )
3595                    while( slot = bt_prevkey (bt, slot) ) {
3596                         if( slotptr(bt->cursor, slot)->dead )
3597                           continue;
3598
3599                         ptr = keyptr(bt->cursor, slot);
3600                         len = ptr->len;
3601
3602                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3603                                 len -= BtId;
3604
3605                         fwrite (ptr->key, len, 1, stdout);
3606                         val = valptr(bt->cursor, slot);
3607                         fwrite (val->value, val->len, 1, stdout);
3608                         fputc ('\n', stdout);
3609                         cnt++;
3610                   }
3611
3612                 fprintf(stderr, " Total keys read %d\n", cnt);
3613                 break;
3614
3615         case 'c':
3616 #ifdef unix
3617                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3618 #endif
3619                 fprintf(stderr, "started counting\n");
3620                 next = REDO_page + bt->mgr->pagezero->redopages;
3621
3622                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3623                         if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3624                                 break;
3625
3626                         if( !bt->cursor->free && !bt->cursor->lvl )
3627                                 cnt += bt->cursor->act;
3628
3629                         bt->mgr->reads++;
3630                         page_no = next++;
3631                 }
3632                 
3633                 cnt--;  // remove stopper key
3634                 fprintf(stderr, " Total keys counted %d\n", cnt);
3635                 break;
3636         }
3637
3638         bt_close (bt);
3639 #ifdef unix
3640         return NULL;
3641 #else
3642         return 0;
3643 #endif
3644 }
3645
3646 typedef struct timeval timer;
3647
3648 int main (int argc, char **argv)
3649 {
3650 int idx, cnt, len, slot, err;
3651 int segsize, bits = 16;
3652 double start, stop;
3653 #ifdef unix
3654 pthread_t *threads;
3655 #else
3656 HANDLE *threads;
3657 #endif
3658 ThreadArg *args;
3659 uint redopages = 0;
3660 uint poolsize = 0;
3661 uint mainpool = 0;
3662 uint mainbits = 0;
3663 float elapsed;
3664 int num = 0;
3665 char key[1];
3666 BtMgr *main;
3667 BtMgr *mgr;
3668 BtKey *ptr;
3669
3670         if( argc < 3 ) {
3671                 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3672                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
3673                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
3674                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3675                 fprintf (stderr, "  page_bits is the page size in bits for the cache btree\n");
3676                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3677                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3678                 fprintf (stderr, "  recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3679                 fprintf (stderr, "  main_bits is the page size of the main btree in bits\n");
3680                 fprintf (stderr, "  main_pool is the number of main pages in the main buffer pool\n");
3681                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3682                 exit(0);
3683         }
3684
3685         start = getCpuTime(0);
3686
3687         if( argc > 4 )
3688                 bits = atoi(argv[4]);
3689
3690         if( argc > 5 )
3691                 poolsize = atoi(argv[5]);
3692
3693         if( !poolsize )
3694                 fprintf (stderr, "Warning: no mapped_pool\n");
3695
3696         if( argc > 6 )
3697                 num = atoi(argv[6]);
3698
3699         if( argc > 7 )
3700                 redopages = atoi(argv[7]);
3701
3702         if( redopages + REDO_page > 65535 )
3703                 fprintf (stderr, "Warning: Recovery buffer too large\n");
3704
3705         if( argc > 8 )
3706                 mainbits = atoi(argv[8]);
3707
3708         if( argc > 9 )
3709                 mainpool = atoi(argv[9]);
3710
3711         cnt = argc - 10;
3712 #ifdef unix
3713         threads = malloc (cnt * sizeof(pthread_t));
3714 #else
3715         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3716 #endif
3717         args = malloc ((cnt + 1) * sizeof(ThreadArg));
3718
3719         mgr = bt_mgr (argv[1], bits, poolsize, redopages);
3720
3721         if( !mgr ) {
3722                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3723                 exit (1);
3724         }
3725
3726         if( mainbits ) {
3727                 main = bt_mgr (argv[2], mainbits, mainpool, 0);
3728
3729                 if( !main ) {
3730                         fprintf(stderr, "Index Open Error %s\n", argv[2]);
3731                         exit (1);
3732                 }
3733         } else
3734                 main = NULL;
3735
3736         //      fire off threads
3737
3738         if( cnt )
3739           for( idx = 0; idx < cnt; idx++ ) {
3740                 args[idx].infile = argv[idx + 10];
3741                 args[idx].type = argv[3];
3742                 args[idx].main = main;
3743                 args[idx].mgr = mgr;
3744                 args[idx].num = num;
3745                 args[idx].idx = idx;
3746 #ifdef unix
3747                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3748                         fprintf(stderr, "Error creating thread %d\n", err);
3749 #else
3750                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3751 #endif
3752           }
3753         else {
3754                 args[0].infile = argv[idx + 10];
3755                 args[0].type = argv[3];
3756                 args[0].main = main;
3757                 args[0].mgr = mgr;
3758                 args[0].num = num;
3759                 args[0].idx = 0;
3760                 index_file (args);
3761         }
3762
3763         //      wait for termination
3764
3765 #ifdef unix
3766         for( idx = 0; idx < cnt; idx++ )
3767                 pthread_join (threads[idx], NULL);
3768 #else
3769         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3770
3771         for( idx = 0; idx < cnt; idx++ )
3772                 CloseHandle(threads[idx]);
3773 #endif
3774         bt_poolaudit(mgr);
3775
3776         if( main )
3777                 bt_poolaudit(main);
3778
3779         fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
3780
3781         if( main )
3782                 bt_mgrclose (main);
3783         bt_mgrclose (mgr);
3784
3785         elapsed = getCpuTime(0) - start;
3786         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3787         elapsed = getCpuTime(1);
3788         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3789         elapsed = getCpuTime(2);
3790         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3791 }
3792
3793 #endif  //STANDALONE