]> pd.if.org Git - btree/blob - threadskv10.c
Remove extra assert traps, finalize mutex lock.
[btree] / threadskv10.c
1 // btree version threadskv10 futex version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      redo log for failure recovery
10 //      and LSM B-trees for write optimization
11
12 // 11 OCT 2014
13
14 // author: karl malbrain, malbrain@cal.berkeley.edu
15
16 /*
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
19
20 The orginal author is Karl Malbrain.
21
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
28 */
29
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
32
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
35
36 #ifdef linux
37 #define _GNU_SOURCE
38 #include <linux/futex.h>
39 #define SYS_futex 202
40 #endif
41
42 #ifdef unix
43 #include <unistd.h>
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <fcntl.h>
47 #include <sys/time.h>
48 #include <sys/mman.h>
49 #include <errno.h>
50 #include <pthread.h>
51 #include <limits.h>
52 #else
53 #define WIN32_LEAN_AND_MEAN
54 #include <windows.h>
55 #include <stdio.h>
56 #include <stdlib.h>
57 #include <time.h>
58 #include <fcntl.h>
59 #include <process.h>
60 #include <intrin.h>
61 #endif
62
63 #include <memory.h>
64 #include <string.h>
65 #include <stddef.h>
66
67 typedef unsigned long long      uid;
68 typedef unsigned long long      logseqno;
69
70 #ifndef unix
71 typedef unsigned long long      off64_t;
72 typedef unsigned short          ushort;
73 typedef unsigned int            uint;
74 #endif
75
76 #define BT_ro 0x6f72    // ro
77 #define BT_rw 0x7772    // rw
78
79 #define BT_maxbits              26                                      // maximum page size in bits
80 #define BT_minbits              9                                       // minimum page size in bits
81 #define BT_minpage              (1 << BT_minbits)       // minimum page size
82 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
83
84 //  BTree page number constants
85 #define ALLOC_page              0       // allocation page
86 #define ROOT_page               1       // root of the btree
87 #define LEAF_page               2       // first page of leaves
88 #define REDO_page               3       // first page of redo buffer
89
90 //      Number of levels to create in a new BTree
91
92 #define MIN_lvl                 2
93
94 /*
95 There are six lock types for each node in four independent sets: 
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
101 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
102 */
103
104 typedef enum{
105         BtLockAccess = 1,
106         BtLockDelete = 2,
107         BtLockRead   = 4,
108         BtLockWrite  = 8,
109         BtLockParent = 16,
110         BtLockAtomic = 32
111 } BtLock;
112
113 typedef struct {
114   union {
115         struct {
116           volatile uint xlock:1;        // one writer has exclusive lock
117           volatile uint wrt:31;         // count of other writers waiting
118         } bits[1];
119         uint value[1];
120   };
121 } BtMutexLatch;
122
123 #define XCL 1
124 #define WRT 2
125
126 //      definition for phase-fair reader/writer lock implementation
127
128 typedef struct {
129         volatile ushort rin[1];
130         volatile ushort rout[1];
131         volatile ushort ticket[1];
132         volatile ushort serving[1];
133         volatile ushort tid[1];
134         volatile ushort dup[1];
135 } RWLock;
136
137 //      write only lock
138
139 typedef struct {
140         BtMutexLatch xcl[1];
141         volatile ushort tid[1];
142         volatile ushort dup[1];
143 } WOLock;
144
145 #define PHID 0x1
146 #define PRES 0x2
147 #define MASK 0x3
148 #define RINC 0x4
149
150 //      mode & definition for lite latch implementation
151
152 enum {
153         QueRd = 1,              // reader queue
154         QueWr = 2               // writer queue
155 } RWQueue;
156
157 //  hash table entries
158
159 typedef struct {
160         uint entry;             // Latch table entry at head of chain
161         BtMutexLatch latch[1];
162 } BtHashEntry;
163
164 //      latch manager table structure
165
166 typedef struct {
167         uid page_no;                    // latch set page number
168         RWLock readwr[1];               // read/write page lock
169         RWLock access[1];               // Access Intent/Page delete
170         WOLock parent[1];               // Posting of fence key in parent
171         WOLock atomic[1];               // Atomic update in progress
172         uint split;                             // right split page atomic insert
173         uint entry;                             // entry slot in latch table
174         uint next;                              // next entry in hash table chain
175         uint prev;                              // prev entry in hash table chain
176         ushort pin;                             // number of accessing threads
177         unsigned char dirty;    // page in cache is dirty (atomic set)
178         BtMutexLatch modify[1]; // modify entry lite latch
179 } BtLatchSet;
180
181 //      Define the length of the page record numbers
182
183 #define BtId 6
184
185 //      Page key slot definition.
186
187 //      Keys are marked dead, but remain on the page until
188 //      it cleanup is called. The fence key (highest key) for
189 //      a leaf page is always present, even after cleanup.
190
191 //      Slot types
192
193 //      In addition to the Unique keys that occupy slots
194 //      there are Librarian and Duplicate key
195 //      slots occupying the key slot array.
196
197 //      The Librarian slots are dead keys that
198 //      serve as filler, available to add new Unique
199 //      or Dup slots that are inserted into the B-tree.
200
201 //      The Duplicate slots have had their key bytes extended
202 //      by 6 bytes to contain a binary duplicate key uniqueifier.
203
204 typedef enum {
205         Unique,
206         Librarian,
207         Duplicate,
208         Delete
209 } BtSlotType;
210
211 typedef struct {
212         uint off:BT_maxbits;    // page offset for key start
213         uint type:3;                    // type of slot
214         uint dead:1;                    // set for deleted slot
215 } BtSlot;
216
217 //      The key structure occupies space at the upper end of
218 //      each page.  It's a length byte followed by the key
219 //      bytes.
220
221 typedef struct {
222         unsigned char len;              // this can be changed to a ushort or uint
223         unsigned char key[0];
224 } BtKey;
225
226 //      the value structure also occupies space at the upper
227 //      end of the page. Each key is immediately followed by a value.
228
229 typedef struct {
230         unsigned char len;              // this can be changed to a ushort or uint
231         unsigned char value[0];
232 } BtVal;
233
234 #define BT_maxkey       255             // maximum number of bytes in a key
235 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
236
237 //      The first part of an index page.
238 //      It is immediately followed
239 //      by the BtSlot array of keys.
240
241 //      note that this structure size
242 //      must be a multiple of 8 bytes
243 //      in order to place dups correctly.
244
245 typedef struct BtPage_ {
246         uint cnt;                                       // count of keys in page
247         uint act;                                       // count of active keys
248         uint min;                                       // next key offset
249         uint garbage;                           // page garbage in bytes
250         unsigned char bits:7;           // page size in bits
251         unsigned char free:1;           // page is on free chain
252         unsigned char lvl:7;            // level of page
253         unsigned char kill:1;           // page is being deleted
254         unsigned char right[BtId];      // page number to right
255         unsigned char left[BtId];       // page number to left
256         unsigned char filler[2];        // padding to multiple of 8
257         logseqno lsn;                           // log sequence number applied
258         uid page_no;                            // this page number
259 } *BtPage;
260
261 //  The loadpage interface object
262
263 typedef struct {
264         BtPage page;            // current page pointer
265         BtLatchSet *latch;      // current page latch set
266 } BtPageSet;
267
268 //      structure for latch manager on ALLOC_page
269
270 typedef struct {
271         struct BtPage_ alloc[1];                // next page_no in right ptr
272         unsigned long long dups[1];             // global duplicate key uniqueifier
273         unsigned char freechain[BtId];  // head of free page_nos chain
274         unsigned long long activepages; // number of active pages pages
275 } BtPageZero;
276
277 //      The object structure for Btree access
278
279 typedef struct {
280         uint page_size;                         // page size    
281         uint page_bits;                         // page size in bits    
282 #ifdef unix
283         int idx;
284 #else
285         HANDLE idx;
286 #endif
287         BtPageZero *pagezero;           // mapped allocation page
288         BtHashEntry *hashtable;         // the buffer pool hash table entries
289         BtLatchSet *latchsets;          // mapped latch set from buffer pool
290         unsigned char *pagepool;        // mapped to the buffer pool pages
291         unsigned char *redobuff;        // mapped recovery buffer pointer
292         logseqno lsn, flushlsn;         // current & first lsn flushed
293         BtMutexLatch redo[1];           // redo area lite latch
294         BtMutexLatch lock[1];           // allocation area lite latch
295         BtMutexLatch maps[1];           // mapping segments lite latch
296         ushort thread_no[1];            // next thread number
297         uint nlatchpage;                        // number of latch pages at BT_latch
298         uint latchtotal;                        // number of page latch entries
299         uint latchhash;                         // number of latch hash table slots
300         uint latchvictim;                       // next latch entry to examine
301         uint latchpromote;                      // next latch entry to promote
302         uint redopages;                         // size of recovery buff in pages
303         uint redolast;                          // last msync size of recovery buff
304         uint redoend;                           // eof/end element in recovery buff
305         int err;                                        // last error
306         int line;                                       // last error line no
307         int found;                                      // number of keys found by delete
308         int reads, writes;                      // number of reads and writes
309 #ifndef unix
310         HANDLE halloc;                          // allocation handle
311         HANDLE hpool;                           // buffer pool handle
312 #endif
313         uint segments;                          // number of memory mapped segments
314         unsigned char *pages[64000];// memory mapped segments of b-tree
315 } BtMgr;
316
317 typedef struct {
318         BtMgr *mgr;                                     // buffer manager for entire process
319         BtMgr *main;                            // buffer manager for main btree
320         BtPage cursor;                          // cached page frame for start/next
321         ushort thread_no;                       // thread number
322         unsigned char key[BT_keyarray]; // last found complete key
323 } BtDb;
324
325 //      Catastrophic errors
326
327 typedef enum {
328         BTERR_ok = 0,
329         BTERR_struct,
330         BTERR_ovflw,
331         BTERR_lock,
332         BTERR_map,
333         BTERR_read,
334         BTERR_wrt,
335         BTERR_atomic,
336         BTERR_recovery
337 } BTERR;
338
339 #define CLOCK_bit 0x8000
340
341 // recovery manager entry types
342
343 typedef enum {
344         BTRM_eof = 0,   // rest of buffer is emtpy
345         BTRM_add,               // add a unique key-value to btree
346         BTRM_dup,               // add a duplicate key-value to btree
347         BTRM_del,               // delete a key-value from btree
348         BTRM_upd,               // update a key with a new value
349         BTRM_new,               // allocate a new empty page
350         BTRM_old                // reuse an old empty page
351 } BTRM;
352
353 // recovery manager entry
354 //      structure followed by BtKey & BtVal
355
356 typedef struct {
357         logseqno reqlsn;        // log sequence number required
358         logseqno lsn;           // log sequence number for entry
359         uint len;                       // length of entry
360         unsigned char type;     // type of entry
361         unsigned char lvl;      // level of btree entry pertains to
362 } BtLogHdr;
363
364 // B-Tree functions
365
366 extern void bt_close (BtDb *bt);
367 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
368 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit);
369 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
370 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
371 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
372 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
373 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
374
375 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
376
377 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
378 extern uint bt_nextkey   (BtDb *bt, uint slot);
379 extern uint bt_prevkey (BtDb *db, uint slot);
380 extern uint bt_lastkey (BtDb *db);
381
382 //      manager functions
383 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
384 extern void bt_mgrclose (BtMgr *mgr);
385 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
386 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
387
388 //      transaction functions
389 BTERR bt_txnpromote (BtDb *bt);
390
391 //  The page is allocated from low and hi ends.
392 //  The key slots are allocated from the bottom,
393 //      while the text and value of the key
394 //  are allocated from the top.  When the two
395 //  areas meet, the page is split into two.
396
397 //  A key consists of a length byte, two bytes of
398 //  index number (0 - 65535), and up to 253 bytes
399 //  of key value.
400
401 //  Associated with each key is a value byte string
402 //      containing any value desired.
403
404 //  The b-tree root is always located at page 1.
405 //      The first leaf page of level zero is always
406 //      located on page 2.
407
408 //      The b-tree pages are linked with next
409 //      pointers to facilitate enumerators,
410 //      and provide for concurrency.
411
412 //      When to root page fills, it is split in two and
413 //      the tree height is raised by a new root at page
414 //      one with two keys.
415
416 //      Deleted keys are marked with a dead bit until
417 //      page cleanup. The fence key for a leaf node is
418 //      always present
419
420 //  To achieve maximum concurrency one page is locked at a time
421 //  as the tree is traversed to find leaf key in question. The right
422 //  page numbers are used in cases where the page is being split,
423 //      or consolidated.
424
425 //  Page 0 is dedicated to lock for new page extensions,
426 //      and chains empty pages together for reuse. It also
427 //      contains the latch manager hash table.
428
429 //      The ParentModification lock on a node is obtained to serialize posting
430 //      or changing the fence key for a node.
431
432 //      Empty pages are chained together through the ALLOC page and reused.
433
434 //      Access macros to address slot and key values from the page
435 //      Page slots use 1 based indexing.
436
437 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
438 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
439 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
440
441 void bt_putid(unsigned char *dest, uid id)
442 {
443 int i = BtId;
444
445         while( i-- )
446                 dest[i] = (unsigned char)id, id >>= 8;
447 }
448
449 uid bt_getid(unsigned char *src)
450 {
451 uid id = 0;
452 int i;
453
454         for( i = 0; i < BtId; i++ )
455                 id <<= 8, id |= *src++; 
456
457         return id;
458 }
459
460 uid bt_newdup (BtMgr *mgr)
461 {
462 #ifdef unix
463         return __sync_fetch_and_add (mgr->pagezero->dups, 1) + 1;
464 #else
465         return _InterlockedIncrement64(mgr->pagezero->dups, 1);
466 #endif
467 }
468
469 //      lite weight spin lock Latch Manager
470
471 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
472 {
473         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
474 }
475
476 void bt_mutexlock(BtMutexLatch *latch)
477 {
478 BtMutexLatch prev[1];
479 uint slept = 0;
480
481   while( 1 ) {
482         *prev->value = __sync_fetch_and_or(latch->value, XCL);
483
484         if( !prev->bits->xlock ) {                      // did we set XCL bit?
485             if( slept )
486                   __sync_fetch_and_sub(latch->value, WRT);
487                 return;
488         }
489
490         if( !slept ) {
491                 prev->bits->wrt++;
492                 __sync_fetch_and_add(latch->value, WRT);
493         }
494
495         sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr);
496         slept = 1;
497   }
498 }
499
500 //      try to obtain write lock
501
502 //      return 1 if obtained,
503 //              0 otherwise
504
505 int bt_mutextry(BtMutexLatch *latch)
506 {
507 BtMutexLatch prev[1];
508
509         *prev->value = __sync_fetch_and_or(latch->value, XCL);
510
511         //      take write access if exclusive bit is clear
512
513         return !prev->bits->xlock;
514 }
515
516 //      clear write mode
517
518 void bt_releasemutex(BtMutexLatch *latch)
519 {
520 BtMutexLatch prev[1];
521
522         *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
523
524         if( prev->bits->wrt )
525           sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
526 }
527
528 //      Write-Only Queue Lock
529
530 void WriteOLock (WOLock *lock, ushort tid)
531 {
532         if( *lock->tid == tid ) {
533                 *lock->dup += 1;
534                 return;
535         }
536
537         bt_mutexlock(lock->xcl);
538         *lock->tid = tid;
539 }
540
541 void WriteORelease (WOLock *lock)
542 {
543         if( *lock->dup ) {
544                 *lock->dup -= 1;
545                 return;
546         }
547
548         *lock->tid = 0;
549         bt_releasemutex(lock->xcl);
550 }
551
552 //      Phase-Fair reader/writer lock implementation
553
554 void WriteLock (RWLock *lock, ushort tid)
555 {
556 ushort w, r, tix;
557
558         if( *lock->tid == tid ) {
559                 *lock->dup += 1;
560                 return;
561         }
562 #ifdef unix
563         tix = __sync_fetch_and_add (lock->ticket, 1);
564 #else
565         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
566 #endif
567         // wait for our ticket to come up
568
569         while( tix != lock->serving[0] )
570 #ifdef unix
571                 sched_yield();
572 #else
573                 SwitchToThread ();
574 #endif
575
576         w = PRES | (tix & PHID);
577 #ifdef  unix
578         r = __sync_fetch_and_add (lock->rin, w);
579 #else
580         r = _InterlockedExchangeAdd16 (lock->rin, w);
581 #endif
582         while( r != *lock->rout )
583 #ifdef unix
584                 sched_yield();
585 #else
586                 SwitchToThread();
587 #endif
588         *lock->tid = tid;
589 }
590
591 void WriteRelease (RWLock *lock)
592 {
593         if( *lock->dup ) {
594                 *lock->dup -= 1;
595                 return;
596         }
597
598         *lock->tid = 0;
599 #ifdef unix
600         __sync_fetch_and_and (lock->rin, ~MASK);
601 #else
602         _InterlockedAnd16 (lock->rin, ~MASK);
603 #endif
604         lock->serving[0]++;
605 }
606
607 //      try to obtain read lock
608 //  return 1 if successful
609
610 int ReadTry (RWLock *lock, ushort tid)
611 {
612 ushort w;
613
614         //  OK if write lock already held by same thread
615
616         if( *lock->tid == tid ) {
617                 *lock->dup += 1;
618                 return 1;
619         }
620 #ifdef unix
621         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
622 #else
623         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
624 #endif
625         if( w )
626           if( w == (*lock->rin & MASK) ) {
627 #ifdef unix
628                 __sync_fetch_and_add (lock->rin, -RINC);
629 #else
630                 _InterlockedExchangeAdd16 (lock->rin, -RINC);
631 #endif
632                 return 0;
633           }
634
635         return 1;
636 }
637
638 void ReadLock (RWLock *lock, ushort tid)
639 {
640 ushort w;
641
642         if( *lock->tid == tid ) {
643                 *lock->dup += 1;
644                 return;
645         }
646 #ifdef unix
647         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
648 #else
649         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
650 #endif
651         if( w )
652           while( w == (*lock->rin & MASK) )
653 #ifdef unix
654                 sched_yield ();
655 #else
656                 SwitchToThread ();
657 #endif
658 }
659
660 void ReadRelease (RWLock *lock)
661 {
662         if( *lock->dup ) {
663                 *lock->dup -= 1;
664                 return;
665         }
666
667 #ifdef unix
668         __sync_fetch_and_add (lock->rout, RINC);
669 #else
670         _InterlockedExchangeAdd16 (lock->rout, RINC);
671 #endif
672 }
673
674 //      recovery manager -- flush dirty pages
675
676 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
677 {
678 uint cnt3 = 0, cnt2 = 0, cnt = 0;
679 BtLatchSet *latch;
680 BtPage page;
681 uint entry;
682
683   //    flush dirty pool pages to the btree
684
685 fprintf(stderr, "Start flushlsn  ");
686   for( entry = 1; entry < mgr->latchtotal; entry++ ) {
687         page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
688         latch = mgr->latchsets + entry;
689         bt_mutexlock (latch->modify);
690         bt_lockpage(BtLockRead, latch, thread_no);
691
692         if( latch->dirty ) {
693           bt_writepage(mgr, page, latch->page_no, 0);
694           latch->dirty = 0, cnt++;
695         }
696 if( latch->pin & ~CLOCK_bit )
697 cnt2++;
698         bt_unlockpage(BtLockRead, latch);
699         bt_releasemutex (latch->modify);
700   }
701 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
702 fprintf(stderr, "begin sync");
703         sync_file_range (mgr->idx, 0, 0, SYNC_FILE_RANGE_WAIT_AFTER);
704 fprintf(stderr, " end sync\n");
705 }
706
707 //      recovery manager -- process current recovery buff on startup
708 //      this won't do much if previous session was properly closed.
709
710 BTERR bt_recoveryredo (BtMgr *mgr)
711 {
712 BtLogHdr *hdr, *eof;
713 uint offset = 0;
714 BtKey *key;
715 BtVal *val;
716
717   pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
718
719   hdr = (BtLogHdr *)mgr->redobuff;
720   mgr->flushlsn = hdr->lsn;
721
722   while( 1 ) {
723         hdr = (BtLogHdr *)(mgr->redobuff + offset);
724         switch( hdr->type ) {
725         case BTRM_eof:
726                 mgr->lsn = hdr->lsn;
727                 return 0;
728         case BTRM_add:          // add a unique key-value to btree
729         
730         case BTRM_dup:          // add a duplicate key-value to btree
731         case BTRM_del:          // delete a key-value from btree
732         case BTRM_upd:          // update a key with a new value
733         case BTRM_new:          // allocate a new empty page
734         case BTRM_old:          // reuse an old empty page
735                 return 0;
736         }
737   }
738 }
739
740 //      recovery manager -- append new entry to recovery log
741 //      flush to disk when it overflows.
742
743 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
744 {
745 uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
746 uint amt = sizeof(BtLogHdr);
747 BtLogHdr *hdr, *eof;
748 uint last, end;
749
750         bt_mutexlock (mgr->redo);
751
752         if( key )
753           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
754
755         //      see if new entry fits in buffer
756         //      flush and reset if it doesn't
757
758         if( amt > size - mgr->redoend ) {
759           mgr->flushlsn = mgr->lsn;
760           msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
761           mgr->redolast = 0;
762           mgr->redoend = 0;
763           bt_flushlsn(mgr, thread_no);
764         }
765
766         //      fill in new entry & either eof or end block
767
768         hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
769
770         hdr->len = amt;
771         hdr->type = type;
772         hdr->lvl = lvl;
773         hdr->lsn = ++mgr->lsn;
774
775         mgr->redoend += amt;
776
777         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
778         memset (eof, 0, sizeof(BtLogHdr));
779
780         //  fill in key and value
781
782         if( key ) {
783           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
784           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
785         }
786
787         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
788         memset (eof, 0, sizeof(BtLogHdr));
789         eof->lsn = mgr->lsn;
790
791         last = mgr->redolast & 0xfff;
792         end = mgr->redoend;
793         mgr->redolast = end;
794
795         bt_releasemutex(mgr->redo);
796
797         msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
798         return hdr->lsn;
799 }
800
801 //      recovery manager -- append transaction to recovery log
802 //      flush to disk when it overflows.
803
804 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
805 {
806 uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
807 uint amt = 0, src, type;
808 BtLogHdr *hdr, *eof;
809 uint last, end;
810 logseqno lsn;
811 BtKey *key;
812 BtVal *val;
813
814         //      determine amount of redo recovery log space required
815
816         for( src = 0; src++ < source->cnt; ) {
817           key = keyptr(source,src);
818           val = valptr(source,src);
819           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
820           amt += sizeof(BtLogHdr);
821         }
822
823         bt_mutexlock (mgr->redo);
824
825         //      see if new entry fits in buffer
826         //      flush and reset if it doesn't
827
828         if( amt > size - mgr->redoend ) {
829           mgr->flushlsn = mgr->lsn;
830           msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
831           mgr->redolast = 0;
832           mgr->redoend = 0;
833           bt_flushlsn (mgr, thread_no);
834         }
835
836         //      assign new lsn to transaction
837
838         lsn = ++mgr->lsn;
839
840         //      fill in new entries
841
842         for( src = 0; src++ < source->cnt; ) {
843           key = keyptr(source, src);
844           val = valptr(source, src);
845
846           switch( slotptr(source, src)->type ) {
847           case Unique:
848                 type = BTRM_add;
849                 break;
850           case Duplicate:
851                 type = BTRM_dup;
852                 break;
853           case Delete:
854                 type = BTRM_del;
855                 break;
856           }
857
858           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
859           amt += sizeof(BtLogHdr);
860
861           hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
862           hdr->len = amt;
863           hdr->type = type;
864           hdr->lsn = lsn;
865           hdr->lvl = 0;
866
867           //  fill in key and value
868
869           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
870           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
871
872           mgr->redoend += amt;
873         }
874
875         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
876         memset (eof, 0, sizeof(BtLogHdr));
877         eof->lsn = lsn;
878
879         last = mgr->redolast & 0xfff;
880         end = mgr->redoend;
881         mgr->redolast = end;
882         bt_releasemutex(mgr->redo);
883
884         msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
885         bt_releasemutex(mgr->redo);
886         return lsn;
887 }
888
889 //      read page into buffer pool from permanent location in Btree file
890
891 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
892 {
893 int flag = PROT_READ | PROT_WRITE;
894 uint segment = page_no >> 32;
895 unsigned char *perm;
896
897   while( 1 ) {
898         if( segment < mgr->segments ) {
899           perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
900                 memcpy (page, perm, mgr->page_size);
901 if( page->page_no != page_no )
902 abort();
903                 mgr->reads++;
904                 return 0;
905         }
906
907         bt_mutexlock (mgr->maps);
908
909         if( segment < mgr->segments ) {
910                 bt_releasemutex (mgr->maps);
911                 continue;
912         }
913
914         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
915         mgr->segments++;
916
917         bt_releasemutex (mgr->maps);
918   }
919 }
920
921 //      write page to permanent location in Btree file
922 //      clear the dirty bit
923
924 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit)
925 {
926 int flag = PROT_READ | PROT_WRITE;
927 uint segment = page_no >> 32;
928 unsigned char *perm;
929
930   while( 1 ) {
931         if( segment < mgr->segments ) {
932           perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
933                 memcpy (perm, page, mgr->page_size);
934                 if( syncit )
935                         msync (perm, mgr->page_size, MS_SYNC);
936                 mgr->writes++;
937                 return 0;
938         }
939
940         bt_mutexlock (mgr->maps);
941
942         if( segment < mgr->segments ) {
943                 bt_releasemutex (mgr->maps);
944                 continue;
945         }
946
947         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
948         bt_releasemutex (mgr->maps);
949         mgr->segments++;
950   }
951 }
952
953 //      set CLOCK bit in latch
954 //      decrement pin count
955
956 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
957 {
958         bt_mutexlock(latch->modify);
959         latch->pin |= CLOCK_bit;
960         latch->pin--;
961
962         bt_releasemutex(latch->modify);
963 }
964
965 //  return the btree cached page address
966
967 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
968 {
969 BtPage page = (BtPage)(((uid)latch->entry << mgr->page_bits) + mgr->pagepool);
970
971   return page;
972 }
973
974 //  return next available latch entry
975 //        and with latch entry locked
976
977 uint bt_availnext (BtMgr *mgr)
978 {
979 BtLatchSet *latch;
980 uint entry;
981
982   while( 1 ) {
983 #ifdef unix
984         entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
985 #else
986         entry = _InterlockedIncrement (&mgr->latchvictim);
987 #endif
988         entry %= mgr->latchtotal;
989
990         if( !entry )
991                 continue;
992
993         latch = mgr->latchsets + entry;
994
995         if( !bt_mutextry(latch->modify) )
996                 continue;
997
998         //  return this entry if it is not pinned
999
1000         if( !latch->pin )
1001                 return entry;
1002
1003         //      if the CLOCK bit is set
1004         //      reset it to zero.
1005
1006         latch->pin &= ~CLOCK_bit;
1007         bt_releasemutex(latch->modify);
1008   }
1009 }
1010
1011 //      pin page in buffer pool
1012 //      return with latchset pinned
1013
1014 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
1015 {
1016 uint hashidx = page_no % mgr->latchhash;
1017 BtLatchSet *latch;
1018 uint entry, idx;
1019 BtPage page;
1020
1021   //  try to find our entry
1022
1023   bt_mutexlock(mgr->hashtable[hashidx].latch);
1024
1025   if( entry = mgr->hashtable[hashidx].entry ) do
1026   {
1027         latch = mgr->latchsets + entry;
1028         if( page_no == latch->page_no )
1029                 break;
1030   } while( entry = latch->next );
1031
1032   //  found our entry: increment pin
1033
1034   if( entry ) {
1035         latch = mgr->latchsets + entry;
1036         bt_mutexlock(latch->modify);
1037         latch->pin |= CLOCK_bit;
1038         latch->pin++;
1039 if(contents)
1040 abort();
1041         bt_releasemutex(latch->modify);
1042         bt_releasemutex(mgr->hashtable[hashidx].latch);
1043         return latch;
1044   }
1045
1046   //  find and reuse unpinned entry
1047
1048 trynext:
1049
1050   entry = bt_availnext (mgr);
1051   latch = mgr->latchsets + entry;
1052
1053   idx = latch->page_no % mgr->latchhash;
1054
1055   //  if latch is on a different hash chain
1056   //    unlink from the old page_no chain
1057
1058   if( latch->page_no )
1059    if( idx != hashidx ) {
1060
1061         //  skip over this entry if latch not available
1062
1063     if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1064           bt_releasemutex(latch->modify);
1065           goto trynext;
1066         }
1067
1068         if( latch->prev )
1069           mgr->latchsets[latch->prev].next = latch->next;
1070         else
1071           mgr->hashtable[idx].entry = latch->next;
1072
1073         if( latch->next )
1074           mgr->latchsets[latch->next].prev = latch->prev;
1075
1076     bt_releasemutex (mgr->hashtable[idx].latch);
1077    }
1078
1079   page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1080
1081   //  update permanent page area in btree from buffer pool
1082   //    no read-lock is required since page is not pinned.
1083
1084   if( latch->dirty )
1085         if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1086           return mgr->line = __LINE__, NULL;
1087         else
1088           latch->dirty = 0;
1089
1090   if( contents ) {
1091         memcpy (page, contents, mgr->page_size);
1092         latch->dirty = 1;
1093   } else if( bt_readpage (mgr, page, page_no) )
1094           return mgr->line = __LINE__, NULL;
1095
1096   //  link page as head of hash table chain
1097   //  if this is a never before used entry,
1098   //  or it was previously on a different
1099   //  hash table chain. Otherwise, just
1100   //  leave it in its current hash table
1101   //  chain position.
1102
1103   if( !latch->page_no || hashidx != idx ) {
1104         if( latch->next = mgr->hashtable[hashidx].entry )
1105           mgr->latchsets[latch->next].prev = entry;
1106
1107         mgr->hashtable[hashidx].entry = entry;
1108     latch->prev = 0;
1109   }
1110
1111   //  fill in latch structure
1112
1113   latch->pin = CLOCK_bit | 1;
1114   latch->page_no = page_no;
1115   latch->entry = entry;
1116   latch->split = 0;
1117
1118   bt_releasemutex (latch->modify);
1119   bt_releasemutex (mgr->hashtable[hashidx].latch);
1120   return latch;
1121 }
1122   
1123 void bt_mgrclose (BtMgr *mgr)
1124 {
1125 BtLatchSet *latch;
1126 BtLogHdr *eof;
1127 uint num = 0;
1128 BtPage page;
1129 uint slot;
1130
1131         //      flush previously written dirty pages
1132         //      and write recovery buffer to disk
1133
1134         fdatasync (mgr->idx);
1135
1136         if( mgr->redoend ) {
1137                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1138                 memset (eof, 0, sizeof(BtLogHdr));
1139
1140                 pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1141         }
1142
1143         //      write remaining dirty pool pages to the btree
1144
1145         for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1146                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1147                 latch = mgr->latchsets + slot;
1148
1149                 if( latch->dirty ) {
1150                         bt_writepage(mgr, page, latch->page_no, 0);
1151                         latch->dirty = 0, num++;
1152                 }
1153         }
1154
1155         //      flush last batch to disk and clear
1156         //      redo recovery buffer on disk.
1157
1158         fdatasync (mgr->idx);
1159
1160         if( mgr->redopages ) {
1161                 eof = (BtLogHdr *)mgr->redobuff;
1162                 memset (eof, 0, sizeof(BtLogHdr));
1163                 eof->lsn = mgr->lsn;
1164
1165                 pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1166
1167                 sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
1168         }
1169
1170         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1171
1172 #ifdef unix
1173         while( mgr->segments )
1174                 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1175
1176         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1177         munmap (mgr->pagezero, mgr->page_size);
1178 #else
1179         FlushViewOfFile(mgr->pagezero, 0);
1180         UnmapViewOfFile(mgr->pagezero);
1181         UnmapViewOfFile(mgr->pagepool);
1182         CloseHandle(mgr->halloc);
1183         CloseHandle(mgr->hpool);
1184 #endif
1185 #ifdef unix
1186         free (mgr->redobuff);
1187         close (mgr->idx);
1188         free (mgr);
1189 #else
1190         VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1191         FlushFileBuffers(mgr->idx);
1192         CloseHandle(mgr->idx);
1193         GlobalFree (mgr);
1194 #endif
1195 }
1196
1197 //      close and release memory
1198
1199 void bt_close (BtDb *bt)
1200 {
1201 #ifdef unix
1202         if( bt->cursor )
1203                 free (bt->cursor);
1204 #else
1205         if( bt->cursor)
1206                 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1207 #endif
1208         free (bt);
1209 }
1210
1211 //  open/create new btree buffer manager
1212
1213 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1214 //              size of page pool (e.g. 262144)
1215
1216 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1217 {
1218 uint lvl, attr, last, slot, idx;
1219 uint nlatchpage, latchhash;
1220 unsigned char value[BtId];
1221 int flag, initit = 0;
1222 BtPageZero *pagezero;
1223 BtLatchSet *latch;
1224 off64_t size;
1225 uint amt[1];
1226 BtMgr* mgr;
1227 BtKey* key;
1228 BtVal *val;
1229
1230         // determine sanity of page size and buffer pool
1231
1232         if( bits > BT_maxbits )
1233                 bits = BT_maxbits;
1234         else if( bits < BT_minbits )
1235                 bits = BT_minbits;
1236 #ifdef unix
1237         mgr = calloc (1, sizeof(BtMgr));
1238
1239         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1240
1241         if( mgr->idx == -1 ) {
1242                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1243                 return free(mgr), NULL;
1244         }
1245 #else
1246         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1247         attr = FILE_ATTRIBUTE_NORMAL;
1248         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1249
1250         if( mgr->idx == INVALID_HANDLE_VALUE ) {
1251                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1252                 return GlobalFree(mgr), NULL;
1253         }
1254 #endif
1255
1256 #ifdef unix
1257         pagezero = valloc (BT_maxpage);
1258         *amt = 0;
1259
1260         // read minimum page size to get root info
1261         //      to support raw disk partition files
1262         //      check if bits == 0 on the disk.
1263
1264         if( size = lseek (mgr->idx, 0L, 2) )
1265                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1266                         if( pagezero->alloc->bits )
1267                                 bits = pagezero->alloc->bits;
1268                         else
1269                                 initit = 1;
1270                 else
1271                         return free(mgr), free(pagezero), NULL;
1272         else
1273                 initit = 1;
1274 #else
1275         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1276         size = GetFileSize(mgr->idx, amt);
1277
1278         if( size || *amt ) {
1279                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1280                         return bt_mgrclose (mgr), NULL;
1281                 bits = pagezero->alloc->bits;
1282         } else
1283                 initit = 1;
1284 #endif
1285
1286         mgr->page_size = 1 << bits;
1287         mgr->page_bits = bits;
1288
1289         //  calculate number of latch hash table entries
1290
1291         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1292
1293         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1294         mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1295         mgr->latchtotal = nodemax;
1296
1297         if( !initit )
1298                 goto mgrlatch;
1299
1300         // initialize an empty b-tree with latch page, root page, page of leaves
1301         // and page(s) of latches and page pool cache
1302
1303         memset (pagezero, 0, 1 << bits);
1304         pagezero->alloc->lvl = MIN_lvl - 1;
1305         pagezero->alloc->bits = mgr->page_bits;
1306         bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
1307         pagezero->activepages = 2;
1308
1309         //  initialize left-most LEAF page in
1310         //      alloc->left and count of active leaf pages.
1311
1312         bt_putid (pagezero->alloc->left, LEAF_page);
1313
1314         ftruncate (mgr->idx, REDO_page << mgr->page_bits);
1315
1316         if( bt_writepage (mgr, pagezero->alloc, 0, 1) ) {
1317                 fprintf (stderr, "Unable to create btree page zero\n");
1318                 return bt_mgrclose (mgr), NULL;
1319         }
1320
1321         memset (pagezero, 0, 1 << bits);
1322         pagezero->alloc->bits = mgr->page_bits;
1323
1324         for( lvl=MIN_lvl; lvl--; ) {
1325         BtSlot *node = slotptr(pagezero->alloc, 1);
1326                 node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1327                 key = keyptr(pagezero->alloc, 1);
1328                 key->len = 2;           // create stopper key
1329                 key->key[0] = 0xff;
1330                 key->key[1] = 0xff;
1331
1332                 bt_putid(value, MIN_lvl - lvl + 1);
1333                 val = valptr(pagezero->alloc, 1);
1334                 val->len = lvl ? BtId : 0;
1335                 memcpy (val->value, value, val->len);
1336
1337                 pagezero->alloc->min = node->off;
1338                 pagezero->alloc->lvl = lvl;
1339                 pagezero->alloc->cnt = 1;
1340                 pagezero->alloc->act = 1;
1341                 pagezero->alloc->page_no = MIN_lvl - lvl;
1342
1343                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, 1) ) {
1344                         fprintf (stderr, "Unable to create btree page\n");
1345                         return bt_mgrclose (mgr), NULL;
1346                 }
1347         }
1348
1349 mgrlatch:
1350 #ifdef unix
1351         free (pagezero);
1352 #else
1353         VirtualFree (pagezero, 0, MEM_RELEASE);
1354 #endif
1355 #ifdef unix
1356         // mlock the pagezero page
1357
1358         flag = PROT_READ | PROT_WRITE;
1359         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1360         if( mgr->pagezero == MAP_FAILED ) {
1361                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1362                 return bt_mgrclose (mgr), NULL;
1363         }
1364         mlock (mgr->pagezero, mgr->page_size);
1365
1366         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1367         if( mgr->pagepool == MAP_FAILED ) {
1368                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1369                 return bt_mgrclose (mgr), NULL;
1370         }
1371         if( mgr->redopages = redopages ) {
1372                 ftruncate (mgr->idx, (REDO_page + redopages) << mgr->page_bits);
1373                 mgr->redobuff = valloc (redopages * mgr->page_size);
1374         }
1375 #else
1376         flag = PAGE_READWRITE;
1377         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1378         if( !mgr->halloc ) {
1379                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1380                 return bt_mgrclose (mgr), NULL;
1381         }
1382
1383         flag = FILE_MAP_WRITE;
1384         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1385         if( !mgr->pagezero ) {
1386                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1387                 return bt_mgrclose (mgr), NULL;
1388         }
1389
1390         flag = PAGE_READWRITE;
1391         size = (uid)mgr->nlatchpage << mgr->page_bits;
1392         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1393         if( !mgr->hpool ) {
1394                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1395                 return bt_mgrclose (mgr), NULL;
1396         }
1397
1398         flag = FILE_MAP_WRITE;
1399         mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1400         if( !mgr->pagepool ) {
1401                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1402                 return bt_mgrclose (mgr), NULL;
1403         }
1404         if( mgr->redopages = redopages )
1405                 mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
1406 #endif
1407
1408         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1409         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1410         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1411
1412         return mgr;
1413 }
1414
1415 //      open BTree access method
1416 //      based on buffer manager
1417
1418 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1419 {
1420 BtDb *bt = malloc (sizeof(*bt));
1421
1422         memset (bt, 0, sizeof(*bt));
1423         bt->main = main;
1424         bt->mgr = mgr;
1425 #ifdef unix
1426         bt->cursor = valloc (mgr->page_size);
1427 #else
1428         bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1429 #endif
1430 #ifdef unix
1431         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1432 #else
1433         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1434 #endif
1435         return bt;
1436 }
1437
1438 //  compare two keys, return > 0, = 0, or < 0
1439 //  =0: keys are same
1440 //  -1: key2 > key1
1441 //  +1: key2 < key1
1442 //  as the comparison value
1443
1444 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1445 {
1446 uint len1 = key1->len;
1447 int ans;
1448
1449         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1450                 return ans;
1451
1452         if( len1 > len2 )
1453                 return 1;
1454         if( len1 < len2 )
1455                 return -1;
1456
1457         return 0;
1458 }
1459
1460 // place write, read, or parent lock on requested page_no.
1461
1462 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1463 {
1464         switch( mode ) {
1465         case BtLockRead:
1466                 ReadLock (latch->readwr, thread_no);
1467                 break;
1468         case BtLockWrite:
1469                 WriteLock (latch->readwr, thread_no);
1470                 break;
1471         case BtLockAccess:
1472                 ReadLock (latch->access, thread_no);
1473                 break;
1474         case BtLockDelete:
1475                 WriteLock (latch->access, thread_no);
1476                 break;
1477         case BtLockParent:
1478                 WriteOLock (latch->parent, thread_no);
1479                 break;
1480         case BtLockAtomic:
1481                 WriteOLock (latch->atomic, thread_no);
1482                 break;
1483         case BtLockAtomic | BtLockRead:
1484                 WriteOLock (latch->atomic, thread_no);
1485                 ReadLock (latch->readwr, thread_no);
1486                 break;
1487         }
1488 }
1489
1490 // remove write, read, or parent lock on requested page
1491
1492 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1493 {
1494         switch( mode ) {
1495         case BtLockRead:
1496                 ReadRelease (latch->readwr);
1497                 break;
1498         case BtLockWrite:
1499                 WriteRelease (latch->readwr);
1500                 break;
1501         case BtLockAccess:
1502                 ReadRelease (latch->access);
1503                 break;
1504         case BtLockDelete:
1505                 WriteRelease (latch->access);
1506                 break;
1507         case BtLockParent:
1508                 WriteORelease (latch->parent);
1509                 break;
1510         case BtLockAtomic:
1511                 WriteORelease (latch->atomic);
1512                 break;
1513         case BtLockAtomic | BtLockRead:
1514                 WriteORelease (latch->atomic);
1515                 ReadRelease (latch->readwr);
1516                 break;
1517         }
1518 }
1519
1520 //      allocate a new page
1521 //      return with page latched, but unlocked.
1522
1523 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1524 {
1525 uid page_no;
1526 int blk;
1527
1528         //      lock allocation page
1529
1530         bt_mutexlock(mgr->lock);
1531
1532         // use empty chain first
1533         // else allocate empty page
1534
1535         if( page_no = bt_getid(mgr->pagezero->freechain) ) {
1536                 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1537                         set->page = bt_mappage (mgr, set->latch);
1538                 else
1539                         return mgr->line = __LINE__, mgr->err = BTERR_struct;
1540
1541                 bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
1542                 mgr->pagezero->activepages++;
1543
1544                 bt_releasemutex(mgr->lock);
1545
1546                 memcpy (set->page, contents, mgr->page_size);
1547                 set->latch->dirty = 1;
1548                 return 0;
1549         }
1550
1551         page_no = bt_getid(mgr->pagezero->alloc->right);
1552         bt_putid(mgr->pagezero->alloc->right, page_no+1);
1553
1554         // unlock allocation latch and
1555         //      extend file into new page.
1556
1557         mgr->pagezero->activepages++;
1558         contents->page_no = page_no;
1559
1560         pwrite (mgr->idx, contents, mgr->page_size, (uid)(page_no + 1) << mgr->page_bits);
1561         bt_releasemutex(mgr->lock);
1562
1563         //      don't load cache from btree page, load it from contents
1564
1565         if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1566                 set->page = bt_mappage (mgr, set->latch);
1567         else
1568                 return mgr->err;
1569
1570         return 0;
1571 }
1572
1573 //  find slot in page for given key at a given level
1574
1575 int bt_findslot (BtPage page, unsigned char *key, uint len)
1576 {
1577 uint diff, higher = page->cnt, low = 1, slot;
1578 uint good = 0;
1579
1580         //        make stopper key an infinite fence value
1581
1582         if( bt_getid (page->right) )
1583                 higher++;
1584         else
1585                 good++;
1586
1587         //      low is the lowest candidate.
1588         //  loop ends when they meet
1589
1590         //  higher is already
1591         //      tested as .ge. the passed key.
1592
1593         while( diff = higher - low ) {
1594                 slot = low + ( diff >> 1 );
1595                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1596                         low = slot + 1;
1597                 else
1598                         higher = slot, good++;
1599         }
1600
1601         //      return zero if key is on right link page
1602
1603         return good ? higher : 0;
1604 }
1605
1606 //  find and load page at given level for given key
1607 //      leave page rd or wr locked as requested
1608
1609 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1610 {
1611 uid page_no = ROOT_page, prevpage = 0;
1612 uint drill = 0xff, slot;
1613 BtLatchSet *prevlatch;
1614 uint mode, prevmode;
1615 BtVal *val;
1616
1617   //  start at root of btree and drill down
1618
1619   do {
1620         // determine lock mode of drill level
1621         mode = (drill == lvl) ? lock : BtLockRead; 
1622
1623         if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1624           return 0;
1625
1626         // obtain access lock using lock chaining with Access mode
1627
1628         if( page_no > ROOT_page )
1629           bt_lockpage(BtLockAccess, set->latch, thread_no);
1630
1631         set->page = bt_mappage (mgr, set->latch);
1632
1633         //      release & unpin parent or left sibling page
1634
1635         if( prevpage ) {
1636           bt_unlockpage(prevmode, prevlatch);
1637           bt_unpinlatch (mgr, prevlatch);
1638           prevpage = 0;
1639         }
1640
1641         // obtain mode lock using lock chaining through AccessLock
1642
1643         bt_lockpage(mode, set->latch, thread_no);
1644
1645         if( set->page->free )
1646                 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1647
1648         if( page_no > ROOT_page )
1649           bt_unlockpage(BtLockAccess, set->latch);
1650
1651         // re-read and re-lock root after determining actual level of root
1652
1653         if( set->page->lvl != drill) {
1654                 if( set->latch->page_no != ROOT_page )
1655                         return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1656                         
1657                 drill = set->page->lvl;
1658
1659                 if( lock != BtLockRead && drill == lvl ) {
1660                   bt_unlockpage(mode, set->latch);
1661                   bt_unpinlatch (mgr, set->latch);
1662                   continue;
1663                 }
1664         }
1665
1666         prevpage = set->latch->page_no;
1667         prevlatch = set->latch;
1668         prevmode = mode;
1669
1670         //  find key on page at this level
1671         //  and descend to requested level
1672
1673         if( !set->page->kill )
1674          if( slot = bt_findslot (set->page, key, len) ) {
1675           if( drill == lvl )
1676                 return slot;
1677
1678           // find next non-dead slot -- the fence key if nothing else
1679
1680           while( slotptr(set->page, slot)->dead )
1681                 if( slot++ < set->page->cnt )
1682                   continue;
1683                 else
1684                   return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1685
1686           val = valptr(set->page, slot);
1687
1688           if( val->len == BtId )
1689                 page_no = bt_getid(valptr(set->page, slot)->value);
1690           else
1691                 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1692
1693           drill--;
1694           continue;
1695          }
1696
1697         //  slide right into next page
1698
1699         page_no = bt_getid(set->page->right);
1700   } while( page_no );
1701
1702   // return error on end of right chain
1703
1704   mgr->line = __LINE__, mgr->err = BTERR_struct;
1705   return 0;     // return error
1706 }
1707
1708 //      return page to free list
1709 //      page must be delete & write locked
1710
1711 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1712 {
1713         //      lock allocation page
1714
1715         bt_mutexlock (mgr->lock);
1716
1717         //      store chain
1718
1719         memcpy(set->page->right, mgr->pagezero->freechain, BtId);
1720         bt_putid(mgr->pagezero->freechain, set->latch->page_no);
1721         set->latch->dirty = 1;
1722         set->page->free = 1;
1723
1724         // decrement active page count
1725         // and unlock allocation page
1726
1727         mgr->pagezero->activepages--;
1728         bt_releasemutex (mgr->lock);
1729
1730         // unlock released page
1731
1732         bt_unlockpage (BtLockDelete, set->latch);
1733         bt_unlockpage (BtLockWrite, set->latch);
1734         bt_unpinlatch (mgr, set->latch);
1735 }
1736
1737 //      a fence key was deleted from a page
1738 //      push new fence value upwards
1739
1740 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1741 {
1742 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1743 unsigned char value[BtId];
1744 BtKey* ptr;
1745 uint idx;
1746
1747         //      remove the old fence value
1748
1749         ptr = keyptr(set->page, set->page->cnt);
1750         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1751         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1752         set->latch->dirty = 1;
1753
1754         //  cache new fence value
1755
1756         ptr = keyptr(set->page, set->page->cnt);
1757         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1758
1759         bt_lockpage (BtLockParent, set->latch, thread_no);
1760         bt_unlockpage (BtLockWrite, set->latch);
1761
1762         //      insert new (now smaller) fence key
1763
1764         bt_putid (value, set->latch->page_no);
1765         ptr = (BtKey*)leftkey;
1766
1767         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1768           return mgr->err;
1769
1770         //      now delete old fence key
1771
1772         ptr = (BtKey*)rightkey;
1773
1774         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1775                 return mgr->err;
1776
1777         bt_unlockpage (BtLockParent, set->latch);
1778         bt_unpinlatch(mgr, set->latch);
1779         return 0;
1780 }
1781
1782 //      root has a single child
1783 //      collapse a level from the tree
1784
1785 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1786 {
1787 BtPageSet child[1];
1788 uid page_no;
1789 BtVal *val;
1790 uint idx;
1791
1792   // find the child entry and promote as new root contents
1793
1794   do {
1795         for( idx = 0; idx++ < root->page->cnt; )
1796           if( !slotptr(root->page, idx)->dead )
1797                 break;
1798
1799         val = valptr(root->page, idx);
1800
1801         if( val->len == BtId )
1802                 page_no = bt_getid (valptr(root->page, idx)->value);
1803         else
1804                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1805
1806         if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1807                 child->page = bt_mappage (mgr, child->latch);
1808         else
1809                 return mgr->err;
1810
1811         bt_lockpage (BtLockDelete, child->latch, thread_no);
1812         bt_lockpage (BtLockWrite, child->latch, thread_no);
1813
1814         memcpy (root->page, child->page, mgr->page_size);
1815         root->latch->dirty = 1;
1816
1817         bt_freepage (mgr, child);
1818
1819   } while( root->page->lvl > 1 && root->page->act == 1 );
1820
1821   bt_unlockpage (BtLockWrite, root->latch);
1822   bt_unpinlatch (mgr, root->latch);
1823   return 0;
1824 }
1825
1826 //  delete a page and manage keys
1827 //  call with page writelocked
1828
1829 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1830 {
1831 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1832 unsigned char value[BtId];
1833 uint lvl = set->page->lvl;
1834 BtPageSet right[1];
1835 uid page_no;
1836 BtKey *ptr;
1837
1838         //      cache copy of fence key
1839         //      to remove in parent
1840
1841         ptr = keyptr(set->page, set->page->cnt);
1842         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1843
1844         //      obtain lock on right page
1845
1846         page_no = bt_getid(set->page->right);
1847
1848         if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1849                 right->page = bt_mappage (mgr, right->latch);
1850         else
1851                 return 0;
1852
1853         bt_lockpage (BtLockWrite, right->latch, thread_no);
1854
1855         // cache copy of key to update
1856
1857         ptr = keyptr(right->page, right->page->cnt);
1858         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1859
1860         if( right->page->kill )
1861                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1862
1863         // pull contents of right peer into our empty page
1864
1865         memcpy (set->page, right->page, mgr->page_size);
1866         set->latch->dirty = 1;
1867
1868         // mark right page deleted and point it to left page
1869         //      until we can post parent updates that remove access
1870         //      to the deleted page.
1871
1872         bt_putid (right->page->right, set->latch->page_no);
1873         right->latch->dirty = 1;
1874         right->page->kill = 1;
1875
1876         bt_lockpage (BtLockParent, right->latch, thread_no);
1877         bt_unlockpage (BtLockWrite, right->latch);
1878
1879         bt_lockpage (BtLockParent, set->latch, thread_no);
1880         bt_unlockpage (BtLockWrite, set->latch);
1881
1882         // redirect higher key directly to our new node contents
1883
1884         bt_putid (value, set->latch->page_no);
1885         ptr = (BtKey*)higherfence;
1886
1887         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1888           return mgr->err;
1889
1890         //      delete old lower key to our node
1891
1892         ptr = (BtKey*)lowerfence;
1893
1894         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1895           return mgr->err;
1896
1897         //      obtain delete and write locks to right node
1898
1899         bt_unlockpage (BtLockParent, right->latch);
1900         bt_lockpage (BtLockDelete, right->latch, thread_no);
1901         bt_lockpage (BtLockWrite, right->latch, thread_no);
1902         bt_freepage (mgr, right);
1903
1904         bt_unlockpage (BtLockParent, set->latch);
1905         return 0;
1906 }
1907
1908 //  find and delete key on page by marking delete flag bit
1909 //  if page becomes empty, delete it from the btree
1910
1911 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
1912 {
1913 uint slot, idx, found, fence;
1914 BtPageSet set[1];
1915 BtSlot *node;
1916 BtKey *ptr;
1917 BtVal *val;
1918
1919         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
1920                 node = slotptr(set->page, slot);
1921                 ptr = keyptr(set->page, slot);
1922         } else
1923                 return mgr->err;
1924
1925         // if librarian slot, advance to real slot
1926
1927         if( node->type == Librarian ) {
1928                 ptr = keyptr(set->page, ++slot);
1929                 node = slotptr(set->page, slot);
1930         }
1931
1932         fence = slot == set->page->cnt;
1933
1934         // delete the key, ignore request if already dead
1935
1936         if( found = !keycmp (ptr, key, len) )
1937           if( found = node->dead == 0 ) {
1938                 val = valptr(set->page,slot);
1939                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1940                 set->page->act--;
1941
1942                 //  mark node type as delete
1943
1944                 node->type = Delete;
1945                 node->dead = 1;
1946
1947                 // collapse empty slots beneath the fence
1948                 // on interiour nodes
1949
1950                 if( lvl )
1951                  while( idx = set->page->cnt - 1 )
1952                   if( slotptr(set->page, idx)->dead ) {
1953                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1954                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1955                   } else
1956                         break;
1957           }
1958
1959         //      did we delete a fence key in an upper level?
1960
1961         if( found && lvl && set->page->act && fence )
1962           if( bt_fixfence (mgr, set, lvl, thread_no) )
1963                 return mgr->err;
1964           else
1965                 return 0;
1966
1967         //      do we need to collapse root?
1968
1969         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1970           if( bt_collapseroot (mgr, set, thread_no) )
1971                 return mgr->err;
1972           else
1973                 return 0;
1974
1975         //      delete empty page
1976
1977         if( !set->page->act ) {
1978           if( bt_deletepage (mgr, set, thread_no) )
1979                 return mgr->err;
1980         } else {
1981           set->latch->dirty = 1;
1982           bt_unlockpage(BtLockWrite, set->latch);
1983         }
1984
1985         bt_unpinlatch (mgr, set->latch);
1986         return 0;
1987 }
1988
1989 //      advance to next slot
1990
1991 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1992 {
1993 BtLatchSet *prevlatch;
1994 uid page_no;
1995
1996         if( slot < set->page->cnt )
1997                 return slot + 1;
1998
1999         prevlatch = set->latch;
2000
2001         if( page_no = bt_getid(set->page->right) )
2002           if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2003                 set->page = bt_mappage (bt->mgr, set->latch);
2004           else
2005                 return 0;
2006         else
2007           return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2008
2009         // obtain access lock using lock chaining with Access mode
2010
2011         bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2012
2013         bt_unlockpage(BtLockRead, prevlatch);
2014         bt_unpinlatch (bt->mgr, prevlatch);
2015
2016         bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2017         bt_unlockpage(BtLockAccess, set->latch);
2018         return 1;
2019 }
2020
2021 //      find unique key == given key, or first duplicate key in
2022 //      leaf level and return number of value bytes
2023 //      or (-1) if not found.
2024
2025 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2026 {
2027 BtPageSet set[1];
2028 uint len, slot;
2029 int ret = -1;
2030 BtKey *ptr;
2031 BtVal *val;
2032
2033   if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2034    do {
2035         ptr = keyptr(set->page, slot);
2036
2037         //      skip librarian slot place holder
2038
2039         if( slotptr(set->page, slot)->type == Librarian )
2040                 ptr = keyptr(set->page, ++slot);
2041
2042         //      return actual key found
2043
2044         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2045         len = ptr->len;
2046
2047         if( slotptr(set->page, slot)->type == Duplicate )
2048                 len -= BtId;
2049
2050         //      not there if we reach the stopper key
2051
2052         if( slot == set->page->cnt )
2053           if( !bt_getid (set->page->right) )
2054                 break;
2055
2056         // if key exists, return >= 0 value bytes copied
2057         //      otherwise return (-1)
2058
2059         if( slotptr(set->page, slot)->dead )
2060                 continue;
2061
2062         if( keylen == len )
2063           if( !memcmp (ptr->key, key, len) ) {
2064                 val = valptr (set->page,slot);
2065                 if( valmax > val->len )
2066                         valmax = val->len;
2067                 memcpy (value, val->value, valmax);
2068                 ret = valmax;
2069           }
2070
2071         break;
2072
2073    } while( slot = bt_findnext (bt, set, slot) );
2074
2075   bt_unlockpage (BtLockRead, set->latch);
2076   bt_unpinlatch (bt->mgr, set->latch);
2077   return ret;
2078 }
2079
2080 //      check page for space available,
2081 //      clean if necessary and return
2082 //      0 - page needs splitting
2083 //      >0  new slot value
2084
2085 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2086 {
2087 BtPage page = set->page, frame;
2088 uint nxt = mgr->page_size;
2089 uint cnt = 0, idx = 0;
2090 uint max = page->cnt;
2091 uint newslot = max;
2092 BtKey *key;
2093 BtVal *val;
2094
2095         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2096                 return slot;
2097
2098         //      skip cleanup and proceed to split
2099         //      if there's not enough garbage
2100         //      to bother with.
2101
2102         if( page->garbage < nxt / 5 )
2103                 return 0;
2104
2105         frame = malloc (mgr->page_size);
2106         memcpy (frame, page, mgr->page_size);
2107
2108         // skip page info and set rest of page to zero
2109
2110         memset (page+1, 0, mgr->page_size - sizeof(*page));
2111         set->latch->dirty = 1;
2112
2113         page->garbage = 0;
2114         page->act = 0;
2115
2116         // clean up page first by
2117         // removing deleted keys
2118
2119         while( cnt++ < max ) {
2120                 if( cnt == slot )
2121                         newslot = idx + 2;
2122
2123                 if( cnt < max || frame->lvl )
2124                   if( slotptr(frame,cnt)->dead )
2125                         continue;
2126
2127                 // copy the value across
2128
2129                 val = valptr(frame, cnt);
2130                 nxt -= val->len + sizeof(BtVal);
2131                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2132
2133                 // copy the key across
2134
2135                 key = keyptr(frame, cnt);
2136                 nxt -= key->len + sizeof(BtKey);
2137                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2138
2139                 // make a librarian slot
2140
2141                 slotptr(page, ++idx)->off = nxt;
2142                 slotptr(page, idx)->type = Librarian;
2143                 slotptr(page, idx)->dead = 1;
2144
2145                 // set up the slot
2146
2147                 slotptr(page, ++idx)->off = nxt;
2148                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2149
2150                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2151                         page->act++;
2152         }
2153
2154         page->min = nxt;
2155         page->cnt = idx;
2156         free (frame);
2157
2158         //      see if page has enough space now, or does it need splitting?
2159
2160         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2161                 return newslot;
2162
2163         return 0;
2164 }
2165
2166 // split the root and raise the height of the btree
2167
2168 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2169 {  
2170 unsigned char leftkey[BT_keyarray];
2171 uint nxt = mgr->page_size;
2172 unsigned char value[BtId];
2173 BtPageSet left[1];
2174 uid left_page_no;
2175 BtKey *ptr;
2176 BtVal *val;
2177
2178         //      save left page fence key for new root
2179
2180         ptr = keyptr(root->page, root->page->cnt);
2181         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2182
2183         //  Obtain an empty page to use, and copy the current
2184         //  root contents into it, e.g. lower keys
2185
2186         if( bt_newpage(mgr, left, root->page, page_no) )
2187                 return mgr->err;
2188
2189         left_page_no = left->latch->page_no;
2190         bt_unpinlatch (mgr, left->latch);
2191
2192         // preserve the page info at the bottom
2193         // of higher keys and set rest to zero
2194
2195         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2196
2197         // insert stopper key at top of newroot page
2198         // and increase the root height
2199
2200         nxt -= BtId + sizeof(BtVal);
2201         bt_putid (value, right->page_no);
2202         val = (BtVal *)((unsigned char *)root->page + nxt);
2203         memcpy (val->value, value, BtId);
2204         val->len = BtId;
2205
2206         nxt -= 2 + sizeof(BtKey);
2207         slotptr(root->page, 2)->off = nxt;
2208         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2209         ptr->len = 2;
2210         ptr->key[0] = 0xff;
2211         ptr->key[1] = 0xff;
2212
2213         // insert lower keys page fence key on newroot page as first key
2214
2215         nxt -= BtId + sizeof(BtVal);
2216         bt_putid (value, left_page_no);
2217         val = (BtVal *)((unsigned char *)root->page + nxt);
2218         memcpy (val->value, value, BtId);
2219         val->len = BtId;
2220
2221         ptr = (BtKey *)leftkey;
2222         nxt -= ptr->len + sizeof(BtKey);
2223         slotptr(root->page, 1)->off = nxt;
2224         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2225         
2226         bt_putid(root->page->right, 0);
2227         root->page->min = nxt;          // reset lowest used offset and key count
2228         root->page->cnt = 2;
2229         root->page->act = 2;
2230         root->page->lvl++;
2231
2232         mgr->pagezero->alloc->lvl = root->page->lvl;
2233
2234         // release and unpin root pages
2235
2236         bt_unlockpage(BtLockWrite, root->latch);
2237         bt_unpinlatch (mgr, root->latch);
2238
2239         bt_unpinlatch (mgr, right);
2240         return 0;
2241 }
2242
2243 //  split already locked full node
2244 //      leave it locked.
2245 //      return pool entry for new right
2246 //      page, unlocked
2247
2248 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2249 {
2250 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2251 BtPage frame = malloc (mgr->page_size);
2252 uint lvl = set->page->lvl;
2253 BtPageSet right[1];
2254 BtKey *key, *ptr;
2255 BtVal *val, *src;
2256 uid right2;
2257 uint prev;
2258
2259         //  split higher half of keys to frame
2260
2261         memset (frame, 0, mgr->page_size);
2262         max = set->page->cnt;
2263         cnt = max / 2;
2264         idx = 0;
2265
2266         while( cnt++ < max ) {
2267                 if( cnt < max || set->page->lvl )
2268                   if( slotptr(set->page, cnt)->dead )
2269                         continue;
2270
2271                 src = valptr(set->page, cnt);
2272                 nxt -= src->len + sizeof(BtVal);
2273                 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2274
2275                 key = keyptr(set->page, cnt);
2276                 nxt -= key->len + sizeof(BtKey);
2277                 ptr = (BtKey*)((unsigned char *)frame + nxt);
2278                 memcpy (ptr, key, key->len + sizeof(BtKey));
2279
2280                 //      add librarian slot
2281
2282                 slotptr(frame, ++idx)->off = nxt;
2283                 slotptr(frame, idx)->type = Librarian;
2284                 slotptr(frame, idx)->dead = 1;
2285
2286                 //  add actual slot
2287
2288                 slotptr(frame, ++idx)->off = nxt;
2289                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2290
2291                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2292                         frame->act++;
2293         }
2294
2295         frame->bits = mgr->page_bits;
2296         frame->min = nxt;
2297         frame->cnt = idx;
2298         frame->lvl = lvl;
2299
2300         // link right node
2301
2302         if( set->latch->page_no > ROOT_page )
2303                 bt_putid (frame->right, bt_getid (set->page->right));
2304
2305         //      get new free page and write higher keys to it.
2306
2307         if( bt_newpage(mgr, right, frame, thread_no) )
2308                 return 0;
2309
2310         // process lower keys
2311
2312         memcpy (frame, set->page, mgr->page_size);
2313         memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2314         set->latch->dirty = 1;
2315
2316         nxt = mgr->page_size;
2317         set->page->garbage = 0;
2318         set->page->act = 0;
2319         max /= 2;
2320         cnt = 0;
2321         idx = 0;
2322
2323         if( slotptr(frame, max)->type == Librarian )
2324                 max--;
2325
2326         //  assemble page of smaller keys
2327
2328         while( cnt++ < max ) {
2329                 if( slotptr(frame, cnt)->dead )
2330                         continue;
2331                 val = valptr(frame, cnt);
2332                 nxt -= val->len + sizeof(BtVal);
2333                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2334
2335                 key = keyptr(frame, cnt);
2336                 nxt -= key->len + sizeof(BtKey);
2337                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2338
2339                 //      add librarian slot
2340
2341                 slotptr(set->page, ++idx)->off = nxt;
2342                 slotptr(set->page, idx)->type = Librarian;
2343                 slotptr(set->page, idx)->dead = 1;
2344
2345                 //      add actual slot
2346
2347                 slotptr(set->page, ++idx)->off = nxt;
2348                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2349                 set->page->act++;
2350         }
2351
2352         bt_putid(set->page->right, right->latch->page_no);
2353         set->page->min = nxt;
2354         set->page->cnt = idx;
2355
2356         return right->latch->entry;
2357 }
2358
2359 //      fix keys for newly split page
2360 //      call with page locked, return
2361 //      unlocked
2362
2363 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2364 {
2365 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2366 unsigned char value[BtId];
2367 uint lvl = set->page->lvl;
2368 BtPage page;
2369 BtKey *ptr;
2370
2371         // if current page is the root page, split it
2372
2373         if( set->latch->page_no == ROOT_page )
2374                 return bt_splitroot (mgr, set, right, thread_no);
2375
2376         ptr = keyptr(set->page, set->page->cnt);
2377         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2378
2379         page = bt_mappage (mgr, right);
2380
2381         ptr = keyptr(page, page->cnt);
2382         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2383
2384         // insert new fences in their parent pages
2385
2386         bt_lockpage (BtLockParent, right, thread_no);
2387
2388         bt_lockpage (BtLockParent, set->latch, thread_no);
2389         bt_unlockpage (BtLockWrite, set->latch);
2390
2391         // insert new fence for reformulated left block of smaller keys
2392
2393         bt_putid (value, set->latch->page_no);
2394         ptr = (BtKey *)leftkey;
2395
2396         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2397                 return mgr->err;
2398
2399         // switch fence for right block of larger keys to new right page
2400
2401         bt_putid (value, right->page_no);
2402         ptr = (BtKey *)rightkey;
2403
2404         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2405                 return mgr->err;
2406
2407         bt_unlockpage (BtLockParent, set->latch);
2408         bt_unpinlatch (mgr, set->latch);
2409
2410         bt_unlockpage (BtLockParent, right);
2411         bt_unpinlatch (mgr, right);
2412         return 0;
2413 }
2414
2415 //      install new key and value onto page
2416 //      page must already be checked for
2417 //      adequate space
2418
2419 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2420 {
2421 uint idx, librarian;
2422 BtSlot *node;
2423 BtKey *ptr;
2424 BtVal *val;
2425
2426         //      if found slot > desired slot and previous slot
2427         //      is a librarian slot, use it
2428
2429         if( slot > 1 )
2430           if( slotptr(set->page, slot-1)->type == Librarian )
2431                 slot--;
2432
2433         // copy value onto page
2434
2435         set->page->min -= vallen + sizeof(BtVal);
2436         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2437         memcpy (val->value, value, vallen);
2438         val->len = vallen;
2439
2440         // copy key onto page
2441
2442         set->page->min -= keylen + sizeof(BtKey);
2443         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2444         memcpy (ptr->key, key, keylen);
2445         ptr->len = keylen;
2446         
2447         //      find first empty slot
2448
2449         for( idx = slot; idx < set->page->cnt; idx++ )
2450           if( slotptr(set->page, idx)->dead )
2451                 break;
2452
2453         // now insert key into array before slot
2454
2455         if( idx == set->page->cnt )
2456                 idx += 2, set->page->cnt += 2, librarian = 2;
2457         else
2458                 librarian = 1;
2459
2460         set->latch->dirty = 1;
2461         set->page->act++;
2462
2463         while( idx > slot + librarian - 1 )
2464                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2465
2466         //      add librarian slot
2467
2468         if( librarian > 1 ) {
2469                 node = slotptr(set->page, slot++);
2470                 node->off = set->page->min;
2471                 node->type = Librarian;
2472                 node->dead = 1;
2473         }
2474
2475         //      fill in new slot
2476
2477         node = slotptr(set->page, slot);
2478         node->off = set->page->min;
2479         node->type = type;
2480         node->dead = 0;
2481
2482         if( release ) {
2483                 bt_unlockpage (BtLockWrite, set->latch);
2484                 bt_unpinlatch (mgr, set->latch);
2485         }
2486
2487         return 0;
2488 }
2489
2490 //  Insert new key into the btree at given level.
2491 //      either add a new key or update/add an existing one
2492
2493 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2494 {
2495 uint slot, idx, len, entry;
2496 BtPageSet set[1];
2497 BtSlot *node;
2498 BtKey *ptr;
2499 BtVal *val;
2500
2501   while( 1 ) { // find the page and slot for the current key
2502         if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2503                 node = slotptr(set->page, slot);
2504                 ptr = keyptr(set->page, slot);
2505         } else {
2506                 if( !mgr->err )
2507                         mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2508                 return mgr->err;
2509         }
2510
2511         // if librarian slot == found slot, advance to real slot
2512
2513         if( node->type == Librarian )
2514           if( !keycmp (ptr, key, keylen) ) {
2515                 ptr = keyptr(set->page, ++slot);
2516                 node = slotptr(set->page, slot);
2517           }
2518
2519         //  if inserting a duplicate key or unique
2520         //      key that doesn't exist on the page,
2521         //      check for adequate space on the page
2522         //      and insert the new key before slot.
2523
2524         switch( type ) {
2525         case Unique:
2526         case Duplicate:
2527           if( keycmp (ptr, key, keylen) )
2528            if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2529             return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
2530            else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2531                 return mgr->err;
2532            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2533                 return mgr->err;
2534            else
2535                 continue;
2536
2537           // if key already exists, update value and return
2538
2539           val = valptr(set->page, slot);
2540
2541           if( val->len >= vallen ) {
2542                 if( slotptr(set->page, slot)->dead )
2543                         set->page->act++;
2544                 node->type = type;
2545                 node->dead = 0;
2546
2547                 set->page->garbage += val->len - vallen;
2548                 set->latch->dirty = 1;
2549                 val->len = vallen;
2550                 memcpy (val->value, value, vallen);
2551                 bt_unlockpage(BtLockWrite, set->latch);
2552                 bt_unpinlatch (mgr, set->latch);
2553                 return 0;
2554           }
2555
2556           //  new update value doesn't fit in existing value area
2557           //    make sure page has room
2558
2559           if( !node->dead )
2560                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2561           else
2562                 set->page->act++;
2563
2564           node->type = type;
2565           node->dead = 0;
2566
2567           if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2568            if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2569                 return mgr->err;
2570            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2571                 return mgr->err;
2572            else
2573                 continue;
2574
2575           //  copy key and value onto page and update slot
2576
2577           set->page->min -= vallen + sizeof(BtVal);
2578           val = (BtVal*)((unsigned char *)set->page + set->page->min);
2579           memcpy (val->value, value, vallen);
2580           val->len = vallen;
2581
2582           set->latch->dirty = 1;
2583           set->page->min -= keylen + sizeof(BtKey);
2584           ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2585           memcpy (ptr->key, key, keylen);
2586           ptr->len = keylen;
2587         
2588           node->off = set->page->min;
2589           bt_unlockpage(BtLockWrite, set->latch);
2590           bt_unpinlatch (mgr, set->latch);
2591           return 0;
2592     }
2593   }
2594   return 0;
2595 }
2596
2597 typedef struct {
2598         logseqno reqlsn;        // redo log seq no required
2599         logseqno lsn;           // redo log sequence number
2600         uint entry;                     // latch table entry number
2601         uint slot:31;           // page slot number
2602         uint reuse:1;           // reused previous page
2603 } AtomicTxn;
2604
2605 typedef struct {
2606         uid page_no;            // page number for split leaf
2607         void *next;                     // next key to insert
2608         uint entry:29;          // latch table entry number
2609         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2610         uint nounlock:1;        // don't unlock ParentModification
2611         unsigned char leafkey[BT_keyarray];
2612 } AtomicKey;
2613
2614 //      determine actual page where key is located
2615 //  return slot number
2616
2617 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2618 {
2619 BtKey *key = keyptr(source,src);
2620 uint slot = locks[src].slot;
2621 uint entry;
2622
2623         if( src > 1 && locks[src].reuse )
2624           entry = locks[src-1].entry, slot = 0;
2625         else
2626           entry = locks[src].entry;
2627
2628         if( slot ) {
2629                 set->latch = mgr->latchsets + entry;
2630                 set->page = bt_mappage (mgr, set->latch);
2631                 return slot;
2632         }
2633
2634         //      is locks->reuse set? or was slot zeroed?
2635         //      if so, find where our key is located 
2636         //      on current page or pages split on
2637         //      same page txn operations.
2638
2639         do {
2640                 set->latch = mgr->latchsets + entry;
2641                 set->page = bt_mappage (mgr, set->latch);
2642
2643                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2644                   if( slotptr(set->page, slot)->type == Librarian )
2645                         slot++;
2646                   if( locks[src].reuse )
2647                         locks[src].entry = entry;
2648                   return slot;
2649                 }
2650         } while( entry = set->latch->split );
2651
2652         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2653         return 0;
2654 }
2655
2656 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
2657 {
2658 BtKey *key = keyptr(source, src);
2659 BtVal *val = valptr(source, src);
2660 BtLatchSet *latch;
2661 BtPageSet set[1];
2662 uint entry, slot;
2663
2664   while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2665         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2666           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2667                 return mgr->err;
2668           set->page->lsn = locks[src].lsn;
2669           return 0;
2670         }
2671
2672         if( entry = bt_splitpage (mgr, set, thread_no) )
2673           latch = mgr->latchsets + entry;
2674         else
2675           return mgr->err;
2676
2677         //      splice right page into split chain
2678         //      and WriteLock it.
2679
2680         bt_lockpage(BtLockWrite, latch, thread_no);
2681         latch->split = set->latch->split;
2682         set->latch->split = entry;
2683         locks[src].slot = 0;
2684   }
2685
2686   return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2687 }
2688
2689 //      perform delete from smaller btree
2690 //  insert a delete slot if not found there
2691
2692 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
2693 {
2694 BtKey *key = keyptr(source, src);
2695 BtPageSet set[1];
2696 uint idx, slot;
2697 BtSlot *node;
2698 BtKey *ptr;
2699 BtVal *val;
2700
2701         if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2702           node = slotptr(set->page, slot);
2703           ptr = keyptr(set->page, slot);
2704           val = valptr(set->page, slot);
2705         } else
2706           return mgr->line = __LINE__, mgr->err = BTERR_struct;
2707
2708         //  if slot is not found, insert a delete slot
2709
2710         if( keycmp (ptr, key->key, key->len) )
2711           return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
2712
2713         //      if node is already dead,
2714         //      ignore the request.
2715
2716         if( node->dead )
2717                 return 0;
2718
2719         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2720         set->latch->dirty = 1;
2721         set->page->lsn = locks[src].lsn;
2722         set->page->act--;
2723
2724         node->dead = 0;
2725         mgr->found++;
2726         return 0;
2727 }
2728
2729 //      delete an empty master page for a transaction
2730
2731 //      note that the far right page never empties because
2732 //      it always contains (at least) the infinite stopper key
2733 //      and that all pages that don't contain any keys are
2734 //      deleted, or are being held under Atomic lock.
2735
2736 BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no)
2737 {
2738 BtPageSet right[1], temp[1];
2739 unsigned char value[BtId];
2740 uid right_page_no;
2741 BtKey *ptr;
2742
2743         bt_lockpage(BtLockWrite, prev->latch, thread_no);
2744
2745         //      grab the right sibling
2746
2747         if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) )
2748                 right->page = bt_mappage (mgr, right->latch);
2749         else
2750                 return mgr->err;
2751
2752         bt_lockpage(BtLockAtomic, right->latch, thread_no);
2753         bt_lockpage(BtLockWrite, right->latch, thread_no);
2754
2755         //      and pull contents over empty page
2756         //      while preserving master's left link
2757
2758         memcpy (right->page->left, prev->page->left, BtId);
2759         memcpy (prev->page, right->page, mgr->page_size);
2760
2761         //      forward seekers to old right sibling
2762         //      to new page location in set
2763
2764         bt_putid (right->page->right, prev->latch->page_no);
2765         right->latch->dirty = 1;
2766         right->page->kill = 1;
2767
2768         //      remove pointer to right page for searchers by
2769         //      changing right fence key to point to the master page
2770
2771         ptr = keyptr(right->page,right->page->cnt);
2772         bt_putid (value, prev->latch->page_no);
2773
2774         if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
2775                 return mgr->err;
2776
2777         //  now that master page is in good shape we can
2778         //      remove its locks.
2779
2780         bt_unlockpage (BtLockAtomic, prev->latch);
2781         bt_unlockpage (BtLockWrite, prev->latch);
2782
2783         //  fix master's right sibling's left pointer
2784         //      to remove scanner's poiner to the right page
2785
2786         if( right_page_no = bt_getid (prev->page->right) ) {
2787           if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
2788                 temp->page = bt_mappage (mgr, temp->latch);
2789           else
2790                 return mgr->err;
2791
2792           bt_lockpage (BtLockWrite, temp->latch, thread_no);
2793           bt_putid (temp->page->left, prev->latch->page_no);
2794           temp->latch->dirty = 1;
2795
2796           bt_unlockpage (BtLockWrite, temp->latch);
2797           bt_unpinlatch (mgr, temp->latch);
2798         } else {        // master is now the far right page
2799           bt_mutexlock (mgr->lock);
2800           bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
2801           bt_releasemutex(mgr->lock);
2802         }
2803
2804         //      now that there are no pointers to the right page
2805         //      we can delete it after the last read access occurs
2806
2807         bt_unlockpage (BtLockWrite, right->latch);
2808         bt_unlockpage (BtLockAtomic, right->latch);
2809         bt_lockpage (BtLockDelete, right->latch, thread_no);
2810         bt_lockpage (BtLockWrite, right->latch, thread_no);
2811         bt_freepage (mgr, right);
2812         return 0;
2813 }
2814
2815 //      atomic modification of a batch of keys.
2816
2817 //      return -1 if BTERR is set
2818 //      otherwise return slot number
2819 //      causing the key constraint violation
2820 //      or zero on successful completion.
2821
2822 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2823 {
2824 uint src, idx, slot, samepage, entry, que = 0;
2825 AtomicKey *head, *tail, *leaf;
2826 BtPageSet set[1], prev[1];
2827 unsigned char value[BtId];
2828 BtKey *key, *ptr, *key2;
2829 BtLatchSet *latch;
2830 AtomicTxn *locks;
2831 int result = 0;
2832 BtSlot temp[1];
2833 logseqno lsn;
2834 BtPage page;
2835 BtVal *val;
2836 uid right;
2837 int type;
2838
2839   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2840   head = NULL;
2841   tail = NULL;
2842
2843   // stable sort the list of keys into order to
2844   //    prevent deadlocks between threads.
2845
2846   for( src = 1; src++ < source->cnt; ) {
2847         *temp = *slotptr(source,src);
2848         key = keyptr (source,src);
2849
2850         for( idx = src; --idx; ) {
2851           key2 = keyptr (source,idx);
2852           if( keycmp (key, key2->key, key2->len) < 0 ) {
2853                 *slotptr(source,idx+1) = *slotptr(source,idx);
2854                 *slotptr(source,idx) = *temp;
2855           } else
2856                 break;
2857         }
2858   }
2859
2860   //  add entries to redo log
2861
2862   if( bt->mgr->redopages )
2863    if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
2864     for( src = 0; src++ < source->cnt; )
2865          locks[src].lsn = lsn;
2866     else
2867          return bt->mgr->err;
2868
2869   // Load the leaf page for each key
2870   // group same page references with reuse bit
2871   // and determine any constraint violations
2872
2873   for( src = 0; src++ < source->cnt; ) {
2874         key = keyptr(source, src);
2875         slot = 0;
2876
2877         // first determine if this modification falls
2878         // on the same page as the previous modification
2879         //      note that the far right leaf page is a special case
2880
2881         if( samepage = src > 1 )
2882           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2883                 slot = bt_findslot(set->page, key->key, key->len);
2884           else
2885                 bt_unlockpage(BtLockRead, set->latch); 
2886
2887         if( !slot )
2888           if( slot = bt_loadpage(bt->mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, bt->thread_no) )
2889                 set->latch->split = 0;
2890           else
2891                 return bt->mgr->err;
2892
2893         if( slotptr(set->page, slot)->type == Librarian )
2894           ptr = keyptr(set->page, ++slot);
2895         else
2896           ptr = keyptr(set->page, slot);
2897
2898         if( !samepage ) {
2899           locks[src].entry = set->latch->entry;
2900           locks[src].slot = slot;
2901           locks[src].reuse = 0;
2902         } else {
2903           locks[src].entry = 0;
2904           locks[src].slot = 0;
2905           locks[src].reuse = 1;
2906         }
2907
2908         //      capture current lsn for master page
2909
2910         locks[src].reqlsn = set->page->lsn;
2911   }
2912
2913   //  unlock last loadpage lock
2914
2915   if( source->cnt )
2916         bt_unlockpage(BtLockRead, set->latch);
2917
2918   //  obtain write lock for each master page
2919   //  sync flushed pages to disk
2920
2921   for( src = 0; src++ < source->cnt; ) {
2922         if( locks[src].reuse )
2923           continue;
2924
2925         set->latch = bt->mgr->latchsets + locks[src].entry;
2926         bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
2927   }
2928
2929   // insert or delete each key
2930   // process any splits or merges
2931   // release Write & Atomic latches
2932   // set ParentModifications and build
2933   // queue of keys to insert for split pages
2934   // or delete for deleted pages.
2935
2936   // run through txn list backwards
2937
2938   samepage = source->cnt + 1;
2939
2940   for( src = source->cnt; src; src-- ) {
2941         if( locks[src].reuse )
2942           continue;
2943
2944         //  perform the txn operations
2945         //      from smaller to larger on
2946         //  the same page
2947
2948         for( idx = src; idx < samepage; idx++ )
2949          switch( slotptr(source,idx)->type ) {
2950          case Delete:
2951           if( bt_atomicdelete (bt->mgr, source, locks, idx, bt->thread_no) )
2952                 return bt->mgr->err;
2953           break;
2954
2955         case Duplicate:
2956         case Unique:
2957           if( bt_atomicinsert (bt->mgr, source, locks, idx, bt->thread_no) )
2958                 return bt->mgr->err;
2959           break;
2960         }
2961
2962         //      after the same page operations have finished,
2963         //  process master page for splits or deletion.
2964
2965         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2966         prev->page = bt_mappage (bt->mgr, prev->latch);
2967         samepage = src;
2968
2969         //  pick-up all splits from master page
2970         //      each one is already WriteLocked.
2971
2972         entry = prev->latch->split;
2973
2974         while( entry ) {
2975           set->latch = bt->mgr->latchsets + entry;
2976           set->page = bt_mappage (bt->mgr, set->latch);
2977           entry = set->latch->split;
2978
2979           // delete empty master page by undoing its split
2980           //  (this is potentially another empty page)
2981           //  note that there are no new left pointers yet
2982
2983           if( !prev->page->act ) {
2984                 memcpy (set->page->left, prev->page->left, BtId);
2985                 memcpy (prev->page, set->page, bt->mgr->page_size);
2986                 bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
2987                 bt_freepage (bt->mgr, set);
2988
2989                 prev->latch->split = set->latch->split;
2990                 prev->latch->dirty = 1;
2991                 continue;
2992           }
2993
2994           // remove empty page from the split chain
2995           // and return it to the free list.
2996
2997           if( !set->page->act ) {
2998                 memcpy (prev->page->right, set->page->right, BtId);
2999                 prev->latch->split = set->latch->split;
3000                 bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
3001                 bt_freepage (bt->mgr, set);
3002                 continue;
3003           }
3004
3005           //  schedule prev fence key update
3006
3007           ptr = keyptr(prev->page,prev->page->cnt);
3008           leaf = calloc (sizeof(AtomicKey), 1), que++;
3009
3010           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3011           leaf->page_no = prev->latch->page_no;
3012           leaf->entry = prev->latch->entry;
3013           leaf->type = 0;
3014
3015           if( tail )
3016                 tail->next = leaf;
3017           else
3018                 head = leaf;
3019
3020           tail = leaf;
3021
3022           // splice in the left link into the split page
3023
3024           bt_putid (set->page->left, prev->latch->page_no);
3025           bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
3026           bt_unlockpage(BtLockWrite, prev->latch);
3027           *prev = *set;
3028         }
3029
3030         //  update left pointer in next right page from last split page
3031         //      (if all splits were reversed, latch->split == 0)
3032
3033         if( latch->split ) {
3034           //  fix left pointer in master's original (now split)
3035           //  far right sibling or set rightmost page in page zero
3036
3037           if( right = bt_getid (prev->page->right) ) {
3038                 if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3039                   set->page = bt_mappage (bt->mgr, set->latch);
3040                 else
3041                   return bt->mgr->err;
3042
3043             bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3044             bt_putid (set->page->left, prev->latch->page_no);
3045                 set->latch->dirty = 1;
3046             bt_unlockpage (BtLockWrite, set->latch);
3047                 bt_unpinlatch (bt->mgr, set->latch);
3048           } else {      // prev is rightmost page
3049             bt_mutexlock (bt->mgr->lock);
3050                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
3051             bt_releasemutex(bt->mgr->lock);
3052           }
3053
3054           //  Process last page split in chain
3055
3056           ptr = keyptr(prev->page,prev->page->cnt);
3057           leaf = calloc (sizeof(AtomicKey), 1), que++;
3058
3059           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3060           leaf->page_no = prev->latch->page_no;
3061           leaf->entry = prev->latch->entry;
3062           leaf->type = 0;
3063   
3064           if( tail )
3065                 tail->next = leaf;
3066           else
3067                 head = leaf;
3068
3069           tail = leaf;
3070
3071           bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
3072           bt_unlockpage(BtLockWrite, prev->latch);
3073
3074           //  remove atomic lock on master page
3075
3076           bt_unlockpage(BtLockAtomic, latch);
3077           continue;
3078         }
3079
3080         //  finished if prev page occupied (either master or final split)
3081
3082         if( prev->page->act ) {
3083           bt_unlockpage(BtLockWrite, latch);
3084           bt_unlockpage(BtLockAtomic, latch);
3085           bt_unpinlatch(bt->mgr, latch);
3086           continue;
3087         }
3088
3089         // any and all splits were reversed, and the
3090         // master page located in prev is empty, delete it
3091         // by pulling over master's right sibling.
3092
3093         // Remove empty master's fence key
3094
3095         ptr = keyptr(prev->page,prev->page->cnt);
3096
3097         if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3098                 return bt->mgr->err;
3099
3100         //      perform the remainder of the delete
3101         //      from the FIFO queue
3102
3103         leaf = calloc (sizeof(AtomicKey), 1), que++;
3104
3105         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3106         leaf->page_no = prev->latch->page_no;
3107         leaf->entry = prev->latch->entry;
3108         leaf->nounlock = 1;
3109         leaf->type = 2;
3110   
3111         if( tail )
3112           tail->next = leaf;
3113         else
3114           head = leaf;
3115
3116         tail = leaf;
3117
3118         //      leave atomic lock in place until
3119         //      deletion completes in next phase.
3120
3121         bt_unlockpage(BtLockWrite, prev->latch);
3122   }
3123
3124   //  add & delete keys for any pages split or merged during transaction
3125
3126   if( leaf = head )
3127     do {
3128           set->latch = bt->mgr->latchsets + leaf->entry;
3129           set->page = bt_mappage (bt->mgr, set->latch);
3130
3131           bt_putid (value, leaf->page_no);
3132           ptr = (BtKey *)leaf->leafkey;
3133
3134           switch( leaf->type ) {
3135           case 0:       // insert key
3136             if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, Unique, bt->thread_no) )
3137                   return bt->mgr->err;
3138
3139                 break;
3140
3141           case 1:       // delete key
3142                 if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3143                   return bt->mgr->err;
3144
3145                 break;
3146
3147           case 2:       // free page
3148                 if( bt_atomicfree (bt->mgr, set, bt->thread_no) )
3149                   return bt->mgr->err;
3150
3151                 break;
3152           }
3153
3154           if( !leaf->nounlock )
3155             bt_unlockpage (BtLockParent, set->latch);
3156
3157           bt_unpinlatch (bt->mgr, set->latch);
3158           tail = leaf->next;
3159           free (leaf);
3160         } while( leaf = tail );
3161
3162   // if number of active pages
3163   // is greater than the buffer pool
3164   // promote page into larger btree
3165
3166   while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
3167         if( bt_txnpromote (bt) )
3168           return bt->mgr->err;
3169
3170   // return success
3171
3172   free (locks);
3173   return 0;
3174 }
3175
3176 //  promote a page into the larger btree
3177
3178 BTERR bt_txnpromote (BtDb *bt)
3179 {
3180 uint entry, slot, idx;
3181 BtPageSet set[1];
3182 BtSlot *node;
3183 BtKey *ptr;
3184 BtVal *val;
3185
3186   while( 1 ) {
3187 #ifdef unix
3188         entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
3189 #else
3190         entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
3191 #endif
3192         entry %= bt->mgr->latchtotal;
3193
3194         if( !entry )
3195                 continue;
3196
3197         set->latch = bt->mgr->latchsets + entry;
3198         idx = set->latch->page_no % bt->mgr->latchhash;
3199
3200         if( !bt_mutextry(set->latch->modify) )
3201                 continue;
3202
3203 //    if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
3204 //              bt_releasemutex(set->latch->modify);
3205 //              continue;
3206 //      }
3207
3208         //  skip this entry if it is pinned
3209
3210         if( set->latch->pin & ~CLOCK_bit ) {
3211           bt_releasemutex(set->latch->modify);
3212 //      bt_releasemutex(bt->mgr->hashtable[idx].latch);
3213           continue;
3214         }
3215
3216         set->page = bt_mappage (bt->mgr, set->latch);
3217
3218         // entry never used or has no right sibling
3219
3220         if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3221           bt_releasemutex(set->latch->modify);
3222 //      bt_releasemutex(bt->mgr->hashtable[idx].latch);
3223           continue;
3224         }
3225
3226         bt_lockpage (BtLockAccess, set->latch, bt->thread_no);
3227         bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3228     bt_unlockpage(BtLockAccess, set->latch);
3229
3230         // entry interiour node or being or was killed
3231
3232         if( set->page->lvl || set->page->free || set->page->kill ) {
3233           bt_releasemutex(set->latch->modify);
3234 //      bt_releasemutex(bt->mgr->hashtable[idx].latch);
3235       bt_unlockpage(BtLockWrite, set->latch);
3236           continue;
3237         }
3238
3239         //  pin the page for our useage
3240
3241         set->latch->pin++;
3242         bt_releasemutex(set->latch->modify);
3243 //    bt_releasemutex(bt->mgr->hashtable[idx].latch);
3244
3245         //      if page is dirty, then
3246         //      sync it to the disk first.
3247
3248         if( set->latch->dirty )
3249          if( bt->mgr->err = bt_writepage (bt->mgr, set->page, set->latch->page_no, 1) )
3250           return bt->mgr->line = __LINE__, bt->mgr->err;
3251          else
3252           set->latch->dirty = 0;
3253
3254         // transfer slots in our selected page to larger btree
3255 if( !(set->latch->page_no % 100) )
3256 fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cnt);
3257
3258         for( slot = 0; slot++ < set->page->cnt; ) {
3259                 ptr = keyptr (set->page, slot);
3260                 val = valptr (set->page, slot);
3261                 node = slotptr(set->page, slot);
3262
3263                 switch( node->type ) {
3264                 case Duplicate:
3265                 case Unique:
3266                   if( bt_insertkey (bt->main, ptr->key, ptr->len, 0, val->value, val->len, node->type, bt->thread_no) )
3267                         return bt->main->err;
3268
3269                   continue;
3270
3271                 case Delete:
3272                   if( bt_deletekey (bt->main, ptr->key, ptr->len, 0, bt->thread_no) )
3273                         return bt->main->err;
3274
3275                   continue;
3276                 }
3277         }
3278
3279         //  now delete the page
3280
3281         if( bt_deletepage (bt->mgr, set, bt->thread_no) )
3282                 return bt->mgr->err;
3283
3284         bt_unpinlatch (bt->mgr, set->latch);
3285         return 0;
3286   }
3287 }
3288
3289 //      set cursor to highest slot on highest page
3290
3291 uint bt_lastkey (BtDb *bt)
3292 {
3293 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3294 BtPageSet set[1];
3295
3296         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3297                 set->page = bt_mappage (bt->mgr, set->latch);
3298         else
3299                 return 0;
3300
3301     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3302         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3303     bt_unlockpage(BtLockRead, set->latch);
3304         bt_unpinlatch (bt->mgr, set->latch);
3305         return bt->cursor->cnt;
3306 }
3307
3308 //      return previous slot on cursor page
3309
3310 uint bt_prevkey (BtDb *bt, uint slot)
3311 {
3312 uid cursor_page = bt->cursor->page_no;
3313 uid ourright, next, us = cursor_page;
3314 BtPageSet set[1];
3315
3316         if( --slot )
3317                 return slot;
3318
3319         ourright = bt_getid(bt->cursor->right);
3320
3321 goleft:
3322         if( !(next = bt_getid(bt->cursor->left)) )
3323                 return 0;
3324
3325 findourself:
3326         cursor_page = next;
3327
3328         if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3329                 set->page = bt_mappage (bt->mgr, set->latch);
3330         else
3331                 return 0;
3332
3333     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3334         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3335         bt_unlockpage(BtLockRead, set->latch);
3336         bt_unpinlatch (bt->mgr, set->latch);
3337         
3338         next = bt_getid (bt->cursor->right);
3339
3340         if( bt->cursor->kill )
3341                 goto findourself;
3342
3343         if( next != us )
3344           if( next == ourright )
3345                 goto goleft;
3346           else
3347                 goto findourself;
3348
3349         return bt->cursor->cnt;
3350 }
3351
3352 //  return next slot on cursor page
3353 //  or slide cursor right into next page
3354
3355 uint bt_nextkey (BtDb *bt, uint slot)
3356 {
3357 BtPageSet set[1];
3358 uid right;
3359
3360   do {
3361         right = bt_getid(bt->cursor->right);
3362
3363         while( slot++ < bt->cursor->cnt )
3364           if( slotptr(bt->cursor,slot)->dead )
3365                 continue;
3366           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3367                 return slot;
3368           else
3369                 break;
3370
3371         if( !right )
3372                 break;
3373
3374         if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3375                 set->page = bt_mappage (bt->mgr, set->latch);
3376         else
3377                 return 0;
3378
3379     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3380         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3381         bt_unlockpage(BtLockRead, set->latch);
3382         bt_unpinlatch (bt->mgr, set->latch);
3383         slot = 0;
3384
3385   } while( 1 );
3386
3387   return bt->mgr->err = 0;
3388 }
3389
3390 //  cache page of keys into cursor and return starting slot for given key
3391
3392 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3393 {
3394 BtPageSet set[1];
3395 uint slot;
3396
3397         // cache page for retrieval
3398
3399         if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3400           memcpy (bt->cursor, set->page, bt->mgr->page_size);
3401         else
3402           return 0;
3403
3404         bt_unlockpage(BtLockRead, set->latch);
3405         bt_unpinlatch (bt->mgr, set->latch);
3406         return slot;
3407 }
3408
3409 #ifdef STANDALONE
3410
3411 #ifndef unix
3412 double getCpuTime(int type)
3413 {
3414 FILETIME crtime[1];
3415 FILETIME xittime[1];
3416 FILETIME systime[1];
3417 FILETIME usrtime[1];
3418 SYSTEMTIME timeconv[1];
3419 double ans = 0;
3420
3421         memset (timeconv, 0, sizeof(SYSTEMTIME));
3422
3423         switch( type ) {
3424         case 0:
3425                 GetSystemTimeAsFileTime (xittime);
3426                 FileTimeToSystemTime (xittime, timeconv);
3427                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3428                 break;
3429         case 1:
3430                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3431                 FileTimeToSystemTime (usrtime, timeconv);
3432                 break;
3433         case 2:
3434                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3435                 FileTimeToSystemTime (systime, timeconv);
3436                 break;
3437         }
3438
3439         ans += (double)timeconv->wHour * 3600;
3440         ans += (double)timeconv->wMinute * 60;
3441         ans += (double)timeconv->wSecond;
3442         ans += (double)timeconv->wMilliseconds / 1000;
3443         return ans;
3444 }
3445 #else
3446 #include <time.h>
3447 #include <sys/resource.h>
3448
3449 double getCpuTime(int type)
3450 {
3451 struct rusage used[1];
3452 struct timeval tv[1];
3453
3454         switch( type ) {
3455         case 0:
3456                 gettimeofday(tv, NULL);
3457                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3458
3459         case 1:
3460                 getrusage(RUSAGE_SELF, used);
3461                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3462
3463         case 2:
3464                 getrusage(RUSAGE_SELF, used);
3465                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3466         }
3467
3468         return 0;
3469 }
3470 #endif
3471
3472 void bt_poolaudit (BtMgr *mgr)
3473 {
3474 BtLatchSet *latch;
3475 uint entry = 0;
3476
3477         while( ++entry < mgr->latchtotal ) {
3478                 latch = mgr->latchsets + entry;
3479
3480                 if( *latch->readwr->rin & MASK )
3481                         fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3482
3483                 if( *latch->access->rin & MASK )
3484                         fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3485
3486 //              if( *latch->parent->xcl->value )
3487 //                      fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3488
3489 //              if( *latch->atomic->xcl->value )
3490 //                      fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3491
3492 //              if( *latch->modify->value )
3493 //                      fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3494
3495                 if( latch->pin & ~CLOCK_bit )
3496                         fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3497         }
3498 }
3499
3500 typedef struct {
3501         char idx;
3502         char *type;
3503         char *infile;
3504         BtMgr *main;
3505         BtMgr *mgr;
3506         int num;
3507 } ThreadArg;
3508
3509 //  standalone program to index file of keys
3510 //  then list them onto std-out
3511
3512 #ifdef unix
3513 void *index_file (void *arg)
3514 #else
3515 uint __stdcall index_file (void *arg)
3516 #endif
3517 {
3518 int line = 0, found = 0, cnt = 0, idx;
3519 uid next, page_no = LEAF_page;  // start on first page of leaves
3520 int ch, len = 0, slot, type = 0;
3521 unsigned char key[BT_maxkey];
3522 unsigned char txn[65536];
3523 ThreadArg *args = arg;
3524 BtPage page, frame;
3525 BtPageSet set[1];
3526 uint nxt = 65536;
3527 BtKey *ptr;
3528 BtVal *val;
3529 BtDb *bt;
3530 FILE *in;
3531
3532         bt = bt_open (args->mgr, args->main);
3533         page = (BtPage)txn;
3534
3535         if( args->idx < strlen (args->type) )
3536                 ch = args->type[args->idx];
3537         else
3538                 ch = args->type[strlen(args->type) - 1];
3539
3540         switch(ch | 0x20)
3541         {
3542         case 'd':
3543                 type = Delete;
3544
3545         case 'p':
3546                 if( !type )
3547                         type = Unique;
3548
3549                 if( args->num )
3550                  if( type == Delete )
3551                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3552                  else
3553                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3554                 else
3555                  if( type == Delete )
3556                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3557                  else
3558                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3559
3560                 if( in = fopen (args->infile, "rb") )
3561                   while( ch = getc(in), ch != EOF )
3562                         if( ch == '\n' )
3563                         {
3564                           line++;
3565
3566                           if( !args->num ) {
3567                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3568                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3569                             len = 0;
3570                                 continue;
3571                           }
3572
3573                           nxt -= len - 10;
3574                           memcpy (txn + nxt, key + 10, len - 10);
3575                           nxt -= 1;
3576                           txn[nxt] = len - 10;
3577                           nxt -= 10;
3578                           memcpy (txn + nxt, key, 10);
3579                           nxt -= 1;
3580                           txn[nxt] = 10;
3581                           slotptr(page,++cnt)->off  = nxt;
3582                           slotptr(page,cnt)->type = type;
3583                           len = 0;
3584
3585                           if( cnt < args->num )
3586                                 continue;
3587
3588                           page->cnt = cnt;
3589                           page->act = cnt;
3590                           page->min = nxt;
3591
3592                           if( bt_atomictxn (bt, page) )
3593                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3594                           nxt = sizeof(txn);
3595                           cnt = 0;
3596
3597                         }
3598                         else if( len < BT_maxkey )
3599                                 key[len++] = ch;
3600                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3601                 break;
3602
3603         case 'w':
3604                 fprintf(stderr, "started indexing for %s\n", args->infile);
3605                 if( in = fopen (args->infile, "r") )
3606                   while( ch = getc(in), ch != EOF )
3607                         if( ch == '\n' )
3608                         {
3609                           line++;
3610
3611                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3612                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3613                           len = 0;
3614                         }
3615                         else if( len < BT_maxkey )
3616                                 key[len++] = ch;
3617                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3618                 break;
3619
3620         case 'f':
3621                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3622                 if( in = fopen (args->infile, "rb") )
3623                   while( ch = getc(in), ch != EOF )
3624                         if( ch == '\n' )
3625                         {
3626                           line++;
3627                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3628                                 found++;
3629                           else if( bt->mgr->err )
3630                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3631                           len = 0;
3632                         }
3633                         else if( len < BT_maxkey )
3634                                 key[len++] = ch;
3635                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3636                 break;
3637
3638         case 's':
3639                 fprintf(stderr, "started scanning\n");
3640                 
3641                 do {
3642                         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3643                                 set->page = bt_mappage (bt->mgr, set->latch);
3644                         else
3645                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3646
3647                         bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3648                         next = bt_getid (set->page->right);
3649
3650                         for( slot = 0; slot++ < set->page->cnt; )
3651                          if( next || slot < set->page->cnt )
3652                           if( !slotptr(set->page, slot)->dead ) {
3653                                 ptr = keyptr(set->page, slot);
3654                                 len = ptr->len;
3655
3656                             if( slotptr(set->page, slot)->type == Duplicate )
3657                                         len -= BtId;
3658
3659                                 fwrite (ptr->key, len, 1, stdout);
3660                                 val = valptr(set->page, slot);
3661                                 fwrite (val->value, val->len, 1, stdout);
3662                                 fputc ('\n', stdout);
3663                                 cnt++;
3664                            }
3665
3666                         bt_unlockpage (BtLockRead, set->latch);
3667                         bt_unpinlatch (bt->mgr, set->latch);
3668                 } while( page_no = next );
3669
3670                 fprintf(stderr, " Total keys read %d\n", cnt);
3671                 break;
3672
3673         case 'r':
3674                 fprintf(stderr, "started reverse scan\n");
3675                 if( slot = bt_lastkey (bt) )
3676                    while( slot = bt_prevkey (bt, slot) ) {
3677                         if( slotptr(bt->cursor, slot)->dead )
3678                           continue;
3679
3680                         ptr = keyptr(bt->cursor, slot);
3681                         len = ptr->len;
3682
3683                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3684                                 len -= BtId;
3685
3686                         fwrite (ptr->key, len, 1, stdout);
3687                         val = valptr(bt->cursor, slot);
3688                         fwrite (val->value, val->len, 1, stdout);
3689                         fputc ('\n', stdout);
3690                         cnt++;
3691                   }
3692
3693                 fprintf(stderr, " Total keys read %d\n", cnt);
3694                 break;
3695
3696         case 'c':
3697 #ifdef unix
3698                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3699 #endif
3700                 fprintf(stderr, "started counting\n");
3701                 next = LEAF_page + bt->mgr->redopages + 1;
3702
3703                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3704                         if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3705                                 break;
3706
3707                         if( !bt->cursor->free && !bt->cursor->lvl )
3708                                 cnt += bt->cursor->act;
3709
3710                         bt->mgr->reads++;
3711                         page_no = next++;
3712                 }
3713                 
3714                 cnt--;  // remove stopper key
3715                 fprintf(stderr, " Total keys counted %d\n", cnt);
3716                 break;
3717         }
3718
3719         fprintf(stderr, "%d reads %d writes %d found\n", bt->mgr->reads, bt->mgr->writes, bt->mgr->found);
3720         bt_close (bt);
3721 #ifdef unix
3722         return NULL;
3723 #else
3724         return 0;
3725 #endif
3726 }
3727
3728 typedef struct timeval timer;
3729
3730 int main (int argc, char **argv)
3731 {
3732 int idx, cnt, len, slot, err;
3733 int segsize, bits = 16;
3734 double start, stop;
3735 #ifdef unix
3736 pthread_t *threads;
3737 #else
3738 HANDLE *threads;
3739 #endif
3740 ThreadArg *args;
3741 uint poolsize = 0;
3742 uint recovery = 0;
3743 uint mainpool = 0;
3744 uint mainbits = 0;
3745 float elapsed;
3746 int num = 0;
3747 char key[1];
3748 BtMgr *main;
3749 BtMgr *mgr;
3750 BtKey *ptr;
3751
3752         if( argc < 3 ) {
3753                 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3754                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
3755                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
3756                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3757                 fprintf (stderr, "  page_bits is the page size in bits for the cache btree\n");
3758                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3759                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3760                 fprintf (stderr, "  recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3761                 fprintf (stderr, "  main_bits is the page size of the main btree in bits\n");
3762                 fprintf (stderr, "  main_pool is the number of main pages in the main buffer pool\n");
3763                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3764                 exit(0);
3765         }
3766
3767         start = getCpuTime(0);
3768
3769         if( argc > 4 )
3770                 bits = atoi(argv[4]);
3771
3772         if( argc > 5 )
3773                 poolsize = atoi(argv[5]);
3774
3775         if( !poolsize )
3776                 fprintf (stderr, "Warning: no mapped_pool\n");
3777
3778         if( argc > 6 )
3779                 num = atoi(argv[6]);
3780
3781         if( argc > 7 )
3782                 recovery = atoi(argv[7]);
3783
3784         if( argc > 8 )
3785                 mainbits = atoi(argv[8]);
3786
3787         if( argc > 9 )
3788                 mainpool = atoi(argv[9]);
3789
3790         cnt = argc - 10;
3791 #ifdef unix
3792         threads = malloc (cnt * sizeof(pthread_t));
3793 #else
3794         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3795 #endif
3796         args = malloc ((cnt + 1) * sizeof(ThreadArg));
3797
3798         mgr = bt_mgr (argv[1], bits, poolsize, recovery);
3799
3800         if( !mgr ) {
3801                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3802                 exit (1);
3803         }
3804
3805         main = bt_mgr (argv[2], mainbits, mainpool, 0);
3806
3807         if( !main ) {
3808                 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3809                 exit (1);
3810         }
3811
3812         //      fire off threads
3813
3814         if( cnt )
3815           for( idx = 0; idx < cnt; idx++ ) {
3816                 args[idx].infile = argv[idx + 10];
3817                 args[idx].type = argv[3];
3818                 args[idx].main = main;
3819                 args[idx].mgr = mgr;
3820                 args[idx].num = num;
3821                 args[idx].idx = idx;
3822 #ifdef unix
3823                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3824                         fprintf(stderr, "Error creating thread %d\n", err);
3825 #else
3826                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3827 #endif
3828           }
3829         else {
3830                 args[0].infile = argv[idx + 10];
3831                 args[0].type = argv[3];
3832                 args[0].main = main;
3833                 args[0].mgr = mgr;
3834                 args[0].num = num;
3835                 args[0].idx = 0;
3836                 index_file (args);
3837         }
3838
3839         //      wait for termination
3840
3841 #ifdef unix
3842         for( idx = 0; idx < cnt; idx++ )
3843                 pthread_join (threads[idx], NULL);
3844 #else
3845         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3846
3847         for( idx = 0; idx < cnt; idx++ )
3848                 CloseHandle(threads[idx]);
3849 #endif
3850         bt_poolaudit(mgr);
3851         bt_poolaudit(main);
3852
3853         bt_mgrclose (main);
3854         bt_mgrclose (mgr);
3855
3856         elapsed = getCpuTime(0) - start;
3857         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3858         elapsed = getCpuTime(1);
3859         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3860         elapsed = getCpuTime(2);
3861         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3862 }
3863
3864 #endif  //STANDALONE