]> pd.if.org Git - btree/blob - threadskv10e.c
62285128eecc0c0d8951ab1be3c83fdcd2320d14
[btree] / threadskv10e.c
1 // btree version threadskv10e futex version
2 //      with reworked bt_deletekey code,
3 //      phase-fair re-entrant reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      redo log for failure recovery
10 //      LSM B-trees for write optimization
11 //      and larger sized leaf pages than non-leaf
12
13 // 24 OCT 2014
14
15 // author: karl malbrain, malbrain@cal.berkeley.edu
16
17 /*
18 This work, including the source code, documentation
19 and related data, is placed into the public domain.
20
21 The orginal author is Karl Malbrain.
22
23 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
24 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
25 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
26 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
27 RESULTING FROM THE USE, MODIFICATION, OR
28 REDISTRIBUTION OF THIS SOFTWARE.
29 */
30
31 // Please see the project home page for documentation
32 // code.google.com/p/high-concurrency-btree
33
34 #define _FILE_OFFSET_BITS 64
35 #define _LARGEFILE64_SOURCE
36
37 #ifdef linux
38 #define _GNU_SOURCE
39 #include <xmmintrin.h>
40 #include <linux/futex.h>
41 #define SYS_futex 202
42 #endif
43
44 #ifdef unix
45 #include <unistd.h>
46 #include <stdio.h>
47 #include <stdlib.h>
48 #include <fcntl.h>
49 #include <sys/time.h>
50 #include <sys/mman.h>
51 #include <errno.h>
52 #include <pthread.h>
53 #include <limits.h>
54 #else
55 #define WIN32_LEAN_AND_MEAN
56 #include <windows.h>
57 #include <stdio.h>
58 #include <stdlib.h>
59 #include <time.h>
60 #include <fcntl.h>
61 #include <process.h>
62 #include <intrin.h>
63 #endif
64
65 #include <memory.h>
66 #include <string.h>
67 #include <stddef.h>
68
69 typedef unsigned long long      uid;
70 typedef unsigned long long      logseqno;
71
72 #ifndef unix
73 typedef unsigned long long      off64_t;
74 typedef unsigned short          ushort;
75 typedef unsigned int            uint;
76 #endif
77
78 #define BT_ro 0x6f72    // ro
79 #define BT_rw 0x7772    // rw
80
81 #define BT_maxbits              26                                      // maximum page size in bits
82 #define BT_minbits              9                                       // minimum page size in bits
83 #define BT_minpage              (1 << BT_minbits)       // minimum page size
84 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
85
86 //  BTree page number constants
87 #define ALLOC_page              0       // allocation page
88 #define ROOT_page               1       // root of the btree
89 #define LEAF_page               2       // first page of leaves
90
91 //      Number of levels to create in a new BTree
92
93 #define MIN_lvl                 2
94
95 /*
96 There are six lock types for each node in four independent sets: 
97 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
98 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
99 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
100 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
101 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
102 6. (set 4) LinkModification: Exclusive. Update of a node's left link is underway. Incompatible with another LinkModification. 
103 */
104
105 typedef enum{
106         BtLockAccess = 1,
107         BtLockDelete = 2,
108         BtLockRead   = 4,
109         BtLockWrite  = 8,
110         BtLockParent = 16,
111         BtLockLink   = 32
112 } BtLock;
113
114 typedef struct {
115         volatile uint value[1];
116 } BtMutexLatch;
117
118 //      definition for reader/writer reentrant lock implementation
119
120 typedef struct {
121   BtMutexLatch xcl[1];
122   BtMutexLatch wrt[1];
123   uint readers;
124   ushort dup;           // re-entrant locks
125   ushort tid;           // owner thread-no
126   uint line;            // owner line #
127 } RWLock;
128
129 //  hash table entries
130
131 typedef struct {
132         BtMutexLatch latch[1];
133         uint entry;             // Latch table entry at head of chain
134 } BtHashEntry;
135
136 //      latch manager table structure
137
138 typedef struct {
139         uid page_no;                    // latch set page number
140         BtMutexLatch modify[1]; // modify entry lite latch
141         RWLock readwr[1];               // read/write page lock
142         RWLock access[1];               // Access Intent/Page delete
143         RWLock parent[1];               // Posting of fence key in parent
144         RWLock link[1];                 // left link update in progress
145         uint split;                             // right split page atomic insert
146         uint next;                              // next entry in hash table chain
147         uint prev;                              // prev entry in hash table chain
148         ushort pin;                             // number of accessing threads
149         unsigned char dirty;    // page in cache is dirty
150         unsigned char leaf;             // page in cache is a leaf
151 } BtLatchSet;
152
153 //      Define the length of the page record numbers
154
155 #define BtId 6
156
157 //      Page key slot definition.
158
159 //      Keys are marked dead, but remain on the page until
160 //      it cleanup is called. The fence key (highest key) for
161 //      a leaf page is always present, even after cleanup.
162
163 //      Slot types
164
165 //      In addition to the Unique keys that occupy slots
166 //      there are Librarian and Duplicate key
167 //      slots occupying the key slot array.
168
169 //      The Librarian slots are dead keys that
170 //      serve as filler, available to add new Unique
171 //      or Dup slots that are inserted into the B-tree.
172
173 //      The Duplicate slots have had their key bytes extended
174 //      by 6 bytes to contain a binary duplicate key uniqueifier.
175
176 typedef enum {
177         Unique,
178         Librarian,
179         Duplicate,
180         Delete
181 } BtSlotType;
182
183 typedef struct {
184         uint off:BT_maxbits;    // page offset for key start
185         uint type:3;                    // type of slot
186         uint dead:1;                    // set for deleted slot
187 } BtSlot;
188
189 //      The key structure occupies space at the upper end of
190 //      each page.  It's a length byte followed by the key
191 //      bytes.
192
193 typedef struct {
194         unsigned char len;              // this can be changed to a ushort or uint
195         unsigned char key[0];
196 } BtKey;
197
198 //      the value structure also occupies space at the upper
199 //      end of the page. Each key is immediately followed by a value.
200
201 typedef struct {
202         unsigned char len;              // this can be changed to a ushort or uint
203         unsigned char value[0];
204 } BtVal;
205
206 #define BT_maxkey       255             // maximum number of bytes in a key
207 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
208
209 //      The first part of an index page.
210 //      It is immediately followed
211 //      by the BtSlot array of keys.
212
213 typedef struct BtPage_ {
214         uint cnt;                                       // count of keys in page
215         uint act;                                       // count of active keys
216         uint min;                                       // next key offset
217         uint garbage;                           // page garbage in bytes
218         unsigned char lvl;                      // level of page
219         unsigned char free;                     // page is on free chain
220         unsigned char kill;                     // page is being deleted
221         unsigned char nopromote;        // page is being constructed
222         unsigned char filler1[6];       // padding to multiple of 8 bytes
223         unsigned char right[BtId];      // page number to right
224         unsigned char left[BtId];       // page number to left
225         unsigned char filler2[2];       // padding to multiple of 8 bytes
226         logseqno lsn;                           // log sequence number applied
227         uid page_no;                            // this page number
228 } *BtPage;
229
230 //  The loadpage interface object
231
232 typedef struct {
233         BtPage page;            // current page pointer
234         BtLatchSet *latch;      // current page latch set
235 } BtPageSet;
236
237 //      structure for latch manager on ALLOC_page
238
239 typedef struct {
240         struct BtPage_ alloc[1];                // next page_no in right ptr
241         unsigned char freechain[BtId];  // head of free page_nos chain
242         unsigned char leafchain[BtId];  // head of leaf page_nos chain
243         unsigned long long leafpages;   // number of active leaf pages
244         uint redopages;                                 // number of redo pages in file
245         unsigned char leaf_xtra;                // leaf page size in xtra bits
246         unsigned char page_bits;                // base page size in bits
247 } BtPageZero;
248
249 //      The object structure for Btree access
250
251 typedef struct {
252         uint page_size;                         // base page size       
253         uint page_bits;                         // base page size in bits       
254         uint leaf_xtra;                         // leaf xtra bits       
255 #ifdef unix
256         int idx;
257 #else
258         HANDLE idx;
259 #endif
260         BtPageZero *pagezero;           // mapped allocation page
261         BtHashEntry *hashtable;         // the buffer pool hash table entries
262         BtHashEntry *leaftable;         // the buffer pool hash table entries
263         BtLatchSet *latchsets;          // mapped latch set from buffer pool
264         BtLatchSet *leafsets;           // mapped latch set from buffer pool
265         unsigned char *pagepool;        // mapped to the buffer pool pages
266         unsigned char *leafpool;        // mapped to the leaf pool pages
267         unsigned char *redobuff;        // mapped recovery buffer pointer
268         logseqno lsn, flushlsn;         // current & first lsn flushed
269         BtMutexLatch redo[1];           // redo area lite latch
270         BtMutexLatch lock[1];           // allocation area lite latch
271         BtMutexLatch maps[1];           // mapping segments lite latch
272         ushort thread_no[1];            // highest thread number issued
273         ushort err_thread;                      // error thread number
274         uint nlatchpage;                        // size of buffer pool & latchsets
275         uint latchtotal;                        // number of page latch entries
276         uint latchhash;                         // number of latch hash table slots
277         uint latchvictim;                       // next latch entry to examine
278         uint nleafpage;                         // size of leaf pool & leafsets
279         uint leaftotal;                         // number of leaf latch entries
280         uint leafhash;                          // number of leaf hash table slots
281         uint leafvictim;                        // next leaf entry to examine
282         uint leafpromote;                       // next leaf entry to promote
283         uint redopage;                          // page number of redo buffer
284         uint redolast;                          // last msync size of recovery buff
285         uint redoend;                           // eof/end element in recovery buff
286         int err;                                        // last error
287         int line;                                       // last error line no
288         int found;                                      // number of keys found by delete
289         int reads, writes;                      // number of reads and writes
290 #ifndef unix
291         HANDLE halloc;                          // allocation handle
292         HANDLE hpool;                           // buffer pool handle
293 #endif
294         uint segments;                          // number of memory mapped segments
295         unsigned char *pages[64000];// memory mapped segments of b-tree
296 } BtMgr;
297
298 typedef struct {
299         BtMgr *mgr;                                     // buffer manager for entire process
300         BtMgr *main;                            // buffer manager for main btree
301         BtPage cursor;                          // cached page frame for start/next
302         ushort thread_no;                       // thread number
303         unsigned char key[BT_keyarray]; // last found complete key
304 } BtDb;
305
306 //      atomic txn structures
307
308 typedef struct {
309         logseqno reqlsn;        // redo log seq no required
310         uint entry:31;          // latch table entry number
311         uint reuse:1;           // reused previous page
312 } AtomicTxn;
313
314 //      Catastrophic errors
315
316 typedef enum {
317         BTERR_ok = 0,
318         BTERR_struct,
319         BTERR_ovflw,
320         BTERR_lock,
321         BTERR_map,
322         BTERR_read,
323         BTERR_wrt,
324         BTERR_atomic,
325         BTERR_recovery
326 } BTERR;
327
328 #define CLOCK_bit 0x8000
329
330 // recovery manager entry types
331
332 typedef enum {
333         BTRM_eof = 0,   // rest of buffer is emtpy
334         BTRM_add,               // add a unique key-value to btree
335         BTRM_dup,               // add a duplicate key-value to btree
336         BTRM_del,               // delete a key-value from btree
337         BTRM_upd,               // update a key with a new value
338         BTRM_new,               // allocate a new empty page
339         BTRM_old                // reuse an old empty page
340 } BTRM;
341
342 // recovery manager entry
343 //      structure followed by BtKey & BtVal
344
345 typedef struct {
346         logseqno reqlsn;        // log sequence number required
347         logseqno lsn;           // log sequence number for entry
348         uint len;                       // length of entry
349         unsigned char type;     // type of entry
350         unsigned char lvl;      // level of btree entry pertains to
351 } BtLogHdr;
352
353 // B-Tree functions
354
355 extern void bt_close (BtDb *bt);
356 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
357 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
358 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
359 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
360 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
361 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
362 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
363
364 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
365
366 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
367 extern uint bt_nextkey   (BtDb *bt, uint slot);
368 extern uint bt_prevkey (BtDb *db, uint slot);
369 extern uint bt_lastkey (BtDb *db);
370
371 //      manager functions
372 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
373 extern void bt_mgrclose (BtMgr *mgr);
374 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
375 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
376
377 //      atomic transaction functions
378 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
379 BTERR bt_txnpromote (BtDb *bt);
380
381 //  The page is allocated from low and hi ends.
382 //  The key slots are allocated from the bottom,
383 //      while the text and value of the key
384 //  are allocated from the top.  When the two
385 //  areas meet, the page is split into two.
386
387 //  A key consists of a length byte, two bytes of
388 //  index number (0 - 65535), and up to 253 bytes
389 //  of key value.
390
391 //  Associated with each key is a value byte string
392 //      containing any value desired.
393
394 //  The b-tree root is always located at page 1.
395 //      The first leaf page of level zero is always
396 //      located on page 2.
397
398 //      The b-tree pages are linked with next
399 //      pointers to facilitate enumerators,
400 //      and provide for concurrency.
401
402 //      When to root page fills, it is split in two and
403 //      the tree height is raised by a new root at page
404 //      one with two keys.
405
406 //      Deleted keys are marked with a dead bit until
407 //      page cleanup. The fence key for a leaf node is
408 //      always present
409
410 //  To achieve maximum concurrency one page is locked at a time
411 //  as the tree is traversed to find leaf key in question. The right
412 //  page numbers are used in cases where the page is being split,
413 //      or consolidated.
414
415 //  Page 0 is dedicated to lock for new page extensions,
416 //      and chains empty pages together for reuse. It also
417 //      contains the latch manager hash table.
418
419 //      The ParentModification lock on a node is obtained to serialize posting
420 //      or changing the fence key for a node.
421
422 //      Empty pages are chained together through the ALLOC page and reused.
423
424 //      Access macros to address slot and key values from the page
425 //      Page slots use 1 based indexing.
426
427 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
428 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
429 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
430
431 void bt_putid(unsigned char *dest, uid id)
432 {
433 int i = BtId;
434
435         while( i-- )
436                 dest[i] = (unsigned char)id, id >>= 8;
437 }
438
439 uid bt_getid(unsigned char *src)
440 {
441 uid id = 0;
442 int i;
443
444         for( i = 0; i < BtId; i++ )
445                 id <<= 8, id |= *src++; 
446
447         return id;
448 }
449
450 //      lite weight spin lock Latch Manager
451
452 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
453 {
454         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
455 }
456 /*
457 void bt_mutexlock(BtMutexLatch *latch)
458 {
459 uint idx = 200;
460
461  while( __sync_val_compare_and_swap (latch->value, 0, 1) )
462   if( !idx )
463         sched_yield ();
464   else
465         idx--;
466 }
467
468 int bt_mutextry(BtMutexLatch *latch)
469 {
470   return !__sync_val_compare_and_swap (latch->value, 0, 1);
471 }
472
473 void bt_releasemutex(BtMutexLatch *latch)
474 {
475   if( !__sync_lock_test_and_set (latch->value, 0) )
476         abort();
477 }
478 */
479 void bt_mutexlock(BtMutexLatch *latch)
480 {
481 uint idx;
482
483   for( idx = 0; idx < 100; idx++ )
484         if( !__sync_val_compare_and_swap (latch->value, 0, 1) )
485           return;
486
487   while( __sync_lock_test_and_set (latch->value, 2) )
488         sys_futex ((uint *)latch->value, FUTEX_WAIT_PRIVATE, 2, NULL, NULL, 0);
489 }
490
491 int bt_mutextry(BtMutexLatch *latch)
492 {
493   return !__sync_val_compare_and_swap (latch->value, 0, 1);
494 }
495
496 void bt_releasemutex(BtMutexLatch *latch)
497 {
498 uint idx;
499
500   if( *latch->value == 2 )
501         *latch->value = 0;
502   else if( __sync_lock_test_and_set (latch->value, 0) == 1 )
503         return;
504
505   if( latch->value[0] )
506    if( __sync_val_compare_and_swap (latch->value, 1, 2) )
507         return;
508
509 //  for( idx = 0; idx < 200; idx++ )
510 //      if( latch->value[0] )
511 //        if( __sync_val_compare_and_swap (latch->value, 1, 2) )
512 //              return;
513   
514   sys_futex( (uint *)latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
515 }
516
517 //      reader/writer lock implementation
518
519 void WriteLock (RWLock *lock, ushort tid, uint line)
520 {
521         if( lock->tid == tid ) {
522                 lock->dup++;
523                 return;
524         }
525         bt_mutexlock (lock->xcl);
526         bt_mutexlock (lock->wrt);
527         bt_releasemutex (lock->xcl);
528 //if( lock->tid )
529 //abort();
530         lock->line = line;
531         lock->tid = tid;
532 }
533
534 void WriteRelease (RWLock *lock)
535 {
536         if( lock->dup ) {
537                 lock->dup--;
538                 return;
539         }
540         lock->tid = 0;
541         bt_releasemutex (lock->wrt);
542 }
543
544 void ReadLock (RWLock *lock, ushort tid)
545 {
546         bt_mutexlock (lock->xcl);
547
548         if( !__sync_fetch_and_add (&lock->readers, 1) )
549                 bt_mutexlock (lock->wrt);
550
551         bt_releasemutex (lock->xcl);
552 }
553
554 void ReadRelease (RWLock *lock)
555 {
556         if( __sync_fetch_and_sub (&lock->readers, 1) == 1 )
557                 bt_releasemutex (lock->wrt);
558 }
559
560 //      recovery manager -- flush dirty pages
561
562 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
563 {
564 uint cnt3 = 0, cnt2 = 0, cnt = 0;
565 uint entry, segment;
566 BtLatchSet *latch;
567 BtPage page;
568
569   //    flush dirty pool pages to the btree
570
571 fprintf(stderr, "Start flushlsn  ");
572   for( entry = 1; entry < mgr->latchtotal; entry++ ) {
573         page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
574         latch = mgr->latchsets + entry;
575         bt_mutexlock (latch->modify);
576         bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
577
578         if( latch->dirty ) {
579           bt_writepage(mgr, page, latch->page_no, 0);
580           latch->dirty = 0, cnt++;
581         }
582 if( latch->pin & ~CLOCK_bit )
583 cnt2++;
584         bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
585         bt_releasemutex (latch->modify);
586   }
587   for( entry = 1; entry < mgr->leaftotal; entry++ ) {
588         page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
589         latch = mgr->leafsets + entry;
590         bt_mutexlock (latch->modify);
591         bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
592
593         if( latch->dirty ) {
594           bt_writepage(mgr, page, latch->page_no, 1);
595           latch->dirty = 0, cnt++;
596         }
597 if( latch->pin & ~CLOCK_bit )
598 cnt2++;
599         bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
600         bt_releasemutex (latch->modify);
601   }
602 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
603 fprintf(stderr, "begin sync");
604         for( segment = 0; segment < mgr->segments; segment++ )
605           if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
606                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
607 fprintf(stderr, " end sync\n");
608 }
609
610 //      recovery manager -- process current recovery buff on startup
611 //      this won't do much if previous session was properly closed.
612
613 BTERR bt_recoveryredo (BtMgr *mgr)
614 {
615 BtLogHdr *hdr, *eof;
616 uint offset = 0;
617 BtKey *key;
618 BtVal *val;
619
620   hdr = (BtLogHdr *)mgr->redobuff;
621   mgr->flushlsn = hdr->lsn;
622
623   while( 1 ) {
624         hdr = (BtLogHdr *)(mgr->redobuff + offset);
625         switch( hdr->type ) {
626         case BTRM_eof:
627                 mgr->lsn = hdr->lsn;
628                 return 0;
629         case BTRM_add:          // add a unique key-value to btree
630         
631         case BTRM_dup:          // add a duplicate key-value to btree
632         case BTRM_del:          // delete a key-value from btree
633         case BTRM_upd:          // update a key with a new value
634         case BTRM_new:          // allocate a new empty page
635         case BTRM_old:          // reuse an old empty page
636                 return 0;
637         }
638   }
639 }
640
641 //      recovery manager -- append new entry to recovery log
642 //      flush dirty pages to disk when it overflows.
643
644 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
645 {
646 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
647 uint amt = sizeof(BtLogHdr);
648 BtLogHdr *hdr, *eof;
649 uint last, end;
650
651         bt_mutexlock (mgr->redo);
652
653         if( key )
654           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
655
656         //      see if new entry fits in buffer
657         //      flush and reset if it doesn't
658
659         if( amt > size - mgr->redoend ) {
660           mgr->flushlsn = mgr->lsn;
661           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
662                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
663           mgr->redolast = 0;
664           mgr->redoend = 0;
665           bt_flushlsn(mgr, thread_no);
666         }
667
668         //      fill in new entry & either eof or end block
669
670         hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
671
672         hdr->len = amt;
673         hdr->type = type;
674         hdr->lvl = lvl;
675         hdr->lsn = ++mgr->lsn;
676
677         mgr->redoend += amt;
678
679         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
680         memset (eof, 0, sizeof(BtLogHdr));
681
682         //  fill in key and value
683
684         if( key ) {
685           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
686           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
687         }
688
689         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
690         memset (eof, 0, sizeof(BtLogHdr));
691         eof->lsn = mgr->lsn;
692
693         last = mgr->redolast & ~0xfff;
694         end = mgr->redoend;
695
696         if( end - last + sizeof(BtLogHdr) >= 32768 )
697           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
698                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
699           else
700                 mgr->redolast = end;
701
702         bt_releasemutex(mgr->redo);
703         return hdr->lsn;
704 }
705
706 //      recovery manager -- append transaction to recovery log
707 //      flush dirty pages to disk when it overflows.
708
709 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
710 {
711 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
712 uint amt = 0, src, type;
713 BtLogHdr *hdr, *eof;
714 uint last, end;
715 logseqno lsn;
716 BtKey *key;
717 BtVal *val;
718
719         //      determine amount of redo recovery log space required
720
721         for( src = 0; src++ < source->cnt; ) {
722           key = keyptr(source,src);
723           val = valptr(source,src);
724           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
725           amt += sizeof(BtLogHdr);
726         }
727
728         bt_mutexlock (mgr->redo);
729
730         //      see if new entry fits in buffer
731         //      flush and reset if it doesn't
732
733         if( amt > size - mgr->redoend ) {
734           mgr->flushlsn = mgr->lsn;
735           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
736                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
737           mgr->redolast = 0;
738           mgr->redoend = 0;
739           bt_flushlsn (mgr, thread_no);
740         }
741
742         //      assign new lsn to transaction
743
744         lsn = ++mgr->lsn;
745
746         //      fill in new entries
747
748         for( src = 0; src++ < source->cnt; ) {
749           key = keyptr(source, src);
750           val = valptr(source, src);
751
752           switch( slotptr(source, src)->type ) {
753           case Unique:
754                 type = BTRM_add;
755                 break;
756           case Duplicate:
757                 type = BTRM_dup;
758                 break;
759           case Delete:
760                 type = BTRM_del;
761                 break;
762           }
763
764           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
765           amt += sizeof(BtLogHdr);
766
767           hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
768           hdr->len = amt;
769           hdr->type = type;
770           hdr->lsn = lsn;
771           hdr->lvl = 0;
772
773           //  fill in key and value
774
775           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
776           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
777
778           mgr->redoend += amt;
779         }
780
781         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
782         memset (eof, 0, sizeof(BtLogHdr));
783         eof->lsn = lsn;
784
785         last = mgr->redolast & ~0xfff;
786         end = mgr->redoend;
787
788         if( end - last + sizeof(BtLogHdr) >= 32768 )
789           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
790                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
791           else
792                 mgr->redolast = end;
793
794         bt_releasemutex(mgr->redo);
795         return lsn;
796 }
797
798 //      sync a single btree page to disk
799
800 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
801 {
802 uint segment = latch->page_no >> 16;
803 uint page_size = mgr->page_size;
804 BtPage perm;
805
806         if( bt_writepage (mgr, page, latch->page_no, latch->leaf) )
807                 return mgr->err;
808
809         if( !page->lvl )
810                 page_size <<= mgr->leaf_xtra;
811
812         perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
813
814         if( msync (perm, page_size, MS_SYNC) < 0 )
815           fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
816
817         latch->dirty = 0;
818         return 0;
819 }
820
821 //      read page into buffer pool from permanent location in Btree file
822
823 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
824 {
825 int flag = PROT_READ | PROT_WRITE;
826 uint page_size = mgr->page_size;
827 uint off = 0, segment, fragment;
828 unsigned char *perm;
829
830   if( leaf )
831         page_size <<= mgr->leaf_xtra;
832
833   fragment = page_no & 0xffff;
834   segment = page_no >> 16;
835   mgr->reads++;
836
837   while( off < page_size ) {
838         if( fragment >> 16 )
839                 segment++, fragment = 0;
840
841         if( segment < mgr->segments ) {
842           perm = mgr->pages[segment] + (fragment << mgr->page_bits);
843
844           memcpy ((unsigned char *)page + off, perm, mgr->page_size);
845           off += mgr->page_size;
846           fragment++;
847           continue;
848         }
849
850         bt_mutexlock (mgr->maps);
851
852         if( segment < mgr->segments ) {
853                 bt_releasemutex (mgr->maps);
854                 continue;
855         }
856
857         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
858         mgr->segments++;
859
860         bt_releasemutex (mgr->maps);
861   }
862
863 //if( !leaf && !page->lvl )
864 //abort();
865 //if( leaf && page->lvl )
866 //abort();
867   return 0;
868 }
869
870 //      write page to permanent location in Btree file
871
872 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
873 {
874 int flag = PROT_READ | PROT_WRITE;
875 uint page_size = mgr->page_size;
876 uint off = 0, segment, fragment;
877 unsigned char *perm;
878
879 //if( !leaf && !page->lvl )
880 //abort();
881 //if( leaf && page->lvl )
882 //abort();
883 //if( !page->lvl && mgr->leaf_xtra == 8 )
884 //fprintf(stderr, "WrPage %d\n", (uint)page_no);
885   if( leaf )
886         page_size <<= mgr->leaf_xtra;
887
888   fragment = page_no & 0xffff;
889   segment = page_no >> 16;
890   mgr->writes++;
891
892   while( off < page_size ) {
893         if( fragment >> 16 )
894                 segment++, fragment = 0;
895
896         if( segment < mgr->segments ) {
897           perm = mgr->pages[segment] + (fragment << mgr->page_bits);
898           memcpy (perm, (unsigned char *)page + off, mgr->page_size);
899           off += mgr->page_size;
900           fragment++;
901           continue;
902         }
903
904         bt_mutexlock (mgr->maps);
905
906         if( segment < mgr->segments ) {
907                 bt_releasemutex (mgr->maps);
908                 continue;
909         }
910
911         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
912         mgr->segments++;
913         bt_releasemutex (mgr->maps);
914   }
915
916   return 0;
917 }
918
919 //      set CLOCK bit in latch
920 //      decrement pin count
921
922 void bt_unpinlatch (BtLatchSet *latch, ushort thread_no, uint line)
923 {
924         bt_mutexlock(latch->modify);
925 //if( !(latch->pin & ~CLOCK_bit) )
926 //abort();
927         latch->pin |= CLOCK_bit;
928         latch->pin--;
929
930         bt_releasemutex(latch->modify);
931 }
932
933 //  return the btree cached page address
934
935 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
936 {
937 uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets);
938 BtPage page;
939
940         if( latch->leaf )
941                 page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
942         else
943                 page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
944 //if( latch->leaf )
945 //if( page->lvl )
946 //abort();
947         return page;
948 }
949
950 //  return next available leaf entry
951 //        and with latch entry locked
952
953 uint bt_availleaf (BtMgr *mgr)
954 {
955 BtLatchSet *latch;
956 uint entry;
957
958   while( 1 ) {
959 #ifdef unix
960         entry = __sync_fetch_and_add (&mgr->leafvictim, 1) + 1;
961 #else
962         entry = _InterlockedIncrement (&mgr->leafvictim);
963 #endif
964         entry %= mgr->leaftotal;
965
966         if( !entry )
967                 continue;
968
969         latch = mgr->leafsets + entry;
970
971         if( !bt_mutextry(latch->modify) )
972                 continue;
973
974         //  return this entry if it is not pinned
975
976         if( !latch->pin )
977                 return entry;
978
979         //      if the CLOCK bit is set
980         //      reset it to zero.
981
982         latch->pin &= ~CLOCK_bit;
983         bt_releasemutex(latch->modify);
984   }
985 }
986
987 //  return next available latch entry
988 //        and with latch entry locked
989
990 uint bt_availnext (BtMgr *mgr)
991 {
992 BtLatchSet *latch;
993 uint entry;
994
995   while( 1 ) {
996 #ifdef unix
997         entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
998 #else
999         entry = _InterlockedIncrement (&mgr->latchvictim);
1000 #endif
1001         entry %= mgr->latchtotal;
1002
1003         if( !entry )
1004                 continue;
1005
1006         latch = mgr->latchsets + entry;
1007
1008         if( !bt_mutextry(latch->modify) )
1009                 continue;
1010
1011         //  return this entry if it is not pinned
1012
1013         if( !latch->pin )
1014                 return entry;
1015
1016         //      if the CLOCK bit is set
1017         //      reset it to zero.
1018
1019         latch->pin &= ~CLOCK_bit;
1020         bt_releasemutex(latch->modify);
1021   }
1022 }
1023
1024 //      pin leaf in leaf buffer pool
1025 //      return with latchset pinned
1026
1027 BtLatchSet *bt_pinleaf (BtMgr *mgr, uid page_no, ushort thread_no)
1028 {
1029 uint hashidx = page_no % mgr->leafhash;
1030 BtLatchSet *latch;
1031 uint entry, idx;
1032 BtPage page;
1033
1034   //  try to find our entry
1035
1036   bt_mutexlock(mgr->leaftable[hashidx].latch);
1037
1038   if( entry = mgr->leaftable[hashidx].entry ) do
1039   {
1040         latch = mgr->leafsets + entry;
1041         if( page_no == latch->page_no )
1042                 break;
1043   } while( entry = latch->next );
1044
1045   //  found our entry: increment pin
1046
1047   if( entry ) {
1048         latch = mgr->leafsets + entry;
1049         bt_mutexlock(latch->modify);
1050         latch->pin |= CLOCK_bit;
1051         latch->pin++;
1052 //if( !latch->leaf )
1053 //abort();
1054         bt_releasemutex(latch->modify);
1055         bt_releasemutex(mgr->leaftable[hashidx].latch);
1056         return latch;
1057   }
1058
1059   //  find and reuse unpinned entry
1060
1061 trynext:
1062
1063   entry = bt_availleaf (mgr);
1064   latch = mgr->leafsets + entry;
1065 //if( latch->page_no )
1066 //if( !latch->leaf )
1067 //abort();
1068
1069   idx = latch->page_no % mgr->leafhash;
1070
1071   //  if latch is on a different hash chain
1072   //    unlink from the old page_no chain
1073
1074   if( latch->page_no )
1075    if( idx != hashidx ) {
1076
1077         //  skip over this entry if latch not available
1078
1079     if( !bt_mutextry (mgr->leaftable[idx].latch) ) {
1080           bt_releasemutex(latch->modify);
1081           goto trynext;
1082         }
1083
1084         if( latch->prev )
1085           mgr->leafsets[latch->prev].next = latch->next;
1086         else
1087           mgr->leaftable[idx].entry = latch->next;
1088
1089         if( latch->next )
1090           mgr->leafsets[latch->next].prev = latch->prev;
1091
1092     bt_releasemutex (mgr->leaftable[idx].latch);
1093    }
1094
1095   page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1096
1097   //  update permanent page area in btree from buffer pool
1098   //    no read-lock is required since page is not pinned.
1099
1100 //if( latch->page_no )
1101 //if( !latch->leaf )
1102 //abort();
1103   if( latch->dirty )
1104         if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) )
1105           return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1106         else
1107           latch->dirty = 0;
1108
1109   if( bt_readpage (mgr, page, page_no, 1) )
1110         return mgr->line = __LINE__, NULL;
1111
1112   //  link page as head of hash table chain
1113   //  if this is a never before used entry,
1114   //  or it was previously on a different
1115   //  hash table chain. Otherwise, just
1116   //  leave it in its current hash table
1117   //  chain position.
1118
1119   if( !latch->page_no || hashidx != idx ) {
1120         if( latch->next = mgr->leaftable[hashidx].entry )
1121           mgr->leafsets[latch->next].prev = entry;
1122
1123         mgr->leaftable[hashidx].entry = entry;
1124     latch->prev = 0;
1125   }
1126
1127   //  fill in latch structure
1128
1129   latch->pin = CLOCK_bit | 1;
1130   latch->page_no = page_no;
1131   latch->leaf = 1;
1132
1133   bt_releasemutex (latch->modify);
1134   bt_releasemutex (mgr->leaftable[hashidx].latch);
1135   return latch;
1136 }
1137
1138 //      pin page in non-leaf buffer pool
1139 //      return with latchset pinned
1140
1141 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, ushort thread_no)
1142 {
1143 uint hashidx = page_no % mgr->latchhash;
1144 BtLatchSet *latch;
1145 uint entry, idx;
1146 BtPage page;
1147
1148   //  try to find our entry
1149
1150   bt_mutexlock(mgr->hashtable[hashidx].latch);
1151
1152   if( entry = mgr->hashtable[hashidx].entry ) do
1153   {
1154         latch = mgr->latchsets + entry;
1155         if( page_no == latch->page_no )
1156                 break;
1157   } while( entry = latch->next );
1158
1159   //  found our entry: increment pin
1160
1161   if( entry ) {
1162         latch = mgr->latchsets + entry;
1163         bt_mutexlock(latch->modify);
1164         latch->pin |= CLOCK_bit;
1165         latch->pin++;
1166         bt_releasemutex(latch->modify);
1167         bt_releasemutex(mgr->hashtable[hashidx].latch);
1168         return latch;
1169   }
1170
1171   //  find and reuse unpinned entry
1172
1173 trynext:
1174
1175   entry = bt_availnext (mgr);
1176   latch = mgr->latchsets + entry;
1177
1178   idx = latch->page_no % mgr->latchhash;
1179
1180   //  if latch is on a different hash chain
1181   //    unlink from the old page_no chain
1182
1183   if( latch->page_no )
1184    if( idx != hashidx ) {
1185
1186         //  skip over this entry if latch not available
1187
1188     if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1189           bt_releasemutex(latch->modify);
1190           goto trynext;
1191         }
1192
1193         if( latch->prev )
1194           mgr->latchsets[latch->prev].next = latch->next;
1195         else
1196           mgr->hashtable[idx].entry = latch->next;
1197
1198         if( latch->next )
1199           mgr->latchsets[latch->next].prev = latch->prev;
1200
1201     bt_releasemutex (mgr->hashtable[idx].latch);
1202    }
1203
1204   page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1205
1206   //  update permanent page area in btree from buffer pool
1207   //    no read-lock is required since page is not pinned.
1208
1209 //if( latch->page_no )
1210 //if( latch->leaf )
1211 //abort();
1212   if( latch->dirty )
1213         if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1214           return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1215         else
1216           latch->dirty = 0;
1217
1218   if( bt_readpage (mgr, page, page_no, 0) )
1219         return mgr->line = __LINE__, NULL;
1220
1221   //  link page as head of hash table chain
1222   //  if this is a never before used entry,
1223   //  or it was previously on a different
1224   //  hash table chain. Otherwise, just
1225   //  leave it in its current hash table
1226   //  chain position.
1227
1228   if( !latch->page_no || hashidx != idx ) {
1229         if( latch->next = mgr->hashtable[hashidx].entry )
1230           mgr->latchsets[latch->next].prev = entry;
1231
1232         mgr->hashtable[hashidx].entry = entry;
1233     latch->prev = 0;
1234   }
1235
1236   //  fill in latch structure
1237
1238   latch->pin = CLOCK_bit | 1;
1239   latch->page_no = page_no;
1240   latch->leaf = 0;
1241
1242   bt_releasemutex (latch->modify);
1243   bt_releasemutex (mgr->hashtable[hashidx].latch);
1244   return latch;
1245 }
1246   
1247 void bt_mgrclose (BtMgr *mgr)
1248 {
1249 BtLatchSet *latch;
1250 BtLogHdr *eof;
1251 uint num = 0;
1252 BtPage page;
1253 uint slot;
1254
1255         //      flush previously written dirty pages
1256         //      and write recovery buffer to disk
1257
1258         fdatasync (mgr->idx);
1259
1260         if( mgr->redoend ) {
1261                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1262                 memset (eof, 0, sizeof(BtLogHdr));
1263         }
1264
1265         //      write remaining dirty pool pages to the btree
1266
1267         for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1268                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1269                 latch = mgr->latchsets + slot;
1270
1271                 if( latch->dirty ) {
1272                         bt_writepage(mgr, page, latch->page_no, 0);
1273                         latch->dirty = 0, num++;
1274                 }
1275         }
1276
1277         //      write remaining dirty leaf pages to the btree
1278
1279         for( slot = 1; slot < mgr->leaftotal; slot++ ) {
1280                 page = (BtPage)(((uid)slot << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1281                 latch = mgr->leafsets + slot;
1282
1283                 if( latch->dirty ) {
1284                         bt_writepage(mgr, page, latch->page_no, 1);
1285                         latch->dirty = 0, num++;
1286                 }
1287         }
1288
1289         //      clear redo recovery buffer on disk.
1290
1291         if( mgr->pagezero->redopages ) {
1292                 eof = (BtLogHdr *)mgr->redobuff;
1293                 memset (eof, 0, sizeof(BtLogHdr));
1294                 eof->lsn = mgr->lsn;
1295                 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1296                   fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1297         }
1298
1299         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1300
1301 #ifdef unix
1302         while( mgr->segments )
1303                 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1304
1305         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1306         munmap (mgr->leafpool, (uid)mgr->nleafpage << mgr->page_bits);
1307         munmap (mgr->pagezero, mgr->page_size);
1308 #else
1309         FlushViewOfFile(mgr->pagezero, 0);
1310         UnmapViewOfFile(mgr->pagezero);
1311         UnmapViewOfFile(mgr->pagepool);
1312         CloseHandle(mgr->halloc);
1313         CloseHandle(mgr->hpool);
1314 #endif
1315 #ifdef unix
1316         close (mgr->idx);
1317         free (mgr);
1318 #else
1319         VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1320         FlushFileBuffers(mgr->idx);
1321         CloseHandle(mgr->idx);
1322         GlobalFree (mgr);
1323 #endif
1324 }
1325
1326 //      close and release memory
1327
1328 void bt_close (BtDb *bt)
1329 {
1330 #ifdef unix
1331         if( bt->cursor )
1332                 free (bt->cursor);
1333 #else
1334         if( bt->cursor)
1335                 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1336 #endif
1337         free (bt);
1338 }
1339
1340 //  open/create new btree buffer manager
1341
1342 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1343 //              size of page pool (e.g. 262144) and leaf pool
1344
1345 BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax, uint leafmax, uint redopages)
1346 {
1347 uint lvl, attr, last, slot, idx;
1348 uint nlatchpage, latchhash;
1349 uint nleafpage, leafhash;
1350 unsigned char value[BtId];
1351 int flag, initit = 0;
1352 BtPageZero *pagezero;
1353 BtLatchSet *latch;
1354 off64_t size;
1355 uint amt[1];
1356 BtMgr* mgr;
1357 BtKey* key;
1358 BtVal *val;
1359
1360         // determine sanity of page size and buffer pool
1361
1362         if( leafxtra + pagebits > BT_maxbits )
1363                 fprintf (stderr, "pagebits + leafxtra > maxbits\n"), exit(1);
1364
1365         if( pagebits < BT_minbits )
1366                 fprintf (stderr, "pagebits < minbits\n"), exit(1);
1367
1368 #ifdef unix
1369         mgr = calloc (1, sizeof(BtMgr));
1370
1371         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1372
1373         if( mgr->idx == -1 ) {
1374                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1375                 return free(mgr), NULL;
1376         }
1377 #else
1378         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1379         attr = FILE_ATTRIBUTE_NORMAL;
1380         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1381
1382         if( mgr->idx == INVALID_HANDLE_VALUE ) {
1383                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1384                 return GlobalFree(mgr), NULL;
1385         }
1386 #endif
1387
1388 #ifdef unix
1389         pagezero = valloc (BT_maxpage);
1390         *amt = 0;
1391
1392         // read minimum page size to get root info
1393         //      to support raw disk partition files
1394         //      check if page_bits == 0 on the disk.
1395
1396         if( size = lseek (mgr->idx, 0L, 2) )
1397                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1398                         if( pagezero->page_bits ) {
1399                                 pagebits = pagezero->page_bits;
1400                                 leafxtra = pagezero->leaf_xtra;
1401                         } else
1402                                 initit = 1;
1403                 else
1404                         return free(mgr), free(pagezero), NULL;
1405         else
1406                 initit = 1;
1407 #else
1408         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1409         size = GetFileSize(mgr->idx, amt);
1410
1411         if( size || *amt ) {
1412                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1413                         return bt_mgrclose (mgr), NULL;
1414                 pagebits = pagezero->page_bits;
1415                 leafxtra = pagezero->leaf_xtra;
1416         } else
1417                 initit = 1;
1418 #endif
1419
1420         mgr->page_size = 1 << pagebits;
1421         mgr->page_bits = pagebits;
1422         mgr->leaf_xtra = leafxtra;
1423
1424         //  calculate number of latch hash table entries
1425
1426         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1427
1428         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1429         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1) >> mgr->page_bits;
1430         mgr->latchtotal = nodemax;
1431
1432         //  calculate number of leaf hash table entries
1433
1434         mgr->nleafpage = ((uid)leafmax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1435
1436         mgr->nleafpage += leafmax << leafxtra;          // size of the leaf pool in pages
1437         mgr->nleafpage += (sizeof(BtLatchSet) * leafmax + mgr->page_size - 1) >> mgr->page_bits;
1438         mgr->leaftotal = leafmax;
1439
1440         mgr->redopage = LEAF_page + (1 << leafxtra);
1441
1442         if( !initit )
1443                 goto mgrlatch;
1444
1445         // initialize an empty b-tree with latch page, root page, page of leaves
1446         // and page(s) of latches and page pool cache
1447
1448         ftruncate (mgr->idx, (mgr->redopage + pagezero->redopages) << mgr->page_bits);
1449         memset (pagezero, 0, 1 << pagebits);
1450         pagezero->alloc->lvl = MIN_lvl - 1;
1451         pagezero->redopages = redopages;
1452         pagezero->page_bits = mgr->page_bits;
1453         pagezero->leaf_xtra = leafxtra;
1454
1455         bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages);
1456         pagezero->leafpages = 1;
1457
1458         //  initialize left-most LEAF page in
1459         //      alloc->left and count of active leaf pages.
1460
1461         bt_putid (pagezero->alloc->left, LEAF_page);
1462
1463         if( bt_writepage (mgr, pagezero->alloc, 0, 0) ) {
1464                 fprintf (stderr, "Unable to create btree page zero\n");
1465                 return bt_mgrclose (mgr), NULL;
1466         }
1467
1468         memset (pagezero, 0, 1 << pagebits);
1469
1470         for( lvl=MIN_lvl; lvl--; ) {
1471         BtSlot *node = slotptr(pagezero->alloc, 1);
1472                 node->off = mgr->page_size;
1473
1474                 if( !lvl )
1475                         node->off <<= mgr->leaf_xtra;
1476
1477                 node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1478                 key = keyptr(pagezero->alloc, 1);
1479                 key->len = 2;           // create stopper key
1480                 key->key[0] = 0xff;
1481                 key->key[1] = 0xff;
1482
1483                 bt_putid(value, MIN_lvl - lvl + 1);
1484                 val = valptr(pagezero->alloc, 1);
1485                 val->len = lvl ? BtId : 0;
1486                 memcpy (val->value, value, val->len);
1487
1488                 pagezero->alloc->min = node->off;
1489                 pagezero->alloc->lvl = lvl;
1490                 pagezero->alloc->cnt = 1;
1491                 pagezero->alloc->act = 1;
1492                 pagezero->alloc->page_no = MIN_lvl - lvl;
1493
1494                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, !lvl) ) {
1495                         fprintf (stderr, "Unable to create btree page\n");
1496                         return bt_mgrclose (mgr), NULL;
1497                 }
1498         }
1499
1500 mgrlatch:
1501 #ifdef unix
1502         free (pagezero);
1503 #else
1504         VirtualFree (pagezero, 0, MEM_RELEASE);
1505 #endif
1506
1507         // mlock the first segment of 64K pages
1508
1509         flag = PROT_READ | PROT_WRITE;
1510         mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1511         mgr->segments = 1;
1512
1513         if( mgr->pages[0] == MAP_FAILED ) {
1514                 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1515                 return bt_mgrclose (mgr), NULL;
1516         }
1517
1518         mgr->pagezero = (BtPageZero *)mgr->pages[0];
1519         mlock (mgr->pagezero, mgr->page_size);
1520
1521         mgr->redobuff = mgr->pages[0] + (mgr->redopage << mgr->page_bits);
1522         mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1523
1524         //      allocate pool buffers
1525
1526         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1527         if( mgr->pagepool == MAP_FAILED ) {
1528                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1529                 return bt_mgrclose (mgr), NULL;
1530         }
1531
1532         mgr->leafpool = mmap (0, (uid)mgr->nleafpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1533         if( mgr->leafpool == MAP_FAILED ) {
1534                 fprintf (stderr, "Unable to mmap anonymous leaf pool pages, error = %d\n", errno);
1535                 return bt_mgrclose (mgr), NULL;
1536         }
1537
1538         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1539         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1540         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1541
1542         mgr->leafsets = (BtLatchSet *)(mgr->leafpool + ((uid)mgr->leaftotal << mgr->page_bits << mgr->leaf_xtra));
1543         mgr->leaftable = (BtHashEntry *)(mgr->leafsets + mgr->leaftotal);
1544         mgr->leafhash = (mgr->leafpool + ((uid)mgr->nleafpage << mgr->page_bits) - (unsigned char *)mgr->leaftable) / sizeof(BtHashEntry);
1545
1546         return mgr;
1547 }
1548
1549 //      open BTree access method
1550 //      based on buffer manager
1551
1552 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1553 {
1554 BtDb *bt = malloc (sizeof(*bt));
1555
1556         memset (bt, 0, sizeof(*bt));
1557         bt->main = main;
1558         bt->mgr = mgr;
1559 #ifdef unix
1560         bt->cursor = valloc (mgr->page_size << mgr->leaf_xtra);
1561 #else
1562         bt->cursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
1563 #endif
1564 #ifdef unix
1565         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1566 #else
1567         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1568 #endif
1569         return bt;
1570 }
1571
1572 //  compare two keys, return > 0, = 0, or < 0
1573 //  =0: keys are same
1574 //  -1: key2 > key1
1575 //  +1: key2 < key1
1576 //  as the comparison value
1577
1578 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1579 {
1580 uint len1 = key1->len;
1581 int ans;
1582
1583         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1584                 return ans;
1585
1586         if( len1 > len2 )
1587                 return 1;
1588         if( len1 < len2 )
1589                 return -1;
1590
1591         return 0;
1592 }
1593
1594 // place write, read, or parent lock on requested page_no.
1595
1596 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1597 {
1598         switch( mode ) {
1599         case BtLockRead:
1600                 ReadLock (latch->readwr, thread_no);
1601                 break;
1602         case BtLockWrite:
1603 //if(latch->leaf)
1604 //fprintf(stderr, "WrtRqst %d by %d at %d\n", (uint)latch->page_no, thread_no, line); 
1605                 WriteLock (latch->readwr, thread_no, line);
1606 //if(latch->leaf)
1607 //fprintf(stderr, "WrtLock %d by %d at %d\n", (uint)latch->page_no, thread_no, line); 
1608                 break;
1609         case BtLockAccess:
1610                 ReadLock (latch->access, thread_no);
1611                 break;
1612         case BtLockDelete:
1613                 WriteLock (latch->access, thread_no, line);
1614                 break;
1615         case BtLockParent:
1616                 WriteLock (latch->parent, thread_no, line);
1617                 break;
1618         case BtLockLink:
1619                 WriteLock (latch->link, thread_no, line);
1620                 break;
1621         }
1622 }
1623
1624 // remove write, read, or parent lock on requested page
1625
1626 void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1627 {
1628         switch( mode ) {
1629         case BtLockRead:
1630                 ReadRelease (latch->readwr);
1631                 break;
1632         case BtLockWrite:
1633 //if(latch->leaf)
1634 //fprintf(stderr, "Un Lock %d by %d at %d\n", (uint)latch->page_no, thread_no, line); 
1635                 WriteRelease (latch->readwr);
1636                 break;
1637         case BtLockAccess:
1638                 ReadRelease (latch->access);
1639                 break;
1640         case BtLockDelete:
1641                 WriteRelease (latch->access);
1642                 break;
1643         case BtLockParent:
1644                 WriteRelease (latch->parent);
1645                 break;
1646         case BtLockLink:
1647                 WriteRelease (latch->link);
1648                 break;
1649         }
1650 }
1651
1652 //      allocate a new page
1653 //      return with page latched, but unlocked.
1654
1655 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_no)
1656 {
1657 uint page_size = mgr->page_size, page_xtra = 0;
1658 unsigned char *freechain;
1659 uid page_no;
1660
1661         if( contents->lvl )
1662                 freechain = mgr->pagezero->freechain;
1663         else {
1664                 freechain = mgr->pagezero->leafchain;
1665                 mgr->pagezero->leafpages++;
1666                 page_xtra = mgr->leaf_xtra;
1667                 page_size <<= page_xtra;
1668         }
1669
1670         //      lock allocation page
1671
1672         bt_mutexlock(mgr->lock);
1673
1674         // use empty chain first
1675         // else allocate new page
1676
1677         if( page_no = bt_getid(freechain) ) {
1678                 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1679                         set->page = bt_mappage (mgr, set->latch);
1680                 else
1681                         return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1682
1683                 bt_putid(freechain, bt_getid(set->page->right));
1684
1685                 //  the page is currently nopromote and this
1686                 //  will keep bt_txnpromote out.
1687
1688                 //      contents will replace this bit
1689                 //  and pin will keep bt_txnpromote out
1690
1691                 contents->page_no = page_no;
1692                 contents->nopromote = 0;
1693                 set->latch->dirty = 1;
1694
1695                 memcpy (set->page, contents, page_size);
1696
1697 //              if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1698 //                fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1699
1700                 bt_releasemutex(mgr->lock);
1701                 return 0;
1702         }
1703
1704         page_no = bt_getid(mgr->pagezero->alloc->right);
1705         bt_putid(mgr->pagezero->alloc->right, page_no+(1 << page_xtra));
1706
1707         // unlock allocation latch and
1708         //      extend file into new page.
1709
1710 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1711 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1712         bt_releasemutex(mgr->lock);
1713
1714         //      keep bt_txnpromote out of this page
1715         contents->nopromote = 1;
1716         contents->page_no = page_no;
1717         if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
1718                 fprintf(stderr, "Write %d error %d\n", (uint)page_no, errno);
1719         if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1720                 set->page = bt_mappage (mgr, set->latch);
1721         else
1722                 return mgr->err_thread = thread_no, mgr->err;
1723
1724         // now pin will keep bt_txnpromote out
1725
1726         set->page->nopromote = 0;
1727         set->latch->dirty = 1;
1728         return 0;
1729 }
1730
1731 //  find slot in page for given key at a given level
1732
1733 int bt_findslot (BtPage page, unsigned char *key, uint len)
1734 {
1735 uint diff, higher = page->cnt, low = 1, slot;
1736 uint good = 0;
1737
1738         //        make stopper key an infinite fence value
1739
1740         if( bt_getid (page->right) )
1741                 higher++;
1742         else
1743                 good++;
1744
1745         //      low is the lowest candidate.
1746         //  loop ends when they meet
1747
1748         //  higher is already
1749         //      tested as .ge. the passed key.
1750
1751         while( diff = higher - low ) {
1752                 slot = low + ( diff >> 1 );
1753                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1754                         low = slot + 1;
1755                 else
1756                         higher = slot, good++;
1757         }
1758
1759         //      return zero if key is on right link page
1760
1761         return good ? higher : 0;
1762 }
1763
1764 //  find and load page at given level for given key
1765 //      leave page rd or wr locked as requested
1766
1767 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1768 {
1769 uid page_no = ROOT_page, prevpage_no = 0;
1770 uint drill = 0xff, slot;
1771 BtLatchSet *prevlatch;
1772 uint mode, prevmode;
1773 BtPage prevpage;
1774 BtVal *val;
1775 BtKey *ptr;
1776
1777   //  start at root of btree and drill down
1778
1779   do {
1780         // determine lock mode of drill level
1781         mode = (drill == lvl) ? lock : BtLockRead; 
1782
1783         if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1784           set->page = bt_mappage (mgr, set->latch);
1785         else
1786           return 0;
1787
1788         // obtain access lock using lock chaining with Access mode
1789
1790         if( page_no > ROOT_page )
1791           bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1792
1793         //      release & unpin parent or left sibling page
1794
1795         if( prevpage_no ) {
1796           bt_unlockpage(prevmode, prevlatch, thread_no, __LINE__);
1797           bt_unpinlatch (prevlatch, thread_no, __LINE__);
1798           prevpage_no = 0;
1799         }
1800
1801         // obtain mode lock using lock coupling through AccessLock
1802
1803         bt_lockpage(mode, set->latch, thread_no, __LINE__);
1804
1805         if( set->page->free )
1806                 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1807
1808         if( page_no > ROOT_page )
1809           bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1810
1811         // re-read and re-lock root after determining actual level of root
1812
1813         if( set->page->lvl != drill) {
1814                 if( set->latch->page_no != ROOT_page )
1815                         return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1816                         
1817                 drill = set->page->lvl;
1818
1819                 if( lock != BtLockRead && drill == lvl ) {
1820                   bt_unlockpage(mode, set->latch, thread_no, __LINE__);
1821                   bt_unpinlatch (set->latch, thread_no, __LINE__);
1822                   continue;
1823                 }
1824         }
1825
1826         prevpage_no = set->latch->page_no;
1827         prevlatch = set->latch;
1828         prevpage = set->page;
1829         prevmode = mode;
1830
1831         //  find key on page at this level
1832         //  and descend to requested level
1833
1834         if( !set->page->kill )
1835          if( slot = bt_findslot (set->page, key, len) ) {
1836           if( drill == lvl )
1837                 return slot;
1838
1839           // find next non-dead slot -- the fence key if nothing else
1840
1841           while( slotptr(set->page, slot)->dead )
1842                 if( slot++ < set->page->cnt )
1843                   continue;
1844                 else
1845                   return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1846
1847           val = valptr(set->page, slot);
1848
1849           if( val->len == BtId )
1850                 page_no = bt_getid(valptr(set->page, slot)->value);
1851           else
1852                 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0;
1853
1854           drill--;
1855           continue;
1856          }
1857
1858         //  slide right into next page
1859
1860         bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
1861         page_no = bt_getid(set->page->right);
1862         bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
1863
1864   } while( page_no );
1865
1866   // return error on end of right chain
1867
1868   mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1869   return 0;     // return error
1870 }
1871
1872 //      return page to free list
1873 //      page must be delete & write locked
1874 //      and have no keys pointing to it.
1875
1876 void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1877 {
1878 unsigned char *freechain;
1879
1880 //if( (set->latch->pin & ~CLOCK_bit) > 1 )
1881 //abort();
1882         if( set->page->lvl )
1883                 freechain = mgr->pagezero->freechain;
1884         else {
1885                 freechain = mgr->pagezero->leafchain;
1886                 mgr->pagezero->leafpages--;
1887         }
1888
1889         //      lock allocation page
1890
1891         bt_mutexlock (mgr->lock);
1892
1893         //      store chain
1894
1895         memcpy(set->page->right, freechain, BtId);
1896         bt_putid(freechain, set->latch->page_no);
1897         set->latch->dirty = 1;
1898         set->page->free = 1;
1899
1900 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1901 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1902
1903         // unlock released page
1904         // and unlock allocation page
1905
1906         bt_unlockpage (BtLockDelete, set->latch, thread_no, __LINE__);
1907         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1908         bt_unpinlatch (set->latch, thread_no, __LINE__);
1909         bt_releasemutex (mgr->lock);
1910 }
1911
1912 //      a fence key was deleted from a page
1913 //      push new fence value upwards
1914
1915 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1916 {
1917 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1918 unsigned char value[BtId];
1919 BtKey* ptr;
1920 uint idx;
1921
1922         //      remove the old fence value
1923
1924         ptr = keyptr(set->page, set->page->cnt);
1925         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1926         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1927         set->latch->dirty = 1;
1928
1929         //  cache new fence value
1930
1931         ptr = keyptr(set->page, set->page->cnt);
1932         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1933
1934         bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
1935         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1936
1937         //      insert new (now smaller) fence key
1938
1939         bt_putid (value, set->latch->page_no);
1940         ptr = (BtKey*)leftkey;
1941
1942         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1943           return mgr->err_thread = thread_no, mgr->err;
1944
1945         //      now delete old fence key
1946
1947         ptr = (BtKey*)rightkey;
1948
1949         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1950                 return mgr->err_thread = thread_no, mgr->err;
1951
1952         bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
1953         bt_unpinlatch(set->latch, thread_no, __LINE__);
1954         return 0;
1955 }
1956
1957 //      root has a single child
1958 //      collapse a level from the tree
1959
1960 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1961 {
1962 BtPageSet child[1];
1963 uid page_no;
1964 BtVal *val;
1965 uint idx;
1966
1967   // find the child entry and promote as new root contents
1968
1969   do {
1970         for( idx = 0; idx++ < root->page->cnt; )
1971           if( !slotptr(root->page, idx)->dead )
1972                 break;
1973
1974         val = valptr(root->page, idx);
1975
1976         if( val->len == BtId )
1977                 page_no = bt_getid (valptr(root->page, idx)->value);
1978         else
1979                 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1980
1981         if( child->latch = bt_pinlatch (mgr, page_no, thread_no) )
1982                 child->page = bt_mappage (mgr, child->latch);
1983         else
1984                 return mgr->err_thread = thread_no, mgr->err;
1985
1986         bt_lockpage (BtLockDelete, child->latch, thread_no, __LINE__);
1987         bt_lockpage (BtLockWrite, child->latch, thread_no, __LINE__);
1988
1989         memcpy (root->page, child->page, mgr->page_size);
1990         root->latch->dirty = 1;
1991
1992         bt_freepage (mgr, child, thread_no);
1993
1994   } while( root->page->lvl > 1 && root->page->act == 1 );
1995
1996   bt_unlockpage (BtLockWrite, root->latch, thread_no, __LINE__);
1997   bt_unpinlatch (root->latch, thread_no, __LINE__);
1998   return 0;
1999 }
2000
2001 //  delete a page and manage key
2002 //  call with page writelocked
2003
2004 //      returns with page unpinned
2005 //      from the page pool.
2006
2007 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2008 {
2009 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
2010 uint page_size = mgr->page_size;
2011 unsigned char value[BtId];
2012 uint lvl = set->page->lvl;
2013 BtPageSet right[1];
2014 uid page_no;
2015 BtKey *ptr;
2016
2017         if( !lvl )
2018                 page_size <<= mgr->leaf_xtra;
2019
2020         //      cache copy of fence key
2021         //      to remove in parent
2022
2023         ptr = keyptr(set->page, set->page->cnt);
2024         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
2025
2026         //      obtain locks on right page
2027
2028         page_no = bt_getid(set->page->right);
2029
2030         if( right->latch = lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
2031                 right->page = bt_mappage (mgr, right->latch);
2032         else
2033                 return 0;
2034 //if( right->page->lvl )
2035 //abort();
2036         bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2037
2038         // cache copy of key to update
2039
2040         ptr = keyptr(right->page, right->page->cnt);
2041         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
2042
2043         if( right->page->kill )
2044                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
2045
2046         // pull contents of right peer into our empty page
2047
2048         memcpy (right->page->left, set->page->left, BtId);
2049         memcpy (set->page, right->page, page_size);
2050         set->page->page_no = set->latch->page_no;
2051         set->latch->dirty = 1;
2052
2053         // mark right page deleted and point it to left page
2054         //      until we can post parent updates that remove access
2055         //      to the deleted page.
2056
2057         bt_putid (right->page->right, set->latch->page_no);
2058         right->latch->dirty = 1;
2059         right->page->kill = 1;
2060
2061         bt_lockpage (BtLockParent, right->latch, thread_no, __LINE__);
2062         bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2063
2064         bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
2065         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2066
2067         // redirect higher key directly to our new node contents
2068
2069         bt_putid (value, set->latch->page_no);
2070         ptr = (BtKey*)higherfence;
2071
2072         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2073           return mgr->err;
2074
2075         //      delete old lower key to our node
2076
2077         ptr = (BtKey*)lowerfence;
2078
2079         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
2080           return mgr->err;
2081
2082         //      obtain delete and write locks to right node
2083
2084         bt_unlockpage (BtLockParent, right->latch, thread_no, __LINE__);
2085         bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
2086         bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2087 while( (right->latch->pin & ~CLOCK_bit) > 1 )
2088 sched_yield();
2089         bt_freepage (mgr, right, thread_no);
2090 //fprintf(stderr, "DelPage %d by %d at %d\n", (uint)right->latch->page_no, thread_no, __LINE__); 
2091         bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
2092         bt_unpinlatch (set->latch, thread_no, __LINE__);
2093         return 0;
2094 }
2095
2096 //  find and delete key on page by marking delete flag bit
2097 //  if page becomes empty, delete it from the btree
2098
2099 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2100 {
2101 uint slot, idx, found, fence;
2102 BtPageSet set[1];
2103 BtSlot *node;
2104 BtKey *ptr;
2105 BtVal *val;
2106
2107         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2108                 node = slotptr(set->page, slot);
2109                 ptr = keyptr(set->page, slot);
2110         } else
2111                 return mgr->err_thread = thread_no, mgr->err;
2112
2113         // if librarian slot, advance to real slot
2114
2115         if( node->type == Librarian ) {
2116                 ptr = keyptr(set->page, ++slot);
2117                 node = slotptr(set->page, slot);
2118         }
2119
2120         fence = slot == set->page->cnt;
2121
2122         // delete the key, ignore request if already dead
2123
2124         if( found = !keycmp (ptr, key, len) )
2125           if( found = node->dead == 0 ) {
2126                 val = valptr(set->page,slot);
2127                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2128                 set->page->act--;
2129
2130                 //  mark node type as delete
2131
2132                 node->type = Delete;
2133                 node->dead = 1;
2134
2135                 // collapse empty slots beneath the fence
2136                 // on interiour nodes
2137
2138                 if( lvl )
2139                  while( idx = set->page->cnt - 1 )
2140                   if( slotptr(set->page, idx)->dead ) {
2141                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2142                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2143                   } else
2144                         break;
2145           }
2146
2147         if( !found )
2148                 return 0;
2149
2150         //      did we delete a fence key in an upper level?
2151
2152         if( lvl && set->page->act && fence )
2153           if( bt_fixfence (mgr, set, lvl, thread_no) )
2154                 return mgr->err_thread = thread_no, mgr->err;
2155           else
2156                 return 0;
2157
2158         //      do we need to collapse root?
2159
2160         if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2161           if( bt_collapseroot (mgr, set, thread_no) )
2162                 return mgr->err_thread = thread_no, mgr->err;
2163           else
2164                 return 0;
2165
2166         //      delete empty page
2167
2168         if( !set->page->act )
2169           return bt_deletepage (mgr, set, thread_no);
2170
2171         set->latch->dirty = 1;
2172         bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2173         bt_unpinlatch (set->latch, thread_no, __LINE__);
2174         return 0;
2175 }
2176
2177 //      advance to next slot
2178
2179 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2180 {
2181 BtLatchSet *prevlatch;
2182 uid page_no;
2183
2184         if( slot < set->page->cnt )
2185                 return slot + 1;
2186
2187         prevlatch = set->latch;
2188
2189         if( page_no = bt_getid(set->page->right) )
2190           if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
2191                 set->page = bt_mappage (bt->mgr, set->latch);
2192           else
2193                 return 0;
2194         else
2195           return bt->mgr->err = BTERR_struct, bt->mgr->err_thread = bt->thread_no, bt->mgr->line = __LINE__, 0;
2196
2197         // obtain access lock using lock chaining with Access mode
2198
2199         bt_lockpage(BtLockAccess, set->latch, bt->thread_no, __LINE__);
2200
2201         bt_unlockpage(BtLockRead, prevlatch, bt->thread_no, __LINE__);
2202         bt_unpinlatch (prevlatch, bt->thread_no, __LINE__);
2203
2204         bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
2205         bt_unlockpage(BtLockAccess, set->latch, bt->thread_no, __LINE__);
2206         return 1;
2207 }
2208
2209 //      find unique key == given key, or first duplicate key in
2210 //      leaf level and return number of value bytes
2211 //      or (-1) if not found.
2212
2213 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2214 {
2215 BtPageSet set[1];
2216 uint len, slot;
2217 int ret = -1;
2218 BtKey *ptr;
2219 BtVal *val;
2220
2221   if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2222    do {
2223         ptr = keyptr(set->page, slot);
2224
2225         //      skip librarian slot place holder
2226
2227         if( slotptr(set->page, slot)->type == Librarian )
2228                 ptr = keyptr(set->page, ++slot);
2229
2230         //      return actual key found
2231
2232         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2233         len = ptr->len;
2234
2235         if( slotptr(set->page, slot)->type == Duplicate )
2236                 len -= BtId;
2237
2238         //      not there if we reach the stopper key
2239
2240         if( slot == set->page->cnt )
2241           if( !bt_getid (set->page->right) )
2242                 break;
2243
2244         // if key exists, return >= 0 value bytes copied
2245         //      otherwise return (-1)
2246
2247         if( slotptr(set->page, slot)->dead )
2248                 continue;
2249
2250         if( keylen == len )
2251           if( !memcmp (ptr->key, key, len) ) {
2252                 val = valptr (set->page,slot);
2253                 if( valmax > val->len )
2254                         valmax = val->len;
2255                 memcpy (value, val->value, valmax);
2256                 ret = valmax;
2257           }
2258
2259         break;
2260
2261    } while( slot = bt_findnext (bt, set, slot) );
2262
2263   bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
2264   bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
2265   return ret;
2266 }
2267
2268 //      check page for space available,
2269 //      clean if necessary and return
2270 //      0 - page needs splitting
2271 //      >0  new slot value
2272
2273 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2274 {
2275 uint page_size = mgr->page_size;
2276 BtPage page = set->page, frame;
2277 uint cnt = 0, idx = 0;
2278 uint max = page->cnt;
2279 uint newslot = max;
2280 BtKey *key;
2281 BtVal *val;
2282
2283         if( !set->page->lvl )
2284                 page_size <<= mgr->leaf_xtra;
2285
2286         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2287                 return slot;
2288
2289         //      skip cleanup and proceed to split
2290         //      if there's not enough garbage
2291         //      to bother with.
2292
2293         if( page->garbage < page_size / 5 )
2294                 return 0;
2295
2296         frame = malloc (page_size);
2297         memcpy (frame, page, page_size);
2298
2299         // skip page info and set rest of page to zero
2300
2301         memset (page+1, 0, page_size - sizeof(*page));
2302         set->latch->dirty = 1;
2303
2304         page->min = page_size;
2305         page->garbage = 0;
2306         page->act = 0;
2307
2308         // clean up page first by
2309         // removing deleted keys
2310
2311         while( cnt++ < max ) {
2312                 if( cnt == slot )
2313                         newslot = idx + 2;
2314
2315                 if( cnt < max || frame->lvl )
2316                   if( slotptr(frame,cnt)->dead )
2317                         continue;
2318
2319                 // copy the value across
2320
2321                 val = valptr(frame, cnt);
2322                 page->min -= val->len + sizeof(BtVal);
2323                 memcpy ((unsigned char *)page + page->min, val, val->len + sizeof(BtVal));
2324
2325                 // copy the key across
2326
2327                 key = keyptr(frame, cnt);
2328                 page->min -= key->len + sizeof(BtKey);
2329                 memcpy ((unsigned char *)page + page->min, key, key->len + sizeof(BtKey));
2330
2331                 // make a librarian slot
2332
2333                 slotptr(page, ++idx)->off = page->min;
2334                 slotptr(page, idx)->type = Librarian;
2335                 slotptr(page, idx)->dead = 1;
2336
2337                 // set up the slot
2338
2339                 slotptr(page, ++idx)->off = page->min;
2340                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2341
2342                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2343                         page->act++;
2344         }
2345
2346         page->cnt = idx;
2347         free (frame);
2348
2349         //      see if page has enough space now, or does it need splitting?
2350
2351         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2352                 return newslot;
2353
2354         return 0;
2355 }
2356
2357 // split the root and raise the height of the btree
2358
2359 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort thread_no)
2360 {  
2361 unsigned char leftkey[BT_keyarray];
2362 uint nxt = mgr->page_size;
2363 unsigned char value[BtId];
2364 BtPageSet left[1];
2365 uid left_page_no;
2366 BtPage frame;
2367 BtKey *ptr;
2368 BtVal *val;
2369
2370         frame = malloc (mgr->page_size);
2371         memcpy (frame, root->page, mgr->page_size);
2372
2373         //      save left page fence key for new root
2374
2375         ptr = keyptr(root->page, root->page->cnt);
2376         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2377
2378         //  Obtain an empty page to use, and copy the current
2379         //  root contents into it, e.g. lower keys
2380
2381         if( bt_newpage(mgr, left, frame, thread_no) )
2382                 return mgr->err_thread = thread_no, mgr->err;
2383
2384         left_page_no = left->latch->page_no;
2385         bt_unpinlatch (left->latch, thread_no, __LINE__);
2386         free (frame);
2387
2388         // preserve the page info at the bottom
2389         // of higher keys and set rest to zero
2390
2391         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2392
2393         // insert stopper key at top of newroot page
2394         // and increase the root height
2395
2396         nxt -= BtId + sizeof(BtVal);
2397         bt_putid (value, right->page_no);
2398         val = (BtVal *)((unsigned char *)root->page + nxt);
2399         memcpy (val->value, value, BtId);
2400         val->len = BtId;
2401
2402         nxt -= 2 + sizeof(BtKey);
2403         slotptr(root->page, 2)->off = nxt;
2404         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2405         ptr->len = 2;
2406         ptr->key[0] = 0xff;
2407         ptr->key[1] = 0xff;
2408
2409         // insert lower keys page fence key on newroot page as first key
2410
2411         nxt -= BtId + sizeof(BtVal);
2412         bt_putid (value, left_page_no);
2413         val = (BtVal *)((unsigned char *)root->page + nxt);
2414         memcpy (val->value, value, BtId);
2415         val->len = BtId;
2416
2417         ptr = (BtKey *)leftkey;
2418         nxt -= ptr->len + sizeof(BtKey);
2419         slotptr(root->page, 1)->off = nxt;
2420         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2421         
2422         bt_putid(root->page->right, 0);
2423         root->page->min = nxt;          // reset lowest used offset and key count
2424         root->page->cnt = 2;
2425         root->page->act = 2;
2426         root->page->lvl++;
2427
2428         mgr->pagezero->alloc->lvl = root->page->lvl;
2429
2430         // release and unpin root pages
2431
2432         bt_unlockpage(BtLockWrite, root->latch, thread_no, __LINE__);
2433         bt_unpinlatch (root->latch, thread_no, __LINE__);
2434
2435         bt_unpinlatch (right, thread_no, __LINE__);
2436         return 0;
2437 }
2438
2439 //  split already locked full node
2440 //      leave it locked.
2441 //      return pool entry for new right
2442 //      page, pinned & unlocked
2443
2444 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2445 {
2446 uint page_size = mgr->page_size;
2447 uint cnt = 0, idx = 0, max;
2448 uint lvl = set->page->lvl;
2449 BtPageSet right[1];
2450 BtKey *key, *ptr;
2451 BtVal *val, *src;
2452 BtPage frame;
2453 uid right2;
2454 uint prev;
2455
2456         if( !set->page->lvl )
2457                 page_size <<= mgr->leaf_xtra;
2458
2459         //  split higher half of keys to frame
2460
2461         frame = malloc (page_size);
2462         memset (frame, 0, page_size);
2463         frame->min = page_size;
2464         max = set->page->cnt;
2465         cnt = max / 2;
2466         idx = 0;
2467
2468         while( cnt++ < max ) {
2469                 if( cnt < max || set->page->lvl )
2470                   if( slotptr(set->page, cnt)->dead )
2471                         continue;
2472
2473                 src = valptr(set->page, cnt);
2474                 frame->min -= src->len + sizeof(BtVal);
2475                 memcpy ((unsigned char *)frame + frame->min, src, src->len + sizeof(BtVal));
2476
2477                 key = keyptr(set->page, cnt);
2478                 frame->min -= key->len + sizeof(BtKey);
2479                 ptr = (BtKey*)((unsigned char *)frame + frame->min);
2480                 memcpy (ptr, key, key->len + sizeof(BtKey));
2481
2482                 //      add librarian slot
2483
2484                 slotptr(frame, ++idx)->off = frame->min;
2485                 slotptr(frame, idx)->type = Librarian;
2486                 slotptr(frame, idx)->dead = 1;
2487
2488                 //  add actual slot
2489
2490                 slotptr(frame, ++idx)->off = frame->min;
2491                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2492
2493                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2494                         frame->act++;
2495         }
2496
2497         frame->cnt = idx;
2498         frame->lvl = lvl;
2499
2500         // link right node
2501
2502         if( set->latch->page_no > ROOT_page )
2503                 bt_putid (frame->right, bt_getid (set->page->right));
2504
2505         //      get new free page and write higher keys to it.
2506
2507         if( bt_newpage(mgr, right, frame, thread_no) )
2508                 return 0;
2509
2510         // process lower keys
2511
2512         memcpy (frame, set->page, page_size);
2513         memset (set->page+1, 0, page_size - sizeof(*set->page));
2514         set->latch->dirty = 1;
2515
2516         set->page->min = page_size;
2517         set->page->garbage = 0;
2518         set->page->act = 0;
2519         max /= 2;
2520         cnt = 0;
2521         idx = 0;
2522
2523         if( slotptr(frame, max)->type == Librarian )
2524                 max--;
2525
2526         //  assemble page of smaller keys
2527
2528         while( cnt++ < max ) {
2529                 if( slotptr(frame, cnt)->dead )
2530                         continue;
2531                 val = valptr(frame, cnt);
2532                 set->page->min -= val->len + sizeof(BtVal);
2533                 memcpy ((unsigned char *)set->page + set->page->min, val, val->len + sizeof(BtVal));
2534
2535                 key = keyptr(frame, cnt);
2536                 set->page->min -= key->len + sizeof(BtKey);
2537                 memcpy ((unsigned char *)set->page + set->page->min, key, key->len + sizeof(BtKey));
2538
2539                 //      add librarian slot
2540
2541                 slotptr(set->page, ++idx)->off = set->page->min;
2542                 slotptr(set->page, idx)->type = Librarian;
2543                 slotptr(set->page, idx)->dead = 1;
2544
2545                 //      add actual slot
2546
2547                 slotptr(set->page, ++idx)->off = set->page->min;
2548                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2549                 set->page->act++;
2550         }
2551
2552         bt_putid(set->page->right, right->latch->page_no);
2553         set->page->cnt = idx;
2554         free(frame);
2555
2556         return right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
2557 }
2558
2559 //      fix keys for newly split page
2560 //      call with both pages pinned & locked
2561 //  return unlocked and unpinned
2562
2563 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2564 {
2565 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2566 unsigned char value[BtId];
2567 uint lvl = set->page->lvl;
2568 BtPage page;
2569 BtKey *ptr;
2570
2571 //if( !lvl )
2572 //abort();
2573         // if current page is the root page, split it
2574
2575         if( set->latch->page_no == ROOT_page )
2576                 return bt_splitroot (mgr, set, right, thread_no);
2577
2578         ptr = keyptr(set->page, set->page->cnt);
2579         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2580
2581         page = bt_mappage (mgr, right);
2582
2583         ptr = keyptr(page, page->cnt);
2584         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2585
2586         // insert new fences in their parent pages
2587
2588         bt_lockpage (BtLockParent, right, thread_no, __LINE__);
2589
2590         bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
2591         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2592
2593         // insert new fence for reformulated left block of smaller keys
2594
2595         bt_putid (value, set->latch->page_no);
2596         ptr = (BtKey *)leftkey;
2597
2598         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2599                 return mgr->err_thread = thread_no, mgr->err;
2600
2601         // switch fence for right block of larger keys to new right page
2602
2603         bt_putid (value, right->page_no);
2604         ptr = (BtKey *)rightkey;
2605
2606         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2607                 return mgr->err_thread = thread_no, mgr->err;
2608
2609         bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
2610         bt_unpinlatch (set->latch, thread_no, __LINE__);
2611
2612         bt_unlockpage (BtLockParent, right, thread_no, __LINE__);
2613         bt_unpinlatch (right, thread_no, __LINE__);
2614         return 0;
2615 }
2616
2617 //      install new key and value onto page
2618 //      page must already be checked for
2619 //      adequate space
2620
2621 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2622 {
2623 uint idx, librarian;
2624 BtSlot *node;
2625 BtKey *ptr;
2626 BtVal *val;
2627 int rate;
2628
2629         //      if found slot > desired slot and previous slot
2630         //      is a librarian slot, use it
2631
2632         if( slot > 1 )
2633           if( slotptr(set->page, slot-1)->type == Librarian )
2634                 slot--;
2635
2636         // copy value onto page
2637
2638         set->page->min -= vallen + sizeof(BtVal);
2639         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2640         memcpy (val->value, value, vallen);
2641         val->len = vallen;
2642
2643         // copy key onto page
2644
2645         set->page->min -= keylen + sizeof(BtKey);
2646         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2647         memcpy (ptr->key, key, keylen);
2648         ptr->len = keylen;
2649         
2650         //      find first empty slot
2651
2652         for( idx = slot; idx < set->page->cnt; idx++ )
2653           if( slotptr(set->page, idx)->dead )
2654                 break;
2655
2656         // now insert key into array before slot,
2657         //      adding as many librarian slots as
2658         //      makes sense.
2659
2660         if( idx == set->page->cnt ) {
2661         int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2662
2663                 librarian = ++idx - slot;
2664                 avail /= sizeof(BtSlot);
2665
2666                 if( avail < 0 )
2667                         avail = 0;
2668
2669                 if( librarian > avail )
2670                         librarian = avail;
2671
2672                 if( librarian ) {
2673                         rate = (idx - slot) / librarian;
2674                         set->page->cnt += librarian;
2675                         idx += librarian;
2676                 } else
2677                         rate = 0;
2678         } else
2679                 librarian = 0, rate = 0;
2680
2681         while( idx > slot ) {
2682                 //  transfer slot
2683                 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2684                 idx--;
2685
2686                 //      add librarian slot per rate
2687
2688                 if( librarian )
2689                  if( (idx - slot + 1)/2 <= librarian * rate ) {
2690                         node = slotptr(set->page, idx--);
2691                         node->off = node[1].off;
2692                         node->type = Librarian;
2693                         node->dead = 1;
2694                         librarian--;
2695                   }
2696         }
2697
2698         set->latch->dirty = 1;
2699         set->page->act++;
2700
2701         //      fill in new slot
2702
2703         node = slotptr(set->page, slot);
2704         node->off = set->page->min;
2705         node->type = type;
2706         node->dead = 0;
2707         return 0;
2708 }
2709
2710 //  Insert new key into the btree at given level.
2711 //      either add a new key or update/add an existing one
2712
2713 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2714 {
2715 uint slot, idx, len, entry;
2716 BtPageSet set[1];
2717 BtSlot *node;
2718 BtKey *ptr;
2719 BtVal *val;
2720
2721   while( 1 ) { // find the page and slot for the current key
2722         if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2723                 node = slotptr(set->page, slot);
2724                 ptr = keyptr(set->page, slot);
2725         } else {
2726                 if( !mgr->err )
2727                         mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2728                 return mgr->err_thread = thread_no, mgr->err;
2729         }
2730
2731         // if librarian slot == found slot, advance to real slot
2732
2733         if( node->type == Librarian )
2734           if( !keycmp (ptr, key, keylen) ) {
2735                 ptr = keyptr(set->page, ++slot);
2736                 node = slotptr(set->page, slot);
2737           }
2738
2739         //  if inserting a duplicate key or unique
2740         //      key that doesn't exist on the page,
2741         //      check for adequate space on the page
2742         //      and insert the new key before slot.
2743
2744         switch( type ) {
2745         case Unique:
2746         case Duplicate:
2747           if( keycmp (ptr, key, keylen) )
2748            if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) ) {
2749             if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
2750                   return mgr->err;
2751
2752                 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2753                 bt_unpinlatch (set->latch, thread_no, __LINE__);
2754                 return 0;
2755            } else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2756                 return mgr->err_thread = thread_no, mgr->err;
2757            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2758                 return mgr->err_thread = thread_no, mgr->err;
2759            else
2760                 continue;
2761
2762           // if key already exists, update value and return
2763
2764           val = valptr(set->page, slot);
2765
2766           if( val->len >= vallen ) {
2767                 if( slotptr(set->page, slot)->dead )
2768                         set->page->act++;
2769                 node->type = type;
2770                 node->dead = 0;
2771
2772                 set->page->garbage += val->len - vallen;
2773                 set->latch->dirty = 1;
2774                 val->len = vallen;
2775                 memcpy (val->value, value, vallen);
2776                 bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2777                 bt_unpinlatch (set->latch, thread_no, __LINE__);
2778                 return 0;
2779           }
2780
2781           //  new update value doesn't fit in existing value area
2782           //    make sure page has room
2783
2784           if( !node->dead )
2785                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2786           else
2787                 set->page->act++;
2788
2789           node->type = type;
2790           node->dead = 0;
2791
2792           if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2793            if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2794                 return mgr->err_thread = thread_no, mgr->err;
2795            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2796                 return mgr->err_thread = thread_no, mgr->err;
2797            else
2798                 continue;
2799
2800           //  copy key and value onto page and update slot
2801
2802           set->page->min -= vallen + sizeof(BtVal);
2803           val = (BtVal*)((unsigned char *)set->page + set->page->min);
2804           memcpy (val->value, value, vallen);
2805           val->len = vallen;
2806
2807           set->latch->dirty = 1;
2808           set->page->min -= keylen + sizeof(BtKey);
2809           ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2810           memcpy (ptr->key, key, keylen);
2811           ptr->len = keylen;
2812         
2813           node->off = set->page->min;
2814           bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2815           bt_unpinlatch (set->latch, thread_no, __LINE__);
2816           return 0;
2817     }
2818   }
2819   return 0;
2820 }
2821
2822 //      determine actual page where key is located
2823 //  return slot number
2824
2825 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2826 {
2827 BtKey *key = keyptr(source,src), *ptr;
2828 unsigned char fence[BT_keyarray];
2829 uint entry, slot;
2830
2831         if( locks[src].reuse )
2832           entry = locks[src-1].entry;
2833         else
2834           entry = locks[src].entry;
2835
2836         //      find where our key is located 
2837         //      on current page or pages split on
2838         //      same page txn operations.
2839
2840         do {
2841                 set->latch = mgr->leafsets + entry;
2842                 set->page = bt_mappage (mgr, set->latch);
2843
2844                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2845                   if( slotptr(set->page, slot)->type == Librarian )
2846                         slot++;
2847                   if( locks[src].reuse )
2848                         locks[src].entry = entry;
2849                   return slot;
2850                 }
2851         } while( entry = set->latch->split );
2852
2853         ptr = keyptr (set->page, set->page->cnt);
2854         memcpy (fence, ptr, ptr->len + 1);
2855
2856         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2857         return 0;
2858 }
2859
2860 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2861 {
2862 BtKey *key = keyptr(source, src);
2863 BtVal *val = valptr(source, src);
2864 BtLatchSet *latch;
2865 BtPageSet set[1];
2866 uint entry, slot;
2867
2868   while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2869         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2870           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type) )
2871                 return mgr->err_thread = thread_no, mgr->err;
2872
2873           set->page->lsn = lsn;
2874           return 0;
2875         }
2876
2877         //  split page
2878
2879         if( entry = bt_splitpage (mgr, set, thread_no) )
2880           latch = mgr->leafsets + entry;
2881         else
2882           return mgr->err_thread = thread_no, mgr->err;
2883
2884         //      splice right page into split chain
2885         //      and WriteLock it
2886
2887 //fprintf(stderr, "SplitPg %d by %d at %d\n", (uint)latch->page_no, thread_no, __LINE__); 
2888         bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2889         latch->split = set->latch->split;
2890         set->latch->split = entry;
2891   }
2892
2893   return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
2894 }
2895
2896 //      perform delete from smaller btree
2897 //  insert a delete slot if not found there
2898
2899 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2900 {
2901 BtKey *key = keyptr(source, src);
2902 BtPageSet set[1];
2903 uint idx, slot;
2904 BtSlot *node;
2905 BtKey *ptr;
2906 BtVal *val;
2907
2908         if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2909           node = slotptr(set->page, slot);
2910           ptr = keyptr(set->page, slot);
2911           val = valptr(set->page, slot);
2912         } else
2913           return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
2914
2915         //  if slot is not found, insert a delete slot
2916
2917         if( keycmp (ptr, key->key, key->len) )
2918           if( bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete) )
2919                 return mgr->err;
2920
2921         //      if node is already dead,
2922         //      ignore the request.
2923
2924         if( node->dead )
2925                 return 0;
2926
2927         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2928         set->latch->dirty = 1;
2929         set->page->lsn = lsn;
2930         set->page->act--;
2931
2932         node->dead = 0;
2933         __sync_fetch_and_add(&mgr->found, 1);
2934         return 0;
2935 }
2936
2937 //      release master's splits from right to left
2938
2939 void bt_atomicrelease (BtMgr *mgr, uint entry, ushort thread_no)
2940 {
2941 BtLatchSet *latch = mgr->leafsets + entry;
2942
2943         if( latch->split )
2944                 bt_atomicrelease (mgr, latch->split, thread_no);
2945
2946         latch->split = 0;
2947         bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
2948         bt_unpinlatch(latch, thread_no, __LINE__);
2949 }
2950
2951 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2952 {
2953 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2954 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2955
2956         return keycmp (key1, key2->key, key2->len);
2957 }
2958 //      atomic modification of a batch of keys.
2959
2960 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2961 {
2962 uint src, idx, slot, samepage, entry, que = 0;
2963 BtKey *key, *ptr, *key2;
2964 int result = 0;
2965 BtSlot temp[1];
2966 logseqno lsn;
2967 int type;
2968
2969   // stable sort the list of keys into order to
2970   //    prevent deadlocks between threads.
2971 /*
2972   for( src = 1; src++ < source->cnt; ) {
2973         *temp = *slotptr(source,src);
2974         key = keyptr (source,src);
2975
2976         for( idx = src; --idx; ) {
2977           key2 = keyptr (source,idx);
2978           if( keycmp (key, key2->key, key2->len) < 0 ) {
2979                 *slotptr(source,idx+1) = *slotptr(source,idx);
2980                 *slotptr(source,idx) = *temp;
2981           } else
2982                 break;
2983         }
2984   }
2985 */
2986         qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2987   //  add entries to redo log
2988
2989   if( bt->mgr->pagezero->redopages )
2990         lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2991   else
2992         lsn = 0;
2993
2994   //  perform the individual actions in the transaction
2995
2996   if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2997         return bt->mgr->err;
2998
2999   // if number of active pages
3000   // is greater than the buffer pool
3001   // promote page into larger btree
3002
3003   if( bt->main )
3004    while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->thread_no )
3005         if( bt_txnpromote (bt) )
3006           return bt->mgr->err;
3007
3008   // return success
3009
3010   return 0;
3011 }
3012
3013 //      execute the source list of inserts/deletes
3014
3015 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
3016 {
3017 unsigned char fencekey[BT_keyarray], prvfence[BT_keyarray];
3018 uint src, idx, samepage, entry;
3019 BtPageSet set[1], prev[1];
3020 unsigned char value[BtId];
3021 BtLatchSet *latch;
3022 uid right_page_no;
3023 AtomicTxn *locks;
3024 BtKey *key, *ptr, *prv, *cur;
3025 BtPage page;
3026 BtVal *val;
3027
3028 uint slot, prvslot;
3029
3030   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
3031
3032   // Load the leaf page for each key
3033   // group same page references with reuse bit
3034
3035   for( src = 0; src++ < source->cnt; ) {
3036         key = keyptr(source, src);
3037
3038         // first determine if this modification falls
3039         // on the same page as the previous modification
3040         //      note that the far right leaf page is a special case
3041
3042         if( samepage = src > 1 )
3043           samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
3044
3045         if( !samepage )
3046           if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
3047 //memcpy (prvfence, fencekey, 10),
3048 //src>1 ?  memcpy (fencekey, ptr->key, 10):NULL,
3049 //prv = ptr,
3050 //cur = keyptr(set->page, slot),
3051                 ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
3052           else
3053                 return mgr->err;
3054
3055         entry = set->latch - mgr->leafsets;
3056
3057         //  is this actually on the same page?
3058
3059 //if( !samepage )
3060 //      for( idx = src; --idx; )
3061 //if( entry == locks[idx].entry ) {
3062 //fprintf(stderr, "Dup page %d by thread %d\n", (uint)set->latch->page_no, thread_no);
3063 //              abort();
3064 //        }
3065
3066         locks[src].reuse = samepage;
3067         locks[src].entry = entry;
3068
3069         //      capture current lsn for master page
3070
3071         locks[src].reqlsn = set->page->lsn;
3072   }
3073
3074   // insert or delete each key
3075   // process any splits or merges
3076   // run through txn list backwards
3077
3078   samepage = source->cnt + 1;
3079
3080   for( src = source->cnt; src; src-- ) {
3081         if( locks[src].reuse )
3082           continue;
3083
3084         //  perform the txn operations
3085         //      from smaller to larger on
3086         //  the same page
3087
3088         for( idx = src; idx < samepage; idx++ )
3089          switch( slotptr(source,idx)->type ) {
3090          case Delete:
3091           if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
3092                 return mgr->err;
3093           break;
3094
3095         case Duplicate:
3096         case Unique:
3097           if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
3098                 return mgr->err;
3099           break;
3100
3101         default:
3102           bt_atomicpage (mgr, source, locks, idx, set);
3103           break;
3104         }
3105
3106         //      after the same page operations have finished,
3107         //  process master page for splits or deletion.
3108
3109         latch = prev->latch = mgr->leafsets + locks[src].entry;
3110         prev->page = bt_mappage (mgr, prev->latch);
3111         samepage = src;
3112
3113         //  pick-up all splits from master page
3114         //      each one is already pinned & WriteLocked.
3115
3116         while( entry = prev->latch->split ) {
3117           set->latch = mgr->leafsets + entry;
3118           set->page = bt_mappage (mgr, set->latch);
3119
3120           // delete empty master page by undoing its split
3121           //  (this is potentially another empty page)
3122           //  note that there are no pointers to it yet
3123
3124           if( !prev->page->act ) {
3125                 memcpy (set->page->left, prev->page->left, BtId);
3126                 memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
3127                 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3128                 prev->latch->split = set->latch->split;
3129                 prev->latch->dirty = 1;
3130                 bt_freepage (mgr, set, thread_no);
3131                 continue;
3132           }
3133
3134           // remove empty split page from the split chain
3135           // and return it to the free list. No other
3136           // thread has its page number yet.
3137
3138           if( !set->page->act ) {
3139                 memcpy (prev->page->right, set->page->right, BtId);
3140                 prev->latch->split = set->latch->split;
3141
3142                 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3143                 bt_freepage (mgr, set, thread_no);
3144                 continue;
3145           }
3146
3147           //  update prev's fence key
3148
3149           ptr = keyptr(prev->page,prev->page->cnt);
3150           bt_putid (value, prev->latch->page_no);
3151 //fprintf(stderr, "KeyIns for %d by %d at %d\n", (uint)prev->latch->page_no, thread_no, __LINE__);
3152           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3153                 return mgr->err;
3154
3155           // splice in the left link into the split page
3156
3157           bt_putid (set->page->left, prev->latch->page_no);
3158
3159           if( lsm )
3160                 bt_syncpage (mgr, prev->page, prev->latch);
3161
3162           *prev = *set;
3163         }
3164
3165         //  update left pointer in next right page from last split page
3166         //      (if all splits were reversed or none occurred, latch->split == 0)
3167
3168         if( latch->split ) {
3169           //  fix left pointer in master's original (now split)
3170           //  far right sibling or set rightmost page in page zero
3171
3172           if( right_page_no = bt_getid (prev->page->right) ) {
3173                 if( set->latch = bt_pinleaf (mgr, right_page_no, thread_no) )
3174                   set->page = bt_mappage (mgr, set->latch);
3175                 else
3176                   return mgr->err;
3177
3178             bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3179             bt_putid (set->page->left, prev->latch->page_no);
3180                 set->latch->dirty = 1;
3181
3182             bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
3183                 bt_unpinlatch (set->latch, thread_no, __LINE__);
3184           } else {      // prev is rightmost page
3185             bt_mutexlock (mgr->lock);
3186                 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3187             bt_releasemutex(mgr->lock);
3188           }
3189
3190           //  switch the original fence key from the
3191           //  master page to the last split page.
3192
3193           ptr = keyptr(prev->page,prev->page->cnt);
3194           bt_putid (value, prev->latch->page_no);
3195
3196 //fprintf(stderr, "KeyIns for %d by %d at %d\n", (uint)prev->latch->page_no, thread_no, __LINE__);
3197           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3198                 return mgr->err;
3199
3200           if( lsm )
3201                 bt_syncpage (mgr, prev->page, prev->latch);
3202
3203           //  unlock and unpin the split pages
3204
3205           bt_atomicrelease (mgr, latch->split, thread_no);
3206
3207           //  unlock and unpin the master page
3208
3209           latch->split = 0;
3210           bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
3211           bt_unpinlatch(latch, thread_no, __LINE__);
3212           continue;
3213         }
3214
3215         //      since there are no splits remaining, we're
3216         //  finished if master page occupied
3217
3218         if( prev->page->act ) {
3219           bt_unlockpage(BtLockWrite, prev->latch, thread_no, __LINE__);
3220
3221           if( lsm )
3222                 bt_syncpage (mgr, prev->page, prev->latch);
3223
3224           bt_unpinlatch(prev->latch, thread_no, __LINE__);
3225           continue;
3226         }
3227
3228         // any and all splits were reversed, and the
3229         // master page located in prev is empty, delete it
3230
3231         if( bt_deletepage (mgr, prev, thread_no) )
3232                 return mgr->err;
3233   }
3234
3235   free (locks);
3236   return 0;
3237 }
3238
3239 //  pick & promote a page into the larger btree
3240
3241 BTERR bt_txnpromote (BtDb *bt)
3242 {
3243 uint entry, slot, idx;
3244 BtPageSet set[1];
3245 BtSlot *node;
3246 BtKey *ptr;
3247 BtVal *val;
3248
3249   while( 1 ) {
3250 #ifdef unix
3251         entry = __sync_fetch_and_add(&bt->mgr->leafpromote, 1);
3252 #else
3253         entry = _InterlockedIncrement (&bt->mgr->leafpromote) - 1;
3254 #endif
3255         entry %= bt->mgr->leaftotal;
3256
3257         if( !entry )
3258                 continue;
3259
3260         set->latch = bt->mgr->leafsets + entry;
3261
3262         if( !bt_mutextry(set->latch->modify) )
3263                 continue;
3264
3265         //  skip this entry if it is pinned
3266
3267         if( set->latch->pin & ~CLOCK_bit ) {
3268           bt_releasemutex(set->latch->modify);
3269           continue;
3270         }
3271
3272         set->page = bt_mappage (bt->mgr, set->latch);
3273
3274         // entry never used or has no left or right sibling
3275
3276         if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3277           bt_releasemutex(set->latch->modify);
3278           continue;
3279         }
3280
3281         // entry interiour node or being killed or constructed
3282
3283         if( set->page->lvl || set->page->nopromote || set->page->kill ) {
3284           bt_releasemutex(set->latch->modify);
3285           continue;
3286         }
3287
3288         //  pin the page for our access
3289         //      and leave it locked for the
3290         //      duration of the promotion.
3291
3292         set->latch->pin++;
3293         bt_lockpage (BtLockWrite, set->latch, bt->thread_no, __LINE__);
3294         bt_releasemutex(set->latch->modify);
3295
3296         // transfer slots in our selected page to the main btree
3297 if( !(entry % 100) )
3298 fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_no, set->page->act);
3299
3300         if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
3301                 fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
3302                 return bt->main->err;
3303         }
3304
3305         //  now delete the page
3306
3307         if( bt_deletepage (bt->mgr, set, bt->thread_no) )
3308                 fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread);
3309
3310         return bt->mgr->err;
3311   }
3312 }
3313
3314 //      set cursor to highest slot on highest page
3315
3316 uint bt_lastkey (BtDb *bt)
3317 {
3318 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3319 BtPageSet set[1];
3320
3321         if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
3322                 set->page = bt_mappage (bt->mgr, set->latch);
3323         else
3324                 return 0;
3325
3326     bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3327         memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3328     bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3329         bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3330         return bt->cursor->cnt;
3331 }
3332
3333 //      return previous slot on cursor page
3334
3335 uint bt_prevkey (BtDb *bt, uint slot)
3336 {
3337 uid cursor_page = bt->cursor->page_no;
3338 uid ourright, next, us = cursor_page;
3339 BtPageSet set[1];
3340
3341         if( --slot )
3342                 return slot;
3343
3344         ourright = bt_getid(bt->cursor->right);
3345
3346 goleft:
3347         if( !(next = bt_getid(bt->cursor->left)) )
3348                 return 0;
3349
3350 findourself:
3351         cursor_page = next;
3352
3353         if( set->latch = bt_pinleaf (bt->mgr, next, bt->thread_no) )
3354                 set->page = bt_mappage (bt->mgr, set->latch);
3355         else
3356                 return 0;
3357
3358     bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3359         memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3360         bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3361         bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3362         
3363         next = bt_getid (bt->cursor->right);
3364
3365         if( bt->cursor->kill )
3366                 goto findourself;
3367
3368         if( next != us )
3369           if( next == ourright )
3370                 goto goleft;
3371           else
3372                 goto findourself;
3373
3374         return bt->cursor->cnt;
3375 }
3376
3377 //  return next slot on cursor page
3378 //  or slide cursor right into next page
3379
3380 uint bt_nextkey (BtDb *bt, uint slot)
3381 {
3382 BtPageSet set[1];
3383 uid right;
3384
3385   do {
3386         right = bt_getid(bt->cursor->right);
3387
3388         while( slot++ < bt->cursor->cnt )
3389           if( slotptr(bt->cursor,slot)->dead )
3390                 continue;
3391           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3392                 return slot;
3393           else
3394                 break;
3395
3396         if( !right )
3397                 break;
3398
3399         if( set->latch = bt_pinleaf (bt->mgr, right, bt->thread_no) )
3400                 set->page = bt_mappage (bt->mgr, set->latch);
3401         else
3402                 return 0;
3403
3404     bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3405         memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3406         bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3407         bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3408         slot = 0;
3409
3410   } while( 1 );
3411
3412   return bt->mgr->err = 0;
3413 }
3414
3415 //  cache page of keys into cursor and return starting slot for given key
3416
3417 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3418 {
3419 BtPageSet set[1];
3420 uint slot;
3421
3422         // cache page for retrieval
3423
3424         if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3425           memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3426         else
3427           return 0;
3428
3429         bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3430         bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3431         return slot;
3432 }
3433
3434 #ifdef STANDALONE
3435
3436 #ifndef unix
3437 double getCpuTime(int type)
3438 {
3439 FILETIME crtime[1];
3440 FILETIME xittime[1];
3441 FILETIME systime[1];
3442 FILETIME usrtime[1];
3443 SYSTEMTIME timeconv[1];
3444 double ans = 0;
3445
3446         memset (timeconv, 0, sizeof(SYSTEMTIME));
3447
3448         switch( type ) {
3449         case 0:
3450                 GetSystemTimeAsFileTime (xittime);
3451                 FileTimeToSystemTime (xittime, timeconv);
3452                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3453                 break;
3454         case 1:
3455                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3456                 FileTimeToSystemTime (usrtime, timeconv);
3457                 break;
3458         case 2:
3459                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3460                 FileTimeToSystemTime (systime, timeconv);
3461                 break;
3462         }
3463
3464         ans += (double)timeconv->wHour * 3600;
3465         ans += (double)timeconv->wMinute * 60;
3466         ans += (double)timeconv->wSecond;
3467         ans += (double)timeconv->wMilliseconds / 1000;
3468         return ans;
3469 }
3470 #else
3471 #include <time.h>
3472 #include <sys/resource.h>
3473
3474 double getCpuTime(int type)
3475 {
3476 struct rusage used[1];
3477 struct timeval tv[1];
3478
3479         switch( type ) {
3480         case 0:
3481                 gettimeofday(tv, NULL);
3482                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3483
3484         case 1:
3485                 getrusage(RUSAGE_SELF, used);
3486                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3487
3488         case 2:
3489                 getrusage(RUSAGE_SELF, used);
3490                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3491         }
3492
3493         return 0;
3494 }
3495 #endif
3496
3497 void bt_poolaudit (BtMgr *mgr)
3498 {
3499 BtLatchSet *latch;
3500 uint entry = 0;
3501
3502         while( ++entry < mgr->latchtotal ) {
3503                 latch = mgr->latchsets + entry;
3504
3505                 if( *latch->readwr->wrt->value )
3506                         fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3507
3508 //              if( *latch->access->bits->tid )
3509 //                      fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3510
3511 //              if( *latch->parent->bits->tid )
3512 //                      fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3513
3514                 if( *latch->modify->value )
3515                         fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3516
3517                 if( latch->pin & ~CLOCK_bit )
3518                         fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3519         }
3520 }
3521
3522 typedef struct {
3523         char idx;
3524         char *type;
3525         char *infile;
3526         BtMgr *main;
3527         BtMgr *mgr;
3528         int num;
3529 } ThreadArg;
3530
3531 //  standalone program to index file of keys
3532 //  then list them onto std-out
3533
3534 #ifdef unix
3535 void *index_file (void *arg)
3536 #else
3537 uint __stdcall index_file (void *arg)
3538 #endif
3539 {
3540 int line = 0, found = 0, cnt = 0, idx;
3541 uid next, page_no = LEAF_page;  // start on first page of leaves
3542 int ch, len = 0, slot, type = 0;
3543 unsigned char key[BT_maxkey];
3544 unsigned char txn[65536];
3545 ThreadArg *args = arg;
3546 BtPage page, frame;
3547 BtPageSet set[1];
3548 uint nxt = 65536;
3549 BtKey *ptr;
3550 BtVal *val;
3551 BtDb *bt;
3552 FILE *in;
3553
3554         bt = bt_open (args->mgr, args->main);
3555         page = (BtPage)txn;
3556
3557         if( args->idx < strlen (args->type) )
3558                 ch = args->type[args->idx];
3559         else
3560                 ch = args->type[strlen(args->type) - 1];
3561
3562         switch(ch | 0x20)
3563         {
3564         case 'd':
3565                 type = Delete;
3566
3567         case 'p':
3568                 if( !type )
3569                         type = Unique;
3570
3571                 if( args->num )
3572                  if( type == Delete )
3573                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3574                  else
3575                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3576                 else
3577                  if( type == Delete )
3578                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3579                  else
3580                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3581
3582                 if( in = fopen (args->infile, "rb") )
3583                   while( ch = getc(in), ch != EOF )
3584                         if( ch == '\n' )
3585                         {
3586                           line++;
3587
3588                           if( !args->num ) {
3589                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3590                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3591                             len = 0;
3592                                 continue;
3593                           }
3594
3595                           nxt -= len - 10;
3596                           memcpy (txn + nxt, key + 10, len - 10);
3597                           nxt -= 1;
3598                           txn[nxt] = len - 10;
3599                           nxt -= 10;
3600                           memcpy (txn + nxt, key, 10);
3601                           nxt -= 1;
3602                           txn[nxt] = 10;
3603                           slotptr(page,++cnt)->off  = nxt;
3604                           slotptr(page,cnt)->type = type;
3605                           len = 0;
3606
3607                           if( cnt < args->num )
3608                                 continue;
3609
3610                           page->cnt = cnt;
3611                           page->act = cnt;
3612                           page->min = nxt;
3613
3614                           if( bt_atomictxn (bt, page) )
3615                                 fprintf(stderr, "Error %d Line: %d by %d source: %d\n", bt->mgr->err, bt->mgr->line, bt->mgr->err_thread, line), exit(0);
3616                           nxt = sizeof(txn);
3617                           cnt = 0;
3618
3619                         }
3620                         else if( len < BT_maxkey )
3621                                 key[len++] = ch;
3622                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3623                 break;
3624
3625         case 'w':
3626                 fprintf(stderr, "started indexing for %s\n", args->infile);
3627                 if( in = fopen (args->infile, "r") )
3628                   while( ch = getc(in), ch != EOF )
3629                         if( ch == '\n' )
3630                         {
3631                           line++;
3632
3633                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3634                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3635                           len = 0;
3636                         }
3637                         else if( len < BT_maxkey )
3638                                 key[len++] = ch;
3639                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3640                 break;
3641
3642         case 'f':
3643                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3644                 if( in = fopen (args->infile, "rb") )
3645                   while( ch = getc(in), ch != EOF )
3646                         if( ch == '\n' )
3647                         {
3648                           line++;
3649                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3650                                 found++;
3651                           else if( bt->mgr->err )
3652                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3653                           len = 0;
3654                         }
3655                         else if( len < BT_maxkey )
3656                                 key[len++] = ch;
3657                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3658                 break;
3659
3660         case 's':
3661                 fprintf(stderr, "started scanning\n");
3662                 
3663                 do {
3664                         if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
3665                                 set->page = bt_mappage (bt->mgr, set->latch);
3666                         else
3667                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3668
3669                         bt_lockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3670                         next = bt_getid (set->page->right);
3671
3672                         for( slot = 0; slot++ < set->page->cnt; )
3673                          if( next || slot < set->page->cnt )
3674                           if( !slotptr(set->page, slot)->dead ) {
3675                                 ptr = keyptr(set->page, slot);
3676                                 len = ptr->len;
3677
3678                             if( slotptr(set->page, slot)->type == Duplicate )
3679                                         len -= BtId;
3680
3681                                 fwrite (ptr->key, len, 1, stdout);
3682                                 val = valptr(set->page, slot);
3683                                 fwrite (val->value, val->len, 1, stdout);
3684                                 fputc ('\n', stdout);
3685                                 cnt++;
3686                            }
3687
3688                         bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3689                         bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3690                 } while( page_no = next );
3691
3692                 fprintf(stderr, " Total keys read %d\n", cnt);
3693                 break;
3694
3695         case 'r':
3696                 fprintf(stderr, "started reverse scan\n");
3697                 if( slot = bt_lastkey (bt) )
3698                    while( slot = bt_prevkey (bt, slot) ) {
3699                         if( slotptr(bt->cursor, slot)->dead )
3700                           continue;
3701
3702                         ptr = keyptr(bt->cursor, slot);
3703                         len = ptr->len;
3704
3705                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3706                                 len -= BtId;
3707
3708                         fwrite (ptr->key, len, 1, stdout);
3709                         val = valptr(bt->cursor, slot);
3710                         fwrite (val->value, val->len, 1, stdout);
3711                         fputc ('\n', stdout);
3712                         cnt++;
3713                   }
3714
3715                 fprintf(stderr, " Total keys read %d\n", cnt);
3716                 break;
3717
3718         case 'c':
3719 #ifdef unix
3720                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3721 #endif
3722                 fprintf(stderr, "started counting\n");
3723                 next = bt->mgr->redopage + bt->mgr->pagezero->redopages;
3724
3725                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3726                         if( bt_readpage (bt->mgr, bt->cursor, page_no, 1) )
3727                                 break;
3728
3729                         if( !bt->cursor->free && !bt->cursor->lvl )
3730                                 cnt += bt->cursor->act;
3731
3732                         bt->mgr->reads++;
3733                         page_no = next++;
3734                 }
3735                 
3736                 cnt--;  // remove stopper key
3737                 fprintf(stderr, " Total keys counted %d\n", cnt);
3738                 break;
3739         }
3740
3741         bt_close (bt);
3742 #ifdef unix
3743         return NULL;
3744 #else
3745         return 0;
3746 #endif
3747 }
3748
3749 typedef struct timeval timer;
3750
3751 int main (int argc, char **argv)
3752 {
3753 int idx, cnt, len, slot, err;
3754 double start, stop;
3755 #ifdef unix
3756 pthread_t *threads;
3757 #else
3758 HANDLE *threads;
3759 #endif
3760 ThreadArg *args;
3761 uint mainleafpool = 0;
3762 uint mainleafxtra = 0;
3763 uint redopages = 0;
3764 uint poolsize = 0;
3765 uint leafpool = 0;
3766 uint leafxtra = 0;
3767 uint mainpool = 0;
3768 uint mainbits = 0;
3769 int bits = 16;
3770 float elapsed;
3771 int num = 0;
3772 char key[1];
3773 BtMgr *main;
3774 BtMgr *mgr;
3775 BtKey *ptr;
3776
3777         if( argc < 3 ) {
3778                 fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
3779                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
3780                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
3781                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3782                 fprintf (stderr, "  pagebits is the page size in bits for the cache btree\n");
3783                 fprintf (stderr, "  leafbits is the number of xtra bits for a leaf page\n");
3784                 fprintf (stderr, "  poolsize is the number of pages in buffer pool for the cache btree\n");
3785                 fprintf (stderr, "  leafpool is the number of leaf pages in leaf pool for the cache btree\n");
3786                 fprintf (stderr, "  txnsize = n to block transactions into n units, or zero for no transactions\n");
3787                 fprintf (stderr, "  redopages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3788                 fprintf (stderr, "  mainbits is the page size of the main btree in bits\n");
3789                 fprintf (stderr, "  mainpool is the number of main pages in the main buffer pool\n");
3790                 fprintf (stderr, "  mainleaf is the number of main pages in the main leaf pool\n");
3791                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3792                 exit(0);
3793         }
3794
3795         start = getCpuTime(0);
3796
3797         if( argc > 4 )
3798                 bits = atoi(argv[4]);
3799
3800         if( argc > 5 )
3801                 leafxtra = atoi(argv[5]);
3802
3803         if( argc > 6 )
3804                 poolsize = atoi(argv[6]);
3805
3806         if( !poolsize )
3807                 fprintf (stderr, "no page pool\n"), exit(1);
3808
3809         if( argc > 7 )
3810                 leafpool = atoi(argv[7]);
3811
3812         if( argc > 8 )
3813                 num = atoi(argv[8]);
3814
3815         if( argc > 9 )
3816                 redopages = atoi(argv[9]);
3817
3818         if( redopages > 65535 )
3819                 fprintf (stderr, "Recovery buffer too large\n"), exit(1);
3820
3821         if( argc > 10 )
3822                 mainbits = atoi(argv[10]);
3823
3824         if( argc > 11 )
3825                 mainleafxtra = atoi(argv[11]);
3826
3827         if( argc > 12 )
3828                 mainpool = atoi(argv[12]);
3829
3830         if( argc > 13 )
3831                 mainleafpool = atoi(argv[13]);
3832
3833         cnt = argc - 14;
3834 #ifdef unix
3835         threads = malloc (cnt * sizeof(pthread_t));
3836 #else
3837         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3838 #endif
3839         args = malloc ((cnt + 1) * sizeof(ThreadArg));
3840
3841         mgr = bt_mgr (argv[1], bits, leafxtra, poolsize, leafpool, redopages);
3842
3843         if( !mgr ) {
3844                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3845                 exit (1);
3846         }
3847
3848         if( mainbits ) {
3849                 main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool, mainleafpool, 0);
3850
3851                 if( !main ) {
3852                         fprintf(stderr, "Index Open Error %s\n", argv[2]);
3853                         exit (1);
3854                 }
3855         } else
3856                 main = NULL;
3857
3858         //      fire off threads
3859
3860         if( cnt )
3861           for( idx = 0; idx < cnt; idx++ ) {
3862                 args[idx].infile = argv[idx + 14];
3863                 args[idx].type = argv[3];
3864                 args[idx].main = main;
3865                 args[idx].mgr = mgr;
3866                 args[idx].num = num;
3867                 args[idx].idx = idx;
3868 #ifdef unix
3869                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3870                         fprintf(stderr, "Error creating thread %d\n", err);
3871 #else
3872                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3873 #endif
3874           }
3875         else {
3876                 args[0].infile = argv[idx + 10];
3877                 args[0].type = argv[3];
3878                 args[0].main = main;
3879                 args[0].mgr = mgr;
3880                 args[0].num = num;
3881                 args[0].idx = 0;
3882                 index_file (args);
3883         }
3884
3885         //      wait for termination
3886
3887 #ifdef unix
3888         for( idx = 0; idx < cnt; idx++ )
3889                 pthread_join (threads[idx], NULL);
3890 #else
3891         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3892
3893         for( idx = 0; idx < cnt; idx++ )
3894                 CloseHandle(threads[idx]);
3895 #endif
3896         bt_poolaudit(mgr);
3897
3898         if( main )
3899                 bt_poolaudit(main);
3900
3901         fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
3902
3903         if( main )
3904                 bt_mgrclose (main);
3905         bt_mgrclose (mgr);
3906
3907         elapsed = getCpuTime(0) - start;
3908         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3909         elapsed = getCpuTime(1);
3910         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3911         elapsed = getCpuTime(2);
3912         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3913 }
3914
3915 #endif  //STANDALONE