]> pd.if.org Git - btree/blob - threadskv10e.c
Replace mutex implementation, and fix a few bugs.
[btree] / threadskv10e.c
1 // btree version threadskv10e futex version
2 //      with reworked bt_deletekey code,
3 //      phase-fair re-entrant reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      redo log for failure recovery
10 //      LSM B-trees for write optimization
11 //      and larger sized leaf pages than non-leaf
12
13 // 24 OCT 2014
14
15 // author: karl malbrain, malbrain@cal.berkeley.edu
16
17 /*
18 This work, including the source code, documentation
19 and related data, is placed into the public domain.
20
21 The orginal author is Karl Malbrain.
22
23 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
24 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
25 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
26 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
27 RESULTING FROM THE USE, MODIFICATION, OR
28 REDISTRIBUTION OF THIS SOFTWARE.
29 */
30
31 // Please see the project home page for documentation
32 // code.google.com/p/high-concurrency-btree
33
34 #define _FILE_OFFSET_BITS 64
35 #define _LARGEFILE64_SOURCE
36
37 #ifdef linux
38 #define _GNU_SOURCE
39 #include <xmmintrin.h>
40 #include <linux/futex.h>
41 #define SYS_futex 202
42 #endif
43
44 #ifdef unix
45 #include <unistd.h>
46 #include <stdio.h>
47 #include <stdlib.h>
48 #include <fcntl.h>
49 #include <sys/time.h>
50 #include <sys/mman.h>
51 #include <errno.h>
52 #include <pthread.h>
53 #include <limits.h>
54 #else
55 #define WIN32_LEAN_AND_MEAN
56 #include <windows.h>
57 #include <stdio.h>
58 #include <stdlib.h>
59 #include <time.h>
60 #include <fcntl.h>
61 #include <process.h>
62 #include <intrin.h>
63 #endif
64
65 #include <memory.h>
66 #include <string.h>
67 #include <stddef.h>
68
69 typedef unsigned long long      uid;
70 typedef unsigned long long      logseqno;
71
72 #ifndef unix
73 typedef unsigned long long      off64_t;
74 typedef unsigned short          ushort;
75 typedef unsigned int            uint;
76 #endif
77
78 #define BT_ro 0x6f72    // ro
79 #define BT_rw 0x7772    // rw
80
81 #define BT_maxbits              26                                      // maximum page size in bits
82 #define BT_minbits              9                                       // minimum page size in bits
83 #define BT_minpage              (1 << BT_minbits)       // minimum page size
84 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
85
86 //  BTree page number constants
87 #define ALLOC_page              0       // allocation page
88 #define ROOT_page               1       // root of the btree
89 #define LEAF_page               2       // first page of leaves
90
91 //      Number of levels to create in a new BTree
92
93 #define MIN_lvl                 2
94
95 /*
96 There are six lock types for each node in four independent sets: 
97 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
98 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
99 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
100 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
101 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
102 6. (set 4) LinkModification: Exclusive. Update of a node's left link is underway. Incompatible with another LinkModification. 
103 */
104
105 typedef enum{
106         BtLockAccess = 1,
107         BtLockDelete = 2,
108         BtLockRead   = 4,
109         BtLockWrite  = 8,
110         BtLockParent = 16,
111         BtLockLink   = 32
112 } BtLock;
113
114 typedef struct {
115   union {
116         struct {
117           volatile unsigned char xcl[1];
118           volatile unsigned char filler;
119           volatile ushort waiters[1];
120         } bits[1];
121         uint value[1];
122   };
123 } MutexLatch;
124
125 //      definition for reader/writer reentrant lock implementation
126
127 typedef struct {
128   MutexLatch xcl[1];
129   MutexLatch wrt[1];
130   uint readers;
131   ushort dup;           // re-entrant locks
132   ushort tid;           // owner thread-no
133   uint line;            // owner line #
134 } RWLock;
135
136 //  hash table entries
137
138 typedef struct {
139         MutexLatch latch[1];
140         uint entry;             // Latch table entry at head of chain
141 } BtHashEntry;
142
143 //      latch manager table structure
144
145 typedef struct {
146         uid page_no;                    // latch set page number
147         MutexLatch modify[1];   // modify entry lite latch
148         RWLock readwr[1];               // read/write page lock
149         RWLock access[1];               // Access Intent/Page delete
150         RWLock parent[1];               // Posting of fence key in parent
151         RWLock link[1];                 // left link update in progress
152         uint split;                             // right split page atomic insert
153         uint next;                              // next entry in hash table chain
154         uint prev;                              // prev entry in hash table chain
155         volatile ushort pin;    // number of accessing threads
156         unsigned char dirty;    // page in cache is dirty
157         unsigned char leaf;             // page in cache is a leaf
158 } BtLatchSet;
159
160 //      Define the length of the page record numbers
161
162 #define BtId 6
163
164 //      Page key slot definition.
165
166 //      Keys are marked dead, but remain on the page until
167 //      it cleanup is called. The fence key (highest key) for
168 //      a leaf page is always present, even after cleanup.
169
170 //      Slot types
171
172 //      In addition to the Unique keys that occupy slots
173 //      there are Librarian and Duplicate key
174 //      slots occupying the key slot array.
175
176 //      The Librarian slots are dead keys that
177 //      serve as filler, available to add new Unique
178 //      or Dup slots that are inserted into the B-tree.
179
180 //      The Duplicate slots have had their key bytes extended
181 //      by 6 bytes to contain a binary duplicate key uniqueifier.
182
183 typedef enum {
184         Unique,
185         Update,
186         Librarian,
187         Duplicate,
188         Delete
189 } BtSlotType;
190
191 typedef struct {
192         uint off:BT_maxbits;    // page offset for key start
193         uint type:3;                    // type of slot
194         uint dead:1;                    // set for deleted slot
195 } BtSlot;
196
197 //      The key structure occupies space at the upper end of
198 //      each page.  It's a length byte followed by the key
199 //      bytes.
200
201 typedef struct {
202         unsigned char len;              // this can be changed to a ushort or uint
203         unsigned char key[0];
204 } BtKey;
205
206 //      the value structure also occupies space at the upper
207 //      end of the page. Each key is immediately followed by a value.
208
209 typedef struct {
210         unsigned char len;              // this can be changed to a ushort or uint
211         unsigned char value[0];
212 } BtVal;
213
214 #define BT_maxkey       255             // maximum number of bytes in a key
215 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
216
217 //      The first part of an index page.
218 //      It is immediately followed
219 //      by the BtSlot array of keys.
220
221 typedef struct BtPage_ {
222         uint cnt;                                       // count of keys in page
223         uint act;                                       // count of active keys
224         uint min;                                       // next key offset
225         uint garbage;                           // page garbage in bytes
226         unsigned char lvl;                      // level of page
227         unsigned char free;                     // page is on free chain
228         unsigned char kill;                     // page is being deleted
229         unsigned char nopromote;        // page is being constructed
230         unsigned char filler1[6];       // padding to multiple of 8 bytes
231         unsigned char right[BtId];      // page number to right
232         unsigned char left[BtId];       // page number to left
233         unsigned char filler2[2];       // padding to multiple of 8 bytes
234         logseqno lsn;                           // log sequence number applied
235         uid page_no;                            // this page number
236 } *BtPage;
237
238 //  The loadpage interface object
239
240 typedef struct {
241         BtPage page;            // current page pointer
242         BtLatchSet *latch;      // current page latch set
243 } BtPageSet;
244
245 //      structure for latch manager on ALLOC_page
246
247 typedef struct {
248         struct BtPage_ alloc[1];                // next page_no in right ptr
249         unsigned char freechain[BtId];  // head of free page_nos chain
250         unsigned char leafchain[BtId];  // head of leaf page_nos chain
251         unsigned long long leafpages;   // number of active leaf pages
252         uint redopages;                                 // number of redo pages in file
253         unsigned char leaf_xtra;                // leaf page size in xtra bits
254         unsigned char page_bits;                // base page size in bits
255 } BtPageZero;
256
257 //      The object structure for Btree access
258
259 typedef struct {
260         uint page_size;                         // base page size       
261         uint page_bits;                         // base page size in bits       
262         uint leaf_xtra;                         // leaf xtra bits       
263 #ifdef unix
264         int idx;
265 #else
266         HANDLE idx;
267 #endif
268         BtPageZero *pagezero;           // mapped allocation page
269         BtHashEntry *hashtable;         // the buffer pool hash table entries
270         BtHashEntry *leaftable;         // the buffer pool hash table entries
271         BtLatchSet *latchsets;          // mapped latch set from buffer pool
272         BtLatchSet *leafsets;           // mapped latch set from buffer pool
273         unsigned char *pagepool;        // mapped to the buffer pool pages
274         unsigned char *leafpool;        // mapped to the leaf pool pages
275         unsigned char *redobuff;        // mapped recovery buffer pointer
276         logseqno lsn, flushlsn;         // current & first lsn flushed
277         MutexLatch redo[1];                     // redo area lite latch
278         MutexLatch lock[1];                     // allocation area lite latch
279         MutexLatch maps[1];                     // mapping segments lite latch
280         ushort thread_no[1];            // highest thread number issued
281         ushort err_thread;                      // error thread number
282         uint nlatchpage;                        // size of buffer pool & latchsets
283         uint latchtotal;                        // number of page latch entries
284         uint latchhash;                         // number of latch hash table slots
285         uint latchvictim;                       // next latch entry to examine
286         uint nleafpage;                         // size of leaf pool & leafsets
287         uint leaftotal;                         // number of leaf latch entries
288         uint leafhash;                          // number of leaf hash table slots
289         uint leafvictim;                        // next leaf entry to examine
290         uint leafpromote;                       // next leaf entry to promote
291         uint redopage;                          // page number of redo buffer
292         uint redolast;                          // last msync size of recovery buff
293         uint redoend;                           // eof/end element in recovery buff
294         int err;                                        // last error
295         int line;                                       // last error line no
296         int found;                                      // number of keys found by delete
297         int reads, writes;                      // number of reads and writes
298 #ifndef unix
299         HANDLE halloc;                          // allocation handle
300         HANDLE hpool;                           // buffer pool handle
301 #endif
302         volatile uint segments;         // number of memory mapped segments
303         unsigned char *pages[64000];// memory mapped segments of b-tree
304 } BtMgr;
305
306 typedef struct {
307         BtMgr *mgr;                                     // buffer manager for entire process
308         BtMgr *main;                            // buffer manager for main btree
309         BtPage cursor;                          // cached page frame for start/next
310         ushort thread_no;                       // thread number
311         unsigned char key[BT_keyarray]; // last found complete key
312 } BtDb;
313
314 //      atomic txn structures
315
316 typedef struct {
317         logseqno reqlsn;        // redo log seq no required
318         uint entry:31;          // latch table entry number
319         uint reuse:1;           // reused previous page
320 } AtomicTxn;
321
322 //      Catastrophic errors
323
324 typedef enum {
325         BTERR_ok = 0,
326         BTERR_struct,
327         BTERR_ovflw,
328         BTERR_lock,
329         BTERR_map,
330         BTERR_read,
331         BTERR_wrt,
332         BTERR_atomic,
333         BTERR_recovery
334 } BTERR;
335
336 #define CLOCK_bit 0x8000
337
338 // recovery manager entry types
339
340 typedef enum {
341         BTRM_eof = 0,   // rest of buffer is emtpy
342         BTRM_add,               // add a unique key-value to btree
343         BTRM_dup,               // add a duplicate key-value to btree
344         BTRM_del,               // delete a key-value from btree
345         BTRM_upd,               // update a key with a new value
346         BTRM_new,               // allocate a new empty page
347         BTRM_old                // reuse an old empty page
348 } BTRM;
349
350 // recovery manager entry
351 //      structure followed by BtKey & BtVal
352
353 typedef struct {
354         logseqno reqlsn;        // log sequence number required
355         logseqno lsn;           // log sequence number for entry
356         uint len;                       // length of entry
357         unsigned char type;     // type of entry
358         unsigned char lvl;      // level of btree entry pertains to
359 } BtLogHdr;
360
361 // B-Tree functions
362
363 extern void bt_close (BtDb *bt);
364 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
365 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
366 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
367 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
368 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
369 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
370 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
371
372 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
373
374 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
375 extern uint bt_nextkey   (BtDb *bt, uint slot);
376 extern uint bt_prevkey (BtDb *db, uint slot);
377 extern uint bt_lastkey (BtDb *db);
378
379 //      manager functions
380 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
381 extern void bt_mgrclose (BtMgr *mgr);
382 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
383 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
384
385 //      atomic transaction functions
386 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
387 BTERR bt_txnpromote (BtDb *bt);
388
389 //  The page is allocated from low and hi ends.
390 //  The key slots are allocated from the bottom,
391 //      while the text and value of the key
392 //  are allocated from the top.  When the two
393 //  areas meet, the page is split into two.
394
395 //  A key consists of a length byte, two bytes of
396 //  index number (0 - 65535), and up to 253 bytes
397 //  of key value.
398
399 //  Associated with each key is a value byte string
400 //      containing any value desired.
401
402 //  The b-tree root is always located at page 1.
403 //      The first leaf page of level zero is always
404 //      located on page 2.
405
406 //      The b-tree pages are linked with next
407 //      pointers to facilitate enumerators,
408 //      and provide for concurrency.
409
410 //      When to root page fills, it is split in two and
411 //      the tree height is raised by a new root at page
412 //      one with two keys.
413
414 //      Deleted keys are marked with a dead bit until
415 //      page cleanup. The fence key for a leaf node is
416 //      always present
417
418 //  To achieve maximum concurrency one page is locked at a time
419 //  as the tree is traversed to find leaf key in question. The right
420 //  page numbers are used in cases where the page is being split,
421 //      or consolidated.
422
423 //  Page 0 is dedicated to lock for new page extensions,
424 //      and chains empty pages together for reuse. It also
425 //      contains the latch manager hash table.
426
427 //      The ParentModification lock on a node is obtained to serialize posting
428 //      or changing the fence key for a node.
429
430 //      Empty pages are chained together through the ALLOC page and reused.
431
432 //      Access macros to address slot and key values from the page
433 //      Page slots use 1 based indexing.
434
435 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
436 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
437 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
438
439 void bt_putid(unsigned char *dest, uid id)
440 {
441 int i = BtId;
442
443         while( i-- )
444                 dest[i] = (unsigned char)id, id >>= 8;
445 }
446
447 uid bt_getid(unsigned char *src)
448 {
449 uid id = 0;
450 int i;
451
452         for( i = 0; i < BtId; i++ )
453                 id <<= 8, id |= *src++; 
454
455         return id;
456 }
457
458 //      lite weight spin lock Latch Manager
459
460 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
461 {
462         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
463 }
464
465 void bt_mutexlock(MutexLatch *latch)
466 {
467 uint idx, waited = 0;
468 MutexLatch prev[1];
469
470  while( 1 ) {
471   for( idx = 0; idx < 100; idx++ ) {
472         *prev->value = __sync_fetch_and_or (latch->value, 1);
473         if( !*prev->bits->xcl ) {
474           if( waited )
475                 __sync_fetch_and_sub (latch->bits->waiters, 1);
476           return;
477         }
478   }
479
480   if( !waited ) {
481         __sync_fetch_and_add (latch->bits->waiters, 1);
482         *prev->bits->waiters += 1;
483         waited++;
484   }
485
486   sys_futex (latch->value, FUTEX_WAIT_PRIVATE, *prev->value, NULL, NULL, 0);
487  }
488 }
489
490 int bt_mutextry(MutexLatch *latch)
491 {
492         return !__sync_lock_test_and_set (latch->bits->xcl, 1);
493 }
494
495 void bt_releasemutex(MutexLatch *latch)
496 {
497         *latch->bits->xcl = 0;
498
499         if( *latch->bits->waiters )
500                 sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
501 }
502
503 //      reader/writer lock implementation
504
505 void WriteLock (RWLock *lock, ushort tid, uint line)
506 {
507         if( lock->tid == tid ) {
508                 lock->dup++;
509                 return;
510         }
511         bt_mutexlock (lock->xcl);
512         bt_mutexlock (lock->wrt);
513         bt_releasemutex (lock->xcl);
514
515         lock->line = line;
516         lock->tid = tid;
517 }
518
519 void WriteRelease (RWLock *lock)
520 {
521         if( lock->dup ) {
522                 lock->dup--;
523                 return;
524         }
525         lock->tid = 0;
526         bt_releasemutex (lock->wrt);
527 }
528
529 void ReadLock (RWLock *lock, ushort tid)
530 {
531         bt_mutexlock (lock->xcl);
532
533         if( !__sync_fetch_and_add (&lock->readers, 1) )
534                 bt_mutexlock (lock->wrt);
535
536         bt_releasemutex (lock->xcl);
537 }
538
539 void ReadRelease (RWLock *lock)
540 {
541         if( __sync_fetch_and_sub (&lock->readers, 1) == 1 )
542                 bt_releasemutex (lock->wrt);
543 }
544
545 //      recovery manager -- flush dirty pages
546
547 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
548 {
549 uint cnt3 = 0, cnt2 = 0, cnt = 0;
550 uint entry, segment;
551 BtLatchSet *latch;
552 BtPage page;
553
554   //    flush dirty pool pages to the btree
555
556 fprintf(stderr, "Start flushlsn  ");
557   for( entry = 1; entry < mgr->latchtotal; entry++ ) {
558         page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
559         latch = mgr->latchsets + entry;
560         bt_mutexlock (latch->modify);
561         bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
562
563         if( latch->dirty ) {
564           bt_writepage(mgr, page, latch->page_no, 0);
565           latch->dirty = 0, cnt++;
566         }
567 if( latch->pin & ~CLOCK_bit )
568 cnt2++;
569         bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
570         bt_releasemutex (latch->modify);
571   }
572   for( entry = 1; entry < mgr->leaftotal; entry++ ) {
573         page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
574         latch = mgr->leafsets + entry;
575         bt_mutexlock (latch->modify);
576         bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
577
578         if( latch->dirty ) {
579           bt_writepage(mgr, page, latch->page_no, 1);
580           latch->dirty = 0, cnt++;
581         }
582 if( latch->pin & ~CLOCK_bit )
583 cnt2++;
584         bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
585         bt_releasemutex (latch->modify);
586   }
587 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
588 fprintf(stderr, "begin sync");
589         for( segment = 0; segment < mgr->segments; segment++ )
590           if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
591                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
592 fprintf(stderr, " end sync\n");
593 }
594
595 //      recovery manager -- process current recovery buff on startup
596 //      this won't do much if previous session was properly closed.
597
598 BTERR bt_recoveryredo (BtMgr *mgr)
599 {
600 BtLogHdr *hdr, *eof;
601 uint offset = 0;
602 BtKey *key;
603 BtVal *val;
604
605   hdr = (BtLogHdr *)mgr->redobuff;
606   mgr->flushlsn = hdr->lsn;
607
608   while( 1 ) {
609         hdr = (BtLogHdr *)(mgr->redobuff + offset);
610         switch( hdr->type ) {
611         case BTRM_eof:
612                 mgr->lsn = hdr->lsn;
613                 return 0;
614         case BTRM_add:          // add a unique key-value to btree
615         
616         case BTRM_dup:          // add a duplicate key-value to btree
617         case BTRM_del:          // delete a key-value from btree
618         case BTRM_upd:          // update a key with a new value
619         case BTRM_new:          // allocate a new empty page
620         case BTRM_old:          // reuse an old empty page
621                 return 0;
622         }
623   }
624 }
625
626 //      recovery manager -- append new entry to recovery log
627 //      flush dirty pages to disk when it overflows.
628
629 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
630 {
631 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
632 uint amt = sizeof(BtLogHdr);
633 BtLogHdr *hdr, *eof;
634 uint last, end;
635
636         bt_mutexlock (mgr->redo);
637
638         if( key )
639           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
640
641         //      see if new entry fits in buffer
642         //      flush and reset if it doesn't
643
644         if( amt > size - mgr->redoend ) {
645           mgr->flushlsn = mgr->lsn;
646           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
647                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
648           mgr->redolast = 0;
649           mgr->redoend = 0;
650           bt_flushlsn(mgr, thread_no);
651         }
652
653         //      fill in new entry & either eof or end block
654
655         hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
656
657         hdr->len = amt;
658         hdr->type = type;
659         hdr->lvl = lvl;
660         hdr->lsn = ++mgr->lsn;
661
662         mgr->redoend += amt;
663
664         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
665         memset (eof, 0, sizeof(BtLogHdr));
666
667         //  fill in key and value
668
669         if( key ) {
670           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
671           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
672         }
673
674         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
675         memset (eof, 0, sizeof(BtLogHdr));
676         eof->lsn = mgr->lsn;
677
678         last = mgr->redolast & ~0xfff;
679         end = mgr->redoend;
680
681         if( end - last + sizeof(BtLogHdr) >= 32768 )
682           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
683                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
684           else
685                 mgr->redolast = end;
686
687         bt_releasemutex(mgr->redo);
688         return hdr->lsn;
689 }
690
691 //      recovery manager -- append transaction to recovery log
692 //      flush dirty pages to disk when it overflows.
693
694 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
695 {
696 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
697 uint amt = 0, src, type;
698 BtLogHdr *hdr, *eof;
699 uint last, end;
700 logseqno lsn;
701 BtKey *key;
702 BtVal *val;
703
704         //      determine amount of redo recovery log space required
705
706         for( src = 0; src++ < source->cnt; ) {
707           key = keyptr(source,src);
708           val = valptr(source,src);
709           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
710           amt += sizeof(BtLogHdr);
711         }
712
713         bt_mutexlock (mgr->redo);
714
715         //      see if new entry fits in buffer
716         //      flush and reset if it doesn't
717
718         if( amt > size - mgr->redoend ) {
719           mgr->flushlsn = mgr->lsn;
720           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
721                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
722           mgr->redolast = 0;
723           mgr->redoend = 0;
724           bt_flushlsn (mgr, thread_no);
725         }
726
727         //      assign new lsn to transaction
728
729         lsn = ++mgr->lsn;
730
731         //      fill in new entries
732
733         for( src = 0; src++ < source->cnt; ) {
734           key = keyptr(source, src);
735           val = valptr(source, src);
736
737           switch( slotptr(source, src)->type ) {
738           case Unique:
739                 type = BTRM_add;
740                 break;
741           case Duplicate:
742                 type = BTRM_dup;
743                 break;
744           case Delete:
745                 type = BTRM_del;
746                 break;
747           }
748
749           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
750           amt += sizeof(BtLogHdr);
751
752           hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
753           hdr->len = amt;
754           hdr->type = type;
755           hdr->lsn = lsn;
756           hdr->lvl = 0;
757
758           //  fill in key and value
759
760           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
761           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
762
763           mgr->redoend += amt;
764         }
765
766         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
767         memset (eof, 0, sizeof(BtLogHdr));
768         eof->lsn = lsn;
769
770         last = mgr->redolast & ~0xfff;
771         end = mgr->redoend;
772
773         if( end - last + sizeof(BtLogHdr) >= 32768 )
774           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
775                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
776           else
777                 mgr->redolast = end;
778
779         bt_releasemutex(mgr->redo);
780         return lsn;
781 }
782
783 //      sync a single btree page to disk
784
785 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
786 {
787 uint segment = latch->page_no >> 16;
788 uint page_size = mgr->page_size;
789 BtPage perm;
790
791         if( bt_writepage (mgr, page, latch->page_no, latch->leaf) )
792                 return mgr->err;
793
794         if( !page->lvl )
795                 page_size <<= mgr->leaf_xtra;
796
797         perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
798
799         if( msync (perm, page_size, MS_SYNC) < 0 )
800           fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
801
802         latch->dirty = 0;
803         return 0;
804 }
805
806 //      read page into buffer pool from permanent location in Btree file
807
808 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
809 {
810 int flag = PROT_READ | PROT_WRITE;
811 uint page_size = mgr->page_size;
812 uint off = 0, segment, fragment;
813 unsigned char *perm;
814
815   if( leaf )
816         page_size <<= mgr->leaf_xtra;
817
818   fragment = page_no & 0xffff;
819   segment = page_no >> 16;
820   mgr->reads++;
821
822   while( off < page_size ) {
823         if( fragment >> 16 )
824                 segment++, fragment = 0;
825
826         if( segment < mgr->segments ) {
827           perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
828
829           memcpy ((unsigned char *)page + off, perm, mgr->page_size);
830           off += mgr->page_size;
831           fragment++;
832           continue;
833         }
834
835         bt_mutexlock (mgr->maps);
836
837         if( segment < mgr->segments ) {
838                 bt_releasemutex (mgr->maps);
839                 continue;
840         }
841
842         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
843         mgr->segments++;
844
845         bt_releasemutex (mgr->maps);
846   }
847
848   return 0;
849 }
850
851 //      write page to permanent location in Btree file
852
853 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
854 {
855 int flag = PROT_READ | PROT_WRITE;
856 uint page_size = mgr->page_size;
857 uint off = 0, segment, fragment;
858 unsigned char *perm;
859
860   if( leaf )
861         page_size <<= mgr->leaf_xtra;
862
863   fragment = page_no & 0xffff;
864   segment = page_no >> 16;
865   mgr->writes++;
866
867   while( off < page_size ) {
868         if( fragment >> 16 )
869                 segment++, fragment = 0;
870
871         if( segment < mgr->segments ) {
872           perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
873           memcpy (perm, (unsigned char *)page + off, mgr->page_size);
874           off += mgr->page_size;
875           fragment++;
876           continue;
877         }
878
879         bt_mutexlock (mgr->maps);
880
881         if( segment < mgr->segments ) {
882                 bt_releasemutex (mgr->maps);
883                 continue;
884         }
885
886         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
887         mgr->segments++;
888         bt_releasemutex (mgr->maps);
889   }
890
891   return 0;
892 }
893
894 //      set CLOCK bit in latch
895 //      decrement pin count
896
897 int value;
898
899 void bt_unpinlatch (BtLatchSet *latch, ushort thread_no, uint line)
900 {
901         bt_mutexlock(latch->modify);
902         latch->pin |= CLOCK_bit;
903         latch->pin--;
904
905         bt_releasemutex(latch->modify);
906 }
907
908 //  return the btree cached page address
909
910 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
911 {
912 uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets);
913 BtPage page;
914
915   if( latch->leaf )
916         page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
917   else
918         page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
919
920   return page;
921 }
922
923 //  return next available leaf entry
924 //        and with latch entry locked
925
926 uint bt_availleaf (BtMgr *mgr)
927 {
928 BtLatchSet *latch;
929 uint entry;
930
931   while( 1 ) {
932 #ifdef unix
933         entry = __sync_fetch_and_add (&mgr->leafvictim, 1) + 1;
934 #else
935         entry = _InterlockedIncrement (&mgr->leafvictim);
936 #endif
937         entry %= mgr->leaftotal;
938
939         if( !entry )
940                 continue;
941
942         latch = mgr->leafsets + entry;
943
944         if( !bt_mutextry(latch->modify) )
945                 continue;
946
947         //  return this entry if it is not pinned
948
949         if( !latch->pin )
950                 return entry;
951
952         //      if the CLOCK bit is set
953         //      reset it to zero.
954
955         latch->pin &= ~CLOCK_bit;
956         bt_releasemutex(latch->modify);
957   }
958 }
959
960 //  return next available latch entry
961 //        and with latch entry locked
962
963 uint bt_availnext (BtMgr *mgr)
964 {
965 BtLatchSet *latch;
966 uint entry;
967
968   while( 1 ) {
969 #ifdef unix
970         entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
971 #else
972         entry = _InterlockedIncrement (&mgr->latchvictim);
973 #endif
974         entry %= mgr->latchtotal;
975
976         if( !entry )
977                 continue;
978
979         latch = mgr->latchsets + entry;
980
981         if( !bt_mutextry(latch->modify) )
982                 continue;
983
984         //  return this entry if it is not pinned
985
986         if( !latch->pin )
987                 return entry;
988
989         //      if the CLOCK bit is set
990         //      reset it to zero.
991
992         latch->pin &= ~CLOCK_bit;
993         bt_releasemutex(latch->modify);
994   }
995 }
996
997 //      pin leaf in leaf buffer pool
998 //      return with latchset pinned
999
1000 BtLatchSet *bt_pinleaf (BtMgr *mgr, uid page_no, ushort thread_no)
1001 {
1002 uint hashidx = page_no % mgr->leafhash;
1003 BtLatchSet *latch;
1004 uint entry, idx;
1005 BtPage page;
1006
1007   //  try to find our entry
1008
1009   bt_mutexlock(mgr->leaftable[hashidx].latch);
1010
1011   if( entry = mgr->leaftable[hashidx].entry )
1012    do {
1013         latch = mgr->leafsets + entry;
1014
1015         if( page_no == latch->page_no )
1016                 break;
1017    } while( entry = latch->next );
1018
1019   //  found our entry: increment pin
1020
1021   if( entry ) {
1022         latch = mgr->leafsets + entry;
1023         bt_mutexlock(latch->modify);
1024         latch->pin |= CLOCK_bit;
1025         latch->pin++;
1026
1027         bt_releasemutex(latch->modify);
1028         bt_releasemutex(mgr->leaftable[hashidx].latch);
1029         return latch;
1030   }
1031
1032   //  find and reuse unpinned entry
1033
1034 trynext:
1035   entry = bt_availleaf (mgr);
1036   latch = mgr->leafsets + entry;
1037
1038   idx = latch->page_no % mgr->leafhash;
1039
1040   //  if latch is on a different hash chain
1041   //    unlink from the old page_no chain
1042
1043   if( latch->page_no )
1044    if( idx != hashidx ) {
1045
1046         //  skip over this entry if latch not available
1047
1048     if( !bt_mutextry (mgr->leaftable[idx].latch) ) {
1049           bt_releasemutex(latch->modify);
1050           goto trynext;
1051         }
1052
1053         if( latch->prev )
1054           mgr->leafsets[latch->prev].next = latch->next;
1055         else
1056           mgr->leaftable[idx].entry = latch->next;
1057
1058         if( latch->next )
1059           mgr->leafsets[latch->next].prev = latch->prev;
1060
1061     bt_releasemutex (mgr->leaftable[idx].latch);
1062    }
1063
1064   page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1065
1066   //  update permanent page area in btree from buffer pool
1067   //    no read-lock is required since page is not pinned.
1068
1069   if( latch->dirty )
1070         if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) )
1071           return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1072         else
1073           latch->dirty = 0;
1074
1075   if( bt_readpage (mgr, page, page_no, 1) )
1076         return mgr->line = __LINE__, NULL;
1077
1078   //  link page as head of hash table chain
1079   //  if this is a never before used entry,
1080   //  or it was previously on a different
1081   //  hash table chain. Otherwise, just
1082   //  leave it in its current hash table
1083   //  chain position.
1084
1085   if( !latch->page_no || hashidx != idx ) {
1086         if( latch->next = mgr->leaftable[hashidx].entry )
1087           mgr->leafsets[latch->next].prev = entry;
1088
1089         mgr->leaftable[hashidx].entry = entry;
1090     latch->prev = 0;
1091   }
1092
1093   //  fill in latch structure
1094
1095   latch->pin = CLOCK_bit | 1;
1096   latch->page_no = page_no;
1097   latch->leaf = 1;
1098
1099   bt_releasemutex (latch->modify);
1100   bt_releasemutex (mgr->leaftable[hashidx].latch);
1101   return latch;
1102 }
1103
1104 //      pin page in non-leaf buffer pool
1105 //      return with latchset pinned
1106
1107 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, ushort thread_no)
1108 {
1109 uint hashidx = page_no % mgr->latchhash;
1110 BtLatchSet *latch;
1111 uint entry, idx;
1112 BtPage page;
1113
1114   //  try to find our entry
1115
1116   bt_mutexlock(mgr->hashtable[hashidx].latch);
1117
1118   if( entry = mgr->hashtable[hashidx].entry ) do
1119   {
1120         latch = mgr->latchsets + entry;
1121
1122         if( page_no == latch->page_no )
1123                 break;
1124   } while( entry = latch->next );
1125
1126   //  found our entry: increment pin
1127
1128   if( entry ) {
1129         latch = mgr->latchsets + entry;
1130         bt_mutexlock(latch->modify);
1131         latch->pin |= CLOCK_bit;
1132         latch->pin++;
1133         bt_releasemutex(latch->modify);
1134         bt_releasemutex(mgr->hashtable[hashidx].latch);
1135         return latch;
1136   }
1137
1138   //  find and reuse unpinned entry
1139
1140 trynext:
1141   entry = bt_availnext (mgr);
1142   latch = mgr->latchsets + entry;
1143
1144   idx = latch->page_no % mgr->latchhash;
1145
1146   //  if latch is on a different hash chain
1147   //    unlink from the old page_no chain
1148
1149   if( latch->page_no )
1150    if( idx != hashidx ) {
1151
1152         //  skip over this entry if latch not available
1153
1154     if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1155           bt_releasemutex(latch->modify);
1156           goto trynext;
1157         }
1158
1159         if( latch->prev )
1160           mgr->latchsets[latch->prev].next = latch->next;
1161         else
1162           mgr->hashtable[idx].entry = latch->next;
1163
1164         if( latch->next )
1165           mgr->latchsets[latch->next].prev = latch->prev;
1166
1167     bt_releasemutex (mgr->hashtable[idx].latch);
1168    }
1169
1170   page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1171
1172   //  update permanent page area in btree from buffer pool
1173   //    no read-lock is required since page is not pinned.
1174
1175   if( latch->dirty )
1176         if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1177           return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1178         else
1179           latch->dirty = 0;
1180
1181   if( bt_readpage (mgr, page, page_no, 0) )
1182         return mgr->line = __LINE__, NULL;
1183
1184   //  link page as head of hash table chain
1185   //  if this is a never before used entry,
1186   //  or it was previously on a different
1187   //  hash table chain. Otherwise, just
1188   //  leave it in its current hash table
1189   //  chain position.
1190
1191   if( !latch->page_no || hashidx != idx ) {
1192         if( latch->next = mgr->hashtable[hashidx].entry )
1193           mgr->latchsets[latch->next].prev = entry;
1194
1195         mgr->hashtable[hashidx].entry = entry;
1196     latch->prev = 0;
1197   }
1198
1199   //  fill in latch structure
1200
1201   latch->pin = CLOCK_bit | 1;
1202   latch->page_no = page_no;
1203   latch->leaf = 0;
1204
1205   bt_releasemutex (latch->modify);
1206   bt_releasemutex (mgr->hashtable[hashidx].latch);
1207   return latch;
1208 }
1209   
1210 void bt_mgrclose (BtMgr *mgr)
1211 {
1212 BtLatchSet *latch;
1213 BtLogHdr *eof;
1214 uint num = 0;
1215 BtPage page;
1216 uint slot;
1217
1218         //      flush previously written dirty pages
1219         //      and write recovery buffer to disk
1220
1221         fdatasync (mgr->idx);
1222
1223         if( mgr->redoend ) {
1224                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1225                 memset (eof, 0, sizeof(BtLogHdr));
1226         }
1227
1228         //      write remaining dirty pool pages to the btree
1229
1230         for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1231                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1232                 latch = mgr->latchsets + slot;
1233
1234                 if( latch->dirty ) {
1235                         bt_writepage(mgr, page, latch->page_no, 0);
1236                         latch->dirty = 0, num++;
1237                 }
1238         }
1239
1240         //      write remaining dirty leaf pages to the btree
1241
1242         for( slot = 1; slot < mgr->leaftotal; slot++ ) {
1243                 page = (BtPage)(((uid)slot << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1244                 latch = mgr->leafsets + slot;
1245
1246                 if( latch->dirty ) {
1247                         bt_writepage(mgr, page, latch->page_no, 1);
1248                         latch->dirty = 0, num++;
1249                 }
1250         }
1251
1252         //      clear redo recovery buffer on disk.
1253
1254         if( mgr->pagezero->redopages ) {
1255                 eof = (BtLogHdr *)mgr->redobuff;
1256                 memset (eof, 0, sizeof(BtLogHdr));
1257                 eof->lsn = mgr->lsn;
1258                 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1259                   fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1260         }
1261
1262         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1263
1264 #ifdef unix
1265         while( mgr->segments )
1266                 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1267
1268         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1269         munmap (mgr->leafpool, (uid)mgr->nleafpage << mgr->page_bits);
1270         munmap (mgr->pagezero, mgr->page_size);
1271 #else
1272         FlushViewOfFile(mgr->pagezero, 0);
1273         UnmapViewOfFile(mgr->pagezero);
1274         UnmapViewOfFile(mgr->pagepool);
1275         CloseHandle(mgr->halloc);
1276         CloseHandle(mgr->hpool);
1277 #endif
1278 #ifdef unix
1279         close (mgr->idx);
1280         free (mgr);
1281 #else
1282         VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1283         FlushFileBuffers(mgr->idx);
1284         CloseHandle(mgr->idx);
1285         GlobalFree (mgr);
1286 #endif
1287 }
1288
1289 //      close and release memory
1290
1291 void bt_close (BtDb *bt)
1292 {
1293 #ifdef unix
1294         if( bt->cursor )
1295                 free (bt->cursor);
1296 #else
1297         if( bt->cursor)
1298                 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1299 #endif
1300         free (bt);
1301 }
1302
1303 //  open/create new btree buffer manager
1304
1305 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1306 //              size of page pool (e.g. 262144) and leaf pool
1307
1308 BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax, uint leafmax, uint redopages)
1309 {
1310 uint lvl, attr, last, slot, idx;
1311 uint nlatchpage, latchhash;
1312 uint nleafpage, leafhash;
1313 unsigned char value[BtId];
1314 int flag, initit = 0;
1315 BtPageZero *pagezero;
1316 BtLatchSet *latch;
1317 off64_t size;
1318 uint amt[1];
1319 BtMgr* mgr;
1320 BtKey* key;
1321 BtVal *val;
1322
1323         // determine sanity of page size and buffer pool
1324
1325         if( leafxtra + pagebits > BT_maxbits )
1326                 fprintf (stderr, "pagebits + leafxtra > maxbits\n"), exit(1);
1327
1328         if( pagebits < BT_minbits )
1329                 fprintf (stderr, "pagebits < minbits\n"), exit(1);
1330
1331 #ifdef unix
1332         mgr = calloc (1, sizeof(BtMgr));
1333
1334         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1335
1336         if( mgr->idx == -1 ) {
1337                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1338                 return free(mgr), NULL;
1339         }
1340 #else
1341         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1342         attr = FILE_ATTRIBUTE_NORMAL;
1343         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1344
1345         if( mgr->idx == INVALID_HANDLE_VALUE ) {
1346                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1347                 return GlobalFree(mgr), NULL;
1348         }
1349 #endif
1350
1351 #ifdef unix
1352         pagezero = valloc (BT_maxpage);
1353         *amt = 0;
1354
1355         // read minimum page size to get root info
1356         //      to support raw disk partition files
1357         //      check if page_bits == 0 on the disk.
1358
1359         if( size = lseek (mgr->idx, 0L, 2) )
1360                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1361                         if( pagezero->page_bits ) {
1362                                 pagebits = pagezero->page_bits;
1363                                 leafxtra = pagezero->leaf_xtra;
1364                         } else
1365                                 initit = 1;
1366                 else
1367                         return free(mgr), free(pagezero), NULL;
1368         else
1369                 initit = 1;
1370 #else
1371         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1372         size = GetFileSize(mgr->idx, amt);
1373
1374         if( size || *amt ) {
1375                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1376                         return bt_mgrclose (mgr), NULL;
1377                 pagebits = pagezero->page_bits;
1378                 leafxtra = pagezero->leaf_xtra;
1379         } else
1380                 initit = 1;
1381 #endif
1382
1383         mgr->page_size = 1 << pagebits;
1384         mgr->page_bits = pagebits;
1385         mgr->leaf_xtra = leafxtra;
1386
1387         //  calculate number of latch hash table entries
1388
1389         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1390
1391         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1392         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1) >> mgr->page_bits;
1393         mgr->latchtotal = nodemax;
1394
1395         //  calculate number of leaf hash table entries
1396
1397         mgr->nleafpage = ((uid)leafmax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1398
1399         mgr->nleafpage += leafmax << leafxtra;          // size of the leaf pool in pages
1400         mgr->nleafpage += (sizeof(BtLatchSet) * leafmax + mgr->page_size - 1) >> mgr->page_bits;
1401         mgr->leaftotal = leafmax;
1402
1403         mgr->redopage = LEAF_page + (1 << leafxtra);
1404
1405         if( !initit )
1406                 goto mgrlatch;
1407
1408         // initialize an empty b-tree with latch page, root page, page of leaves
1409         // and page(s) of latches and page pool cache
1410
1411         ftruncate (mgr->idx, (mgr->redopage + pagezero->redopages) << mgr->page_bits);
1412         memset (pagezero, 0, 1 << pagebits);
1413         pagezero->alloc->lvl = MIN_lvl - 1;
1414         pagezero->redopages = redopages;
1415         pagezero->page_bits = mgr->page_bits;
1416         pagezero->leaf_xtra = leafxtra;
1417
1418         bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages);
1419         pagezero->leafpages = 1;
1420
1421         //  initialize left-most LEAF page in
1422         //      alloc->left and count of active leaf pages.
1423
1424         bt_putid (pagezero->alloc->left, LEAF_page);
1425
1426         if( bt_writepage (mgr, pagezero->alloc, 0, 0) ) {
1427                 fprintf (stderr, "Unable to create btree page zero\n");
1428                 return bt_mgrclose (mgr), NULL;
1429         }
1430
1431         memset (pagezero, 0, 1 << pagebits);
1432
1433         for( lvl=MIN_lvl; lvl--; ) {
1434         BtSlot *node = slotptr(pagezero->alloc, 1);
1435                 node->off = mgr->page_size;
1436
1437                 if( !lvl )
1438                         node->off <<= mgr->leaf_xtra;
1439
1440                 node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1441                 key = keyptr(pagezero->alloc, 1);
1442                 key->len = 2;           // create stopper key
1443                 key->key[0] = 0xff;
1444                 key->key[1] = 0xff;
1445
1446                 bt_putid(value, MIN_lvl - lvl + 1);
1447                 val = valptr(pagezero->alloc, 1);
1448                 val->len = lvl ? BtId : 0;
1449                 memcpy (val->value, value, val->len);
1450
1451                 pagezero->alloc->min = node->off;
1452                 pagezero->alloc->lvl = lvl;
1453                 pagezero->alloc->cnt = 1;
1454                 pagezero->alloc->act = 1;
1455                 pagezero->alloc->page_no = MIN_lvl - lvl;
1456
1457                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, !lvl) ) {
1458                         fprintf (stderr, "Unable to create btree page\n");
1459                         return bt_mgrclose (mgr), NULL;
1460                 }
1461         }
1462
1463 mgrlatch:
1464 #ifdef unix
1465         free (pagezero);
1466 #else
1467         VirtualFree (pagezero, 0, MEM_RELEASE);
1468 #endif
1469
1470         // mlock the first segment of 64K pages
1471
1472         flag = PROT_READ | PROT_WRITE;
1473         mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1474         mgr->segments = 1;
1475
1476         if( mgr->pages[0] == MAP_FAILED ) {
1477                 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1478                 return bt_mgrclose (mgr), NULL;
1479         }
1480
1481         mgr->pagezero = (BtPageZero *)mgr->pages[0];
1482         mlock (mgr->pagezero, mgr->page_size);
1483
1484         mgr->redobuff = mgr->pages[0] + (mgr->redopage << mgr->page_bits);
1485         mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1486
1487         //      allocate pool buffers
1488
1489         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1490         if( mgr->pagepool == MAP_FAILED ) {
1491                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1492                 return bt_mgrclose (mgr), NULL;
1493         }
1494
1495         mgr->leafpool = mmap (0, (uid)mgr->nleafpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1496         if( mgr->leafpool == MAP_FAILED ) {
1497                 fprintf (stderr, "Unable to mmap anonymous leaf pool pages, error = %d\n", errno);
1498                 return bt_mgrclose (mgr), NULL;
1499         }
1500
1501         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1502         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1503         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1504
1505         mgr->leafsets = (BtLatchSet *)(mgr->leafpool + ((uid)mgr->leaftotal << mgr->page_bits << mgr->leaf_xtra));
1506         mgr->leaftable = (BtHashEntry *)(mgr->leafsets + mgr->leaftotal);
1507         mgr->leafhash = (mgr->leafpool + ((uid)mgr->nleafpage << mgr->page_bits) - (unsigned char *)mgr->leaftable) / sizeof(BtHashEntry);
1508
1509         return mgr;
1510 }
1511
1512 //      open BTree access method
1513 //      based on buffer manager
1514
1515 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1516 {
1517 BtDb *bt = malloc (sizeof(*bt));
1518
1519         memset (bt, 0, sizeof(*bt));
1520         bt->main = main;
1521         bt->mgr = mgr;
1522 #ifdef unix
1523         bt->cursor = valloc (mgr->page_size << mgr->leaf_xtra);
1524 #else
1525         bt->cursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
1526 #endif
1527 #ifdef unix
1528         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1529 #else
1530         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1531 #endif
1532         return bt;
1533 }
1534
1535 //  compare two keys, return > 0, = 0, or < 0
1536 //  =0: keys are same
1537 //  -1: key2 > key1
1538 //  +1: key2 < key1
1539 //  as the comparison value
1540
1541 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1542 {
1543 uint len1 = key1->len;
1544 int ans;
1545
1546         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1547                 return ans;
1548
1549         if( len1 > len2 )
1550                 return 1;
1551         if( len1 < len2 )
1552                 return -1;
1553
1554         return 0;
1555 }
1556
1557 // place write, read, or parent lock on requested page_no.
1558
1559 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1560 {
1561         switch( mode ) {
1562         case BtLockRead:
1563                 ReadLock (latch->readwr, thread_no);
1564                 break;
1565         case BtLockWrite:
1566 //if(latch->leaf)
1567 //fprintf(stderr, "WrtRqst %d by %d at %d\n", (uint)latch->page_no, thread_no, line); 
1568                 WriteLock (latch->readwr, thread_no, line);
1569 //if(latch->leaf)
1570 //fprintf(stderr, "WrtLock %d by %d at %d\n", (uint)latch->page_no, thread_no, line); 
1571                 break;
1572         case BtLockAccess:
1573                 ReadLock (latch->access, thread_no);
1574                 break;
1575         case BtLockDelete:
1576                 WriteLock (latch->access, thread_no, line);
1577                 break;
1578         case BtLockParent:
1579                 WriteLock (latch->parent, thread_no, line);
1580                 break;
1581         case BtLockLink:
1582                 WriteLock (latch->link, thread_no, line);
1583                 break;
1584         }
1585 }
1586
1587 // remove write, read, or parent lock on requested page
1588
1589 void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1590 {
1591         switch( mode ) {
1592         case BtLockRead:
1593                 ReadRelease (latch->readwr);
1594                 break;
1595         case BtLockWrite:
1596 //if(latch->leaf)
1597 //fprintf(stderr, "Un Lock %d by %d at %d\n", (uint)latch->page_no, thread_no, line); 
1598                 WriteRelease (latch->readwr);
1599                 break;
1600         case BtLockAccess:
1601                 ReadRelease (latch->access);
1602                 break;
1603         case BtLockDelete:
1604                 WriteRelease (latch->access);
1605                 break;
1606         case BtLockParent:
1607                 WriteRelease (latch->parent);
1608                 break;
1609         case BtLockLink:
1610                 WriteRelease (latch->link);
1611                 break;
1612         }
1613 }
1614
1615 //      allocate a new page
1616 //      return with page latched, but unlocked.
1617
1618 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_no)
1619 {
1620 uint page_size = mgr->page_size, page_xtra = 0;
1621 unsigned char *freechain;
1622 uid page_no;
1623
1624         if( contents->lvl )
1625                 freechain = mgr->pagezero->freechain;
1626         else {
1627                 freechain = mgr->pagezero->leafchain;
1628                 mgr->pagezero->leafpages++;
1629                 page_xtra = mgr->leaf_xtra;
1630                 page_size <<= page_xtra;
1631         }
1632
1633         //      lock allocation page
1634
1635         bt_mutexlock(mgr->lock);
1636
1637         // use empty chain first
1638         // else allocate new page
1639
1640         if( page_no = bt_getid(freechain) ) {
1641                 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1642                         set->page = bt_mappage (mgr, set->latch);
1643                 else
1644                         return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1645
1646                 bt_putid(freechain, bt_getid(set->page->right));
1647
1648                 //  the page is currently nopromote and this
1649                 //  will keep bt_txnpromote out.
1650
1651                 //      contents will replace this bit
1652                 //  and pin will keep bt_txnpromote out
1653
1654                 contents->page_no = page_no;
1655                 contents->nopromote = 0;
1656                 set->latch->dirty = 1;
1657
1658                 memcpy (set->page, contents, page_size);
1659
1660 //              if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1661 //                fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1662
1663                 bt_releasemutex(mgr->lock);
1664                 return 0;
1665         }
1666
1667         page_no = bt_getid(mgr->pagezero->alloc->right);
1668         bt_putid(mgr->pagezero->alloc->right, page_no+(1 << page_xtra));
1669
1670         // unlock allocation latch and
1671         //      extend file into new page.
1672
1673 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1674 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1675         bt_releasemutex(mgr->lock);
1676
1677         //      keep bt_txnpromote out of this page
1678         contents->nopromote = 1;
1679         contents->page_no = page_no;
1680         if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
1681                 fprintf(stderr, "Write %d error %d\n", (uint)page_no, errno);
1682         if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1683                 set->page = bt_mappage (mgr, set->latch);
1684         else
1685                 return mgr->err_thread = thread_no, mgr->err;
1686
1687         // now pin will keep bt_txnpromote out
1688
1689         set->page->nopromote = 0;
1690         set->latch->dirty = 1;
1691         return 0;
1692 }
1693
1694 //  find slot in page for given key at a given level
1695
1696 int bt_findslot (BtPage page, unsigned char *key, uint len)
1697 {
1698 uint diff, higher = page->cnt, low = 1, slot;
1699 uint good = 0;
1700
1701         //        make stopper key an infinite fence value
1702
1703         if( bt_getid (page->right) )
1704                 higher++;
1705         else
1706                 good++;
1707
1708         //      low is the lowest candidate.
1709         //  loop ends when they meet
1710
1711         //  higher is already
1712         //      tested as .ge. the passed key.
1713
1714         while( diff = higher - low ) {
1715                 slot = low + ( diff >> 1 );
1716                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1717                         low = slot + 1;
1718                 else
1719                         higher = slot, good++;
1720         }
1721
1722         //      return zero if key is on right link page
1723
1724         return good ? higher : 0;
1725 }
1726
1727 //  find and load page at given level for given key
1728 //      leave page rd or wr locked as requested
1729
1730 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1731 {
1732 uid page_no = ROOT_page, prevpage_no = 0;
1733 uint drill = 0xff, slot;
1734 BtLatchSet *prevlatch;
1735 uint mode, prevmode;
1736 BtPage prevpage;
1737 BtVal *val;
1738 BtKey *ptr;
1739
1740   //  start at root of btree and drill down
1741
1742   do {
1743         // determine lock mode of drill level
1744         mode = (drill == lvl) ? lock : BtLockRead; 
1745
1746         if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1747           set->page = bt_mappage (mgr, set->latch);
1748         else
1749           return 0;
1750
1751         // obtain access lock using lock chaining with Access mode
1752
1753         if( page_no > ROOT_page )
1754           bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1755
1756         //      release & unpin parent or left sibling page
1757
1758         if( prevpage_no ) {
1759           bt_unlockpage(prevmode, prevlatch, thread_no, __LINE__);
1760           bt_unpinlatch (prevlatch, thread_no, __LINE__);
1761           prevpage_no = 0;
1762         }
1763
1764         // obtain mode lock using lock coupling through AccessLock
1765
1766         bt_lockpage(mode, set->latch, thread_no, __LINE__);
1767
1768         // grab our fence key
1769
1770         ptr=keyptr(set->page,set->page->cnt);
1771
1772         if( set->page->free )
1773                 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1774
1775         if( page_no > ROOT_page )
1776           bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1777
1778         // re-read and re-lock root after determining actual level of root
1779
1780         if( set->page->lvl != drill) {
1781                 if( set->latch->page_no != ROOT_page )
1782                         return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1783                         
1784                 drill = set->page->lvl;
1785
1786                 if( lock != BtLockRead && drill == lvl ) {
1787                   bt_unlockpage(mode, set->latch, thread_no, __LINE__);
1788                   bt_unpinlatch (set->latch, thread_no, __LINE__);
1789                   continue;
1790                 }
1791         }
1792
1793         prevpage_no = set->latch->page_no;
1794         prevlatch = set->latch;
1795         prevpage = set->page;
1796         prevmode = mode;
1797
1798         //  if requested key is beyond our fence,
1799         //      slide to the right
1800
1801         if( keycmp (ptr, key, len) < 0 )
1802           if( page_no = bt_getid(set->page->right) )
1803                 continue;
1804
1805         //  if page is part of a delete operation,
1806         //      slide to the left;
1807
1808         if( set->page->kill ) {
1809           bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
1810           page_no = bt_getid(set->page->left);
1811           bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
1812           continue;
1813         }
1814
1815         //  find key on page at this level
1816         //  and descend to requested level
1817
1818         if( slot = bt_findslot (set->page, key, len) ) {
1819           if( drill == lvl )
1820                 return slot;
1821
1822           // find next non-dead slot -- the fence key if nothing else
1823
1824           while( slotptr(set->page, slot)->dead )
1825                 if( slot++ < set->page->cnt )
1826                   continue;
1827                 else
1828                   return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1829
1830           val = valptr(set->page, slot);
1831
1832           if( val->len == BtId )
1833                 page_no = bt_getid(val->value);
1834           else
1835                 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0;
1836
1837           drill--;
1838           continue;
1839          }
1840
1841         //  slide right into next page
1842
1843         page_no = bt_getid(set->page->right);
1844
1845   } while( page_no );
1846
1847   // return error on end of right chain
1848
1849   mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1850   return 0;     // return error
1851 }
1852
1853 //      return page to free list
1854 //      page must be delete & write locked
1855 //      and have no keys pointing to it.
1856
1857 void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1858 {
1859 unsigned char *freechain;
1860
1861         if( set->page->lvl )
1862                 freechain = mgr->pagezero->freechain;
1863         else {
1864                 freechain = mgr->pagezero->leafchain;
1865                 mgr->pagezero->leafpages--;
1866         }
1867
1868         //      lock allocation page
1869
1870         bt_mutexlock (mgr->lock);
1871
1872         //      store chain
1873
1874         memcpy(set->page->right, freechain, BtId);
1875         bt_putid(freechain, set->latch->page_no);
1876         set->latch->dirty = 1;
1877         set->page->free = 1;
1878
1879 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1880 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1881
1882         // unlock released page
1883         // and unlock allocation page
1884
1885         bt_unlockpage (BtLockDelete, set->latch, thread_no, __LINE__);
1886         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1887         bt_unpinlatch (set->latch, thread_no, __LINE__);
1888         bt_releasemutex (mgr->lock);
1889 }
1890
1891 //      a fence key was deleted from a page
1892 //      push new fence value upwards
1893
1894 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1895 {
1896 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1897 unsigned char value[BtId];
1898 BtKey* ptr;
1899 uint idx;
1900
1901         //      remove the old fence value
1902
1903         ptr = keyptr(set->page, set->page->cnt);
1904         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1905         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1906         set->latch->dirty = 1;
1907
1908         //  cache new fence value
1909
1910         ptr = keyptr(set->page, set->page->cnt);
1911         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1912
1913         bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
1914         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1915
1916         //      insert new (now smaller) fence key
1917
1918         bt_putid (value, set->latch->page_no);
1919         ptr = (BtKey*)leftkey;
1920
1921         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1922           return mgr->err_thread = thread_no, mgr->err;
1923
1924         //      now delete old fence key
1925
1926         ptr = (BtKey*)rightkey;
1927
1928         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1929                 return mgr->err_thread = thread_no, mgr->err;
1930
1931         bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
1932         bt_unpinlatch(set->latch, thread_no, __LINE__);
1933         return 0;
1934 }
1935
1936 //      root has a single child
1937 //      collapse a level from the tree
1938
1939 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1940 {
1941 BtPageSet child[1];
1942 uid page_no;
1943 BtVal *val;
1944 uint idx;
1945
1946   // find the child entry and promote as new root contents
1947
1948   do {
1949         for( idx = 0; idx++ < root->page->cnt; )
1950           if( !slotptr(root->page, idx)->dead )
1951                 break;
1952
1953         val = valptr(root->page, idx);
1954
1955         if( val->len == BtId )
1956                 page_no = bt_getid (valptr(root->page, idx)->value);
1957         else
1958                 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1959
1960         if( child->latch = bt_pinlatch (mgr, page_no, thread_no) )
1961                 child->page = bt_mappage (mgr, child->latch);
1962         else
1963                 return mgr->err_thread = thread_no, mgr->err;
1964
1965         bt_lockpage (BtLockDelete, child->latch, thread_no, __LINE__);
1966         bt_lockpage (BtLockWrite, child->latch, thread_no, __LINE__);
1967
1968         memcpy (root->page, child->page, mgr->page_size);
1969         root->latch->dirty = 1;
1970
1971         bt_freepage (mgr, child, thread_no);
1972
1973   } while( root->page->lvl > 1 && root->page->act == 1 );
1974
1975   bt_unlockpage (BtLockWrite, root->latch, thread_no, __LINE__);
1976   bt_unpinlatch (root->latch, thread_no, __LINE__);
1977   return 0;
1978 }
1979
1980 //  delete a page and manage key
1981 //  call with page writelocked
1982
1983 //      returns with page unpinned
1984 //      from the page pool.
1985
1986 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl)
1987 {
1988 unsigned char lowerfence[BT_keyarray];
1989 uint page_size = mgr->page_size;
1990 BtPageSet right[1], temp[1];
1991 unsigned char value[BtId];
1992 uid page_no, right2;
1993 BtKey *ptr;
1994
1995         if( !lvl )
1996                 page_size <<= mgr->leaf_xtra;
1997
1998         //  cache copy of original fence key
1999         //      that is being deleted.
2000
2001         ptr = keyptr(set->page, set->page->cnt);
2002         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
2003
2004         //      obtain locks on right page
2005
2006         page_no = bt_getid(set->page->right);
2007
2008         if( right->latch = lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
2009                 right->page = bt_mappage (mgr, right->latch);
2010         else
2011                 return 0;
2012
2013         bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2014
2015         if( right->page->kill )
2016                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
2017
2018         // pull contents of right peer into our empty page
2019         //      preserving our left page number.
2020
2021         bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
2022         page_no = bt_getid(set->page->left);
2023         memcpy (set->page, right->page, page_size);
2024         bt_putid (set->page->left, page_no);
2025         bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
2026
2027         set->page->page_no = set->latch->page_no;
2028         set->latch->dirty = 1;
2029
2030         if( right2 = bt_getid (right->page->right) ) {
2031           if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2032                 temp->page = bt_mappage (mgr, temp->latch);
2033           else
2034                 return 0;
2035
2036       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2037           bt_putid (temp->page->left, set->latch->page_no);
2038           bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2039           bt_unpinlatch (temp->latch, thread_no, __LINE__);
2040         } else {        // page is rightmost
2041           bt_mutexlock (mgr->lock);
2042           bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
2043           bt_releasemutex(mgr->lock);
2044         }
2045
2046         // mark right page deleted and release lock
2047
2048         right->latch->dirty = 1;
2049         right->page->kill = 1;
2050
2051         // redirect higher key directly to our new node contents
2052
2053         ptr = keyptr(right->page, right->page->cnt);
2054         bt_putid (value, set->latch->page_no);
2055
2056         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
2057           return mgr->err;
2058
2059         bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2060
2061         //      delete orignal fence key in parent
2062         //      and unlock our page.
2063
2064         ptr = (BtKey *)lowerfence;
2065
2066         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
2067           return mgr->err;
2068
2069         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2070         bt_unpinlatch (set->latch, thread_no, __LINE__);
2071
2072         //      obtain delete and write locks to right node
2073
2074         bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
2075         bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2076         bt_freepage (mgr, right, thread_no);
2077         return 0;
2078 }
2079
2080 //  find and delete key on page by marking delete flag bit
2081 //  if page becomes empty, delete it from the btree
2082
2083 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2084 {
2085 uint slot, idx, found, fence;
2086 BtPageSet set[1];
2087 BtSlot *node;
2088 BtKey *ptr;
2089 BtVal *val;
2090
2091         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2092                 node = slotptr(set->page, slot);
2093                 ptr = keyptr(set->page, slot);
2094         } else
2095                 return mgr->err_thread = thread_no, mgr->err;
2096
2097         // if librarian slot, advance to real slot
2098
2099         if( node->type == Librarian ) {
2100                 ptr = keyptr(set->page, ++slot);
2101                 node = slotptr(set->page, slot);
2102         }
2103
2104         fence = slot == set->page->cnt;
2105
2106         // delete the key, ignore request if already dead
2107
2108         if( found = !keycmp (ptr, key, len) )
2109           if( found = node->dead == 0 ) {
2110                 val = valptr(set->page,slot);
2111                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2112                 set->page->act--;
2113
2114                 //  mark node type as delete
2115
2116                 node->type = Delete;
2117                 node->dead = 1;
2118
2119                 // collapse empty slots beneath the fence
2120                 // on interiour nodes
2121
2122                 if( lvl )
2123                  while( idx = set->page->cnt - 1 )
2124                   if( slotptr(set->page, idx)->dead ) {
2125                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2126                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2127                   } else
2128                         break;
2129           }
2130
2131         if( !found )
2132                 return 0;
2133
2134         //      did we delete a fence key in an upper level?
2135
2136         if( lvl && set->page->act && fence )
2137           if( bt_fixfence (mgr, set, lvl, thread_no) )
2138                 return mgr->err_thread = thread_no, mgr->err;
2139           else
2140                 return 0;
2141
2142         //      do we need to collapse root?
2143
2144         if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2145           if( bt_collapseroot (mgr, set, thread_no) )
2146                 return mgr->err_thread = thread_no, mgr->err;
2147           else
2148                 return 0;
2149
2150         //      delete empty page
2151
2152         if( !set->page->act )
2153           return bt_deletepage (mgr, set, thread_no, set->page->lvl);
2154
2155         set->latch->dirty = 1;
2156         bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2157         bt_unpinlatch (set->latch, thread_no, __LINE__);
2158         return 0;
2159 }
2160
2161 //      advance to next slot
2162
2163 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2164 {
2165 BtLatchSet *prevlatch;
2166 uid page_no;
2167
2168         if( slot < set->page->cnt )
2169                 return slot + 1;
2170
2171         prevlatch = set->latch;
2172
2173         if( page_no = bt_getid(set->page->right) )
2174           if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
2175                 set->page = bt_mappage (bt->mgr, set->latch);
2176           else
2177                 return 0;
2178         else
2179           return bt->mgr->err = BTERR_struct, bt->mgr->err_thread = bt->thread_no, bt->mgr->line = __LINE__, 0;
2180
2181         // obtain access lock using lock chaining with Access mode
2182
2183         bt_lockpage(BtLockAccess, set->latch, bt->thread_no, __LINE__);
2184
2185         bt_unlockpage(BtLockRead, prevlatch, bt->thread_no, __LINE__);
2186         bt_unpinlatch (prevlatch, bt->thread_no, __LINE__);
2187
2188         bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
2189         bt_unlockpage(BtLockAccess, set->latch, bt->thread_no, __LINE__);
2190         return 1;
2191 }
2192
2193 //      find unique key == given key, or first duplicate key in
2194 //      leaf level and return number of value bytes
2195 //      or (-1) if not found.
2196
2197 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2198 {
2199 BtPageSet set[1];
2200 uint len, slot;
2201 int ret = -1;
2202 BtKey *ptr;
2203 BtVal *val;
2204
2205   if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2206    do {
2207         ptr = keyptr(set->page, slot);
2208
2209         //      skip librarian slot place holder
2210
2211         if( slotptr(set->page, slot)->type == Librarian )
2212                 ptr = keyptr(set->page, ++slot);
2213
2214         //      return actual key found
2215
2216         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2217         len = ptr->len;
2218
2219         if( slotptr(set->page, slot)->type == Duplicate )
2220                 len -= BtId;
2221
2222         //      not there if we reach the stopper key
2223
2224         if( slot == set->page->cnt )
2225           if( !bt_getid (set->page->right) )
2226                 break;
2227
2228         // if key exists, return >= 0 value bytes copied
2229         //      otherwise return (-1)
2230
2231         if( slotptr(set->page, slot)->dead )
2232                 continue;
2233
2234         if( keylen == len )
2235           if( !memcmp (ptr->key, key, len) ) {
2236                 val = valptr (set->page,slot);
2237                 if( valmax > val->len )
2238                         valmax = val->len;
2239                 memcpy (value, val->value, valmax);
2240                 ret = valmax;
2241           }
2242
2243         break;
2244
2245    } while( slot = bt_findnext (bt, set, slot) );
2246
2247   bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
2248   bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
2249   return ret;
2250 }
2251
2252 //      check page for space available,
2253 //      clean if necessary and return
2254 //      0 - page needs splitting
2255 //      >0  new slot value
2256
2257 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2258 {
2259 uint page_size = mgr->page_size;
2260 BtPage page = set->page, frame;
2261 uint cnt = 0, idx = 0;
2262 uint max = page->cnt;
2263 uint newslot = max;
2264 BtKey *key;
2265 BtVal *val;
2266
2267         if( !set->page->lvl )
2268                 page_size <<= mgr->leaf_xtra;
2269
2270         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2271                 return slot;
2272
2273         //      skip cleanup and proceed to split
2274         //      if there's not enough garbage
2275         //      to bother with.
2276
2277         if( page->garbage < page_size / 5 )
2278                 return 0;
2279
2280         frame = malloc (page_size);
2281         memcpy (frame, page, page_size);
2282
2283         // skip page info and set rest of page to zero
2284
2285         memset (page+1, 0, page_size - sizeof(*page));
2286         set->latch->dirty = 1;
2287
2288         page->min = page_size;
2289         page->garbage = 0;
2290         page->act = 0;
2291
2292         // clean up page first by
2293         // removing deleted keys
2294
2295         while( cnt++ < max ) {
2296                 if( cnt == slot )
2297                         newslot = idx + 2;
2298
2299                 if( cnt < max || frame->lvl )
2300                   if( slotptr(frame,cnt)->dead )
2301                         continue;
2302
2303                 // copy the value across
2304
2305                 val = valptr(frame, cnt);
2306                 page->min -= val->len + sizeof(BtVal);
2307                 memcpy ((unsigned char *)page + page->min, val, val->len + sizeof(BtVal));
2308
2309                 // copy the key across
2310
2311                 key = keyptr(frame, cnt);
2312                 page->min -= key->len + sizeof(BtKey);
2313                 memcpy ((unsigned char *)page + page->min, key, key->len + sizeof(BtKey));
2314
2315                 // make a librarian slot
2316
2317                 slotptr(page, ++idx)->off = page->min;
2318                 slotptr(page, idx)->type = Librarian;
2319                 slotptr(page, idx)->dead = 1;
2320
2321                 // set up the slot
2322
2323                 slotptr(page, ++idx)->off = page->min;
2324                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2325
2326                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2327                         page->act++;
2328         }
2329
2330         page->cnt = idx;
2331         free (frame);
2332
2333         //      see if page has enough space now, or does it need splitting?
2334
2335         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2336                 return newslot;
2337
2338         return 0;
2339 }
2340
2341 // split the root and raise the height of the btree
2342
2343 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort thread_no)
2344 {  
2345 unsigned char leftkey[BT_keyarray];
2346 uint nxt = mgr->page_size;
2347 unsigned char value[BtId];
2348 BtPageSet left[1];
2349 uid left_page_no;
2350 BtPage frame;
2351 BtKey *ptr;
2352 BtVal *val;
2353
2354         frame = malloc (mgr->page_size);
2355         memcpy (frame, root->page, mgr->page_size);
2356
2357         //      save left page fence key for new root
2358
2359         ptr = keyptr(root->page, root->page->cnt);
2360         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2361
2362         //  Obtain an empty page to use, and copy the current
2363         //  root contents into it, e.g. lower keys
2364
2365         if( bt_newpage(mgr, left, frame, thread_no) )
2366                 return mgr->err_thread = thread_no, mgr->err;
2367
2368         left_page_no = left->latch->page_no;
2369         bt_unpinlatch (left->latch, thread_no, __LINE__);
2370         free (frame);
2371
2372         // preserve the page info at the bottom
2373         // of higher keys and set rest to zero
2374
2375         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2376
2377         // insert stopper key at top of newroot page
2378         // and increase the root height
2379
2380         nxt -= BtId + sizeof(BtVal);
2381         bt_putid (value, right->page_no);
2382         val = (BtVal *)((unsigned char *)root->page + nxt);
2383         memcpy (val->value, value, BtId);
2384         val->len = BtId;
2385
2386         nxt -= 2 + sizeof(BtKey);
2387         slotptr(root->page, 2)->off = nxt;
2388         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2389         ptr->len = 2;
2390         ptr->key[0] = 0xff;
2391         ptr->key[1] = 0xff;
2392
2393         // insert lower keys page fence key on newroot page as first key
2394
2395         nxt -= BtId + sizeof(BtVal);
2396         bt_putid (value, left_page_no);
2397         val = (BtVal *)((unsigned char *)root->page + nxt);
2398         memcpy (val->value, value, BtId);
2399         val->len = BtId;
2400
2401         ptr = (BtKey *)leftkey;
2402         nxt -= ptr->len + sizeof(BtKey);
2403         slotptr(root->page, 1)->off = nxt;
2404         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2405         
2406         bt_putid(root->page->right, 0);
2407         root->page->min = nxt;          // reset lowest used offset and key count
2408         root->page->cnt = 2;
2409         root->page->act = 2;
2410         root->page->lvl++;
2411
2412         mgr->pagezero->alloc->lvl = root->page->lvl;
2413
2414         // release and unpin root pages
2415
2416         bt_unlockpage(BtLockWrite, root->latch, thread_no, __LINE__);
2417         bt_unpinlatch (root->latch, thread_no, __LINE__);
2418
2419         bt_unpinlatch (right, thread_no, __LINE__);
2420         return 0;
2421 }
2422
2423 //  split already locked full node
2424 //      leave it locked.
2425 //      return pool entry for new right
2426 //      page, pinned & unlocked
2427
2428 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint linkleft)
2429 {
2430 uint page_size = mgr->page_size;
2431 BtPageSet right[1], temp[1];
2432 uint cnt = 0, idx = 0, max;
2433 uint lvl = set->page->lvl;
2434 BtKey *key, *ptr;
2435 BtVal *val, *src;
2436 BtPage frame;
2437 uid right2;
2438 uint entry;
2439 uint prev;
2440
2441         if( !set->page->lvl )
2442                 page_size <<= mgr->leaf_xtra;
2443
2444         //  split higher half of keys to frame
2445
2446         frame = malloc (page_size);
2447         memset (frame, 0, page_size);
2448         frame->min = page_size;
2449         max = set->page->cnt;
2450         cnt = max / 2;
2451         idx = 0;
2452
2453         while( cnt++ < max ) {
2454                 if( cnt < max || set->page->lvl )
2455                   if( slotptr(set->page, cnt)->dead )
2456                         continue;
2457
2458                 src = valptr(set->page, cnt);
2459                 frame->min -= src->len + sizeof(BtVal);
2460                 memcpy ((unsigned char *)frame + frame->min, src, src->len + sizeof(BtVal));
2461
2462                 key = keyptr(set->page, cnt);
2463                 frame->min -= key->len + sizeof(BtKey);
2464                 ptr = (BtKey*)((unsigned char *)frame + frame->min);
2465                 memcpy (ptr, key, key->len + sizeof(BtKey));
2466
2467                 //      add librarian slot
2468
2469                 slotptr(frame, ++idx)->off = frame->min;
2470                 slotptr(frame, idx)->type = Librarian;
2471                 slotptr(frame, idx)->dead = 1;
2472
2473                 //  add actual slot
2474
2475                 slotptr(frame, ++idx)->off = frame->min;
2476                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2477
2478                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2479                         frame->act++;
2480         }
2481
2482         frame->cnt = idx;
2483         frame->lvl = lvl;
2484
2485         // link right node
2486
2487         if( set->latch->page_no > ROOT_page ) {
2488                 right2 = bt_getid (set->page->right);
2489                 bt_putid (frame->right, right2);
2490                 bt_putid (frame->left, set->latch->page_no);
2491         }
2492
2493         //      get new free page and write higher keys to it.
2494
2495         if( bt_newpage(mgr, right, frame, thread_no) )
2496                 return 0;
2497
2498         //      link far right's left pointer to new page
2499
2500         if( linkleft && set->latch->page_no > ROOT_page )
2501          if( right2 ) {
2502           if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2503                 temp->page = bt_mappage (mgr, temp->latch);
2504           else
2505                 return 0;
2506
2507       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2508           bt_putid (temp->page->left, right->latch->page_no);
2509           bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2510           bt_unpinlatch (temp->latch, thread_no, __LINE__);
2511          } else {       // page is rightmost
2512           bt_mutexlock (mgr->lock);
2513           bt_putid (mgr->pagezero->alloc->left, right->latch->page_no);
2514           bt_releasemutex(mgr->lock);
2515          }
2516
2517         // process lower keys
2518
2519         memcpy (frame, set->page, page_size);
2520         memset (set->page+1, 0, page_size - sizeof(*set->page));
2521         set->latch->dirty = 1;
2522
2523         set->page->min = page_size;
2524         set->page->garbage = 0;
2525         set->page->act = 0;
2526         max /= 2;
2527         cnt = 0;
2528         idx = 0;
2529
2530         //  assemble page of smaller keys
2531
2532         while( cnt++ < max ) {
2533                 if( slotptr(frame, cnt)->dead )
2534                         continue;
2535                 val = valptr(frame, cnt);
2536                 set->page->min -= val->len + sizeof(BtVal);
2537                 memcpy ((unsigned char *)set->page + set->page->min, val, val->len + sizeof(BtVal));
2538
2539                 key = keyptr(frame, cnt);
2540                 set->page->min -= key->len + sizeof(BtKey);
2541                 memcpy ((unsigned char *)set->page + set->page->min, key, key->len + sizeof(BtKey));
2542
2543                 //      add librarian slot
2544
2545                 slotptr(set->page, ++idx)->off = set->page->min;
2546                 slotptr(set->page, idx)->type = Librarian;
2547                 slotptr(set->page, idx)->dead = 1;
2548
2549                 //      add actual slot
2550
2551                 slotptr(set->page, ++idx)->off = set->page->min;
2552                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2553                 set->page->act++;
2554         }
2555
2556         bt_putid(set->page->right, right->latch->page_no);
2557         set->page->cnt = idx;
2558         free(frame);
2559
2560         entry = right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
2561         return entry;
2562 }
2563
2564 //      fix keys for newly split page
2565 //      call with both pages pinned & locked
2566 //  return unlocked and unpinned
2567
2568 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2569 {
2570 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2571 unsigned char value[BtId];
2572 uint lvl = set->page->lvl;
2573 BtPage page;
2574 BtKey *ptr;
2575
2576         // if current page is the root page, split it
2577
2578         if( set->latch->page_no == ROOT_page )
2579                 return bt_splitroot (mgr, set, right, thread_no);
2580
2581         ptr = keyptr(set->page, set->page->cnt);
2582         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2583
2584         page = bt_mappage (mgr, right);
2585
2586         ptr = keyptr(page, page->cnt);
2587         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2588
2589         // insert new fences in their parent pages
2590
2591         bt_lockpage (BtLockParent, right, thread_no, __LINE__);
2592
2593         bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
2594         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2595
2596         // insert new fence for reformulated left block of smaller keys
2597
2598         bt_putid (value, set->latch->page_no);
2599         ptr = (BtKey *)leftkey;
2600
2601         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2602                 return mgr->err_thread = thread_no, mgr->err;
2603
2604         // switch fence for right block of larger keys to new right page
2605
2606         bt_putid (value, right->page_no);
2607         ptr = (BtKey *)rightkey;
2608
2609         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2610                 return mgr->err_thread = thread_no, mgr->err;
2611
2612         bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
2613         bt_unpinlatch (set->latch, thread_no, __LINE__);
2614
2615         bt_unlockpage (BtLockParent, right, thread_no, __LINE__);
2616         bt_unpinlatch (right, thread_no, __LINE__);
2617         return 0;
2618 }
2619
2620 //      install new key and value onto page
2621 //      page must already be checked for
2622 //      adequate space
2623
2624 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2625 {
2626 uint idx, librarian;
2627 BtSlot *node;
2628 BtKey *ptr;
2629 BtVal *val;
2630 int rate;
2631
2632         //      if previous slot is a librarian slot, use it
2633
2634         if( slot > 1 )
2635           if( slotptr(set->page, slot-1)->type == Librarian )
2636                 slot--;
2637
2638         // copy value onto page
2639
2640         set->page->min -= vallen + sizeof(BtVal);
2641         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2642         memcpy (val->value, value, vallen);
2643         val->len = vallen;
2644
2645         // copy key onto page
2646
2647         set->page->min -= keylen + sizeof(BtKey);
2648         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2649         memcpy (ptr->key, key, keylen);
2650         ptr->len = keylen;
2651         
2652         //      find first empty slot
2653
2654         for( idx = slot; idx < set->page->cnt; idx++ )
2655           if( slotptr(set->page, idx)->dead )
2656                 break;
2657
2658         // now insert key into array before slot,
2659         //      adding as many librarian slots as
2660         //      makes sense.
2661
2662         if( idx == set->page->cnt ) {
2663         int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2664
2665                 librarian = ++idx - slot;
2666                 avail /= sizeof(BtSlot);
2667
2668                 if( avail < 0 )
2669                         avail = 0;
2670
2671                 if( librarian > avail )
2672                         librarian = avail;
2673
2674                 if( librarian ) {
2675                         rate = (idx - slot) / librarian;
2676                         set->page->cnt += librarian;
2677                         idx += librarian;
2678                 } else
2679                         rate = 0;
2680         } else
2681                 librarian = 0, rate = 0;
2682
2683         while( idx > slot ) {
2684                 //  transfer slot
2685                 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2686                 idx--;
2687
2688                 //      add librarian slot per rate
2689
2690                 if( librarian )
2691                  if( (idx - slot + 1)/2 <= librarian * rate ) {
2692                         node = slotptr(set->page, idx--);
2693                         node->off = node[1].off;
2694                         node->type = Librarian;
2695                         node->dead = 1;
2696                         librarian--;
2697                   }
2698         }
2699
2700         set->latch->dirty = 1;
2701         set->page->act++;
2702
2703         //      fill in new slot
2704
2705         node = slotptr(set->page, slot);
2706         node->off = set->page->min;
2707         node->type = type;
2708         node->dead = 0;
2709         return 0;
2710 }
2711
2712 //  Insert new key into the btree at given level.
2713 //      either add a new key or update/add an existing one
2714
2715 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2716 {
2717 uint slot, idx, len, entry;
2718 BtPageSet set[1];
2719 BtSlot *node;
2720 BtKey *ptr;
2721 BtVal *val;
2722
2723   while( 1 ) { // find the page and slot for the current key
2724         if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2725                 node = slotptr(set->page, slot);
2726                 ptr = keyptr(set->page, slot);
2727         } else {
2728                 if( !mgr->err )
2729                         mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2730                 return mgr->err_thread = thread_no, mgr->err;
2731         }
2732
2733         // if librarian slot == found slot, advance to real slot
2734
2735         if( node->type == Librarian ) {
2736 //        if( !keycmp (ptr, key, keylen) ) {
2737                 ptr = keyptr(set->page, ++slot);
2738                 node = slotptr(set->page, slot);
2739           }
2740
2741         //  if inserting a duplicate key or unique
2742         //      key that doesn't exist on the page,
2743         //      check for adequate space on the page
2744         //      and insert the new key before slot.
2745
2746         switch( type ) {
2747         case Unique:
2748         case Duplicate:
2749           if( !keycmp (ptr, key, keylen) )
2750                 break;
2751
2752           if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2753             if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
2754                   return mgr->err;
2755                 else
2756                   goto insxit;
2757
2758           if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2759             if( !bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2760                   continue;
2761
2762           return mgr->err_thread = thread_no, mgr->err;
2763
2764         case Update:
2765           if( keycmp (ptr, key, keylen) )
2766                 goto insxit;
2767
2768           break;
2769         }
2770
2771         // if key already exists, update value and return
2772
2773         val = valptr(set->page, slot);
2774
2775         if( val->len >= vallen ) {
2776           if( node->dead )
2777                 set->page->act++;
2778           node->type = type;
2779           node->dead = 0;
2780
2781           set->page->garbage += val->len - vallen;
2782           set->latch->dirty = 1;
2783           val->len = vallen;
2784           memcpy (val->value, value, vallen);
2785           goto insxit;
2786         }
2787
2788         //  new update value doesn't fit in existing value area
2789         //      make sure page has room
2790
2791         if( !node->dead )
2792           set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2793         else
2794           set->page->act++;
2795
2796         node->type = type;
2797         node->dead = 0;
2798
2799         if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2800           break;
2801
2802         if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2803           if( !bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2804                 continue;
2805
2806         return mgr->err_thread = thread_no, mgr->err;
2807   }
2808
2809   //  copy key and value onto page and update slot
2810
2811   set->page->min -= vallen + sizeof(BtVal);
2812   val = (BtVal*)((unsigned char *)set->page + set->page->min);
2813   memcpy (val->value, value, vallen);
2814   val->len = vallen;
2815
2816   set->latch->dirty = 1;
2817   set->page->min -= keylen + sizeof(BtKey);
2818   ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2819   memcpy (ptr->key, key, keylen);
2820   ptr->len = keylen;
2821         
2822   slotptr(set->page,slot)->off = set->page->min;
2823
2824 insxit:
2825   bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2826   bt_unpinlatch (set->latch, thread_no, __LINE__);
2827   return 0;
2828 }
2829
2830 //      determine actual page where key is located
2831 //  return slot number
2832
2833 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2834 {
2835 BtKey *key = keyptr(source,src), *ptr;
2836 unsigned char fence[BT_keyarray];
2837 uint entry, slot;
2838
2839         if( locks[src].reuse )
2840           entry = locks[src-1].entry;
2841         else
2842           entry = locks[src].entry;
2843
2844         //      find where our key is located 
2845         //      on current page or pages split on
2846         //      same page txn operations.
2847
2848         do {
2849                 set->latch = mgr->leafsets + entry;
2850                 set->page = bt_mappage (mgr, set->latch);
2851
2852                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2853                   if( slotptr(set->page, slot)->type == Librarian )
2854                         slot++;
2855                   if( locks[src].reuse )
2856                         locks[src].entry = entry;
2857                   return slot;
2858                 }
2859         } while( entry = set->latch->split );
2860
2861         ptr = keyptr (set->page, set->page->cnt);
2862         memcpy (fence, ptr, ptr->len + 1);
2863
2864         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2865         return 0;
2866 }
2867
2868 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2869 {
2870 BtKey *key = keyptr(source, src);
2871 BtVal *val = valptr(source, src);
2872 BtLatchSet *latch;
2873 BtPageSet set[1];
2874 uint entry, slot;
2875
2876   while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2877         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2878           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type) )
2879                 return mgr->err_thread = thread_no, mgr->err;
2880
2881           set->page->lsn = lsn;
2882           return 0;
2883         }
2884
2885         //  split page
2886
2887         if( entry = bt_splitpage (mgr, set, thread_no, 0) )
2888           latch = mgr->leafsets + entry;
2889         else
2890           return mgr->err_thread = thread_no, mgr->err;
2891
2892         //      splice right page into split chain
2893         //      and WriteLock it
2894
2895         bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2896         latch->split = set->latch->split;
2897         set->latch->split = entry;
2898   }
2899
2900   return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
2901 }
2902
2903 //      perform delete from smaller btree
2904 //  insert a delete slot if not found there
2905
2906 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2907 {
2908 BtKey *key = keyptr(source, src);
2909 BtPageSet set[1];
2910 uint idx, slot;
2911 BtSlot *node;
2912 BtKey *ptr;
2913 BtVal *val;
2914
2915         if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2916           node = slotptr(set->page, slot);
2917           ptr = keyptr(set->page, slot);
2918           val = valptr(set->page, slot);
2919         } else
2920           return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
2921
2922         //  if slot is not found, insert a delete slot
2923
2924         if( keycmp (ptr, key->key, key->len) )
2925           if( bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete) )
2926                 return mgr->err;
2927
2928         //      if node is already dead,
2929         //      ignore the request.
2930
2931         if( node->dead )
2932                 return 0;
2933
2934         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2935         set->latch->dirty = 1;
2936         set->page->lsn = lsn;
2937         set->page->act--;
2938
2939         node->dead = 0;
2940         __sync_fetch_and_add(&mgr->found, 1);
2941         return 0;
2942 }
2943
2944 //      release master's splits from right to left
2945
2946 void bt_atomicrelease (BtMgr *mgr, uint entry, ushort thread_no)
2947 {
2948 BtLatchSet *latch = mgr->leafsets + entry;
2949
2950         if( latch->split )
2951                 bt_atomicrelease (mgr, latch->split, thread_no);
2952
2953         latch->split = 0;
2954         bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
2955         bt_unpinlatch(latch, thread_no, __LINE__);
2956 }
2957
2958 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2959 {
2960 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2961 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2962
2963         return keycmp (key1, key2->key, key2->len);
2964 }
2965 //      atomic modification of a batch of keys.
2966
2967 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2968 {
2969 uint src, idx, slot, samepage, entry, que = 0;
2970 BtKey *key, *ptr, *key2;
2971 int result = 0;
2972 BtSlot temp[1];
2973 logseqno lsn;
2974 int type;
2975
2976   // stable sort the list of keys into order to
2977   //    prevent deadlocks between threads.
2978 /*
2979   for( src = 1; src++ < source->cnt; ) {
2980         *temp = *slotptr(source,src);
2981         key = keyptr (source,src);
2982
2983         for( idx = src; --idx; ) {
2984           key2 = keyptr (source,idx);
2985           if( keycmp (key, key2->key, key2->len) < 0 ) {
2986                 *slotptr(source,idx+1) = *slotptr(source,idx);
2987                 *slotptr(source,idx) = *temp;
2988           } else
2989                 break;
2990         }
2991   }
2992 */
2993         qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2994   //  add entries to redo log
2995
2996   if( bt->mgr->pagezero->redopages )
2997         lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2998   else
2999         lsn = 0;
3000
3001   //  perform the individual actions in the transaction
3002
3003   if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
3004         return bt->mgr->err;
3005
3006   // if number of active pages
3007   // is greater than the buffer pool
3008   // promote page into larger btree
3009
3010   if( bt->main )
3011    while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no )
3012         if( bt_txnpromote (bt) )
3013           return bt->mgr->err;
3014
3015   // return success
3016
3017   return 0;
3018 }
3019
3020 //      execute the source list of inserts/deletes
3021
3022 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
3023 {
3024 uint src, idx, samepage, entry;
3025 BtPageSet set[1], prev[1];
3026 unsigned char value[BtId];
3027 BtLatchSet *latch;
3028 uid right_page_no;
3029 AtomicTxn *locks;
3030 BtKey *key, *ptr;
3031 BtPage page;
3032 BtVal *val;
3033
3034   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
3035
3036   // Load the leaf page for each key
3037   // group same page references with reuse bit
3038
3039   for( src = 0; src++ < source->cnt; ) {
3040         key = keyptr(source, src);
3041
3042         // first determine if this modification falls
3043         // on the same page as the previous modification
3044         //      note that the far right leaf page is a special case
3045
3046         if( samepage = src > 1 )
3047           samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
3048
3049         if( !samepage )
3050           if( bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
3051                 ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
3052           else
3053                 return mgr->err;
3054
3055         entry = set->latch - mgr->leafsets;
3056         locks[src].reuse = samepage;
3057         locks[src].entry = entry;
3058
3059         //      capture current lsn for master page
3060
3061         locks[src].reqlsn = set->page->lsn;
3062   }
3063
3064   // insert or delete each key
3065   // process any splits or merges
3066   // run through txn list backwards
3067
3068   samepage = source->cnt + 1;
3069
3070   for( src = source->cnt; src; src-- ) {
3071         if( locks[src].reuse )
3072           continue;
3073
3074         //  perform the txn operations
3075         //      from smaller to larger on
3076         //  the same page
3077
3078         for( idx = src; idx < samepage; idx++ )
3079          switch( slotptr(source,idx)->type ) {
3080          case Delete:
3081           if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
3082                 return mgr->err;
3083           break;
3084
3085         case Duplicate:
3086         case Unique:
3087           if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
3088                 return mgr->err;
3089           break;
3090
3091         default:
3092           bt_atomicpage (mgr, source, locks, idx, set);
3093           break;
3094         }
3095
3096         //      after the same page operations have finished,
3097         //  process master page for splits or deletion.
3098
3099         latch = prev->latch = mgr->leafsets + locks[src].entry;
3100         prev->page = bt_mappage (mgr, prev->latch);
3101         samepage = src;
3102
3103         //  pick-up all splits from master page
3104         //      each one is already pinned & WriteLocked.
3105
3106         while( entry = prev->latch->split ) {
3107           set->latch = mgr->leafsets + entry;
3108           set->page = bt_mappage (mgr, set->latch);
3109
3110           // delete empty master page by undoing its split
3111           //  (this is potentially another empty page)
3112           //  note that there are no pointers to it yet
3113
3114           if( !prev->page->act ) {
3115                 memcpy (set->page->left, prev->page->left, BtId);
3116                 memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
3117                 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3118                 prev->latch->split = set->latch->split;
3119                 prev->latch->dirty = 1;
3120                 bt_freepage (mgr, set, thread_no);
3121                 continue;
3122           }
3123
3124           // remove empty split page from the split chain
3125           // and return it to the free list. No other
3126           // thread has its page number yet.
3127
3128           if( !set->page->act ) {
3129                 memcpy (prev->page->right, set->page->right, BtId);
3130                 prev->latch->split = set->latch->split;
3131
3132                 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3133                 bt_freepage (mgr, set, thread_no);
3134                 continue;
3135           }
3136
3137           //  update prev's fence key
3138
3139           ptr = keyptr(prev->page,prev->page->cnt);
3140           bt_putid (value, prev->latch->page_no);
3141
3142           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3143                 return mgr->err;
3144
3145           // splice in the left link into the split page
3146
3147           bt_putid (set->page->left, prev->latch->page_no);
3148
3149           if( lsm )
3150                 bt_syncpage (mgr, prev->page, prev->latch);
3151
3152           *prev = *set;
3153         }
3154
3155         //  update left pointer in next right page from last split page
3156         //      (if all splits were reversed or none occurred, latch->split == 0)
3157
3158         if( latch->split ) {
3159           //  fix left pointer in master's original (now split)
3160           //  far right sibling or set rightmost page in page zero
3161
3162           if( right_page_no = bt_getid (prev->page->right) ) {
3163                 if( set->latch = bt_pinleaf (mgr, right_page_no, thread_no) )
3164                   set->page = bt_mappage (mgr, set->latch);
3165                 else
3166                   return mgr->err;
3167
3168             bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3169             bt_putid (set->page->left, prev->latch->page_no);
3170                 set->latch->dirty = 1;
3171
3172             bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
3173                 bt_unpinlatch (set->latch, thread_no, __LINE__);
3174           } else {      // prev is rightmost page
3175             bt_mutexlock (mgr->lock);
3176                 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3177             bt_releasemutex(mgr->lock);
3178           }
3179
3180           //  switch the original fence key from the
3181           //  master page to the last split page.
3182
3183           ptr = keyptr(prev->page,prev->page->cnt);
3184           bt_putid (value, prev->latch->page_no);
3185
3186           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) )
3187                 return mgr->err;
3188
3189           if( lsm )
3190                 bt_syncpage (mgr, prev->page, prev->latch);
3191
3192           //  unlock and unpin the split pages
3193
3194           bt_atomicrelease (mgr, latch->split, thread_no);
3195
3196           //  unlock and unpin the master page
3197
3198           latch->split = 0;
3199           bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
3200           bt_unpinlatch(latch, thread_no, __LINE__);
3201           continue;
3202         }
3203
3204         //      since there are no splits remaining, we're
3205         //  finished if master page occupied
3206
3207         if( prev->page->act ) {
3208           bt_unlockpage(BtLockWrite, prev->latch, thread_no, __LINE__);
3209
3210           if( lsm )
3211                 bt_syncpage (mgr, prev->page, prev->latch);
3212
3213           bt_unpinlatch(prev->latch, thread_no, __LINE__);
3214           continue;
3215         }
3216
3217         // any and all splits were reversed, and the
3218         // master page located in prev is empty, delete it
3219
3220         if( bt_deletepage (mgr, prev, thread_no, 0) )
3221                 return mgr->err;
3222   }
3223
3224   free (locks);
3225   return 0;
3226 }
3227
3228 //  pick & promote a page into the larger btree
3229
3230 BTERR bt_txnpromote (BtDb *bt)
3231 {
3232 uint entry, slot, idx;
3233 BtPageSet set[1];
3234 BtSlot *node;
3235 BtKey *ptr;
3236 BtVal *val;
3237
3238   while( 1 ) {
3239 #ifdef unix
3240         entry = __sync_fetch_and_add(&bt->mgr->leafpromote, 1);
3241 #else
3242         entry = _InterlockedIncrement (&bt->mgr->leafpromote) - 1;
3243 #endif
3244         entry %= bt->mgr->leaftotal;
3245
3246         if( !entry )
3247                 continue;
3248
3249         set->latch = bt->mgr->leafsets + entry;
3250
3251         //  skip this entry if it has never been used
3252
3253         if( !set->latch->page_no )
3254           continue;
3255
3256         if( !bt_mutextry(set->latch->modify) )
3257                 continue;
3258
3259         //  skip this entry if it is pinned
3260
3261         if( set->latch->pin & ~CLOCK_bit ) {
3262           bt_releasemutex(set->latch->modify);
3263           continue;
3264         }
3265
3266         set->page = bt_mappage (bt->mgr, set->latch);
3267
3268         // entry has no right sibling
3269
3270         if( !bt_getid (set->page->right) ) {
3271           bt_releasemutex(set->latch->modify);
3272           continue;
3273         }
3274
3275         // entry interiour node or being killed or constructed
3276
3277         if( set->page->lvl || set->page->nopromote || set->page->kill ) {
3278           bt_releasemutex(set->latch->modify);
3279           continue;
3280         }
3281
3282         //  pin the page for our access
3283         //      and leave it locked for the
3284         //      duration of the promotion.
3285
3286         set->latch->pin++;
3287         bt_lockpage (BtLockWrite, set->latch, bt->thread_no, __LINE__);
3288         bt_releasemutex(set->latch->modify);
3289
3290         // transfer slots in our selected page to the main btree
3291 if( !(entry % 100) )
3292 fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_no, set->page->act);
3293
3294         if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
3295                 fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
3296                 return bt->main->err;
3297         }
3298
3299         //  now delete the page
3300
3301         if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
3302                 fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread);
3303
3304         return bt->mgr->err;
3305   }
3306 }
3307
3308 //      set cursor to highest slot on highest page
3309
3310 uint bt_lastkey (BtDb *bt)
3311 {
3312 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3313 BtPageSet set[1];
3314
3315         if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
3316                 set->page = bt_mappage (bt->mgr, set->latch);
3317         else
3318                 return 0;
3319
3320     bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3321         memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3322     bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3323         bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3324         return bt->cursor->cnt;
3325 }
3326
3327 //      return previous slot on cursor page
3328
3329 uint bt_prevkey (BtDb *bt, uint slot)
3330 {
3331 uid cursor_page = bt->cursor->page_no;
3332 uid ourright, next, us = cursor_page;
3333 BtPageSet set[1];
3334
3335         if( --slot )
3336                 return slot;
3337
3338         ourright = bt_getid(bt->cursor->right);
3339
3340 goleft:
3341         if( !(next = bt_getid(bt->cursor->left)) )
3342                 return 0;
3343
3344 findourself:
3345         cursor_page = next;
3346
3347         if( set->latch = bt_pinleaf (bt->mgr, next, bt->thread_no) )
3348                 set->page = bt_mappage (bt->mgr, set->latch);
3349         else
3350                 return 0;
3351
3352     bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3353         memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3354         bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3355         bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3356         
3357         next = bt_getid (bt->cursor->right);
3358
3359         if( bt->cursor->kill )
3360                 goto findourself;
3361
3362         if( next != us )
3363           if( next == ourright )
3364                 goto goleft;
3365           else
3366                 goto findourself;
3367
3368         return bt->cursor->cnt;
3369 }
3370
3371 //  return next slot on cursor page
3372 //  or slide cursor right into next page
3373
3374 uint bt_nextkey (BtDb *bt, uint slot)
3375 {
3376 BtPageSet set[1];
3377 uid right;
3378
3379   do {
3380         right = bt_getid(bt->cursor->right);
3381
3382         while( slot++ < bt->cursor->cnt )
3383           if( slotptr(bt->cursor,slot)->dead )
3384                 continue;
3385           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3386                 return slot;
3387           else
3388                 break;
3389
3390         if( !right )
3391                 break;
3392
3393         if( set->latch = bt_pinleaf (bt->mgr, right, bt->thread_no) )
3394                 set->page = bt_mappage (bt->mgr, set->latch);
3395         else
3396                 return 0;
3397
3398     bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3399         memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3400         bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3401         bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3402         slot = 0;
3403
3404   } while( 1 );
3405
3406   return bt->mgr->err = 0;
3407 }
3408
3409 //  cache page of keys into cursor and return starting slot for given key
3410
3411 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3412 {
3413 BtPageSet set[1];
3414 uint slot;
3415
3416         // cache page for retrieval
3417
3418         if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3419           memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3420         else
3421           return 0;
3422
3423         bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3424         bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3425         return slot;
3426 }
3427
3428 #ifdef STANDALONE
3429
3430 #ifndef unix
3431 double getCpuTime(int type)
3432 {
3433 FILETIME crtime[1];
3434 FILETIME xittime[1];
3435 FILETIME systime[1];
3436 FILETIME usrtime[1];
3437 SYSTEMTIME timeconv[1];
3438 double ans = 0;
3439
3440         memset (timeconv, 0, sizeof(SYSTEMTIME));
3441
3442         switch( type ) {
3443         case 0:
3444                 GetSystemTimeAsFileTime (xittime);
3445                 FileTimeToSystemTime (xittime, timeconv);
3446                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3447                 break;
3448         case 1:
3449                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3450                 FileTimeToSystemTime (usrtime, timeconv);
3451                 break;
3452         case 2:
3453                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3454                 FileTimeToSystemTime (systime, timeconv);
3455                 break;
3456         }
3457
3458         ans += (double)timeconv->wHour * 3600;
3459         ans += (double)timeconv->wMinute * 60;
3460         ans += (double)timeconv->wSecond;
3461         ans += (double)timeconv->wMilliseconds / 1000;
3462         return ans;
3463 }
3464 #else
3465 #include <time.h>
3466 #include <sys/resource.h>
3467
3468 double getCpuTime(int type)
3469 {
3470 struct rusage used[1];
3471 struct timeval tv[1];
3472
3473         switch( type ) {
3474         case 0:
3475                 gettimeofday(tv, NULL);
3476                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3477
3478         case 1:
3479                 getrusage(RUSAGE_SELF, used);
3480                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3481
3482         case 2:
3483                 getrusage(RUSAGE_SELF, used);
3484                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3485         }
3486
3487         return 0;
3488 }
3489 #endif
3490
3491 void bt_poolaudit (BtMgr *mgr, char *type)
3492 {
3493 BtLatchSet *latch, test[1];
3494 uint entry;
3495
3496         memset (test, 0, sizeof(test));
3497
3498         if( memcmp (test, mgr->latchsets, sizeof(test)) )
3499                 fprintf(stderr, "%s latchset zero overwritten\n", type);
3500
3501         if( memcmp (test, mgr->leafsets, sizeof(test)) )
3502                 fprintf(stderr, "%s leafset zero overwritten\n", type);
3503
3504         for( entry = 0; ++entry < mgr->latchtotal; ) {
3505                 latch = mgr->latchsets + entry;
3506
3507                 if( *latch->modify->value )
3508                         fprintf(stderr, "%s latchset %d modifylocked for page %d\n", type, entry, latch->page_no);
3509
3510                 if( latch->pin & ~CLOCK_bit )
3511                         fprintf(stderr, "%s latchset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3512         }
3513
3514         for( entry = 0; ++entry < mgr->leaftotal; ) {
3515                 latch = mgr->leafsets + entry;
3516
3517                 if( *latch->modify->value )
3518                         fprintf(stderr, "%s leafset %d modifylocked for page %d\n", type, entry, latch->page_no);
3519
3520                 if( latch->pin & ~CLOCK_bit )
3521                         fprintf(stderr, "%s leafset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3522         }
3523 }
3524
3525 typedef struct {
3526         char idx;
3527         char *type;
3528         char *infile;
3529         BtMgr *main;
3530         BtMgr *mgr;
3531         int num;
3532 } ThreadArg;
3533
3534 //  standalone program to index file of keys
3535 //  then list them onto std-out
3536
3537 #ifdef unix
3538 void *index_file (void *arg)
3539 #else
3540 uint __stdcall index_file (void *arg)
3541 #endif
3542 {
3543 int line = 0, found = 0, cnt = 0, idx;
3544 uid next, page_no = LEAF_page;  // start on first page of leaves
3545 int ch, len = 0, slot, type = 0;
3546 unsigned char key[BT_maxkey];
3547 unsigned char txn[65536];
3548 ThreadArg *args = arg;
3549 BtPage page, frame;
3550 BtPageSet set[1];
3551 uint nxt = 65536;
3552 BtKey *ptr;
3553 BtVal *val;
3554 BtDb *bt;
3555 FILE *in;
3556
3557         bt = bt_open (args->mgr, args->main);
3558         page = (BtPage)txn;
3559
3560         if( args->idx < strlen (args->type) )
3561                 ch = args->type[args->idx];
3562         else
3563                 ch = args->type[strlen(args->type) - 1];
3564
3565         switch(ch | 0x20)
3566         {
3567         case 'd':
3568                 type = Delete;
3569
3570         case 'p':
3571                 if( !type )
3572                         type = Unique;
3573
3574                 if( args->num )
3575                  if( type == Delete )
3576                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3577                  else
3578                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3579                 else
3580                  if( type == Delete )
3581                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3582                  else
3583                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3584
3585                 if( in = fopen (args->infile, "rb") )
3586                   while( ch = getc(in), ch != EOF )
3587                         if( ch == '\n' )
3588                         {
3589                           line++;
3590
3591                           if( !args->num ) {
3592                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3593                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3594                             len = 0;
3595                                 continue;
3596                           }
3597
3598                           nxt -= len - 10;
3599                           memcpy (txn + nxt, key + 10, len - 10);
3600                           nxt -= 1;
3601                           txn[nxt] = len - 10;
3602                           nxt -= 10;
3603                           memcpy (txn + nxt, key, 10);
3604                           nxt -= 1;
3605                           txn[nxt] = 10;
3606                           slotptr(page,++cnt)->off  = nxt;
3607                           slotptr(page,cnt)->type = type;
3608                           len = 0;
3609
3610                           if( cnt < args->num )
3611                                 continue;
3612
3613                           page->cnt = cnt;
3614                           page->act = cnt;
3615                           page->min = nxt;
3616
3617                           if( bt_atomictxn (bt, page) )
3618                                 fprintf(stderr, "Error %d Line: %d by %d source: %d\n", bt->mgr->err, bt->mgr->line, bt->mgr->err_thread, line), exit(0);
3619                           nxt = sizeof(txn);
3620                           cnt = 0;
3621
3622                         }
3623                         else if( len < BT_maxkey )
3624                                 key[len++] = ch;
3625                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3626                 break;
3627
3628         case 'w':
3629                 fprintf(stderr, "started indexing for %s\n", args->infile);
3630                 if( in = fopen (args->infile, "r") )
3631                   while( ch = getc(in), ch != EOF )
3632                         if( ch == '\n' )
3633                         {
3634                           line++;
3635
3636                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3637                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3638                           len = 0;
3639                         }
3640                         else if( len < BT_maxkey )
3641                                 key[len++] = ch;
3642                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3643                 break;
3644
3645         case 'f':
3646                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3647                 if( in = fopen (args->infile, "rb") )
3648                   while( ch = getc(in), ch != EOF )
3649                         if( ch == '\n' )
3650                         {
3651                           line++;
3652                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3653                                 found++;
3654                           else if( bt->mgr->err )
3655                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3656                           len = 0;
3657                         }
3658                         else if( len < BT_maxkey )
3659                                 key[len++] = ch;
3660                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3661                 break;
3662
3663         case 's':
3664                 fprintf(stderr, "started scanning\n");
3665                 
3666                 do {
3667                         if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
3668                                 set->page = bt_mappage (bt->mgr, set->latch);
3669                         else
3670                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3671
3672                         bt_lockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3673                         next = bt_getid (set->page->right);
3674
3675                         for( slot = 0; slot++ < set->page->cnt; )
3676                          if( next || slot < set->page->cnt )
3677                           if( !slotptr(set->page, slot)->dead ) {
3678                                 ptr = keyptr(set->page, slot);
3679                                 len = ptr->len;
3680
3681                             if( slotptr(set->page, slot)->type == Duplicate )
3682                                         len -= BtId;
3683
3684                                 fwrite (ptr->key, len, 1, stdout);
3685                                 val = valptr(set->page, slot);
3686                                 fwrite (val->value, val->len, 1, stdout);
3687                                 fputc ('\n', stdout);
3688                                 cnt++;
3689                            }
3690
3691                         bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3692                         bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3693                 } while( page_no = next );
3694
3695                 fprintf(stderr, " Total keys read %d\n", cnt);
3696                 break;
3697
3698         case 'r':
3699                 fprintf(stderr, "started reverse scan\n");
3700                 if( slot = bt_lastkey (bt) )
3701                    while( slot = bt_prevkey (bt, slot) ) {
3702                         if( slotptr(bt->cursor, slot)->dead )
3703                           continue;
3704
3705                         ptr = keyptr(bt->cursor, slot);
3706                         len = ptr->len;
3707
3708                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3709                                 len -= BtId;
3710
3711                         fwrite (ptr->key, len, 1, stdout);
3712                         val = valptr(bt->cursor, slot);
3713                         fwrite (val->value, val->len, 1, stdout);
3714                         fputc ('\n', stdout);
3715                         cnt++;
3716                   }
3717
3718                 fprintf(stderr, " Total keys read %d\n", cnt);
3719                 break;
3720
3721         case 'c':
3722 #ifdef unix
3723                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3724 #endif
3725                 fprintf(stderr, "started counting\n");
3726                 next = bt->mgr->redopage + bt->mgr->pagezero->redopages;
3727
3728                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3729                         if( bt_readpage (bt->mgr, bt->cursor, page_no, 1) )
3730                                 break;
3731
3732                         if( !bt->cursor->free && !bt->cursor->lvl )
3733                                 cnt += bt->cursor->act;
3734
3735                         bt->mgr->reads++;
3736                         page_no = next++;
3737                 }
3738                 
3739                 cnt--;  // remove stopper key
3740                 fprintf(stderr, " Total keys counted %d\n", cnt);
3741                 break;
3742         }
3743
3744         bt_close (bt);
3745 #ifdef unix
3746         return NULL;
3747 #else
3748         return 0;
3749 #endif
3750 }
3751
3752 typedef struct timeval timer;
3753
3754 int main (int argc, char **argv)
3755 {
3756 int idx, cnt, len, slot, err;
3757 double start, stop;
3758 #ifdef unix
3759 pthread_t *threads;
3760 #else
3761 HANDLE *threads;
3762 #endif
3763 ThreadArg *args;
3764 uint mainleafpool = 0;
3765 uint mainleafxtra = 0;
3766 uint redopages = 0;
3767 uint poolsize = 0;
3768 uint leafpool = 0;
3769 uint leafxtra = 0;
3770 uint mainpool = 0;
3771 uint mainbits = 0;
3772 int bits = 16;
3773 float elapsed;
3774 int num = 0;
3775 char key[1];
3776 BtMgr *main;
3777 BtMgr *mgr;
3778 BtKey *ptr;
3779
3780         if( argc < 3 ) {
3781                 fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
3782                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
3783                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
3784                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3785                 fprintf (stderr, "  pagebits is the page size in bits for the cache btree\n");
3786                 fprintf (stderr, "  leafbits is the number of xtra bits for a leaf page\n");
3787                 fprintf (stderr, "  poolsize is the number of pages in buffer pool for the cache btree\n");
3788                 fprintf (stderr, "  leafpool is the number of leaf pages in leaf pool for the cache btree\n");
3789                 fprintf (stderr, "  txnsize = n to block transactions into n units, or zero for no transactions\n");
3790                 fprintf (stderr, "  redopages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3791                 fprintf (stderr, "  mainbits is the page size of the main btree in bits\n");
3792                 fprintf (stderr, "  mainpool is the number of main pages in the main buffer pool\n");
3793                 fprintf (stderr, "  mainleaf is the number of main pages in the main leaf pool\n");
3794                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3795                 exit(0);
3796         }
3797
3798         start = getCpuTime(0);
3799
3800         if( argc > 4 )
3801                 bits = atoi(argv[4]);
3802
3803         if( argc > 5 )
3804                 leafxtra = atoi(argv[5]);
3805
3806         if( argc > 6 )
3807                 poolsize = atoi(argv[6]);
3808
3809         if( !poolsize )
3810                 fprintf (stderr, "no page pool\n"), exit(1);
3811
3812         if( argc > 7 )
3813                 leafpool = atoi(argv[7]);
3814
3815         if( argc > 8 )
3816                 num = atoi(argv[8]);
3817
3818         if( argc > 9 )
3819                 redopages = atoi(argv[9]);
3820
3821         if( redopages > 65535 )
3822                 fprintf (stderr, "Recovery buffer too large\n"), exit(1);
3823
3824         if( argc > 10 )
3825                 mainbits = atoi(argv[10]);
3826
3827         if( argc > 11 )
3828                 mainleafxtra = atoi(argv[11]);
3829
3830         if( argc > 12 )
3831                 mainpool = atoi(argv[12]);
3832
3833         if( argc > 13 )
3834                 mainleafpool = atoi(argv[13]);
3835
3836         cnt = argc - 14;
3837 #ifdef unix
3838         threads = malloc (cnt * sizeof(pthread_t));
3839 #else
3840         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3841 #endif
3842         args = malloc ((cnt + 1) * sizeof(ThreadArg));
3843
3844         mgr = bt_mgr (argv[1], bits, leafxtra, poolsize, leafpool, redopages);
3845
3846         if( !mgr ) {
3847                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3848                 exit (1);
3849         }
3850
3851         if( mainbits ) {
3852                 main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool, mainleafpool, 0);
3853
3854                 if( !main ) {
3855                         fprintf(stderr, "Index Open Error %s\n", argv[2]);
3856                         exit (1);
3857                 }
3858         } else
3859                 main = NULL;
3860
3861         //      fire off threads
3862
3863         if( cnt )
3864           for( idx = 0; idx < cnt; idx++ ) {
3865                 args[idx].infile = argv[idx + 14];
3866                 args[idx].type = argv[3];
3867                 args[idx].main = main;
3868                 args[idx].mgr = mgr;
3869                 args[idx].num = num;
3870                 args[idx].idx = idx;
3871 #ifdef unix
3872                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3873                         fprintf(stderr, "Error creating thread %d\n", err);
3874 #else
3875                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3876 #endif
3877           }
3878         else {
3879                 args[0].infile = argv[idx + 10];
3880                 args[0].type = argv[3];
3881                 args[0].main = main;
3882                 args[0].mgr = mgr;
3883                 args[0].num = num;
3884                 args[0].idx = 0;
3885                 index_file (args);
3886         }
3887
3888         //      wait for termination
3889
3890 #ifdef unix
3891         for( idx = 0; idx < cnt; idx++ )
3892                 pthread_join (threads[idx], NULL);
3893 #else
3894         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3895
3896         for( idx = 0; idx < cnt; idx++ )
3897                 CloseHandle(threads[idx]);
3898 #endif
3899         bt_poolaudit(mgr, "cache");
3900
3901         if( main )
3902                 bt_poolaudit(main, "main");
3903
3904         fprintf(stderr, "cache %d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
3905         if( main )
3906                 fprintf(stderr, "main %d reads %d writes %d found\n", main->reads, main->writes, main->found);
3907
3908         if( main )
3909                 bt_mgrclose (main);
3910         bt_mgrclose (mgr);
3911
3912         elapsed = getCpuTime(0) - start;
3913         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3914         elapsed = getCpuTime(1);
3915         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3916         elapsed = getCpuTime(2);
3917         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3918 }
3919
3920 #endif  //STANDALONE