]> pd.if.org Git - btree/blob - threadskv10.c
Merge branch 'master' of https://github.com/malbrain/Btree-source-code
[btree] / threadskv10.c
1 // btree version threadskv10 futex version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      redo log for failure recovery
10 //      and LSM B-trees for write optimization
11
12 // 15 OCT 2014
13
14 // author: karl malbrain, malbrain@cal.berkeley.edu
15
16 /*
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
19
20 The orginal author is Karl Malbrain.
21
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
28 */
29
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
32
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
35
36 #ifdef linux
37 #define _GNU_SOURCE
38 #include <linux/futex.h>
39 #define SYS_futex 202
40 #endif
41
42 #ifdef unix
43 #include <unistd.h>
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <fcntl.h>
47 #include <sys/time.h>
48 #include <sys/mman.h>
49 #include <errno.h>
50 #include <pthread.h>
51 #include <limits.h>
52 #else
53 #define WIN32_LEAN_AND_MEAN
54 #include <windows.h>
55 #include <stdio.h>
56 #include <stdlib.h>
57 #include <time.h>
58 #include <fcntl.h>
59 #include <process.h>
60 #include <intrin.h>
61 #endif
62
63 #include <memory.h>
64 #include <string.h>
65 #include <stddef.h>
66
67 typedef unsigned long long      uid;
68 typedef unsigned long long      logseqno;
69
70 #ifndef unix
71 typedef unsigned long long      off64_t;
72 typedef unsigned short          ushort;
73 typedef unsigned int            uint;
74 #endif
75
76 #define BT_ro 0x6f72    // ro
77 #define BT_rw 0x7772    // rw
78
79 #define BT_maxbits              26                                      // maximum page size in bits
80 #define BT_minbits              9                                       // minimum page size in bits
81 #define BT_minpage              (1 << BT_minbits)       // minimum page size
82 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
83
84 //  BTree page number constants
85 #define ALLOC_page              0       // allocation page
86 #define ROOT_page               1       // root of the btree
87 #define LEAF_page               2       // first page of leaves
88 #define REDO_page               3       // first page of redo buffer
89
90 //      Number of levels to create in a new BTree
91
92 #define MIN_lvl                 2
93
94 /*
95 There are six lock types for each node in four independent sets: 
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
101 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
102 */
103
104 typedef enum{
105         BtLockAccess = 1,
106         BtLockDelete = 2,
107         BtLockRead   = 4,
108         BtLockWrite  = 8,
109         BtLockParent = 16,
110         BtLockAtomic = 32
111 } BtLock;
112
113 typedef struct {
114   union {
115         struct {
116           volatile ushort xlock[1];             // one writer has exclusive lock
117           volatile ushort wrt[1];               // count of other writers waiting
118         } bits[1];
119         uint value[1];
120   };
121 } BtMutexLatch;
122
123 #define XCL 1
124 #define WRT 65536
125
126 //      definition for phase-fair reader/writer lock implementation
127
128 typedef struct {
129         volatile ushort rin[1];
130         volatile ushort rout[1];
131         volatile ushort ticket[1];
132         volatile ushort serving[1];
133 } RWLock;
134
135 //      write only reentrant lock
136
137 typedef struct {
138   BtMutexLatch xcl[1];
139   union {
140         struct {
141                 volatile ushort tid[1];
142                 volatile ushort dup[1];
143         } bits[1];
144         uint value[1];
145   };
146   volatile uint waiters[1];
147 } WOLock;
148
149 #define PHID 0x1
150 #define PRES 0x2
151 #define MASK 0x3
152 #define RINC 0x4
153
154 //      mode & definition for lite latch implementation
155
156 enum {
157         QueRd = 1,              // reader queue
158         QueWr = 2               // writer queue
159 } RWQueue;
160
161 //  hash table entries
162
163 typedef struct {
164         uint entry;             // Latch table entry at head of chain
165         BtMutexLatch latch[1];
166 } BtHashEntry;
167
168 //      latch manager table structure
169
170 typedef struct {
171         uid page_no;                    // latch set page number
172         RWLock readwr[1];               // read/write page lock
173         RWLock access[1];               // Access Intent/Page delete
174         WOLock parent[1];               // Posting of fence key in parent
175         WOLock atomic[1];               // Atomic update in progress
176         uint split;                             // right split page atomic insert
177         uint next;                              // next entry in hash table chain
178         uint prev;                              // prev entry in hash table chain
179         ushort pin;                             // number of accessing threads
180         unsigned char dirty;    // page in cache is dirty (atomic setable)
181         unsigned char promote;  // page in cache is dirty (atomic setable)
182         BtMutexLatch modify[1]; // modify entry lite latch
183 } BtLatchSet;
184
185 //      Define the length of the page record numbers
186
187 #define BtId 6
188
189 //      Page key slot definition.
190
191 //      Keys are marked dead, but remain on the page until
192 //      it cleanup is called. The fence key (highest key) for
193 //      a leaf page is always present, even after cleanup.
194
195 //      Slot types
196
197 //      In addition to the Unique keys that occupy slots
198 //      there are Librarian and Duplicate key
199 //      slots occupying the key slot array.
200
201 //      The Librarian slots are dead keys that
202 //      serve as filler, available to add new Unique
203 //      or Dup slots that are inserted into the B-tree.
204
205 //      The Duplicate slots have had their key bytes extended
206 //      by 6 bytes to contain a binary duplicate key uniqueifier.
207
208 typedef enum {
209         Unique,
210         Librarian,
211         Duplicate,
212         Delete
213 } BtSlotType;
214
215 typedef struct {
216         uint off:BT_maxbits;    // page offset for key start
217         uint type:3;                    // type of slot
218         uint dead:1;                    // set for deleted slot
219 } BtSlot;
220
221 //      The key structure occupies space at the upper end of
222 //      each page.  It's a length byte followed by the key
223 //      bytes.
224
225 typedef struct {
226         unsigned char len;              // this can be changed to a ushort or uint
227         unsigned char key[0];
228 } BtKey;
229
230 //      the value structure also occupies space at the upper
231 //      end of the page. Each key is immediately followed by a value.
232
233 typedef struct {
234         unsigned char len;              // this can be changed to a ushort or uint
235         unsigned char value[0];
236 } BtVal;
237
238 #define BT_maxkey       255             // maximum number of bytes in a key
239 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
240
241 //      The first part of an index page.
242 //      It is immediately followed
243 //      by the BtSlot array of keys.
244
245 //      note that this structure size
246 //      must be a multiple of 8 bytes
247 //      in order to place PageZero correctly.
248
249 typedef struct BtPage_ {
250         uint cnt;                                       // count of keys in page
251         uint act;                                       // count of active keys
252         uint min;                                       // next key offset
253         uint garbage;                           // page garbage in bytes
254         unsigned char bits:7;           // page size in bits
255         unsigned char free:1;           // page is on free chain
256         unsigned char lvl:7;            // level of page
257         unsigned char kill:1;           // page is being deleted
258         unsigned char right[BtId];      // page number to right
259         unsigned char left[BtId];       // page number to left
260         unsigned char filler[2];        // padding to multiple of 8
261         logseqno lsn;                           // log sequence number applied
262         uid page_no;                            // this page number
263 } *BtPage;
264
265 //  The loadpage interface object
266
267 typedef struct {
268         BtPage page;            // current page pointer
269         BtLatchSet *latch;      // current page latch set
270 } BtPageSet;
271
272 //      structure for latch manager on ALLOC_page
273
274 typedef struct {
275         struct BtPage_ alloc[1];                // next page_no in right ptr
276         unsigned char freechain[BtId];  // head of free page_nos chain
277         unsigned long long activepages; // number of active pages
278         uint redopages;                                 // number of redo pages in file
279 } BtPageZero;
280
281 //      The object structure for Btree access
282
283 typedef struct {
284         uint page_size;                         // page size    
285         uint page_bits;                         // page size in bits    
286 #ifdef unix
287         int idx;
288 #else
289         HANDLE idx;
290 #endif
291         BtPageZero *pagezero;           // mapped allocation page
292         BtHashEntry *hashtable;         // the buffer pool hash table entries
293         BtLatchSet *latchsets;          // mapped latch set from buffer pool
294         unsigned char *pagepool;        // mapped to the buffer pool pages
295         unsigned char *redobuff;        // mapped recovery buffer pointer
296         logseqno lsn, flushlsn;         // current & first lsn flushed
297         BtMutexLatch redo[1];           // redo area lite latch
298         BtMutexLatch lock[1];           // allocation area lite latch
299         BtMutexLatch maps[1];           // mapping segments lite latch
300         ushort thread_no[1];            // next thread number
301         uint nlatchpage;                        // number of latch pages at BT_latch
302         uint latchtotal;                        // number of page latch entries
303         uint latchhash;                         // number of latch hash table slots
304         uint latchvictim;                       // next latch entry to examine
305         uint latchpromote;                      // next latch entry to promote
306         uint redolast;                          // last msync size of recovery buff
307         uint redoend;                           // eof/end element in recovery buff
308         int err;                                        // last error
309         int line;                                       // last error line no
310         int found;                                      // number of keys found by delete
311         int reads, writes;                      // number of reads and writes
312 #ifndef unix
313         HANDLE halloc;                          // allocation handle
314         HANDLE hpool;                           // buffer pool handle
315 #endif
316         uint segments;                          // number of memory mapped segments
317         unsigned char *pages[64000];// memory mapped segments of b-tree
318 } BtMgr;
319
320 typedef struct {
321         BtMgr *mgr;                                     // buffer manager for entire process
322         BtMgr *main;                            // buffer manager for main btree
323         BtPage cursor;                          // cached page frame for start/next
324         ushort thread_no;                       // thread number
325         unsigned char key[BT_keyarray]; // last found complete key
326 } BtDb;
327
328 //      atomic txn structures
329
330 typedef struct {
331         logseqno reqlsn;        // redo log seq no required
332         uint entry;                     // latch table entry number
333         uint slot:31;           // page slot number
334         uint reuse:1;           // reused previous page
335 } AtomicTxn;
336
337 //      Catastrophic errors
338
339 typedef enum {
340         BTERR_ok = 0,
341         BTERR_struct,
342         BTERR_ovflw,
343         BTERR_lock,
344         BTERR_map,
345         BTERR_read,
346         BTERR_wrt,
347         BTERR_atomic,
348         BTERR_recovery
349 } BTERR;
350
351 #define CLOCK_bit 0x8000
352
353 // recovery manager entry types
354
355 typedef enum {
356         BTRM_eof = 0,   // rest of buffer is emtpy
357         BTRM_add,               // add a unique key-value to btree
358         BTRM_dup,               // add a duplicate key-value to btree
359         BTRM_del,               // delete a key-value from btree
360         BTRM_upd,               // update a key with a new value
361         BTRM_new,               // allocate a new empty page
362         BTRM_old                // reuse an old empty page
363 } BTRM;
364
365 // recovery manager entry
366 //      structure followed by BtKey & BtVal
367
368 typedef struct {
369         logseqno reqlsn;        // log sequence number required
370         logseqno lsn;           // log sequence number for entry
371         uint len;                       // length of entry
372         unsigned char type;     // type of entry
373         unsigned char lvl;      // level of btree entry pertains to
374 } BtLogHdr;
375
376 // B-Tree functions
377
378 extern void bt_close (BtDb *bt);
379 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
380 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
381 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
382 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
383 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
384 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
385 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
386
387 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
388
389 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
390 extern uint bt_nextkey   (BtDb *bt, uint slot);
391 extern uint bt_prevkey (BtDb *db, uint slot);
392 extern uint bt_lastkey (BtDb *db);
393
394 //      manager functions
395 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
396 extern void bt_mgrclose (BtMgr *mgr);
397 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
398 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
399
400 //      atomic transaction functions
401 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
402 BTERR bt_txnpromote (BtDb *bt);
403
404 //  The page is allocated from low and hi ends.
405 //  The key slots are allocated from the bottom,
406 //      while the text and value of the key
407 //  are allocated from the top.  When the two
408 //  areas meet, the page is split into two.
409
410 //  A key consists of a length byte, two bytes of
411 //  index number (0 - 65535), and up to 253 bytes
412 //  of key value.
413
414 //  Associated with each key is a value byte string
415 //      containing any value desired.
416
417 //  The b-tree root is always located at page 1.
418 //      The first leaf page of level zero is always
419 //      located on page 2.
420
421 //      The b-tree pages are linked with next
422 //      pointers to facilitate enumerators,
423 //      and provide for concurrency.
424
425 //      When to root page fills, it is split in two and
426 //      the tree height is raised by a new root at page
427 //      one with two keys.
428
429 //      Deleted keys are marked with a dead bit until
430 //      page cleanup. The fence key for a leaf node is
431 //      always present
432
433 //  To achieve maximum concurrency one page is locked at a time
434 //  as the tree is traversed to find leaf key in question. The right
435 //  page numbers are used in cases where the page is being split,
436 //      or consolidated.
437
438 //  Page 0 is dedicated to lock for new page extensions,
439 //      and chains empty pages together for reuse. It also
440 //      contains the latch manager hash table.
441
442 //      The ParentModification lock on a node is obtained to serialize posting
443 //      or changing the fence key for a node.
444
445 //      Empty pages are chained together through the ALLOC page and reused.
446
447 //      Access macros to address slot and key values from the page
448 //      Page slots use 1 based indexing.
449
450 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
451 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
452 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
453
454 void bt_putid(unsigned char *dest, uid id)
455 {
456 int i = BtId;
457
458         while( i-- )
459                 dest[i] = (unsigned char)id, id >>= 8;
460 }
461
462 uid bt_getid(unsigned char *src)
463 {
464 uid id = 0;
465 int i;
466
467         for( i = 0; i < BtId; i++ )
468                 id <<= 8, id |= *src++; 
469
470         return id;
471 }
472
473 //      lite weight spin lock Latch Manager
474
475 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
476 {
477         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
478 }
479
480 void bt_mutexlock(BtMutexLatch *latch)
481 {
482 BtMutexLatch prev[1];
483 uint slept = 0;
484
485   while( 1 ) {
486         *prev->value = __sync_fetch_and_or(latch->value, XCL);
487
488         if( !*prev->bits->xlock ) {                     // did we set XCL?
489             if( slept )
490                   __sync_fetch_and_sub(latch->value, WRT);
491                 return;
492         }
493
494         if( !slept ) {
495                 *prev->bits->wrt += 1;
496                 __sync_fetch_and_add(latch->value, WRT);
497         }
498
499         sys_futex (latch->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr);
500         slept = 1;
501   }
502 }
503
504 //      try to obtain write lock
505
506 //      return 1 if obtained,
507 //              0 otherwise
508
509 int bt_mutextry(BtMutexLatch *latch)
510 {
511 BtMutexLatch prev[1];
512
513         *prev->value = __sync_fetch_and_or(latch->value, XCL);
514
515         //      take write access if exclusive bit was clear
516
517         return !*prev->bits->xlock;
518 }
519
520 //      clear write mode
521
522 void bt_releasemutex(BtMutexLatch *latch)
523 {
524 BtMutexLatch prev[1];
525
526         *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
527
528         if( *prev->bits->wrt )
529           sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
530 }
531
532 //      Write-Only Reentrant Lock
533
534 void WriteOLock (WOLock *lock, ushort tid)
535 {
536 uint prev, waited = 0;
537
538   while( 1 ) {
539         bt_mutexlock(lock->xcl);
540
541         if( waited )
542                 *lock->waiters -= 1;
543
544         if( *lock->bits->tid == tid ) {
545                 *lock->bits->dup += 1;
546                 bt_releasemutex(lock->xcl);
547                 return;
548         }
549         if( !*lock->bits->tid ) {
550                 *lock->bits->tid = tid;
551                 bt_releasemutex(lock->xcl);
552                 return;
553         }
554
555         waited = 1;
556         *lock->waiters += 1;
557         prev = *lock->value;
558
559         bt_releasemutex(lock->xcl);
560         
561         sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, prev, NULL, NULL, QueWr );
562   }
563 }
564
565 void WriteORelease (WOLock *lock)
566 {
567   bt_mutexlock(lock->xcl);
568
569   if( *lock->bits->dup ) {
570         *lock->bits->dup -= 1;
571     bt_releasemutex(lock->xcl);
572         return;
573   }
574
575   *lock->bits->tid = 0;
576
577   if( *lock->waiters )
578         sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 32768, NULL, NULL, QueWr );
579   bt_releasemutex(lock->xcl);
580 }
581
582 //      clear lock of holders and waiters
583
584 ClearWOLock (WOLock *lock)
585 {
586   while( 1 ) {
587         bt_mutexlock(lock->xcl);
588
589         if( *lock->waiters ) {
590                 bt_releasemutex(lock->xcl);
591                 sched_yield();
592                 continue;
593         }
594
595         if( *lock->bits->tid ) {
596                 bt_releasemutex(lock->xcl);
597                 sched_yield();
598                 continue;
599         }
600
601         bt_releasemutex(lock->xcl);
602         return;
603   }
604 }
605
606 //      Phase-Fair reader/writer lock implementation
607
608 void WriteLock (RWLock *lock, ushort tid)
609 {
610 ushort w, r, tix;
611
612 #ifdef unix
613         tix = __sync_fetch_and_add (lock->ticket, 1);
614 #else
615         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
616 #endif
617         // wait for our ticket to come up
618
619         while( tix != lock->serving[0] )
620 #ifdef unix
621                 sched_yield();
622 #else
623                 SwitchToThread ();
624 #endif
625
626         w = PRES | (tix & PHID);
627 #ifdef  unix
628         r = __sync_fetch_and_add (lock->rin, w);
629 #else
630         r = _InterlockedExchangeAdd16 (lock->rin, w);
631 #endif
632         while( r != *lock->rout )
633 #ifdef unix
634                 sched_yield();
635 #else
636                 SwitchToThread();
637 #endif
638 }
639
640 void WriteRelease (RWLock *lock)
641 {
642 #ifdef unix
643         __sync_fetch_and_and (lock->rin, ~MASK);
644 #else
645         _InterlockedAnd16 (lock->rin, ~MASK);
646 #endif
647         lock->serving[0]++;
648 }
649
650 //      try to obtain read lock
651 //  return 1 if successful
652
653 int ReadTry (RWLock *lock, ushort tid)
654 {
655 ushort w;
656
657 #ifdef unix
658         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
659 #else
660         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
661 #endif
662         if( w )
663           if( w == (*lock->rin & MASK) ) {
664 #ifdef unix
665                 __sync_fetch_and_add (lock->rin, -RINC);
666 #else
667                 _InterlockedExchangeAdd16 (lock->rin, -RINC);
668 #endif
669                 return 0;
670           }
671
672         return 1;
673 }
674
675 void ReadLock (RWLock *lock, ushort tid)
676 {
677 ushort w;
678
679 #ifdef unix
680         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
681 #else
682         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
683 #endif
684         if( w )
685           while( w == (*lock->rin & MASK) )
686 #ifdef unix
687                 sched_yield ();
688 #else
689                 SwitchToThread ();
690 #endif
691 }
692
693 void ReadRelease (RWLock *lock)
694 {
695 #ifdef unix
696         __sync_fetch_and_add (lock->rout, RINC);
697 #else
698         _InterlockedExchangeAdd16 (lock->rout, RINC);
699 #endif
700 }
701
702 //      recovery manager -- flush dirty pages
703
704 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
705 {
706 uint cnt3 = 0, cnt2 = 0, cnt = 0;
707 uint entry, segment;
708 BtLatchSet *latch;
709 BtPage page;
710
711   //    flush dirty pool pages to the btree
712
713 fprintf(stderr, "Start flushlsn  ");
714   for( entry = 1; entry < mgr->latchtotal; entry++ ) {
715         page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
716         latch = mgr->latchsets + entry;
717         bt_mutexlock (latch->modify);
718         bt_lockpage(BtLockRead, latch, thread_no);
719
720         if( latch->dirty ) {
721           bt_writepage(mgr, page, latch->page_no);
722           latch->dirty = 0, cnt++;
723         }
724 if( latch->pin & ~CLOCK_bit )
725 cnt2++;
726         bt_unlockpage(BtLockRead, latch);
727         bt_releasemutex (latch->modify);
728   }
729 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
730 fprintf(stderr, "begin sync");
731         for( segment = 0; segment < mgr->segments; segment++ )
732           if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
733                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
734 fprintf(stderr, " end sync\n");
735 }
736
737 //      recovery manager -- process current recovery buff on startup
738 //      this won't do much if previous session was properly closed.
739
740 BTERR bt_recoveryredo (BtMgr *mgr)
741 {
742 BtLogHdr *hdr, *eof;
743 uint offset = 0;
744 BtKey *key;
745 BtVal *val;
746
747   hdr = (BtLogHdr *)mgr->redobuff;
748   mgr->flushlsn = hdr->lsn;
749
750   while( 1 ) {
751         hdr = (BtLogHdr *)(mgr->redobuff + offset);
752         switch( hdr->type ) {
753         case BTRM_eof:
754                 mgr->lsn = hdr->lsn;
755                 return 0;
756         case BTRM_add:          // add a unique key-value to btree
757         
758         case BTRM_dup:          // add a duplicate key-value to btree
759         case BTRM_del:          // delete a key-value from btree
760         case BTRM_upd:          // update a key with a new value
761         case BTRM_new:          // allocate a new empty page
762         case BTRM_old:          // reuse an old empty page
763                 return 0;
764         }
765   }
766 }
767
768 //      recovery manager -- append new entry to recovery log
769 //      flush dirty pages to disk when it overflows.
770
771 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
772 {
773 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
774 uint amt = sizeof(BtLogHdr);
775 BtLogHdr *hdr, *eof;
776 uint last, end;
777
778         bt_mutexlock (mgr->redo);
779
780         if( key )
781           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
782
783         //      see if new entry fits in buffer
784         //      flush and reset if it doesn't
785
786         if( amt > size - mgr->redoend ) {
787           mgr->flushlsn = mgr->lsn;
788           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
789                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
790           mgr->redolast = 0;
791           mgr->redoend = 0;
792           bt_flushlsn(mgr, thread_no);
793         }
794
795         //      fill in new entry & either eof or end block
796
797         hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
798
799         hdr->len = amt;
800         hdr->type = type;
801         hdr->lvl = lvl;
802         hdr->lsn = ++mgr->lsn;
803
804         mgr->redoend += amt;
805
806         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
807         memset (eof, 0, sizeof(BtLogHdr));
808
809         //  fill in key and value
810
811         if( key ) {
812           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
813           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
814         }
815
816         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
817         memset (eof, 0, sizeof(BtLogHdr));
818         eof->lsn = mgr->lsn;
819
820         last = mgr->redolast & ~0xfff;
821         end = mgr->redoend;
822
823         if( end - last + sizeof(BtLogHdr) >= 32768 )
824           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
825                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
826           else
827                 mgr->redolast = end;
828
829         bt_releasemutex(mgr->redo);
830         return hdr->lsn;
831 }
832
833 //      recovery manager -- append transaction to recovery log
834 //      flush dirty pages to disk when it overflows.
835
836 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
837 {
838 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
839 uint amt = 0, src, type;
840 BtLogHdr *hdr, *eof;
841 uint last, end;
842 logseqno lsn;
843 BtKey *key;
844 BtVal *val;
845
846         //      determine amount of redo recovery log space required
847
848         for( src = 0; src++ < source->cnt; ) {
849           key = keyptr(source,src);
850           val = valptr(source,src);
851           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
852           amt += sizeof(BtLogHdr);
853         }
854
855         bt_mutexlock (mgr->redo);
856
857         //      see if new entry fits in buffer
858         //      flush and reset if it doesn't
859
860         if( amt > size - mgr->redoend ) {
861           mgr->flushlsn = mgr->lsn;
862           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
863                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
864           mgr->redolast = 0;
865           mgr->redoend = 0;
866           bt_flushlsn (mgr, thread_no);
867         }
868
869         //      assign new lsn to transaction
870
871         lsn = ++mgr->lsn;
872
873         //      fill in new entries
874
875         for( src = 0; src++ < source->cnt; ) {
876           key = keyptr(source, src);
877           val = valptr(source, src);
878
879           switch( slotptr(source, src)->type ) {
880           case Unique:
881                 type = BTRM_add;
882                 break;
883           case Duplicate:
884                 type = BTRM_dup;
885                 break;
886           case Delete:
887                 type = BTRM_del;
888                 break;
889           }
890
891           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
892           amt += sizeof(BtLogHdr);
893
894           hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
895           hdr->len = amt;
896           hdr->type = type;
897           hdr->lsn = lsn;
898           hdr->lvl = 0;
899
900           //  fill in key and value
901
902           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
903           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
904
905           mgr->redoend += amt;
906         }
907
908         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
909         memset (eof, 0, sizeof(BtLogHdr));
910         eof->lsn = lsn;
911
912         last = mgr->redolast & ~0xfff;
913         end = mgr->redoend;
914
915         if( end - last + sizeof(BtLogHdr) >= 32768 )
916           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
917                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
918           else
919                 mgr->redolast = end;
920
921         bt_releasemutex(mgr->redo);
922         return lsn;
923 }
924
925 //      sync a single btree page to disk
926
927 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
928 {
929 uint segment = latch->page_no >> 16;
930 BtPage perm;
931
932         if( bt_writepage (mgr, page, latch->page_no) )
933                 return mgr->err;
934
935         perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
936
937         if( msync (perm, mgr->page_size, MS_SYNC) < 0 )
938           fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
939
940         latch->dirty = 0;
941         return 0;
942 }
943
944 //      read page into buffer pool from permanent location in Btree file
945
946 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
947 {
948 int flag = PROT_READ | PROT_WRITE;
949 uint segment = page_no >> 16;
950 BtPage perm;
951
952   while( 1 ) {
953         if( segment < mgr->segments ) {
954           perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
955 if( perm->page_no != page_no )
956 abort();
957                 memcpy (page, perm, mgr->page_size);
958                 mgr->reads++;
959                 return 0;
960         }
961
962         bt_mutexlock (mgr->maps);
963
964         if( segment < mgr->segments ) {
965                 bt_releasemutex (mgr->maps);
966                 continue;
967         }
968
969         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
970         mgr->segments++;
971
972         bt_releasemutex (mgr->maps);
973   }
974 }
975
976 //      write page to permanent location in Btree file
977 //      clear the dirty bit
978
979 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
980 {
981 int flag = PROT_READ | PROT_WRITE;
982 uint segment = page_no >> 16;
983 BtPage perm;
984
985   while( 1 ) {
986         if( segment < mgr->segments ) {
987           perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
988 if( page_no > LEAF_page && perm->page_no != page_no)
989 abort();
990                 memcpy (perm, page, mgr->page_size);
991                 mgr->writes++;
992                 return 0;
993         }
994
995         bt_mutexlock (mgr->maps);
996
997         if( segment < mgr->segments ) {
998                 bt_releasemutex (mgr->maps);
999                 continue;
1000         }
1001
1002         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
1003         bt_releasemutex (mgr->maps);
1004         mgr->segments++;
1005   }
1006 }
1007
1008 //      set CLOCK bit in latch
1009 //      decrement pin count
1010
1011 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
1012 {
1013         bt_mutexlock(latch->modify);
1014         latch->pin |= CLOCK_bit;
1015         latch->pin--;
1016
1017         bt_releasemutex(latch->modify);
1018 }
1019
1020 //  return the btree cached page address
1021
1022 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
1023 {
1024 uid entry = latch - mgr->latchsets;
1025 BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
1026
1027   return page;
1028 }
1029
1030 //  return next available latch entry
1031 //        and with latch entry locked
1032
1033 uint bt_availnext (BtMgr *mgr)
1034 {
1035 BtLatchSet *latch;
1036 uint entry;
1037
1038   while( 1 ) {
1039 #ifdef unix
1040         entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
1041 #else
1042         entry = _InterlockedIncrement (&mgr->latchvictim);
1043 #endif
1044         entry %= mgr->latchtotal;
1045
1046         if( !entry )
1047                 continue;
1048
1049         latch = mgr->latchsets + entry;
1050
1051         if( !bt_mutextry(latch->modify) )
1052                 continue;
1053
1054         //  return this entry if it is not pinned
1055
1056         if( !latch->pin )
1057                 return entry;
1058
1059         //      if the CLOCK bit is set
1060         //      reset it to zero.
1061
1062         latch->pin &= ~CLOCK_bit;
1063         bt_releasemutex(latch->modify);
1064   }
1065 }
1066
1067 //      pin page in buffer pool
1068 //      return with latchset pinned
1069
1070 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
1071 {
1072 uint hashidx = page_no % mgr->latchhash;
1073 BtLatchSet *latch;
1074 uint entry, idx;
1075 BtPage page;
1076
1077   //  try to find our entry
1078
1079   bt_mutexlock(mgr->hashtable[hashidx].latch);
1080
1081   if( entry = mgr->hashtable[hashidx].entry ) do
1082   {
1083         latch = mgr->latchsets + entry;
1084         if( page_no == latch->page_no )
1085                 break;
1086   } while( entry = latch->next );
1087
1088   //  found our entry: increment pin
1089
1090   if( entry ) {
1091         latch = mgr->latchsets + entry;
1092         bt_mutexlock(latch->modify);
1093         latch->pin |= CLOCK_bit;
1094         latch->pin++;
1095 if(contents)
1096 abort();
1097         bt_releasemutex(latch->modify);
1098         bt_releasemutex(mgr->hashtable[hashidx].latch);
1099         return latch;
1100   }
1101
1102   //  find and reuse unpinned entry
1103
1104 trynext:
1105
1106   entry = bt_availnext (mgr);
1107   latch = mgr->latchsets + entry;
1108
1109   idx = latch->page_no % mgr->latchhash;
1110
1111   //  if latch is on a different hash chain
1112   //    unlink from the old page_no chain
1113
1114   if( latch->page_no )
1115    if( idx != hashidx ) {
1116
1117         //  skip over this entry if latch not available
1118
1119     if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1120           bt_releasemutex(latch->modify);
1121           goto trynext;
1122         }
1123
1124         if( latch->prev )
1125           mgr->latchsets[latch->prev].next = latch->next;
1126         else
1127           mgr->hashtable[idx].entry = latch->next;
1128
1129         if( latch->next )
1130           mgr->latchsets[latch->next].prev = latch->prev;
1131
1132     bt_releasemutex (mgr->hashtable[idx].latch);
1133    }
1134
1135   page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1136
1137   //  update permanent page area in btree from buffer pool
1138   //    no read-lock is required since page is not pinned.
1139
1140   if( latch->dirty )
1141         if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
1142           return mgr->line = __LINE__, NULL;
1143         else
1144           latch->dirty = 0;
1145
1146   if( contents ) {
1147         memcpy (page, contents, mgr->page_size);
1148         latch->dirty = 1;
1149   } else if( bt_readpage (mgr, page, page_no) )
1150         return mgr->line = __LINE__, NULL;
1151
1152   //  link page as head of hash table chain
1153   //  if this is a never before used entry,
1154   //  or it was previously on a different
1155   //  hash table chain. Otherwise, just
1156   //  leave it in its current hash table
1157   //  chain position.
1158
1159   if( !latch->page_no || hashidx != idx ) {
1160         if( latch->next = mgr->hashtable[hashidx].entry )
1161           mgr->latchsets[latch->next].prev = entry;
1162
1163         mgr->hashtable[hashidx].entry = entry;
1164     latch->prev = 0;
1165   }
1166
1167   //  fill in latch structure
1168
1169   latch->pin = CLOCK_bit | 1;
1170   latch->page_no = page_no;
1171   latch->split = 0;
1172
1173   bt_releasemutex (latch->modify);
1174   bt_releasemutex (mgr->hashtable[hashidx].latch);
1175   return latch;
1176 }
1177   
1178 void bt_mgrclose (BtMgr *mgr)
1179 {
1180 BtLatchSet *latch;
1181 BtLogHdr *eof;
1182 uint num = 0;
1183 BtPage page;
1184 uint slot;
1185
1186         //      flush previously written dirty pages
1187         //      and write recovery buffer to disk
1188
1189         fdatasync (mgr->idx);
1190
1191         if( mgr->redoend ) {
1192                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1193                 memset (eof, 0, sizeof(BtLogHdr));
1194         }
1195
1196         //      write remaining dirty pool pages to the btree
1197
1198         for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1199                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1200                 latch = mgr->latchsets + slot;
1201
1202                 if( latch->dirty ) {
1203                         bt_writepage(mgr, page, latch->page_no);
1204                         latch->dirty = 0, num++;
1205                 }
1206         }
1207
1208         //      clear redo recovery buffer on disk.
1209
1210         if( mgr->pagezero->redopages ) {
1211                 eof = (BtLogHdr *)mgr->redobuff;
1212                 memset (eof, 0, sizeof(BtLogHdr));
1213                 eof->lsn = mgr->lsn;
1214                 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1215                   fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1216         }
1217
1218         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1219
1220 #ifdef unix
1221         while( mgr->segments )
1222                 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1223
1224         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1225         munmap (mgr->pagezero, mgr->page_size);
1226 #else
1227         FlushViewOfFile(mgr->pagezero, 0);
1228         UnmapViewOfFile(mgr->pagezero);
1229         UnmapViewOfFile(mgr->pagepool);
1230         CloseHandle(mgr->halloc);
1231         CloseHandle(mgr->hpool);
1232 #endif
1233 #ifdef unix
1234         close (mgr->idx);
1235         free (mgr);
1236 #else
1237         VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1238         FlushFileBuffers(mgr->idx);
1239         CloseHandle(mgr->idx);
1240         GlobalFree (mgr);
1241 #endif
1242 }
1243
1244 //      close and release memory
1245
1246 void bt_close (BtDb *bt)
1247 {
1248 #ifdef unix
1249         if( bt->cursor )
1250                 free (bt->cursor);
1251 #else
1252         if( bt->cursor)
1253                 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1254 #endif
1255         free (bt);
1256 }
1257
1258 //  open/create new btree buffer manager
1259
1260 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1261 //              size of page pool (e.g. 262144)
1262
1263 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1264 {
1265 uint lvl, attr, last, slot, idx;
1266 uint nlatchpage, latchhash;
1267 unsigned char value[BtId];
1268 int flag, initit = 0;
1269 BtPageZero *pagezero;
1270 BtLatchSet *latch;
1271 off64_t size;
1272 uint amt[1];
1273 BtMgr* mgr;
1274 BtKey* key;
1275 BtVal *val;
1276
1277         // determine sanity of page size and buffer pool
1278
1279         if( bits > BT_maxbits )
1280                 bits = BT_maxbits;
1281         else if( bits < BT_minbits )
1282                 bits = BT_minbits;
1283 #ifdef unix
1284         mgr = calloc (1, sizeof(BtMgr));
1285
1286         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1287
1288         if( mgr->idx == -1 ) {
1289                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1290                 return free(mgr), NULL;
1291         }
1292 #else
1293         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1294         attr = FILE_ATTRIBUTE_NORMAL;
1295         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1296
1297         if( mgr->idx == INVALID_HANDLE_VALUE ) {
1298                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1299                 return GlobalFree(mgr), NULL;
1300         }
1301 #endif
1302
1303 #ifdef unix
1304         pagezero = valloc (BT_maxpage);
1305         *amt = 0;
1306
1307         // read minimum page size to get root info
1308         //      to support raw disk partition files
1309         //      check if bits == 0 on the disk.
1310
1311         if( size = lseek (mgr->idx, 0L, 2) )
1312                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1313                         if( pagezero->alloc->bits )
1314                                 bits = pagezero->alloc->bits;
1315                         else
1316                                 initit = 1;
1317                 else
1318                         return free(mgr), free(pagezero), NULL;
1319         else
1320                 initit = 1;
1321 #else
1322         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1323         size = GetFileSize(mgr->idx, amt);
1324
1325         if( size || *amt ) {
1326                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1327                         return bt_mgrclose (mgr), NULL;
1328                 bits = pagezero->alloc->bits;
1329         } else
1330                 initit = 1;
1331 #endif
1332
1333         mgr->page_size = 1 << bits;
1334         mgr->page_bits = bits;
1335
1336         //  calculate number of latch hash table entries
1337
1338         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1339
1340         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1341         mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1342         mgr->latchtotal = nodemax;
1343
1344         if( !initit )
1345                 goto mgrlatch;
1346
1347         // initialize an empty b-tree with latch page, root page, page of leaves
1348         // and page(s) of latches and page pool cache
1349
1350         memset (pagezero, 0, 1 << bits);
1351         pagezero->alloc->lvl = MIN_lvl - 1;
1352         pagezero->alloc->bits = mgr->page_bits;
1353         pagezero->redopages = redopages;
1354
1355         bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1);
1356         pagezero->activepages = 2;
1357
1358         //  initialize left-most LEAF page in
1359         //      alloc->left and count of active leaf pages.
1360
1361         bt_putid (pagezero->alloc->left, LEAF_page);
1362         ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits);
1363
1364         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1365                 fprintf (stderr, "Unable to create btree page zero\n");
1366                 return bt_mgrclose (mgr), NULL;
1367         }
1368
1369         memset (pagezero, 0, 1 << bits);
1370         pagezero->alloc->bits = mgr->page_bits;
1371
1372         for( lvl=MIN_lvl; lvl--; ) {
1373         BtSlot *node = slotptr(pagezero->alloc, 1);
1374                 node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1375                 key = keyptr(pagezero->alloc, 1);
1376                 key->len = 2;           // create stopper key
1377                 key->key[0] = 0xff;
1378                 key->key[1] = 0xff;
1379
1380                 bt_putid(value, MIN_lvl - lvl + 1);
1381                 val = valptr(pagezero->alloc, 1);
1382                 val->len = lvl ? BtId : 0;
1383                 memcpy (val->value, value, val->len);
1384
1385                 pagezero->alloc->min = node->off;
1386                 pagezero->alloc->lvl = lvl;
1387                 pagezero->alloc->cnt = 1;
1388                 pagezero->alloc->act = 1;
1389                 pagezero->alloc->page_no = MIN_lvl - lvl;
1390
1391                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1392                         fprintf (stderr, "Unable to create btree page\n");
1393                         return bt_mgrclose (mgr), NULL;
1394                 }
1395         }
1396
1397 mgrlatch:
1398 #ifdef unix
1399         free (pagezero);
1400 #else
1401         VirtualFree (pagezero, 0, MEM_RELEASE);
1402 #endif
1403 #ifdef unix
1404         // mlock the first segment of 64K pages
1405
1406         flag = PROT_READ | PROT_WRITE;
1407         mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1408         mgr->segments = 1;
1409
1410         if( mgr->pages[0] == MAP_FAILED ) {
1411                 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1412                 return bt_mgrclose (mgr), NULL;
1413         }
1414
1415         mgr->pagezero = (BtPageZero *)mgr->pages[0];
1416         mlock (mgr->pagezero, mgr->page_size);
1417
1418         mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size;
1419         mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1420
1421         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1422         if( mgr->pagepool == MAP_FAILED ) {
1423                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1424                 return bt_mgrclose (mgr), NULL;
1425         }
1426 #else
1427         flag = PAGE_READWRITE;
1428         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1429         if( !mgr->halloc ) {
1430                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1431                 return bt_mgrclose (mgr), NULL;
1432         }
1433
1434         flag = FILE_MAP_WRITE;
1435         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1436         if( !mgr->pagezero ) {
1437                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1438                 return bt_mgrclose (mgr), NULL;
1439         }
1440
1441         flag = PAGE_READWRITE;
1442         size = (uid)mgr->nlatchpage << mgr->page_bits;
1443         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1444         if( !mgr->hpool ) {
1445                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1446                 return bt_mgrclose (mgr), NULL;
1447         }
1448
1449         flag = FILE_MAP_WRITE;
1450         mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1451         if( !mgr->pagepool ) {
1452                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1453                 return bt_mgrclose (mgr), NULL;
1454         }
1455 #endif
1456
1457         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1458         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1459         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1460
1461         return mgr;
1462 }
1463
1464 //      open BTree access method
1465 //      based on buffer manager
1466
1467 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1468 {
1469 BtDb *bt = malloc (sizeof(*bt));
1470
1471         memset (bt, 0, sizeof(*bt));
1472         bt->main = main;
1473         bt->mgr = mgr;
1474 #ifdef unix
1475         bt->cursor = valloc (mgr->page_size);
1476 #else
1477         bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1478 #endif
1479 #ifdef unix
1480         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1481 #else
1482         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1483 #endif
1484         return bt;
1485 }
1486
1487 //  compare two keys, return > 0, = 0, or < 0
1488 //  =0: keys are same
1489 //  -1: key2 > key1
1490 //  +1: key2 < key1
1491 //  as the comparison value
1492
1493 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1494 {
1495 uint len1 = key1->len;
1496 int ans;
1497
1498         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1499                 return ans;
1500
1501         if( len1 > len2 )
1502                 return 1;
1503         if( len1 < len2 )
1504                 return -1;
1505
1506         return 0;
1507 }
1508
1509 // place write, read, or parent lock on requested page_no.
1510
1511 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1512 {
1513         switch( mode ) {
1514         case BtLockRead:
1515                 ReadLock (latch->readwr, thread_no);
1516                 break;
1517         case BtLockWrite:
1518                 WriteLock (latch->readwr, thread_no);
1519                 break;
1520         case BtLockAccess:
1521                 ReadLock (latch->access, thread_no);
1522                 break;
1523         case BtLockDelete:
1524                 WriteLock (latch->access, thread_no);
1525                 break;
1526         case BtLockParent:
1527                 WriteOLock (latch->parent, thread_no);
1528                 break;
1529         case BtLockAtomic:
1530                 WriteOLock (latch->atomic, thread_no);
1531                 break;
1532         case BtLockAtomic | BtLockRead:
1533                 WriteOLock (latch->atomic, thread_no);
1534                 ReadLock (latch->readwr, thread_no);
1535                 break;
1536         case BtLockAtomic | BtLockWrite:
1537                 WriteOLock (latch->atomic, thread_no);
1538                 WriteLock (latch->readwr, thread_no);
1539                 break;
1540         }
1541 }
1542
1543 // remove write, read, or parent lock on requested page
1544
1545 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1546 {
1547         switch( mode ) {
1548         case BtLockRead:
1549                 ReadRelease (latch->readwr);
1550                 break;
1551         case BtLockWrite:
1552                 WriteRelease (latch->readwr);
1553                 break;
1554         case BtLockAccess:
1555                 ReadRelease (latch->access);
1556                 break;
1557         case BtLockDelete:
1558                 WriteRelease (latch->access);
1559                 break;
1560         case BtLockParent:
1561                 WriteORelease (latch->parent);
1562                 break;
1563         case BtLockAtomic:
1564                 WriteORelease (latch->atomic);
1565                 break;
1566         case BtLockAtomic | BtLockRead:
1567                 WriteORelease (latch->atomic);
1568                 ReadRelease (latch->readwr);
1569                 break;
1570         case BtLockAtomic | BtLockWrite:
1571                 WriteORelease (latch->atomic);
1572                 WriteRelease (latch->readwr);
1573                 break;
1574         }
1575 }
1576
1577 //      allocate a new page
1578 //      return with page latched, but unlocked.
1579
1580 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1581 {
1582 uid page_no;
1583 int blk;
1584
1585         //      lock allocation page
1586
1587         bt_mutexlock(mgr->lock);
1588
1589         // use empty chain first
1590         // else allocate new page
1591
1592         if( page_no = bt_getid(mgr->pagezero->freechain) ) {
1593                 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1594                         set->page = bt_mappage (mgr, set->latch);
1595                 else
1596                         return mgr->line = __LINE__, mgr->err = BTERR_struct;
1597
1598                 mgr->pagezero->activepages++;
1599                 bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
1600
1601                 //  the page is currently free and this
1602                 //  will keep bt_txnpromote out.
1603
1604                 //      contents will replace this bit
1605                 //  and pin will keep bt_txnpromote out
1606
1607                 contents->page_no = page_no;
1608                 set->latch->dirty = 1;
1609
1610                 memcpy (set->page, contents, mgr->page_size);
1611
1612                 if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1613                   fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1614
1615                 bt_releasemutex(mgr->lock);
1616                 return 0;
1617         }
1618
1619         page_no = bt_getid(mgr->pagezero->alloc->right);
1620         bt_putid(mgr->pagezero->alloc->right, page_no+1);
1621
1622         // unlock allocation latch and
1623         //      extend file into new page.
1624
1625         mgr->pagezero->activepages++;
1626         if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1627           fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1628         bt_releasemutex(mgr->lock);
1629
1630         //      keep bt_txnpromote out of this page
1631
1632         contents->free = 1;
1633         contents->page_no = page_no;
1634         pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
1635
1636         //      don't load cache from btree page, load it from contents
1637
1638         if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1639                 set->page = bt_mappage (mgr, set->latch);
1640         else
1641                 return mgr->err;
1642
1643         // now pin will keep bt_txnpromote out
1644
1645         set->page->free = 0;
1646         return 0;
1647 }
1648
1649 //  find slot in page for given key at a given level
1650
1651 int bt_findslot (BtPage page, unsigned char *key, uint len)
1652 {
1653 uint diff, higher = page->cnt, low = 1, slot;
1654 uint good = 0;
1655
1656         //        make stopper key an infinite fence value
1657
1658         if( bt_getid (page->right) )
1659                 higher++;
1660         else
1661                 good++;
1662
1663         //      low is the lowest candidate.
1664         //  loop ends when they meet
1665
1666         //  higher is already
1667         //      tested as .ge. the passed key.
1668
1669         while( diff = higher - low ) {
1670                 slot = low + ( diff >> 1 );
1671                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1672                         low = slot + 1;
1673                 else
1674                         higher = slot, good++;
1675         }
1676
1677         //      return zero if key is on right link page
1678
1679         return good ? higher : 0;
1680 }
1681
1682 //  find and load page at given level for given key
1683 //      leave page rd or wr locked as requested
1684
1685 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1686 {
1687 uid page_no = ROOT_page, prevpage_no = 0;
1688 uint drill = 0xff, slot;
1689 BtLatchSet *prevlatch;
1690 uint mode, prevmode;
1691 BtPage prevpage;
1692 BtVal *val;
1693
1694   //  start at root of btree and drill down
1695
1696   do {
1697         // determine lock mode of drill level
1698         mode = (drill == lvl) ? lock : BtLockRead; 
1699
1700         if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1701           return 0;
1702
1703         // obtain access lock using lock chaining with Access mode
1704
1705         if( page_no > ROOT_page )
1706           bt_lockpage(BtLockAccess, set->latch, thread_no);
1707
1708         set->page = bt_mappage (mgr, set->latch);
1709 if( set->latch->promote )
1710 abort();
1711
1712         //      release & unpin parent or left sibling page
1713
1714         if( prevpage_no ) {
1715           bt_unlockpage(prevmode, prevlatch);
1716           bt_unpinlatch (mgr, prevlatch);
1717           prevpage_no = 0;
1718         }
1719
1720         // obtain mode lock using lock chaining through AccessLock
1721
1722         bt_lockpage(mode, set->latch, thread_no);
1723
1724         if( set->page->free )
1725                 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1726
1727         if( page_no > ROOT_page )
1728           bt_unlockpage(BtLockAccess, set->latch);
1729
1730         // re-read and re-lock root after determining actual level of root
1731
1732         if( set->page->lvl != drill) {
1733                 if( set->latch->page_no != ROOT_page )
1734                         return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1735                         
1736                 drill = set->page->lvl;
1737
1738                 if( lock != BtLockRead && drill == lvl ) {
1739                   bt_unlockpage(mode, set->latch);
1740                   bt_unpinlatch (mgr, set->latch);
1741                   continue;
1742                 }
1743         }
1744
1745         prevpage_no = set->latch->page_no;
1746         prevlatch = set->latch;
1747         prevpage = set->page;
1748         prevmode = mode;
1749
1750         //  find key on page at this level
1751         //  and descend to requested level
1752
1753         if( !set->page->kill )
1754          if( slot = bt_findslot (set->page, key, len) ) {
1755           if( drill == lvl )
1756                 return slot;
1757
1758           // find next non-dead slot -- the fence key if nothing else
1759
1760           while( slotptr(set->page, slot)->dead )
1761                 if( slot++ < set->page->cnt )
1762                   continue;
1763                 else
1764                   return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1765
1766           val = valptr(set->page, slot);
1767
1768           if( val->len == BtId )
1769                 page_no = bt_getid(valptr(set->page, slot)->value);
1770           else
1771                 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1772
1773           drill--;
1774           continue;
1775          }
1776
1777         //  slide right into next page
1778
1779         page_no = bt_getid(set->page->right);
1780   } while( page_no );
1781
1782   // return error on end of right chain
1783
1784   mgr->line = __LINE__, mgr->err = BTERR_struct;
1785   return 0;     // return error
1786 }
1787
1788 //      return page to free list
1789 //      page must be delete & write locked
1790 //      and have no keys pointing to it.
1791
1792 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1793 {
1794         //      lock allocation page
1795
1796         bt_mutexlock (mgr->lock);
1797
1798         //      store chain
1799
1800         memcpy(set->page->right, mgr->pagezero->freechain, BtId);
1801         bt_putid(mgr->pagezero->freechain, set->latch->page_no);
1802         set->latch->promote = 0;
1803         set->latch->dirty = 1;
1804         set->page->free = 1;
1805
1806         // decrement active page count
1807
1808         mgr->pagezero->activepages--;
1809
1810         if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1811           fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1812
1813         // unlock released page
1814         // and unlock allocation page
1815
1816         bt_unlockpage (BtLockDelete, set->latch);
1817         bt_unlockpage (BtLockWrite, set->latch);
1818         bt_unpinlatch (mgr, set->latch);
1819         bt_releasemutex (mgr->lock);
1820 }
1821
1822 //      a fence key was deleted from a page
1823 //      push new fence value upwards
1824
1825 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1826 {
1827 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1828 unsigned char value[BtId];
1829 BtKey* ptr;
1830 uint idx;
1831
1832         //      remove the old fence value
1833
1834         ptr = keyptr(set->page, set->page->cnt);
1835         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1836         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1837         set->latch->dirty = 1;
1838
1839         //  cache new fence value
1840
1841         ptr = keyptr(set->page, set->page->cnt);
1842         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1843
1844         bt_lockpage (BtLockParent, set->latch, thread_no);
1845         bt_unlockpage (BtLockWrite, set->latch);
1846
1847         //      insert new (now smaller) fence key
1848
1849         bt_putid (value, set->latch->page_no);
1850         ptr = (BtKey*)leftkey;
1851
1852         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1853           return mgr->err;
1854
1855         //      now delete old fence key
1856
1857         ptr = (BtKey*)rightkey;
1858
1859         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1860                 return mgr->err;
1861
1862         bt_unlockpage (BtLockParent, set->latch);
1863         bt_unpinlatch(mgr, set->latch);
1864         return 0;
1865 }
1866
1867 //      root has a single child
1868 //      collapse a level from the tree
1869
1870 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1871 {
1872 BtPageSet child[1];
1873 uid page_no;
1874 BtVal *val;
1875 uint idx;
1876
1877   // find the child entry and promote as new root contents
1878
1879   do {
1880         for( idx = 0; idx++ < root->page->cnt; )
1881           if( !slotptr(root->page, idx)->dead )
1882                 break;
1883
1884         val = valptr(root->page, idx);
1885
1886         if( val->len == BtId )
1887                 page_no = bt_getid (valptr(root->page, idx)->value);
1888         else
1889                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1890
1891         if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1892                 child->page = bt_mappage (mgr, child->latch);
1893         else
1894                 return mgr->err;
1895
1896         bt_lockpage (BtLockDelete, child->latch, thread_no);
1897         bt_lockpage (BtLockWrite, child->latch, thread_no);
1898
1899         memcpy (root->page, child->page, mgr->page_size);
1900         root->latch->dirty = 1;
1901
1902         bt_freepage (mgr, child);
1903
1904   } while( root->page->lvl > 1 && root->page->act == 1 );
1905
1906   bt_unlockpage (BtLockWrite, root->latch);
1907   bt_unpinlatch (mgr, root->latch);
1908   return 0;
1909 }
1910
1911 //  delete a page and manage keys
1912 //  call with page writelocked
1913
1914 //      returns with page removed
1915 //      from the page pool.
1916
1917 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, int delkey)
1918 {
1919 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1920 unsigned char value[BtId];
1921 uint lvl = set->page->lvl;
1922 BtPageSet right[1];
1923 uid page_no;
1924 BtKey *ptr;
1925
1926         //      cache copy of fence key
1927         //      to remove in parent
1928
1929         ptr = keyptr(set->page, set->page->cnt);
1930         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1931
1932         //      obtain lock on right page
1933
1934         page_no = bt_getid(set->page->right);
1935
1936         if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1937                 right->page = bt_mappage (mgr, right->latch);
1938         else
1939                 return 0;
1940
1941         bt_lockpage (BtLockWrite, right->latch, thread_no);
1942
1943         // cache copy of key to update
1944
1945         ptr = keyptr(right->page, right->page->cnt);
1946         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1947
1948         if( right->page->kill )
1949                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1950
1951         // pull contents of right peer into our empty page
1952
1953         memcpy (set->page, right->page, mgr->page_size);
1954         set->page->page_no = set->latch->page_no;
1955         set->latch->dirty = 1;
1956
1957         // mark right page deleted and point it to left page
1958         //      until we can post parent updates that remove access
1959         //      to the deleted page.
1960
1961         bt_putid (right->page->right, set->latch->page_no);
1962         right->latch->dirty = 1;
1963         right->page->kill = 1;
1964
1965         bt_lockpage (BtLockParent, right->latch, thread_no);
1966         bt_unlockpage (BtLockWrite, right->latch);
1967
1968         bt_lockpage (BtLockParent, set->latch, thread_no);
1969         bt_unlockpage (BtLockWrite, set->latch);
1970
1971         // redirect higher key directly to our new node contents
1972
1973         bt_putid (value, set->latch->page_no);
1974         ptr = (BtKey*)higherfence;
1975
1976         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1977           return mgr->err;
1978
1979         //      delete old lower key to our node
1980
1981         ptr = (BtKey*)lowerfence;
1982
1983         if( delkey )
1984          if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1985           return mgr->err;
1986
1987         //      obtain delete and write locks to right node
1988
1989         bt_unlockpage (BtLockParent, right->latch);
1990         bt_lockpage (BtLockDelete, right->latch, thread_no);
1991         bt_lockpage (BtLockWrite, right->latch, thread_no);
1992         bt_freepage (mgr, right);
1993
1994         bt_unlockpage (BtLockParent, set->latch);
1995         bt_unpinlatch (mgr, set->latch);
1996         return 0;
1997 }
1998
1999 //  find and delete key on page by marking delete flag bit
2000 //  if page becomes empty, delete it from the btree
2001
2002 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2003 {
2004 uint slot, idx, found, fence;
2005 BtPageSet set[1];
2006 BtSlot *node;
2007 BtKey *ptr;
2008 BtVal *val;
2009
2010         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2011                 node = slotptr(set->page, slot);
2012                 ptr = keyptr(set->page, slot);
2013         } else
2014                 return mgr->err;
2015
2016         // if librarian slot, advance to real slot
2017
2018         if( node->type == Librarian ) {
2019                 ptr = keyptr(set->page, ++slot);
2020                 node = slotptr(set->page, slot);
2021         }
2022
2023         fence = slot == set->page->cnt;
2024
2025         // delete the key, ignore request if already dead
2026
2027         if( found = !keycmp (ptr, key, len) )
2028           if( found = node->dead == 0 ) {
2029                 val = valptr(set->page,slot);
2030                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2031                 set->page->act--;
2032
2033                 //  mark node type as delete
2034
2035                 node->type = Delete;
2036                 node->dead = 1;
2037
2038                 // collapse empty slots beneath the fence
2039                 // on interiour nodes
2040
2041                 if( lvl )
2042                  while( idx = set->page->cnt - 1 )
2043                   if( slotptr(set->page, idx)->dead ) {
2044                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2045                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2046                   } else
2047                         break;
2048           }
2049
2050         if( !found )
2051                 return 0;
2052
2053         //      did we delete a fence key in an upper level?
2054
2055         if( lvl && set->page->act && fence )
2056           if( bt_fixfence (mgr, set, lvl, thread_no) )
2057                 return mgr->err;
2058           else
2059                 return 0;
2060
2061         //      do we need to collapse root?
2062
2063         if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2064           if( bt_collapseroot (mgr, set, thread_no) )
2065                 return mgr->err;
2066           else
2067                 return 0;
2068
2069         //      delete empty page
2070
2071         if( !set->page->act )
2072           return bt_deletepage (mgr, set, thread_no, 1);
2073
2074         set->latch->dirty = 1;
2075         bt_unlockpage(BtLockWrite, set->latch);
2076         bt_unpinlatch (mgr, set->latch);
2077         return 0;
2078 }
2079
2080 //      advance to next slot
2081
2082 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2083 {
2084 BtLatchSet *prevlatch;
2085 uid page_no;
2086
2087         if( slot < set->page->cnt )
2088                 return slot + 1;
2089
2090         prevlatch = set->latch;
2091
2092         if( page_no = bt_getid(set->page->right) )
2093           if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2094                 set->page = bt_mappage (bt->mgr, set->latch);
2095           else
2096                 return 0;
2097         else
2098           return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2099
2100         // obtain access lock using lock chaining with Access mode
2101
2102         bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2103
2104         bt_unlockpage(BtLockRead, prevlatch);
2105         bt_unpinlatch (bt->mgr, prevlatch);
2106
2107         bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2108         bt_unlockpage(BtLockAccess, set->latch);
2109         return 1;
2110 }
2111
2112 //      find unique key == given key, or first duplicate key in
2113 //      leaf level and return number of value bytes
2114 //      or (-1) if not found.
2115
2116 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2117 {
2118 BtPageSet set[1];
2119 uint len, slot;
2120 int ret = -1;
2121 BtKey *ptr;
2122 BtVal *val;
2123
2124   if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2125    do {
2126         ptr = keyptr(set->page, slot);
2127
2128         //      skip librarian slot place holder
2129
2130         if( slotptr(set->page, slot)->type == Librarian )
2131                 ptr = keyptr(set->page, ++slot);
2132
2133         //      return actual key found
2134
2135         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2136         len = ptr->len;
2137
2138         if( slotptr(set->page, slot)->type == Duplicate )
2139                 len -= BtId;
2140
2141         //      not there if we reach the stopper key
2142
2143         if( slot == set->page->cnt )
2144           if( !bt_getid (set->page->right) )
2145                 break;
2146
2147         // if key exists, return >= 0 value bytes copied
2148         //      otherwise return (-1)
2149
2150         if( slotptr(set->page, slot)->dead )
2151                 continue;
2152
2153         if( keylen == len )
2154           if( !memcmp (ptr->key, key, len) ) {
2155                 val = valptr (set->page,slot);
2156                 if( valmax > val->len )
2157                         valmax = val->len;
2158                 memcpy (value, val->value, valmax);
2159                 ret = valmax;
2160           }
2161
2162         break;
2163
2164    } while( slot = bt_findnext (bt, set, slot) );
2165
2166   bt_unlockpage (BtLockRead, set->latch);
2167   bt_unpinlatch (bt->mgr, set->latch);
2168   return ret;
2169 }
2170
2171 //      check page for space available,
2172 //      clean if necessary and return
2173 //      0 - page needs splitting
2174 //      >0  new slot value
2175
2176 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2177 {
2178 BtPage page = set->page, frame;
2179 uint nxt = mgr->page_size;
2180 uint cnt = 0, idx = 0;
2181 uint max = page->cnt;
2182 uint newslot = max;
2183 BtKey *key;
2184 BtVal *val;
2185
2186         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2187                 return slot;
2188
2189         //      skip cleanup and proceed to split
2190         //      if there's not enough garbage
2191         //      to bother with.
2192
2193         if( page->garbage < nxt / 5 )
2194                 return 0;
2195
2196         frame = malloc (mgr->page_size);
2197         memcpy (frame, page, mgr->page_size);
2198
2199         // skip page info and set rest of page to zero
2200
2201         memset (page+1, 0, mgr->page_size - sizeof(*page));
2202         set->latch->dirty = 1;
2203
2204         page->garbage = 0;
2205         page->act = 0;
2206
2207         // clean up page first by
2208         // removing deleted keys
2209
2210         while( cnt++ < max ) {
2211                 if( cnt == slot )
2212                         newslot = idx + 2;
2213
2214                 if( cnt < max || frame->lvl )
2215                   if( slotptr(frame,cnt)->dead )
2216                         continue;
2217
2218                 // copy the value across
2219
2220                 val = valptr(frame, cnt);
2221                 nxt -= val->len + sizeof(BtVal);
2222                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2223
2224                 // copy the key across
2225
2226                 key = keyptr(frame, cnt);
2227                 nxt -= key->len + sizeof(BtKey);
2228                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2229
2230                 // make a librarian slot
2231
2232                 slotptr(page, ++idx)->off = nxt;
2233                 slotptr(page, idx)->type = Librarian;
2234                 slotptr(page, idx)->dead = 1;
2235
2236                 // set up the slot
2237
2238                 slotptr(page, ++idx)->off = nxt;
2239                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2240
2241                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2242                         page->act++;
2243         }
2244
2245         page->min = nxt;
2246         page->cnt = idx;
2247         free (frame);
2248
2249         //      see if page has enough space now, or does it need splitting?
2250
2251         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2252                 return newslot;
2253
2254         return 0;
2255 }
2256
2257 // split the root and raise the height of the btree
2258
2259 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2260 {  
2261 unsigned char leftkey[BT_keyarray];
2262 uint nxt = mgr->page_size;
2263 unsigned char value[BtId];
2264 BtPageSet left[1];
2265 uid left_page_no;
2266 BtPage frame;
2267 BtKey *ptr;
2268 BtVal *val;
2269
2270         frame = malloc (mgr->page_size);
2271         memcpy (frame, root->page, mgr->page_size);
2272
2273         //      save left page fence key for new root
2274
2275         ptr = keyptr(root->page, root->page->cnt);
2276         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2277
2278         //  Obtain an empty page to use, and copy the current
2279         //  root contents into it, e.g. lower keys
2280
2281         if( bt_newpage(mgr, left, frame, page_no) )
2282                 return mgr->err;
2283
2284         left_page_no = left->latch->page_no;
2285         bt_unpinlatch (mgr, left->latch);
2286         free (frame);
2287
2288         // preserve the page info at the bottom
2289         // of higher keys and set rest to zero
2290
2291         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2292
2293         // insert stopper key at top of newroot page
2294         // and increase the root height
2295
2296         nxt -= BtId + sizeof(BtVal);
2297         bt_putid (value, right->page_no);
2298         val = (BtVal *)((unsigned char *)root->page + nxt);
2299         memcpy (val->value, value, BtId);
2300         val->len = BtId;
2301
2302         nxt -= 2 + sizeof(BtKey);
2303         slotptr(root->page, 2)->off = nxt;
2304         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2305         ptr->len = 2;
2306         ptr->key[0] = 0xff;
2307         ptr->key[1] = 0xff;
2308
2309         // insert lower keys page fence key on newroot page as first key
2310
2311         nxt -= BtId + sizeof(BtVal);
2312         bt_putid (value, left_page_no);
2313         val = (BtVal *)((unsigned char *)root->page + nxt);
2314         memcpy (val->value, value, BtId);
2315         val->len = BtId;
2316
2317         ptr = (BtKey *)leftkey;
2318         nxt -= ptr->len + sizeof(BtKey);
2319         slotptr(root->page, 1)->off = nxt;
2320         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2321         
2322         bt_putid(root->page->right, 0);
2323         root->page->min = nxt;          // reset lowest used offset and key count
2324         root->page->cnt = 2;
2325         root->page->act = 2;
2326         root->page->lvl++;
2327
2328         mgr->pagezero->alloc->lvl = root->page->lvl;
2329
2330         // release and unpin root pages
2331
2332         bt_unlockpage(BtLockWrite, root->latch);
2333         bt_unpinlatch (mgr, root->latch);
2334
2335         bt_unpinlatch (mgr, right);
2336         return 0;
2337 }
2338
2339 //  split already locked full node
2340 //      leave it locked.
2341 //      return pool entry for new right
2342 //      page, pinned & unlocked
2343
2344 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2345 {
2346 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2347 BtPage frame = malloc (mgr->page_size);
2348 uint lvl = set->page->lvl;
2349 BtPageSet right[1];
2350 BtKey *key, *ptr;
2351 BtVal *val, *src;
2352 uid right2;
2353 uint prev;
2354
2355         //  split higher half of keys to frame
2356
2357         memset (frame, 0, mgr->page_size);
2358         max = set->page->cnt;
2359         cnt = max / 2;
2360         idx = 0;
2361
2362         while( cnt++ < max ) {
2363                 if( cnt < max || set->page->lvl )
2364                   if( slotptr(set->page, cnt)->dead )
2365                         continue;
2366
2367                 src = valptr(set->page, cnt);
2368                 nxt -= src->len + sizeof(BtVal);
2369                 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2370
2371                 key = keyptr(set->page, cnt);
2372                 nxt -= key->len + sizeof(BtKey);
2373                 ptr = (BtKey*)((unsigned char *)frame + nxt);
2374                 memcpy (ptr, key, key->len + sizeof(BtKey));
2375
2376                 //      add librarian slot
2377
2378                 slotptr(frame, ++idx)->off = nxt;
2379                 slotptr(frame, idx)->type = Librarian;
2380                 slotptr(frame, idx)->dead = 1;
2381
2382                 //  add actual slot
2383
2384                 slotptr(frame, ++idx)->off = nxt;
2385                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2386
2387                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2388                         frame->act++;
2389         }
2390
2391         frame->bits = mgr->page_bits;
2392         frame->min = nxt;
2393         frame->cnt = idx;
2394         frame->lvl = lvl;
2395
2396         // link right node
2397
2398         if( set->latch->page_no > ROOT_page )
2399                 bt_putid (frame->right, bt_getid (set->page->right));
2400
2401         //      get new free page and write higher keys to it.
2402
2403         if( bt_newpage(mgr, right, frame, thread_no) )
2404                 return 0;
2405
2406         // process lower keys
2407
2408         memcpy (frame, set->page, mgr->page_size);
2409         memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2410         set->latch->dirty = 1;
2411
2412         nxt = mgr->page_size;
2413         set->page->garbage = 0;
2414         set->page->act = 0;
2415         max /= 2;
2416         cnt = 0;
2417         idx = 0;
2418
2419         if( slotptr(frame, max)->type == Librarian )
2420                 max--;
2421
2422         //  assemble page of smaller keys
2423
2424         while( cnt++ < max ) {
2425                 if( slotptr(frame, cnt)->dead )
2426                         continue;
2427                 val = valptr(frame, cnt);
2428                 nxt -= val->len + sizeof(BtVal);
2429                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2430
2431                 key = keyptr(frame, cnt);
2432                 nxt -= key->len + sizeof(BtKey);
2433                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2434
2435                 //      add librarian slot
2436
2437                 slotptr(set->page, ++idx)->off = nxt;
2438                 slotptr(set->page, idx)->type = Librarian;
2439                 slotptr(set->page, idx)->dead = 1;
2440
2441                 //      add actual slot
2442
2443                 slotptr(set->page, ++idx)->off = nxt;
2444                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2445                 set->page->act++;
2446         }
2447
2448         bt_putid(set->page->right, right->latch->page_no);
2449         set->page->min = nxt;
2450         set->page->cnt = idx;
2451         free(frame);
2452
2453         return right->latch - mgr->latchsets;
2454 }
2455
2456 //      fix keys for newly split page
2457 //      call with both pages pinned & locked
2458 //  return unlocked and unpinned
2459
2460 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2461 {
2462 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2463 unsigned char value[BtId];
2464 uint lvl = set->page->lvl;
2465 BtPage page;
2466 BtKey *ptr;
2467
2468         // if current page is the root page, split it
2469
2470         if( set->latch->page_no == ROOT_page )
2471                 return bt_splitroot (mgr, set, right, thread_no);
2472
2473         ptr = keyptr(set->page, set->page->cnt);
2474         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2475
2476         page = bt_mappage (mgr, right);
2477
2478         ptr = keyptr(page, page->cnt);
2479         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2480
2481         // insert new fences in their parent pages
2482
2483         bt_lockpage (BtLockParent, right, thread_no);
2484
2485         bt_lockpage (BtLockParent, set->latch, thread_no);
2486         bt_unlockpage (BtLockWrite, set->latch);
2487
2488         // insert new fence for reformulated left block of smaller keys
2489
2490         bt_putid (value, set->latch->page_no);
2491         ptr = (BtKey *)leftkey;
2492
2493         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2494                 return mgr->err;
2495
2496         // switch fence for right block of larger keys to new right page
2497
2498         bt_putid (value, right->page_no);
2499         ptr = (BtKey *)rightkey;
2500
2501         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2502                 return mgr->err;
2503
2504         bt_unlockpage (BtLockParent, set->latch);
2505         bt_unpinlatch (mgr, set->latch);
2506
2507         bt_unlockpage (BtLockParent, right);
2508         bt_unpinlatch (mgr, right);
2509         return 0;
2510 }
2511
2512 //      install new key and value onto page
2513 //      page must already be checked for
2514 //      adequate space
2515
2516 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2517 {
2518 uint idx, librarian;
2519 BtSlot *node;
2520 BtKey *ptr;
2521 BtVal *val;
2522 int rate;
2523
2524         //      if found slot > desired slot and previous slot
2525         //      is a librarian slot, use it
2526
2527         if( slot > 1 )
2528           if( slotptr(set->page, slot-1)->type == Librarian )
2529                 slot--;
2530
2531         // copy value onto page
2532
2533         set->page->min -= vallen + sizeof(BtVal);
2534         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2535         memcpy (val->value, value, vallen);
2536         val->len = vallen;
2537
2538         // copy key onto page
2539
2540         set->page->min -= keylen + sizeof(BtKey);
2541         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2542         memcpy (ptr->key, key, keylen);
2543         ptr->len = keylen;
2544         
2545         //      find first empty slot
2546
2547         for( idx = slot; idx < set->page->cnt; idx++ )
2548           if( slotptr(set->page, idx)->dead )
2549                 break;
2550
2551         // now insert key into array before slot,
2552         //      adding as many librarian slots as
2553         //      makes sense.
2554
2555         if( idx == set->page->cnt ) {
2556         int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2557
2558                 librarian = ++idx - slot;
2559                 avail /= sizeof(BtSlot);
2560
2561                 if( avail < 0 )
2562                         avail = 0;
2563
2564                 if( librarian > avail )
2565                         librarian = avail;
2566
2567                 if( librarian ) {
2568                         rate = (idx - slot) / librarian;
2569                         set->page->cnt += librarian;
2570                         idx += librarian;
2571                 } else
2572                         rate = 0;
2573         } else
2574                 librarian = 0, rate = 0;
2575
2576         while( idx > slot ) {
2577                 //  transfer slot
2578                 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2579                 idx--;
2580
2581                 //      add librarian slot per rate
2582
2583                 if( librarian )
2584                  if( (idx - slot + 1)/2 <= librarian * rate ) {
2585 //                if( rate && !(idx % rate) ) {
2586                         node = slotptr(set->page, idx--);
2587                         node->off = node[1].off;
2588                         node->type = Librarian;
2589                         node->dead = 1;
2590                         librarian--;
2591                   }
2592         }
2593 if(librarian)
2594 abort();
2595         set->latch->dirty = 1;
2596         set->page->act++;
2597
2598         //      fill in new slot
2599
2600         node = slotptr(set->page, slot);
2601         node->off = set->page->min;
2602         node->type = type;
2603         node->dead = 0;
2604
2605         if( release ) {
2606                 bt_unlockpage (BtLockWrite, set->latch);
2607                 bt_unpinlatch (mgr, set->latch);
2608         }
2609
2610         return 0;
2611 }
2612
2613 //  Insert new key into the btree at given level.
2614 //      either add a new key or update/add an existing one
2615
2616 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2617 {
2618 uint slot, idx, len, entry;
2619 BtPageSet set[1];
2620 BtSlot *node;
2621 BtKey *ptr;
2622 BtVal *val;
2623
2624   while( 1 ) { // find the page and slot for the current key
2625         if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2626                 node = slotptr(set->page, slot);
2627                 ptr = keyptr(set->page, slot);
2628         } else {
2629                 if( !mgr->err )
2630                         mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2631                 return mgr->err;
2632         }
2633
2634         // if librarian slot == found slot, advance to real slot
2635
2636         if( node->type == Librarian )
2637           if( !keycmp (ptr, key, keylen) ) {
2638                 ptr = keyptr(set->page, ++slot);
2639                 node = slotptr(set->page, slot);
2640           }
2641
2642         //  if inserting a duplicate key or unique
2643         //      key that doesn't exist on the page,
2644         //      check for adequate space on the page
2645         //      and insert the new key before slot.
2646
2647         switch( type ) {
2648         case Unique:
2649         case Duplicate:
2650           if( keycmp (ptr, key, keylen) )
2651            if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2652             return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
2653            else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2654                 return mgr->err;
2655            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2656                 return mgr->err;
2657            else
2658                 continue;
2659
2660           // if key already exists, update value and return
2661
2662           val = valptr(set->page, slot);
2663
2664           if( val->len >= vallen ) {
2665                 if( slotptr(set->page, slot)->dead )
2666                         set->page->act++;
2667                 node->type = type;
2668                 node->dead = 0;
2669
2670                 set->page->garbage += val->len - vallen;
2671                 set->latch->dirty = 1;
2672                 val->len = vallen;
2673                 memcpy (val->value, value, vallen);
2674                 bt_unlockpage(BtLockWrite, set->latch);
2675                 bt_unpinlatch (mgr, set->latch);
2676                 return 0;
2677           }
2678
2679           //  new update value doesn't fit in existing value area
2680           //    make sure page has room
2681
2682           if( !node->dead )
2683                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2684           else
2685                 set->page->act++;
2686
2687           node->type = type;
2688           node->dead = 0;
2689
2690           if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2691            if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2692                 return mgr->err;
2693            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2694                 return mgr->err;
2695            else
2696                 continue;
2697
2698           //  copy key and value onto page and update slot
2699
2700           set->page->min -= vallen + sizeof(BtVal);
2701           val = (BtVal*)((unsigned char *)set->page + set->page->min);
2702           memcpy (val->value, value, vallen);
2703           val->len = vallen;
2704
2705           set->latch->dirty = 1;
2706           set->page->min -= keylen + sizeof(BtKey);
2707           ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2708           memcpy (ptr->key, key, keylen);
2709           ptr->len = keylen;
2710         
2711           node->off = set->page->min;
2712           bt_unlockpage(BtLockWrite, set->latch);
2713           bt_unpinlatch (mgr, set->latch);
2714           return 0;
2715     }
2716   }
2717   return 0;
2718 }
2719
2720 //      determine actual page where key is located
2721 //  return slot number
2722
2723 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2724 {
2725 BtKey *key = keyptr(source,src);
2726 uint slot = locks[src].slot;
2727 uint entry;
2728
2729         if( src > 1 && locks[src].reuse )
2730           entry = locks[src-1].entry, slot = 0;
2731         else
2732           entry = locks[src].entry;
2733
2734         if( slot ) {
2735                 set->latch = mgr->latchsets + entry;
2736                 set->page = bt_mappage (mgr, set->latch);
2737                 return slot;
2738         }
2739
2740         //      is locks->reuse set? or was slot zeroed?
2741         //      if so, find where our key is located 
2742         //      on current page or pages split on
2743         //      same page txn operations.
2744
2745         do {
2746                 set->latch = mgr->latchsets + entry;
2747                 set->page = bt_mappage (mgr, set->latch);
2748
2749                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2750                   if( slotptr(set->page, slot)->type == Librarian )
2751                         slot++;
2752                   if( locks[src].reuse )
2753                         locks[src].entry = entry;
2754                   return slot;
2755                 }
2756         } while( entry = set->latch->split );
2757
2758         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2759         return 0;
2760 }
2761
2762 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2763 {
2764 BtKey *key = keyptr(source, src);
2765 BtVal *val = valptr(source, src);
2766 BtLatchSet *latch;
2767 BtPageSet set[1];
2768 uint entry, slot;
2769
2770   while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2771         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2772           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2773                 return mgr->err;
2774           set->page->lsn = lsn;
2775           return 0;
2776         }
2777
2778         //  split page
2779
2780         if( entry = bt_splitpage (mgr, set, thread_no) )
2781           latch = mgr->latchsets + entry;
2782         else
2783           return mgr->err;
2784
2785         //      splice right page into split chain
2786         //      and WriteLock it
2787
2788         bt_lockpage(BtLockWrite, latch, thread_no);
2789         latch->split = set->latch->split;
2790         set->latch->split = entry;
2791         locks[src].slot = 0;
2792   }
2793
2794   return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2795 }
2796
2797 //      perform delete from smaller btree
2798 //  insert a delete slot if not found there
2799
2800 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2801 {
2802 BtKey *key = keyptr(source, src);
2803 BtPageSet set[1];
2804 uint idx, slot;
2805 BtSlot *node;
2806 BtKey *ptr;
2807 BtVal *val;
2808
2809         if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2810           node = slotptr(set->page, slot);
2811           ptr = keyptr(set->page, slot);
2812           val = valptr(set->page, slot);
2813         } else
2814           return mgr->line = __LINE__, mgr->err = BTERR_struct;
2815
2816         //  if slot is not found, insert a delete slot
2817
2818         if( keycmp (ptr, key->key, key->len) )
2819           return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
2820
2821         //      if node is already dead,
2822         //      ignore the request.
2823
2824         if( node->dead )
2825                 return 0;
2826
2827         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2828         set->latch->dirty = 1;
2829         set->page->lsn = lsn;
2830         set->page->act--;
2831
2832         node->dead = 0;
2833         __sync_fetch_and_add(&mgr->found, 1);
2834         return 0;
2835 }
2836
2837 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2838 {
2839 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2840 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2841
2842         return keycmp (key1, key2->key, key2->len);
2843 }
2844 //      atomic modification of a batch of keys.
2845
2846 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2847 {
2848 uint src, idx, slot, samepage, entry, que = 0;
2849 BtKey *key, *ptr, *key2;
2850 int result = 0;
2851 BtSlot temp[1];
2852 logseqno lsn;
2853 int type;
2854
2855   // stable sort the list of keys into order to
2856   //    prevent deadlocks between threads.
2857 /*
2858   for( src = 1; src++ < source->cnt; ) {
2859         *temp = *slotptr(source,src);
2860         key = keyptr (source,src);
2861
2862         for( idx = src; --idx; ) {
2863           key2 = keyptr (source,idx);
2864           if( keycmp (key, key2->key, key2->len) < 0 ) {
2865                 *slotptr(source,idx+1) = *slotptr(source,idx);
2866                 *slotptr(source,idx) = *temp;
2867           } else
2868                 break;
2869         }
2870   }
2871 */
2872         qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2873   //  add entries to redo log
2874
2875   if( bt->mgr->pagezero->redopages )
2876         lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2877   else
2878         lsn = 0;
2879
2880   //  perform the individual actions in the transaction
2881
2882   if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2883         return bt->mgr->err;
2884
2885   // if number of active pages
2886   // is greater than the buffer pool
2887   // promote page into larger btree
2888
2889   if( bt->main )
2890    while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
2891         if( bt_txnpromote (bt) )
2892           return bt->mgr->err;
2893
2894   // return success
2895
2896   return 0;
2897 }
2898
2899 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2900 {
2901 uint src, idx, slot, samepage, entry, que = 0;
2902 BtPageSet set[1], prev[1];
2903 unsigned char value[BtId];
2904 BtLatchSet *latch;
2905 AtomicTxn *locks;
2906 BtKey *key, *ptr;
2907 BtPage page;
2908 BtVal *val;
2909 uid right;
2910
2911   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2912
2913   // Load the leaf page for each key
2914   // group same page references with reuse bit
2915   // and determine any constraint violations
2916
2917   for( src = 0; src++ < source->cnt; ) {
2918         key = keyptr(source, src);
2919         slot = 0;
2920
2921         // first determine if this modification falls
2922         // on the same page as the previous modification
2923         //      note that the far right leaf page is a special case
2924
2925         if( samepage = src > 1 )
2926           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2927                 slot = bt_findslot(set->page, key->key, key->len);
2928
2929         if( !slot )
2930           if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockAtomic, thread_no) )
2931                 set->latch->split = 0;
2932           else
2933                 return mgr->err;
2934
2935         if( slotptr(set->page, slot)->type == Librarian )
2936           ptr = keyptr(set->page, ++slot);
2937         else
2938           ptr = keyptr(set->page, slot);
2939
2940         if( !samepage ) {
2941           locks[src].entry = set->latch - mgr->latchsets;
2942           locks[src].slot = slot;
2943           locks[src].reuse = 0;
2944         } else {
2945           locks[src].entry = 0;
2946           locks[src].slot = 0;
2947           locks[src].reuse = 1;
2948         }
2949
2950         //      capture current lsn for master page
2951
2952         locks[src].reqlsn = set->page->lsn;
2953   }
2954
2955   //  obtain write lock for each master page
2956
2957   for( src = 0; src++ < source->cnt; ) {
2958         if( locks[src].reuse )
2959           continue;
2960
2961         set->latch = mgr->latchsets + locks[src].entry;
2962         bt_lockpage (BtLockWrite, set->latch, thread_no);
2963   }
2964
2965   // insert or delete each key
2966   // process any splits or merges
2967   // run through txn list backwards
2968
2969   samepage = source->cnt + 1;
2970
2971   for( src = source->cnt; src; src-- ) {
2972         if( locks[src].reuse )
2973           continue;
2974
2975         //  perform the txn operations
2976         //      from smaller to larger on
2977         //  the same page
2978
2979         for( idx = src; idx < samepage; idx++ )
2980          switch( slotptr(source,idx)->type ) {
2981          case Delete:
2982           if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
2983                 return mgr->err;
2984           break;
2985
2986         case Duplicate:
2987         case Unique:
2988           if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
2989                 return mgr->err;
2990           break;
2991
2992         default:
2993           bt_atomicpage (mgr, source, locks, idx, set);
2994           break;
2995         }
2996
2997         //      after the same page operations have finished,
2998         //  process master page for splits or deletion.
2999
3000         latch = prev->latch = mgr->latchsets + locks[src].entry;
3001         prev->page = bt_mappage (mgr, prev->latch);
3002         samepage = src;
3003
3004         //  pick-up all splits from master page
3005         //      each one is already pinned & WriteLocked.
3006
3007         if( entry = prev->latch->split ) do {
3008           set->latch = mgr->latchsets + entry;
3009           set->page = bt_mappage (mgr, set->latch);
3010
3011           // delete empty master page by undoing its split
3012           //  (this is potentially another empty page)
3013           //  note that there are no pointers to it yet
3014
3015           if( !prev->page->act ) {
3016                 memcpy (set->page->left, prev->page->left, BtId);
3017                 memcpy (prev->page, set->page, mgr->page_size);
3018                 bt_lockpage (BtLockDelete, set->latch, thread_no);
3019                 prev->latch->split = set->latch->split;
3020                 prev->latch->dirty = 1;
3021                 bt_freepage (mgr, set);
3022                 continue;
3023           }
3024
3025           // remove empty split page from the split chain
3026           // and return it to the free list. No other
3027           // thread has its page number yet.
3028
3029           if( !set->page->act ) {
3030                 memcpy (prev->page->right, set->page->right, BtId);
3031                 prev->latch->split = set->latch->split;
3032
3033                 bt_lockpage (BtLockDelete, set->latch, thread_no);
3034                 bt_freepage (mgr, set);
3035                 continue;
3036           }
3037
3038           //  update prev's fence key
3039
3040           ptr = keyptr(prev->page,prev->page->cnt);
3041           bt_putid (value, prev->latch->page_no);
3042
3043           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3044                 return mgr->err;
3045
3046           // splice in the left link into the split page
3047
3048           bt_putid (set->page->left, prev->latch->page_no);
3049
3050           if( lsm )
3051                 bt_syncpage (mgr, prev->page, prev->latch);
3052
3053           // page is unpinned below to avoid bt_txnpromote
3054
3055           bt_unlockpage(BtLockWrite, prev->latch);
3056           *prev = *set;
3057         } while( entry = prev->latch->split );
3058
3059         //  update left pointer in next right page from last split page
3060         //      (if all splits were reversed or none occurred, latch->split == 0)
3061
3062         if( latch->split ) {
3063           //  fix left pointer in master's original (now split)
3064           //  far right sibling or set rightmost page in page zero
3065
3066           if( right = bt_getid (prev->page->right) ) {
3067                 if( set->latch = bt_pinlatch (mgr, right, NULL, thread_no) )
3068                   set->page = bt_mappage (mgr, set->latch);
3069                 else
3070                   return mgr->err;
3071
3072             bt_lockpage (BtLockWrite, set->latch, thread_no);
3073             bt_putid (set->page->left, prev->latch->page_no);
3074                 set->latch->dirty = 1;
3075
3076             bt_unlockpage (BtLockWrite, set->latch);
3077                 bt_unpinlatch (mgr, set->latch);
3078           } else {      // prev is rightmost page
3079             bt_mutexlock (mgr->lock);
3080                 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3081             bt_releasemutex(mgr->lock);
3082           }
3083
3084           //  Process last page split in chain
3085           //  by switching the key from the master
3086           //  page to the last split.
3087
3088           ptr = keyptr(prev->page,prev->page->cnt);
3089           bt_putid (value, prev->latch->page_no);
3090
3091           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3092                 return mgr->err;
3093
3094           bt_unlockpage(BtLockWrite, prev->latch);
3095
3096           if( lsm )
3097                 bt_syncpage (mgr, prev->page, prev->latch);
3098
3099           bt_unlockpage(BtLockAtomic, latch);
3100           bt_unpinlatch(mgr, latch);
3101
3102           // go through the list of splits and
3103           // release the latch pins
3104
3105           while( entry = latch->split ) {
3106                 latch = mgr->latchsets + entry;
3107                 bt_unpinlatch(mgr, latch);
3108           }
3109
3110           continue;
3111         }
3112
3113         //      since there are no splits, we're
3114         //  finished if master page occupied
3115
3116         if( prev->page->act ) {
3117           bt_unlockpage(BtLockAtomic, prev->latch);
3118           bt_unlockpage(BtLockWrite, prev->latch);
3119
3120           if( lsm )
3121                 bt_syncpage (mgr, prev->page, prev->latch);
3122
3123           bt_unpinlatch(mgr, prev->latch);
3124           continue;
3125         }
3126
3127         // any and all splits were reversed, and the
3128         // master page located in prev is empty, delete it
3129
3130         if( bt_deletepage (mgr, prev, thread_no, 1) )
3131                 return mgr->err;
3132   }
3133
3134   free (locks);
3135   return 0;
3136 }
3137
3138 //  promote a page into the larger btree
3139
3140 BTERR bt_txnpromote (BtDb *bt)
3141 {
3142 uint entry, slot, idx;
3143 BtPageSet set[1];
3144 BtSlot *node;
3145 BtKey *ptr;
3146 BtVal *val;
3147
3148   while( 1 ) {
3149 #ifdef unix
3150         entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
3151 #else
3152         entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
3153 #endif
3154         entry %= bt->mgr->latchtotal;
3155
3156         if( !entry )
3157                 continue;
3158
3159         set->latch = bt->mgr->latchsets + entry;
3160
3161         if( !bt_mutextry(set->latch->modify) )
3162                 continue;
3163
3164         //  skip this entry if it is pinned
3165
3166         if( set->latch->pin & ~CLOCK_bit ) {
3167           bt_releasemutex(set->latch->modify);
3168           continue;
3169         }
3170
3171         set->page = bt_mappage (bt->mgr, set->latch);
3172
3173         // entry never used or has no right sibling
3174
3175         if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3176           bt_releasemutex(set->latch->modify);
3177           continue;
3178         }
3179
3180         // entry interiour node or being killed or promoted
3181
3182         if( set->page->lvl || set->page->free || set->page->kill ) {
3183           bt_releasemutex(set->latch->modify);
3184           continue;
3185         }
3186
3187         //  pin the page for our useage
3188
3189         set->latch->pin++;
3190         set->latch->promote = 1;
3191         bt_releasemutex(set->latch->modify);
3192
3193         bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3194
3195         //  remove the key for the page
3196         //      and wait for other threads to fade away
3197
3198         ptr = keyptr(set->page, set->page->cnt);
3199
3200         if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3201                 return bt->mgr->err;
3202
3203         bt_unlockpage (BtLockWrite, set->latch);
3204 while( (set->latch->pin & ~CLOCK_bit) > 1 )
3205 sched_yield();
3206         bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
3207         bt_lockpage (BtLockAtomic, set->latch, bt->thread_no);
3208         bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3209
3210         // transfer slots in our selected page to larger btree
3211 if( !(set->latch->page_no % 100) )
3212 fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act);
3213
3214         if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) )
3215                 return bt->main->err;
3216
3217         //  now delete the page
3218
3219         bt_unlockpage (BtLockDelete, set->latch);
3220         bt_unlockpage (BtLockAtomic, set->latch);
3221         return bt_deletepage (bt->mgr, set, bt->thread_no, 0);
3222   }
3223 }
3224
3225 //      set cursor to highest slot on highest page
3226
3227 uint bt_lastkey (BtDb *bt)
3228 {
3229 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3230 BtPageSet set[1];
3231
3232         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3233                 set->page = bt_mappage (bt->mgr, set->latch);
3234         else
3235                 return 0;
3236
3237     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3238         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3239     bt_unlockpage(BtLockRead, set->latch);
3240         bt_unpinlatch (bt->mgr, set->latch);
3241         return bt->cursor->cnt;
3242 }
3243
3244 //      return previous slot on cursor page
3245
3246 uint bt_prevkey (BtDb *bt, uint slot)
3247 {
3248 uid cursor_page = bt->cursor->page_no;
3249 uid ourright, next, us = cursor_page;
3250 BtPageSet set[1];
3251
3252         if( --slot )
3253                 return slot;
3254
3255         ourright = bt_getid(bt->cursor->right);
3256
3257 goleft:
3258         if( !(next = bt_getid(bt->cursor->left)) )
3259                 return 0;
3260
3261 findourself:
3262         cursor_page = next;
3263
3264         if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3265                 set->page = bt_mappage (bt->mgr, set->latch);
3266         else
3267                 return 0;
3268
3269     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3270         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3271         bt_unlockpage(BtLockRead, set->latch);
3272         bt_unpinlatch (bt->mgr, set->latch);
3273         
3274         next = bt_getid (bt->cursor->right);
3275
3276         if( bt->cursor->kill )
3277                 goto findourself;
3278
3279         if( next != us )
3280           if( next == ourright )
3281                 goto goleft;
3282           else
3283                 goto findourself;
3284
3285         return bt->cursor->cnt;
3286 }
3287
3288 //  return next slot on cursor page
3289 //  or slide cursor right into next page
3290
3291 uint bt_nextkey (BtDb *bt, uint slot)
3292 {
3293 BtPageSet set[1];
3294 uid right;
3295
3296   do {
3297         right = bt_getid(bt->cursor->right);
3298
3299         while( slot++ < bt->cursor->cnt )
3300           if( slotptr(bt->cursor,slot)->dead )
3301                 continue;
3302           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3303                 return slot;
3304           else
3305                 break;
3306
3307         if( !right )
3308                 break;
3309
3310         if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3311                 set->page = bt_mappage (bt->mgr, set->latch);
3312         else
3313                 return 0;
3314
3315     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3316         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3317         bt_unlockpage(BtLockRead, set->latch);
3318         bt_unpinlatch (bt->mgr, set->latch);
3319         slot = 0;
3320
3321   } while( 1 );
3322
3323   return bt->mgr->err = 0;
3324 }
3325
3326 //  cache page of keys into cursor and return starting slot for given key
3327
3328 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3329 {
3330 BtPageSet set[1];
3331 uint slot;
3332
3333         // cache page for retrieval
3334
3335         if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3336           memcpy (bt->cursor, set->page, bt->mgr->page_size);
3337         else
3338           return 0;
3339
3340         bt_unlockpage(BtLockRead, set->latch);
3341         bt_unpinlatch (bt->mgr, set->latch);
3342         return slot;
3343 }
3344
3345 #ifdef STANDALONE
3346
3347 #ifndef unix
3348 double getCpuTime(int type)
3349 {
3350 FILETIME crtime[1];
3351 FILETIME xittime[1];
3352 FILETIME systime[1];
3353 FILETIME usrtime[1];
3354 SYSTEMTIME timeconv[1];
3355 double ans = 0;
3356
3357         memset (timeconv, 0, sizeof(SYSTEMTIME));
3358
3359         switch( type ) {
3360         case 0:
3361                 GetSystemTimeAsFileTime (xittime);
3362                 FileTimeToSystemTime (xittime, timeconv);
3363                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3364                 break;
3365         case 1:
3366                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3367                 FileTimeToSystemTime (usrtime, timeconv);
3368                 break;
3369         case 2:
3370                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3371                 FileTimeToSystemTime (systime, timeconv);
3372                 break;
3373         }
3374
3375         ans += (double)timeconv->wHour * 3600;
3376         ans += (double)timeconv->wMinute * 60;
3377         ans += (double)timeconv->wSecond;
3378         ans += (double)timeconv->wMilliseconds / 1000;
3379         return ans;
3380 }
3381 #else
3382 #include <time.h>
3383 #include <sys/resource.h>
3384
3385 double getCpuTime(int type)
3386 {
3387 struct rusage used[1];
3388 struct timeval tv[1];
3389
3390         switch( type ) {
3391         case 0:
3392                 gettimeofday(tv, NULL);
3393                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3394
3395         case 1:
3396                 getrusage(RUSAGE_SELF, used);
3397                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3398
3399         case 2:
3400                 getrusage(RUSAGE_SELF, used);
3401                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3402         }
3403
3404         return 0;
3405 }
3406 #endif
3407
3408 void bt_poolaudit (BtMgr *mgr)
3409 {
3410 BtLatchSet *latch;
3411 uint entry = 0;
3412
3413         while( ++entry < mgr->latchtotal ) {
3414                 latch = mgr->latchsets + entry;
3415
3416                 if( *latch->readwr->rin & MASK )
3417                         fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3418
3419                 if( *latch->access->rin & MASK )
3420                         fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3421
3422 //              if( *latch->parent->xcl->value )
3423 //                      fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3424
3425 //              if( *latch->atomic->xcl->value )
3426 //                      fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3427
3428 //              if( *latch->modify->value )
3429 //                      fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3430
3431                 if( latch->pin & ~CLOCK_bit )
3432                         fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3433         }
3434 }
3435
3436 typedef struct {
3437         char idx;
3438         char *type;
3439         char *infile;
3440         BtMgr *main;
3441         BtMgr *mgr;
3442         int num;
3443 } ThreadArg;
3444
3445 //  standalone program to index file of keys
3446 //  then list them onto std-out
3447
3448 #ifdef unix
3449 void *index_file (void *arg)
3450 #else
3451 uint __stdcall index_file (void *arg)
3452 #endif
3453 {
3454 int line = 0, found = 0, cnt = 0, idx;
3455 uid next, page_no = LEAF_page;  // start on first page of leaves
3456 int ch, len = 0, slot, type = 0;
3457 unsigned char key[BT_maxkey];
3458 unsigned char txn[65536];
3459 ThreadArg *args = arg;
3460 BtPage page, frame;
3461 BtPageSet set[1];
3462 uint nxt = 65536;
3463 BtKey *ptr;
3464 BtVal *val;
3465 BtDb *bt;
3466 FILE *in;
3467
3468         bt = bt_open (args->mgr, args->main);
3469         page = (BtPage)txn;
3470
3471         if( args->idx < strlen (args->type) )
3472                 ch = args->type[args->idx];
3473         else
3474                 ch = args->type[strlen(args->type) - 1];
3475
3476         switch(ch | 0x20)
3477         {
3478         case 'd':
3479                 type = Delete;
3480
3481         case 'p':
3482                 if( !type )
3483                         type = Unique;
3484
3485                 if( args->num )
3486                  if( type == Delete )
3487                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3488                  else
3489                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3490                 else
3491                  if( type == Delete )
3492                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3493                  else
3494                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3495
3496                 if( in = fopen (args->infile, "rb") )
3497                   while( ch = getc(in), ch != EOF )
3498                         if( ch == '\n' )
3499                         {
3500                           line++;
3501
3502                           if( !args->num ) {
3503                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3504                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3505                             len = 0;
3506                                 continue;
3507                           }
3508
3509                           nxt -= len - 10;
3510                           memcpy (txn + nxt, key + 10, len - 10);
3511                           nxt -= 1;
3512                           txn[nxt] = len - 10;
3513                           nxt -= 10;
3514                           memcpy (txn + nxt, key, 10);
3515                           nxt -= 1;
3516                           txn[nxt] = 10;
3517                           slotptr(page,++cnt)->off  = nxt;
3518                           slotptr(page,cnt)->type = type;
3519                           len = 0;
3520
3521                           if( cnt < args->num )
3522                                 continue;
3523
3524                           page->cnt = cnt;
3525                           page->act = cnt;
3526                           page->min = nxt;
3527
3528                           if( bt_atomictxn (bt, page) )
3529                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3530                           nxt = sizeof(txn);
3531                           cnt = 0;
3532
3533                         }
3534                         else if( len < BT_maxkey )
3535                                 key[len++] = ch;
3536                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3537                 break;
3538
3539         case 'w':
3540                 fprintf(stderr, "started indexing for %s\n", args->infile);
3541                 if( in = fopen (args->infile, "r") )
3542                   while( ch = getc(in), ch != EOF )
3543                         if( ch == '\n' )
3544                         {
3545                           line++;
3546
3547                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3548                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3549                           len = 0;
3550                         }
3551                         else if( len < BT_maxkey )
3552                                 key[len++] = ch;
3553                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3554                 break;
3555
3556         case 'f':
3557                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3558                 if( in = fopen (args->infile, "rb") )
3559                   while( ch = getc(in), ch != EOF )
3560                         if( ch == '\n' )
3561                         {
3562                           line++;
3563                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3564                                 found++;
3565                           else if( bt->mgr->err )
3566                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3567                           len = 0;
3568                         }
3569                         else if( len < BT_maxkey )
3570                                 key[len++] = ch;
3571                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3572                 break;
3573
3574         case 's':
3575                 fprintf(stderr, "started scanning\n");
3576                 
3577                 do {
3578                         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3579                                 set->page = bt_mappage (bt->mgr, set->latch);
3580                         else
3581                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3582
3583                         bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3584                         next = bt_getid (set->page->right);
3585
3586                         for( slot = 0; slot++ < set->page->cnt; )
3587                          if( next || slot < set->page->cnt )
3588                           if( !slotptr(set->page, slot)->dead ) {
3589                                 ptr = keyptr(set->page, slot);
3590                                 len = ptr->len;
3591
3592                             if( slotptr(set->page, slot)->type == Duplicate )
3593                                         len -= BtId;
3594
3595                                 fwrite (ptr->key, len, 1, stdout);
3596                                 val = valptr(set->page, slot);
3597                                 fwrite (val->value, val->len, 1, stdout);
3598                                 fputc ('\n', stdout);
3599                                 cnt++;
3600                            }
3601
3602                         bt_unlockpage (BtLockRead, set->latch);
3603                         bt_unpinlatch (bt->mgr, set->latch);
3604                 } while( page_no = next );
3605
3606                 fprintf(stderr, " Total keys read %d\n", cnt);
3607                 break;
3608
3609         case 'r':
3610                 fprintf(stderr, "started reverse scan\n");
3611                 if( slot = bt_lastkey (bt) )
3612                    while( slot = bt_prevkey (bt, slot) ) {
3613                         if( slotptr(bt->cursor, slot)->dead )
3614                           continue;
3615
3616                         ptr = keyptr(bt->cursor, slot);
3617                         len = ptr->len;
3618
3619                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3620                                 len -= BtId;
3621
3622                         fwrite (ptr->key, len, 1, stdout);
3623                         val = valptr(bt->cursor, slot);
3624                         fwrite (val->value, val->len, 1, stdout);
3625                         fputc ('\n', stdout);
3626                         cnt++;
3627                   }
3628
3629                 fprintf(stderr, " Total keys read %d\n", cnt);
3630                 break;
3631
3632         case 'c':
3633 #ifdef unix
3634                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3635 #endif
3636                 fprintf(stderr, "started counting\n");
3637                 next = REDO_page + bt->mgr->pagezero->redopages;
3638
3639                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3640                         if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3641                                 break;
3642
3643                         if( !bt->cursor->free && !bt->cursor->lvl )
3644                                 cnt += bt->cursor->act;
3645
3646                         bt->mgr->reads++;
3647                         page_no = next++;
3648                 }
3649                 
3650                 cnt--;  // remove stopper key
3651                 fprintf(stderr, " Total keys counted %d\n", cnt);
3652                 break;
3653         }
3654
3655         bt_close (bt);
3656 #ifdef unix
3657         return NULL;
3658 #else
3659         return 0;
3660 #endif
3661 }
3662
3663 typedef struct timeval timer;
3664
3665 int main (int argc, char **argv)
3666 {
3667 int idx, cnt, len, slot, err;
3668 int segsize, bits = 16;
3669 double start, stop;
3670 #ifdef unix
3671 pthread_t *threads;
3672 #else
3673 HANDLE *threads;
3674 #endif
3675 ThreadArg *args;
3676 uint redopages = 0;
3677 uint poolsize = 0;
3678 uint mainpool = 0;
3679 uint mainbits = 0;
3680 float elapsed;
3681 int num = 0;
3682 char key[1];
3683 BtMgr *main;
3684 BtMgr *mgr;
3685 BtKey *ptr;
3686
3687         if( argc < 3 ) {
3688                 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3689                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
3690                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
3691                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3692                 fprintf (stderr, "  page_bits is the page size in bits for the cache btree\n");
3693                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3694                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3695                 fprintf (stderr, "  recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3696                 fprintf (stderr, "  main_bits is the page size of the main btree in bits\n");
3697                 fprintf (stderr, "  main_pool is the number of main pages in the main buffer pool\n");
3698                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3699                 exit(0);
3700         }
3701
3702         start = getCpuTime(0);
3703
3704         if( argc > 4 )
3705                 bits = atoi(argv[4]);
3706
3707         if( argc > 5 )
3708                 poolsize = atoi(argv[5]);
3709
3710         if( !poolsize )
3711                 fprintf (stderr, "Warning: no mapped_pool\n");
3712
3713         if( argc > 6 )
3714                 num = atoi(argv[6]);
3715
3716         if( argc > 7 )
3717                 redopages = atoi(argv[7]);
3718
3719         if( redopages + REDO_page > 65535 )
3720                 fprintf (stderr, "Warning: Recovery buffer too large\n");
3721
3722         if( argc > 8 )
3723                 mainbits = atoi(argv[8]);
3724
3725         if( argc > 9 )
3726                 mainpool = atoi(argv[9]);
3727
3728         cnt = argc - 10;
3729 #ifdef unix
3730         threads = malloc (cnt * sizeof(pthread_t));
3731 #else
3732         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3733 #endif
3734         args = malloc ((cnt + 1) * sizeof(ThreadArg));
3735
3736         mgr = bt_mgr (argv[1], bits, poolsize, redopages);
3737
3738         if( !mgr ) {
3739                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3740                 exit (1);
3741         }
3742
3743         if( mainbits ) {
3744                 main = bt_mgr (argv[2], mainbits, mainpool, 0);
3745
3746                 if( !main ) {
3747                         fprintf(stderr, "Index Open Error %s\n", argv[2]);
3748                         exit (1);
3749                 }
3750         } else
3751                 main = NULL;
3752
3753         //      fire off threads
3754
3755         if( cnt )
3756           for( idx = 0; idx < cnt; idx++ ) {
3757                 args[idx].infile = argv[idx + 10];
3758                 args[idx].type = argv[3];
3759                 args[idx].main = main;
3760                 args[idx].mgr = mgr;
3761                 args[idx].num = num;
3762                 args[idx].idx = idx;
3763 #ifdef unix
3764                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3765                         fprintf(stderr, "Error creating thread %d\n", err);
3766 #else
3767                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3768 #endif
3769           }
3770         else {
3771                 args[0].infile = argv[idx + 10];
3772                 args[0].type = argv[3];
3773                 args[0].main = main;
3774                 args[0].mgr = mgr;
3775                 args[0].num = num;
3776                 args[0].idx = 0;
3777                 index_file (args);
3778         }
3779
3780         //      wait for termination
3781
3782 #ifdef unix
3783         for( idx = 0; idx < cnt; idx++ )
3784                 pthread_join (threads[idx], NULL);
3785 #else
3786         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3787
3788         for( idx = 0; idx < cnt; idx++ )
3789                 CloseHandle(threads[idx]);
3790 #endif
3791         bt_poolaudit(mgr);
3792
3793         if( main )
3794                 bt_poolaudit(main);
3795
3796         fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
3797
3798         if( main )
3799                 bt_mgrclose (main);
3800         bt_mgrclose (mgr);
3801
3802         elapsed = getCpuTime(0) - start;
3803         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3804         elapsed = getCpuTime(1);
3805         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3806         elapsed = getCpuTime(2);
3807         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3808 }
3809
3810 #endif  //STANDALONE