]> pd.if.org Git - btree/blob - threadskv9b.c
Release durable threadskv9b.c
[btree] / threadskv9b.c
1 // btree version threadskv9a sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      and redo log for failure recovery
10
11 // 05 OCT 2014
12
13 // author: karl malbrain, malbrain@cal.berkeley.edu
14
15 /*
16 This work, including the source code, documentation
17 and related data, is placed into the public domain.
18
19 The orginal author is Karl Malbrain.
20
21 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
22 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
23 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
24 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
25 RESULTING FROM THE USE, MODIFICATION, OR
26 REDISTRIBUTION OF THIS SOFTWARE.
27 */
28
29 // Please see the project home page for documentation
30 // code.google.com/p/high-concurrency-btree
31
32 #define _FILE_OFFSET_BITS 64
33 #define _LARGEFILE64_SOURCE
34
35 #ifdef linux
36 #define _GNU_SOURCE
37 #include <linux/futex.h>
38 #define SYS_futex 202
39 #endif
40
41 #ifdef unix
42 #include <unistd.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <fcntl.h>
46 #include <sys/time.h>
47 #include <sys/mman.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <limits.h>
51 #else
52 #define WIN32_LEAN_AND_MEAN
53 #include <windows.h>
54 #include <stdio.h>
55 #include <stdlib.h>
56 #include <time.h>
57 #include <fcntl.h>
58 #include <process.h>
59 #include <intrin.h>
60 #endif
61
62 #include <memory.h>
63 #include <string.h>
64 #include <stddef.h>
65
66 typedef unsigned long long      uid;
67 typedef unsigned long long      logseqno;
68
69 #ifndef unix
70 typedef unsigned long long      off64_t;
71 typedef unsigned short          ushort;
72 typedef unsigned int            uint;
73 #endif
74
75 #define BT_ro 0x6f72    // ro
76 #define BT_rw 0x7772    // rw
77
78 #define BT_maxbits              24                                      // maximum page size in bits
79 #define BT_minbits              9                                       // minimum page size in bits
80 #define BT_minpage              (1 << BT_minbits)       // minimum page size
81 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
82
83 //  BTree page number constants
84 #define ALLOC_page              0       // allocation page
85 #define ROOT_page               1       // root of the btree
86 #define LEAF_page               2       // first page of leaves
87 #define REDO_page               3       // first page of redo buffer
88
89 //      Number of levels to create in a new BTree
90
91 #define MIN_lvl                 2
92
93 /*
94 There are six lock types for each node in four independent sets: 
95 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
96 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
97 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
98 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
99 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
100 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
101 */
102
103 typedef enum{
104         BtLockAccess = 1,
105         BtLockDelete = 2,
106         BtLockRead   = 4,
107         BtLockWrite  = 8,
108         BtLockParent = 16,
109         BtLockAtomic = 32
110 } BtLock;
111
112 //      definition for phase-fair reader/writer lock implementation
113
114 typedef struct {
115         volatile ushort rin[1];
116         volatile ushort rout[1];
117         volatile ushort ticket[1];
118         volatile ushort serving[1];
119         ushort tid;
120         ushort dup;
121 } RWLock;
122
123 //      write only queue lock
124
125 typedef struct {
126         volatile ushort ticket[1];
127         volatile ushort serving[1];
128         ushort tid;
129         ushort dup;
130 } WOLock;
131
132 #define PHID 0x1
133 #define PRES 0x2
134 #define MASK 0x3
135 #define RINC 0x4
136
137 //      lite weight mutex
138
139 // exclusive is set for write access
140
141 volatile typedef struct {
142         unsigned char exclusive[1];
143         unsigned char filler;
144 } BtMutexLatch;
145
146 #define XCL 1
147
148 /*
149 typedef struct {
150   union {
151         struct {
152           uint xlock:1;         // owner has exclusive lock
153           uint filler:16;
154           uint wrt:15;          // count of writers waiting
155         } bits[1];
156         uint value[1];
157   };
158 } BtMutexLatch;
159 */
160
161 //      lite weight spin latch
162 /*
163 typedef struct {
164   union {
165         struct {
166           uint xlock:1;         // writer has exclusive lock
167           uint share:15;        // count of readers with lock
168           uint read:1;          // readers are waiting
169           uint wrt:15;          // count of writers waiting
170         } bits[1];
171         uint value[1];
172   };
173 } BtSpinLatch;
174
175
176 //      mode & definition for lite latch implementation
177
178 enum {
179         QueRd = 1,              // reader queue
180         QueWr = 2               // writer queue
181 } RWQueue;
182
183 #define XCL 1
184 #define SHARE 2
185 #define READ (SHARE * 32768)
186 #define WRT (READ * 2)
187 */
188 //  hash table entries
189
190 typedef struct {
191         volatile uint slot;             // Latch table entry at head of chain
192         BtMutexLatch latch[1];
193 } BtHashEntry;
194
195 //      latch manager table structure
196
197 typedef struct {
198         uid page_no;                    // latch set page number
199         RWLock readwr[1];               // read/write page lock
200         RWLock access[1];               // Access Intent/Page delete
201         WOLock parent[1];               // Posting of fence key in parent
202         WOLock atomic[1];               // Atomic update in progress
203         uint split;                             // right split page atomic insert
204         uint entry;                             // entry slot in latch table
205         uint next;                              // next entry in hash table chain
206         uint prev;                              // prev entry in hash table chain
207         ushort pin[1];                  // number of outstanding threads
208         ushort dirty:1;                 // page in cache is dirty
209 } BtLatchSet;
210
211 //      Define the length of the page record numbers
212
213 #define BtId 6
214
215 //      Page key slot definition.
216
217 //      Keys are marked dead, but remain on the page until
218 //      it cleanup is called. The fence key (highest key) for
219 //      a leaf page is always present, even after cleanup.
220
221 //      Slot types
222
223 //      In addition to the Unique keys that occupy slots
224 //      there are Librarian and Duplicate key
225 //      slots occupying the key slot array.
226
227 //      The Librarian slots are dead keys that
228 //      serve as filler, available to add new Unique
229 //      or Dup slots that are inserted into the B-tree.
230
231 //      The Duplicate slots have had their key bytes extended
232 //      by 6 bytes to contain a binary duplicate key uniqueifier.
233
234 typedef enum {
235         Unique,
236         Librarian,
237         Duplicate,
238         Delete,
239         Update
240 } BtSlotType;
241
242 typedef struct {
243         uint off:BT_maxbits;    // page offset for key start
244         uint type:3;                    // type of slot
245         uint dead:1;                    // set for deleted slot
246 } BtSlot;
247
248 //      The key structure occupies space at the upper end of
249 //      each page.  It's a length byte followed by the key
250 //      bytes.
251
252 typedef struct {
253         unsigned char len;              // this can be changed to a ushort or uint
254         unsigned char key[0];
255 } BtKey;
256
257 //      the value structure also occupies space at the upper
258 //      end of the page. Each key is immediately followed by a value.
259
260 typedef struct {
261         unsigned char len;              // this can be changed to a ushort or uint
262         unsigned char value[0];
263 } BtVal;
264
265 #define BT_maxkey       255             // maximum number of bytes in a key
266 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
267
268 //      The first part of an index page.
269 //      It is immediately followed
270 //      by the BtSlot array of keys.
271
272 //      note that this structure size
273 //      must be a multiple of 8 bytes
274 //      in order to place dups correctly.
275
276 typedef struct BtPage_ {
277         uint cnt;                                       // count of keys in page
278         uint act;                                       // count of active keys
279         uint min;                                       // next key offset
280         uint garbage;                           // page garbage in bytes
281         unsigned char bits:7;           // page size in bits
282         unsigned char free:1;           // page is on free chain
283         unsigned char lvl:7;            // level of page
284         unsigned char kill:1;           // page is being deleted
285         unsigned char right[BtId];      // page number to right
286         unsigned char left[BtId];       // page number to left
287         unsigned char filler[2];        // padding to multiple of 8
288         logseqno lsn;                           // log sequence number applied
289 } *BtPage;
290
291 //  The loadpage interface object
292
293 typedef struct {
294         BtPage page;            // current page pointer
295         BtLatchSet *latch;      // current page latch set
296 } BtPageSet;
297
298 //      structure for latch manager on ALLOC_page
299
300 typedef struct {
301         struct BtPage_ alloc[1];        // next page_no in right ptr
302         unsigned long long dups[1];     // global duplicate key uniqueifier
303         unsigned char chain[BtId];      // head of free page_nos chain
304 } BtPageZero;
305
306 //      The object structure for Btree access
307
308 typedef struct {
309         uint page_size;                         // page size    
310         uint page_bits;                         // page size in bits    
311 #ifdef unix
312         int idx;
313 #else
314         HANDLE idx;
315 #endif
316         BtPageZero *pagezero;           // mapped allocation page
317         BtHashEntry *hashtable;         // the buffer pool hash table entries
318         BtLatchSet *latchsets;          // mapped latch set from buffer pool
319         unsigned char *pagepool;        // mapped to the buffer pool pages
320         unsigned char *redobuff;        // mapped recovery buffer pointer
321         logseqno lsn, flushlsn;         // current & first lsn flushed
322         BtMutexLatch redo[1];           // redo area lite latch
323         BtMutexLatch lock[1];           // allocation area lite latch
324         BtMutexLatch maps[1];           // mapping segments lite latch
325         ushort thread_no[1];            // next thread number
326         uint latchdeployed;                     // highest number of latch entries deployed
327         uint nlatchpage;                        // number of latch pages at BT_latch
328         uint latchtotal;                        // number of page latch entries
329         uint latchhash;                         // number of latch hash table slots
330         uint latchvictim;                       // next latch entry to examine
331         uint redopages;                         // size of recovery buff in pages
332         uint redolast;                          // last msync size of recovery buff
333         uint redoend;                           // eof/end element in recovery buff
334 #ifndef unix
335         HANDLE halloc;                          // allocation handle
336         HANDLE hpool;                           // buffer pool handle
337 #endif
338         uint segments;                          // number of memory mapped segments
339         unsigned char *pages[64000];// memory mapped segments of b-tree
340 } BtMgr;
341
342 typedef struct {
343         BtMgr *mgr;                             // buffer manager for thread
344         BtPage cursor;                  // cached frame for start/next (never mapped)
345         BtPage frame;                   // spare frame for the page split (never mapped)
346         uid cursor_page;                                // current cursor page number   
347         unsigned char *mem;                             // frame, cursor, page memory buffer
348         unsigned char key[BT_keyarray]; // last found complete key
349         int found;                              // last delete or insert was found
350         int err;                                // last error
351         int line;                               // last error line no
352         int reads, writes;              // number of reads and writes from the btree
353         ushort thread_no;               // thread number
354 } BtDb;
355
356 //      Catastrophic errors
357
358 typedef enum {
359         BTERR_ok = 0,
360         BTERR_struct,
361         BTERR_ovflw,
362         BTERR_lock,
363         BTERR_map,
364         BTERR_read,
365         BTERR_wrt,
366         BTERR_atomic,
367         BTERR_recovery
368 } BTERR;
369
370 #define CLOCK_bit 0x8000
371
372 // recovery manager entry types
373
374 typedef enum {
375         BTRM_eof = 0,   // rest of buffer is emtpy
376         BTRM_add,               // add a unique key-value to btree
377         BTRM_dup,               // add a duplicate key-value to btree
378         BTRM_del,               // delete a key-value from btree
379         BTRM_upd,               // update a key with a new value
380         BTRM_new,               // allocate a new empty page
381         BTRM_old                // reuse an old empty page
382 } BTRM;
383
384 // recovery manager entry
385 //      structure followed by BtKey & BtVal
386
387 typedef struct {
388         logseqno reqlsn;        // log sequence number required
389         logseqno lsn;           // log sequence number for entry
390         uint len;                       // length of entry
391         unsigned char type;     // type of entry
392         unsigned char lvl;      // level of btree entry pertains to
393 } BtLogHdr;
394
395 // B-Tree functions
396
397 extern void bt_close (BtDb *bt);
398 extern BtDb *bt_open (BtMgr *mgr);
399 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
400 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
401 extern void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
402 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
403 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
404 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
405 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
406 extern BtKey *bt_foundkey (BtDb *bt);
407 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
408 extern uint bt_nextkey   (BtDb *bt, uint slot);
409
410 //      manager functions
411 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint rmpages);
412 extern void bt_mgrclose (BtMgr *mgr);
413 extern logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val);
414 extern logseqno bt_txnredo (BtDb *bt, BtPage page);
415
416 //  Helper functions to return slot values
417 //      from the cursor page.
418
419 extern BtKey *bt_key (BtDb *bt, uint slot);
420 extern BtVal *bt_val (BtDb *bt, uint slot);
421
422 //  The page is allocated from low and hi ends.
423 //  The key slots are allocated from the bottom,
424 //      while the text and value of the key
425 //  are allocated from the top.  When the two
426 //  areas meet, the page is split into two.
427
428 //  A key consists of a length byte, two bytes of
429 //  index number (0 - 65535), and up to 253 bytes
430 //  of key value.
431
432 //  Associated with each key is a value byte string
433 //      containing any value desired.
434
435 //  The b-tree root is always located at page 1.
436 //      The first leaf page of level zero is always
437 //      located on page 2.
438
439 //      The b-tree pages are linked with next
440 //      pointers to facilitate enumerators,
441 //      and provide for concurrency.
442
443 //      When to root page fills, it is split in two and
444 //      the tree height is raised by a new root at page
445 //      one with two keys.
446
447 //      Deleted keys are marked with a dead bit until
448 //      page cleanup. The fence key for a leaf node is
449 //      always present
450
451 //  To achieve maximum concurrency one page is locked at a time
452 //  as the tree is traversed to find leaf key in question. The right
453 //  page numbers are used in cases where the page is being split,
454 //      or consolidated.
455
456 //  Page 0 is dedicated to lock for new page extensions,
457 //      and chains empty pages together for reuse. It also
458 //      contains the latch manager hash table.
459
460 //      The ParentModification lock on a node is obtained to serialize posting
461 //      or changing the fence key for a node.
462
463 //      Empty pages are chained together through the ALLOC page and reused.
464
465 //      Access macros to address slot and key values from the page
466 //      Page slots use 1 based indexing.
467
468 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
469 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
470 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
471
472 void bt_putid(unsigned char *dest, uid id)
473 {
474 int i = BtId;
475
476         while( i-- )
477                 dest[i] = (unsigned char)id, id >>= 8;
478 }
479
480 uid bt_getid(unsigned char *src)
481 {
482 uid id = 0;
483 int i;
484
485         for( i = 0; i < BtId; i++ )
486                 id <<= 8, id |= *src++; 
487
488         return id;
489 }
490
491 uid bt_newdup (BtDb *bt)
492 {
493 #ifdef unix
494         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
495 #else
496         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
497 #endif
498 }
499
500 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
501 {
502         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
503 }
504
505 //      Write-Only Queue Lock
506
507 void WriteOLock (WOLock *lock, ushort tid)
508 {
509 ushort tix;
510
511         if( lock->tid == tid ) {
512                 lock->dup++;
513                 return;
514         }
515 #ifdef unix
516         tix = __sync_fetch_and_add (lock->ticket, 1);
517 #else
518         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
519 #endif
520         // wait for our ticket to come up
521
522         while( tix != lock->serving[0] )
523 #ifdef unix
524                 sched_yield();
525 #else
526                 SwitchToThread ();
527 #endif
528         lock->tid = tid;
529 }
530
531 void WriteORelease (WOLock *lock)
532 {
533         if( lock->dup ) {
534                 lock->dup--;
535                 return;
536         }
537
538         lock->tid = 0;
539         lock->serving[0]++;
540 }
541
542 //      Phase-Fair reader/writer lock implementation
543
544 void WriteLock (RWLock *lock, ushort tid)
545 {
546 ushort w, r, tix;
547
548         if( lock->tid == tid ) {
549                 lock->dup++;
550                 return;
551         }
552 #ifdef unix
553         tix = __sync_fetch_and_add (lock->ticket, 1);
554 #else
555         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
556 #endif
557         // wait for our ticket to come up
558
559         while( tix != lock->serving[0] )
560 #ifdef unix
561                 sched_yield();
562 #else
563                 SwitchToThread ();
564 #endif
565
566         w = PRES | (tix & PHID);
567 #ifdef  unix
568         r = __sync_fetch_and_add (lock->rin, w);
569 #else
570         r = _InterlockedExchangeAdd16 (lock->rin, w);
571 #endif
572         while( r != *lock->rout )
573 #ifdef unix
574                 sched_yield();
575 #else
576                 SwitchToThread();
577 #endif
578         lock->tid = tid;
579 }
580
581 void WriteRelease (RWLock *lock)
582 {
583         if( lock->dup ) {
584                 lock->dup--;
585                 return;
586         }
587
588         lock->tid = 0;
589 #ifdef unix
590         __sync_fetch_and_and (lock->rin, ~MASK);
591 #else
592         _InterlockedAnd16 (lock->rin, ~MASK);
593 #endif
594         lock->serving[0]++;
595 }
596
597 //      try to obtain read lock
598 //  return 1 if successful
599
600 int ReadTry (RWLock *lock, ushort tid)
601 {
602 ushort w;
603
604         //  OK if write lock already held by same thread
605
606         if( lock->tid == tid ) {
607                 lock->dup++;
608                 return 1;
609         }
610 #ifdef unix
611         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
612 #else
613         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
614 #endif
615         if( w )
616           if( w == (*lock->rin & MASK) ) {
617 #ifdef unix
618                 __sync_fetch_and_add (lock->rin, -RINC);
619 #else
620                 _InterlockedExchangeAdd16 (lock->rin, -RINC);
621 #endif
622                 return 0;
623           }
624
625         return 1;
626 }
627
628 void ReadLock (RWLock *lock, ushort tid)
629 {
630 ushort w;
631         if( lock->tid == tid ) {
632                 lock->dup++;
633                 return;
634         }
635 #ifdef unix
636         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
637 #else
638         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
639 #endif
640         if( w )
641           while( w == (*lock->rin & MASK) )
642 #ifdef unix
643                 sched_yield ();
644 #else
645                 SwitchToThread ();
646 #endif
647 }
648
649 void ReadRelease (RWLock *lock)
650 {
651         if( lock->dup ) {
652                 lock->dup--;
653                 return;
654         }
655
656 #ifdef unix
657         __sync_fetch_and_add (lock->rout, RINC);
658 #else
659         _InterlockedExchangeAdd16 (lock->rout, RINC);
660 #endif
661 }
662
663 //      lite weight spin lock Latch Manager
664
665 void bt_spinmutexlock(BtMutexLatch *latch)
666 {
667 unsigned char prev;
668
669   do {
670 #ifdef  unix
671         prev = __sync_fetch_and_or(latch->exclusive, XCL);
672 #else
673         prev = _InterlockedOr8(latch->exclusive, XCL);
674 #endif
675         if( !(prev & XCL) )
676                 return;
677 #ifdef  unix
678   } while( sched_yield(), 1 );
679 #else
680   } while( SwitchToThread(), 1 );
681 #endif
682 }
683
684 //      try to obtain write lock
685
686 //      return 1 if obtained,
687 //              0 otherwise
688
689 int bt_spinmutextry(BtMutexLatch *latch)
690 {
691 unsigned char prev;
692
693 #ifdef  unix
694         prev = __sync_fetch_and_or(latch->exclusive, XCL);
695 #else
696         prev = _InterlockedOr8(latch->exclusive, XCL);
697 #endif
698         //      take write access if all bits are clear
699
700         return !(prev & XCL);
701 }
702
703 //      clear write mode
704
705 void bt_spinreleasemutex(BtMutexLatch *latch)
706 {
707         *latch->exclusive = 0;
708 }
709
710 //      recovery manager -- flush dirty pages
711
712 void bt_flushlsn (BtDb *bt)
713 {
714 uint cnt2 = 0, cnt = 0;
715 BtLatchSet *latch;
716 BtPage page;
717 uint entry;
718
719   //    flush dirty pool pages to the btree
720
721 fprintf(stderr, "Start flushlsn\n");
722   for( entry = 1; entry < bt->mgr->latchtotal; entry++ ) {
723         page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
724         latch = bt->mgr->latchsets + entry;
725         bt_lockpage(bt, BtLockRead, latch);
726
727         if( latch->dirty ) {
728           bt_writepage(bt->mgr, page, latch->page_no);
729           latch->dirty = 0, cnt++;
730         }
731 if( *latch->pin & ~CLOCK_bit )
732 cnt2++;
733         bt_unlockpage(BtLockRead, latch);
734   }
735 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
736 fprintf(stderr, "begin sync");
737         sync_file_range (bt->mgr->idx, 0, 0, SYNC_FILE_RANGE_WAIT_AFTER);
738 fprintf(stderr, " end sync\n");
739 }
740
741 //      recovery manager -- process current recovery buff on startup
742 //      this won't do much if previous session was properly closed.
743
744 BTERR bt_recoveryredo (BtMgr *mgr)
745 {
746 BtLogHdr *hdr, *eof;
747 uint offset = 0;
748 BtKey *key;
749 BtVal *val;
750
751   hdr = (BtLogHdr *)mgr->redobuff;
752   mgr->flushlsn = hdr->lsn;
753
754   while( 1 ) {
755         hdr = (BtLogHdr *)(mgr->redobuff + offset);
756         switch( hdr->type ) {
757         case BTRM_eof:
758                 mgr->lsn = hdr->lsn;
759                 return 0;
760         case BTRM_add:          // add a unique key-value to btree
761         
762         case BTRM_dup:          // add a duplicate key-value to btree
763         case BTRM_del:          // delete a key-value from btree
764         case BTRM_upd:          // update a key with a new value
765         case BTRM_new:          // allocate a new empty page
766         case BTRM_old:          // reuse an old empty page
767                 return 0;
768         }
769   }
770 }
771
772 //      recovery manager -- append new entry to recovery log
773 //      flush to disk when it overflows.
774
775 logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val)
776 {
777 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
778 uint amt = sizeof(BtLogHdr);
779 BtLogHdr *hdr, *eof;
780 uint last, end;
781
782         bt_spinmutexlock (bt->mgr->redo);
783
784         if( key )
785           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
786
787         //      see if new entry fits in buffer
788         //      flush and reset if it doesn't
789         //  fix this for circular buffer
790
791         if( amt > size - bt->mgr->redoend ) {
792           bt->mgr->flushlsn = bt->mgr->lsn;
793           msync (bt->mgr->redobuff + (bt->mgr->redolast & 0xfff), bt->mgr->redoend - bt->mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
794           bt->mgr->redolast = 0;
795           bt->mgr->redoend = 0;
796           bt_flushlsn(bt);
797         }
798
799         //      fill in new entry & either eof or end block
800
801         hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
802
803         hdr->len = amt;
804         hdr->type = type;
805         hdr->lvl = lvl;
806         hdr->lsn = ++bt->mgr->lsn;
807
808         bt->mgr->redoend += amt;
809
810         eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
811         memset (eof, 0, sizeof(BtLogHdr));
812         eof->lsn = bt->mgr->lsn;
813
814         //  fill in key and value
815
816         if( key ) {
817           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
818           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
819         }
820
821         last = bt->mgr->redolast & 0xfff;
822         end = bt->mgr->redoend;
823         bt->mgr->redolast = end;
824
825         bt_spinreleasemutex(bt->mgr->redo);
826
827         msync (bt->mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
828         return hdr->lsn;
829 }
830
831 //      recovery manager -- append transaction to recovery log
832 //      flush to disk when it overflows.
833
834 logseqno bt_txnredo (BtDb *bt, BtPage source)
835 {
836 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
837 uint amt = 0, src, type;
838 BtLogHdr *hdr, *eof;
839 uint last, end;
840 logseqno lsn;
841 BtKey *key;
842 BtVal *val;
843
844         //      determine amount of redo recovery log space required
845
846         for( src = 0; src++ < source->cnt; ) {
847           key = keyptr(source,src);
848           val = valptr(source,src);
849           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
850           amt += sizeof(BtLogHdr);
851         }
852
853         bt_spinmutexlock (bt->mgr->redo);
854
855         //      see if new entry fits in buffer
856         //      flush and reset if it doesn't
857         // fix this for circular buffer
858
859         if( amt > size - bt->mgr->redoend ) {
860           bt->mgr->flushlsn = bt->mgr->lsn;
861           msync (bt->mgr->redobuff + (bt->mgr->redolast & 0xfff), bt->mgr->redoend - bt->mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
862           bt->mgr->redolast = 0;
863           bt_flushlsn(bt);
864           bt->mgr->redoend = 0;
865         }
866
867         //      assign new lsn to transaction
868
869         lsn = ++bt->mgr->lsn;
870
871         //      fill in new entries
872
873         for( src = 0; src++ < source->cnt; ) {
874           key = keyptr(source, src);
875           val = valptr(source, src);
876
877           switch( slotptr(source, src)->type ) {
878           case Unique:
879                 type = BTRM_add;
880                 break;
881           case Duplicate:
882                 type = BTRM_dup;
883                 break;
884           case Delete:
885                 type = BTRM_del;
886                 break;
887           case Update:
888                 type = BTRM_upd;
889                 break;
890           }
891
892           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
893           amt += sizeof(BtLogHdr);
894
895           hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
896           hdr->len = amt;
897           hdr->type = type;
898           hdr->lsn = lsn;
899           hdr->lvl = 0;
900
901           //  fill in key and value
902
903           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
904           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
905
906           bt->mgr->redoend += amt;
907         }
908
909         eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
910         memset (eof, 0, sizeof(BtLogHdr));
911         eof->lsn = lsn;
912
913         end = bt->mgr->redoend;
914         last = bt->mgr->redolast & 0xfff;
915         end = bt->mgr->redoend;
916         bt->mgr->redolast = end;
917         bt_spinreleasemutex(bt->mgr->redo);
918
919         msync (bt->mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
920         return lsn;
921 }
922
923 //      read page into buffer pool from permanent location in Btree file
924
925 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
926 {
927 int flag = PROT_READ | PROT_WRITE;
928 uint segment = page_no >> 32;
929 unsigned char *perm;
930
931   while( 1 ) {
932         if( segment < mgr->segments ) {
933           perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
934                 memcpy (page, perm, mgr->page_size);
935                 return 0;
936         }
937
938         bt_spinmutexlock (mgr->maps);
939
940         if( segment < mgr->segments ) {
941                 bt_spinreleasemutex (mgr->maps);
942                 continue;
943         }
944
945         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
946         mgr->segments++;
947
948         bt_spinreleasemutex (mgr->maps);
949   }
950 }
951
952 //      copy page from buffer pool to permanent location in Btree file
953
954 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
955 {
956 int flag = PROT_READ | PROT_WRITE;
957 uint segment = page_no >> 32;
958 unsigned char *perm;
959
960   while( 1 ) {
961         if( segment < mgr->segments ) {
962           perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
963                 memcpy (perm, page, mgr->page_size);
964                 return 0;
965         }
966
967         bt_spinmutexlock (mgr->maps);
968
969         if( segment < mgr->segments ) {
970                 bt_spinreleasemutex (mgr->maps);
971                 continue;
972         }
973
974         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
975         bt_spinreleasemutex (mgr->maps);
976         mgr->segments++;
977   }
978 }
979
980 //      link latch table entry into head of latch hash table
981
982 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
983 {
984 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
985 BtLatchSet *latch = bt->mgr->latchsets + slot;
986
987         if( latch->next = bt->mgr->hashtable[hashidx].slot )
988                 bt->mgr->latchsets[latch->next].prev = slot;
989
990         bt->mgr->hashtable[hashidx].slot = slot;
991         latch->page_no = page_no;
992         latch->entry = slot;
993         latch->split = 0;
994         latch->prev = 0;
995         *latch->pin = 1;
996
997         if( loadit )
998           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
999                 return bt->line = __LINE__, bt->err;
1000           else
1001                 bt->reads++;
1002
1003         return bt->err = 0;
1004 }
1005
1006 //      set CLOCK bit in latch
1007 //      decrement pin count
1008
1009 void bt_unpinlatch (BtLatchSet *latch)
1010 {
1011 #ifdef unix
1012         if( ~*latch->pin & CLOCK_bit )
1013                 __sync_fetch_and_or(latch->pin, CLOCK_bit);
1014         __sync_fetch_and_add(latch->pin, -1);
1015 #else
1016         if( ~*latch->pin & CLOCK_bit )
1017                 _InterlockedOr16 (latch->pin, CLOCK_bit);
1018         _InterlockedDecrement16 (latch->pin);
1019 #endif
1020 }
1021
1022 //  return the btree cached page address
1023
1024 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
1025 {
1026 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
1027
1028   return page;
1029 }
1030
1031 //      find existing latchset or inspire new one
1032 //      return with latchset pinned
1033
1034 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
1035 {
1036 uint hashidx = page_no % bt->mgr->latchhash;
1037 uint cnt, slot, idx;
1038 BtLatchSet *latch;
1039 uint attempts = 0;
1040 BtPage page;
1041 int flush;
1042
1043   //  try to find our entry
1044
1045   bt_spinmutexlock(bt->mgr->hashtable[hashidx].latch);
1046
1047   if( slot = bt->mgr->hashtable[hashidx].slot ) do
1048   {
1049         latch = bt->mgr->latchsets + slot;
1050         if( page_no == latch->page_no )
1051                 break;
1052   } while( slot = latch->next );
1053
1054   //  found our entry
1055   //  increment pin
1056
1057   if( slot ) {
1058         latch = bt->mgr->latchsets + slot;
1059 #ifdef unix
1060         __sync_fetch_and_add(latch->pin, 1);
1061 #else
1062         _InterlockedIncrement16 (latch->pin);
1063 #endif
1064         bt_spinreleasemutex(bt->mgr->hashtable[hashidx].latch);
1065         return latch;
1066   }
1067
1068         //  see if there are any unused pool entries
1069 #ifdef unix
1070         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
1071 #else
1072         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
1073 #endif
1074
1075         if( slot < bt->mgr->latchtotal ) {
1076                 latch = bt->mgr->latchsets + slot;
1077                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
1078                         return NULL;
1079                 bt_spinreleasemutex (bt->mgr->hashtable[hashidx].latch);
1080                 return latch;
1081         }
1082
1083 #ifdef unix
1084         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
1085 #else
1086         _InterlockedDecrement (&bt->mgr->latchdeployed);
1087 #endif
1088   //  find and reuse previous entry on victim
1089
1090   while( 1 ) {
1091 #ifdef unix
1092         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
1093 #else
1094         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
1095 #endif
1096         // try to get write lock on hash chain
1097         //      skip entry if not obtained
1098         //      or has outstanding pins
1099
1100         slot %= bt->mgr->latchtotal;
1101
1102         if( !slot )
1103                 continue;
1104
1105         latch = bt->mgr->latchsets + slot;
1106         idx = latch->page_no % bt->mgr->latchhash;
1107
1108         //      see we are on same chain as hashidx
1109
1110         if( idx == hashidx )
1111                 continue;
1112
1113         if( !bt_spinmutextry (bt->mgr->hashtable[idx].latch) )
1114                 continue;
1115
1116         //  skip this slot if it is pinned
1117         //      or the CLOCK bit is set
1118
1119         if( *latch->pin ) {
1120           if( *latch->pin & CLOCK_bit ) {
1121 #ifdef unix
1122                 __sync_fetch_and_and(latch->pin, ~CLOCK_bit);
1123 #else
1124                 _InterlockedAnd16 (latch->pin, ~CLOCK_bit);
1125 #endif
1126           }
1127
1128           bt_spinreleasemutex (bt->mgr->hashtable[idx].latch);
1129           continue;
1130         }
1131
1132         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
1133
1134         //  update permanent page area in btree from buffer pool
1135         //      no read-lock is required since page is not pinned.
1136
1137         if( latch->dirty )
1138           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
1139                 return bt->line = __LINE__, NULL;
1140           else
1141                 latch->dirty = 0, bt->writes++;
1142
1143         //  unlink our available slot from its hash chain
1144
1145         if( latch->prev )
1146                 bt->mgr->latchsets[latch->prev].next = latch->next;
1147         else
1148                 bt->mgr->hashtable[idx].slot = latch->next;
1149
1150         if( latch->next )
1151                 bt->mgr->latchsets[latch->next].prev = latch->prev;
1152
1153         bt_spinreleasemutex (bt->mgr->hashtable[idx].latch);
1154
1155         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
1156                 return NULL;
1157
1158         bt_spinreleasemutex (bt->mgr->hashtable[hashidx].latch);
1159         return latch;
1160   }
1161 }
1162
1163 void bt_mgrclose (BtMgr *mgr)
1164 {
1165 BtLatchSet *latch;
1166 BtLogHdr *eof;
1167 uint num = 0;
1168 BtPage page;
1169 uint slot;
1170
1171         //      write remaining dirty pool pages to the btree
1172
1173         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
1174                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1175                 latch = mgr->latchsets + slot;
1176
1177                 if( latch->dirty ) {
1178                         bt_writepage(mgr, page, latch->page_no);
1179                         latch->dirty = 0, num++;
1180                 }
1181         }
1182
1183         fdatasync (mgr->idx);
1184         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1185
1186 #ifdef unix
1187         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
1188         munmap (mgr->pagezero, mgr->page_size);
1189
1190         if( mgr->redopages )
1191                 munmap (mgr->redobuff, mgr->redopages << mgr->page_bits);
1192 #else
1193         FlushViewOfFile(mgr->pagezero, 0);
1194         UnmapViewOfFile(mgr->pagezero);
1195         UnmapViewOfFile(mgr->hashtable);
1196         CloseHandle(mgr->halloc);
1197         CloseHandle(mgr->hpool);
1198 #endif
1199 #ifdef unix
1200         close (mgr->idx);
1201         free (mgr);
1202 #else
1203         FlushFileBuffers(mgr->idx);
1204         CloseHandle(mgr->idx);
1205         GlobalFree (mgr);
1206 #endif
1207 }
1208
1209 //      close and release memory
1210
1211 void bt_close (BtDb *bt)
1212 {
1213 #ifdef unix
1214         if( bt->mem )
1215                 free (bt->mem);
1216 #else
1217         if( bt->mem)
1218                 VirtualFree (bt->mem, 0, MEM_RELEASE);
1219 #endif
1220         free (bt);
1221 }
1222
1223 //  open/create new btree buffer manager
1224
1225 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1226 //              size of page pool (e.g. 262144)
1227
1228 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1229 {
1230 uint lvl, attr, last, slot, idx;
1231 uint nlatchpage, latchhash;
1232 unsigned char value[BtId];
1233 int flag, initit = 0;
1234 BtPageZero *pagezero;
1235 off64_t size;
1236 uint amt[1];
1237 BtMgr* mgr;
1238 BtKey* key;
1239 BtVal *val;
1240
1241         // determine sanity of page size and buffer pool
1242
1243         if( bits > BT_maxbits )
1244                 bits = BT_maxbits;
1245         else if( bits < BT_minbits )
1246                 bits = BT_minbits;
1247
1248         if( nodemax < 16 ) {
1249                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
1250                 return NULL;
1251         }
1252
1253 #ifdef unix
1254         mgr = calloc (1, sizeof(BtMgr));
1255
1256         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1257
1258         if( mgr->idx == -1 ) {
1259                 fprintf (stderr, "Unable to open btree file\n");
1260                 return free(mgr), NULL;
1261         }
1262 #else
1263         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1264         attr = FILE_ATTRIBUTE_NORMAL;
1265         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1266
1267         if( mgr->idx == INVALID_HANDLE_VALUE )
1268                 return GlobalFree(mgr), NULL;
1269 #endif
1270
1271 #ifdef unix
1272         pagezero = valloc (BT_maxpage);
1273         *amt = 0;
1274
1275         // read minimum page size to get root info
1276         //      to support raw disk partition files
1277         //      check if bits == 0 on the disk.
1278
1279         if( size = lseek (mgr->idx, 0L, 2) )
1280                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1281                         if( pagezero->alloc->bits )
1282                                 bits = pagezero->alloc->bits;
1283                         else
1284                                 initit = 1;
1285                 else
1286                         return free(mgr), free(pagezero), NULL;
1287         else
1288                 initit = 1;
1289 #else
1290         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1291         size = GetFileSize(mgr->idx, amt);
1292
1293         if( size || *amt ) {
1294                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1295                         return bt_mgrclose (mgr), NULL;
1296                 bits = pagezero->alloc->bits;
1297         } else
1298                 initit = 1;
1299 #endif
1300
1301         mgr->page_size = 1 << bits;
1302         mgr->page_bits = bits;
1303
1304         //  calculate number of latch hash table entries
1305
1306         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1307         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
1308
1309         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1310         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1311         mgr->latchtotal = nodemax;
1312
1313         if( !initit )
1314                 goto mgrlatch;
1315
1316         // initialize an empty b-tree with latch page, root page, page of leaves
1317         // and page(s) of latches and page pool cache
1318
1319         memset (pagezero, 0, 1 << bits);
1320         pagezero->alloc->bits = mgr->page_bits;
1321         bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
1322
1323         //  initialize left-most LEAF page in
1324         //      alloc->left.
1325
1326         bt_putid (pagezero->alloc->left, LEAF_page);
1327
1328         ftruncate (mgr->idx, REDO_page << mgr->page_bits);
1329
1330         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1331                 fprintf (stderr, "Unable to create btree page zero\n");
1332                 return bt_mgrclose (mgr), NULL;
1333         }
1334
1335         memset (pagezero, 0, 1 << bits);
1336         pagezero->alloc->bits = mgr->page_bits;
1337
1338         for( lvl=MIN_lvl; lvl--; ) {
1339                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1340                 key = keyptr(pagezero->alloc, 1);
1341                 key->len = 2;           // create stopper key
1342                 key->key[0] = 0xff;
1343                 key->key[1] = 0xff;
1344
1345                 bt_putid(value, MIN_lvl - lvl + 1);
1346                 val = valptr(pagezero->alloc, 1);
1347                 val->len = lvl ? BtId : 0;
1348                 memcpy (val->value, value, val->len);
1349
1350                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1351                 pagezero->alloc->lvl = lvl;
1352                 pagezero->alloc->cnt = 1;
1353                 pagezero->alloc->act = 1;
1354
1355                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1356                         fprintf (stderr, "Unable to create btree page zero\n");
1357                         return bt_mgrclose (mgr), NULL;
1358                 }
1359         }
1360
1361 mgrlatch:
1362 #ifdef unix
1363         free (pagezero);
1364 #else
1365         VirtualFree (pagezero, 0, MEM_RELEASE);
1366 #endif
1367 #ifdef unix
1368         // mlock the pagezero page
1369
1370         flag = PROT_READ | PROT_WRITE;
1371         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1372         if( mgr->pagezero == MAP_FAILED ) {
1373                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1374                 return bt_mgrclose (mgr), NULL;
1375         }
1376         mlock (mgr->pagezero, mgr->page_size);
1377
1378         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1379         if( mgr->hashtable == MAP_FAILED ) {
1380                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1381                 return bt_mgrclose (mgr), NULL;
1382         }
1383         if( mgr->redopages = redopages ) {
1384                 ftruncate (mgr->idx, (REDO_page + redopages) << mgr->page_bits);
1385                 mgr->redobuff = mmap (0, redopages << mgr->page_bits, flag, MAP_SHARED, mgr->idx, REDO_page << mgr->page_bits);
1386                 mlock (mgr->redobuff, redopages << mgr->page_bits);
1387         }
1388 #else
1389         flag = PAGE_READWRITE;
1390         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1391         if( !mgr->halloc ) {
1392                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1393                 return bt_mgrclose (mgr), NULL;
1394         }
1395
1396         flag = FILE_MAP_WRITE;
1397         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1398         if( !mgr->pagezero ) {
1399                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1400                 return bt_mgrclose (mgr), NULL;
1401         }
1402
1403         flag = PAGE_READWRITE;
1404         size = (uid)mgr->nlatchpage << mgr->page_bits;
1405         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1406         if( !mgr->hpool ) {
1407                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1408                 return bt_mgrclose (mgr), NULL;
1409         }
1410
1411         flag = FILE_MAP_WRITE;
1412         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1413         if( !mgr->hashtable ) {
1414                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1415                 return bt_mgrclose (mgr), NULL;
1416         }
1417         if( mgr->redopages = redopages )
1418                 mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
1419 #endif
1420
1421         mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1422         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1423
1424         return mgr;
1425 }
1426
1427 //      open BTree access method
1428 //      based on buffer manager
1429
1430 BtDb *bt_open (BtMgr *mgr)
1431 {
1432 BtDb *bt = malloc (sizeof(*bt));
1433
1434         memset (bt, 0, sizeof(*bt));
1435         bt->mgr = mgr;
1436 #ifdef unix
1437         bt->mem = valloc (2 *mgr->page_size);
1438 #else
1439         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1440 #endif
1441         bt->frame = (BtPage)bt->mem;
1442         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1443 #ifdef unix
1444         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1445 #else
1446         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1447 #endif
1448         return bt;
1449 }
1450
1451 //  compare two keys, return > 0, = 0, or < 0
1452 //  =0: keys are same
1453 //  -1: key2 > key1
1454 //  +1: key2 < key1
1455 //  as the comparison value
1456
1457 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1458 {
1459 uint len1 = key1->len;
1460 int ans;
1461
1462         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1463                 return ans;
1464
1465         if( len1 > len2 )
1466                 return 1;
1467         if( len1 < len2 )
1468                 return -1;
1469
1470         return 0;
1471 }
1472
1473 // place write, read, or parent lock on requested page_no.
1474
1475 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1476 {
1477         switch( mode ) {
1478         case BtLockRead:
1479                 ReadLock (latch->readwr, bt->thread_no);
1480                 break;
1481         case BtLockWrite:
1482                 WriteLock (latch->readwr, bt->thread_no);
1483                 break;
1484         case BtLockAccess:
1485                 ReadLock (latch->access, bt->thread_no);
1486                 break;
1487         case BtLockDelete:
1488                 WriteLock (latch->access, bt->thread_no);
1489                 break;
1490         case BtLockParent:
1491                 WriteOLock (latch->parent, bt->thread_no);
1492                 break;
1493         case BtLockAtomic:
1494                 WriteOLock (latch->atomic, bt->thread_no);
1495                 break;
1496         case BtLockAtomic | BtLockRead:
1497                 WriteOLock (latch->atomic, bt->thread_no);
1498                 ReadLock (latch->readwr, bt->thread_no);
1499                 break;
1500         }
1501 }
1502
1503 // remove write, read, or parent lock on requested page
1504
1505 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1506 {
1507         switch( mode ) {
1508         case BtLockRead:
1509                 ReadRelease (latch->readwr);
1510                 break;
1511         case BtLockWrite:
1512                 WriteRelease (latch->readwr);
1513                 break;
1514         case BtLockAccess:
1515                 ReadRelease (latch->access);
1516                 break;
1517         case BtLockDelete:
1518                 WriteRelease (latch->access);
1519                 break;
1520         case BtLockParent:
1521                 WriteORelease (latch->parent);
1522                 break;
1523         case BtLockAtomic:
1524                 WriteORelease (latch->atomic);
1525                 break;
1526         case BtLockAtomic | BtLockRead:
1527                 WriteORelease (latch->atomic);
1528                 ReadRelease (latch->readwr);
1529                 break;
1530         }
1531 }
1532
1533 //      allocate a new page
1534 //      return with page latched, but unlocked.
1535
1536 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1537 {
1538 uid page_no;
1539 int blk;
1540
1541         //      lock allocation page
1542
1543         bt_spinmutexlock(bt->mgr->lock);
1544
1545         // use empty chain first
1546         // else allocate empty page
1547
1548         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1549                 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1550                         set->page = bt_mappage (bt, set->latch);
1551                 else
1552                         return bt->err = BTERR_struct, bt->line = __LINE__, -1;
1553
1554                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1555                 bt_spinreleasemutex(bt->mgr->lock);
1556
1557                 memcpy (set->page, contents, bt->mgr->page_size);
1558                 set->latch->dirty = 1;
1559                 return 0;
1560         }
1561
1562         //  otherwise extend btree file
1563
1564         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1565         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1566
1567         //      don't load cache from btree page
1568
1569         if( set->latch = bt_pinlatch (bt, page_no, 0) )
1570                 set->page = bt_mappage (bt, set->latch);
1571         else
1572                 return bt->line = __LINE__, bt->err = BTERR_struct;
1573
1574         ftruncate (bt->mgr->idx, (uid)(page_no + 1) << bt->mgr->page_bits);
1575
1576         // unlock allocation latch
1577
1578         bt_spinreleasemutex(bt->mgr->lock);
1579
1580         memcpy (set->page, contents, bt->mgr->page_size);
1581         set->latch->dirty = 1;
1582         return 0;
1583 }
1584
1585 //  find slot in page for given key at a given level
1586
1587 int bt_findslot (BtPage page, unsigned char *key, uint len)
1588 {
1589 uint diff, higher = page->cnt, low = 1, slot;
1590 uint good = 0;
1591
1592         //        make stopper key an infinite fence value
1593
1594         if( bt_getid (page->right) )
1595                 higher++;
1596         else
1597                 good++;
1598
1599         //      low is the lowest candidate.
1600         //  loop ends when they meet
1601
1602         //  higher is already
1603         //      tested as .ge. the passed key.
1604
1605         while( diff = higher - low ) {
1606                 slot = low + ( diff >> 1 );
1607                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1608                         low = slot + 1;
1609                 else
1610                         higher = slot, good++;
1611         }
1612
1613         //      return zero if key is on right link page
1614
1615         return good ? higher : 0;
1616 }
1617
1618 //  find and load page at given level for given key
1619 //      leave page rd or wr locked as requested
1620
1621 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1622 {
1623 uid page_no = ROOT_page, prevpage = 0;
1624 uint drill = 0xff, slot;
1625 BtLatchSet *prevlatch;
1626 uint mode, prevmode;
1627 BtVal *val;
1628
1629   //  start at root of btree and drill down
1630
1631   do {
1632         // determine lock mode of drill level
1633         mode = (drill == lvl) ? lock : BtLockRead; 
1634
1635         if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1636           return 0;
1637
1638         // obtain access lock using lock chaining with Access mode
1639
1640         if( page_no > ROOT_page )
1641           bt_lockpage(bt, BtLockAccess, set->latch);
1642
1643         set->page = bt_mappage (bt, set->latch);
1644
1645         //      release & unpin parent or left sibling page
1646
1647         if( prevpage ) {
1648           bt_unlockpage(prevmode, prevlatch);
1649           bt_unpinlatch (prevlatch);
1650           prevpage = 0;
1651         }
1652
1653         // obtain mode lock using lock chaining through AccessLock
1654
1655         bt_lockpage(bt, mode, set->latch);
1656
1657         if( set->page->free )
1658                 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1659
1660         if( page_no > ROOT_page )
1661           bt_unlockpage(BtLockAccess, set->latch);
1662
1663         // re-read and re-lock root after determining actual level of root
1664
1665         if( set->page->lvl != drill) {
1666                 if( set->latch->page_no != ROOT_page )
1667                         return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1668                         
1669                 drill = set->page->lvl;
1670
1671                 if( lock != BtLockRead && drill == lvl ) {
1672                   bt_unlockpage(mode, set->latch);
1673                   bt_unpinlatch (set->latch);
1674                   continue;
1675                 }
1676         }
1677
1678         prevpage = set->latch->page_no;
1679         prevlatch = set->latch;
1680         prevmode = mode;
1681
1682         //  find key on page at this level
1683         //  and descend to requested level
1684
1685         if( !set->page->kill )
1686          if( slot = bt_findslot (set->page, key, len) ) {
1687           if( drill == lvl )
1688                 return slot;
1689
1690           // find next non-dead slot -- the fence key if nothing else
1691
1692           while( slotptr(set->page, slot)->dead )
1693                 if( slot++ < set->page->cnt )
1694                   continue;
1695                 else
1696                   return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1697
1698           val = valptr(set->page, slot);
1699
1700           if( val->len == BtId )
1701                 page_no = bt_getid(valptr(set->page, slot)->value);
1702           else
1703                 return bt->line = __LINE__, bt->err = BTERR_struct, 0;
1704
1705           drill--;
1706           continue;
1707          }
1708
1709         //  or slide right into next page
1710
1711         page_no = bt_getid(set->page->right);
1712   } while( page_no );
1713
1714   // return error on end of right chain
1715
1716   bt->line = __LINE__, bt->err = BTERR_struct;
1717   return 0;     // return error
1718 }
1719
1720 //      return page to free list
1721 //      page must be delete & write locked
1722
1723 void bt_freepage (BtDb *bt, BtPageSet *set)
1724 {
1725         //      lock allocation page
1726
1727         bt_spinmutexlock (bt->mgr->lock);
1728
1729         //      store chain
1730
1731         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1732         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1733         set->latch->dirty = 1;
1734         set->page->free = 1;
1735
1736         // unlock released page
1737
1738         bt_unlockpage (BtLockDelete, set->latch);
1739         bt_unlockpage (BtLockWrite, set->latch);
1740         bt_unpinlatch (set->latch);
1741
1742         // unlock allocation page
1743
1744         bt_spinreleasemutex (bt->mgr->lock);
1745 }
1746
1747 //      a fence key was deleted from a page
1748 //      push new fence value upwards
1749
1750 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1751 {
1752 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1753 unsigned char value[BtId];
1754 BtKey* ptr;
1755 uint idx;
1756
1757         //      remove the old fence value
1758
1759         ptr = keyptr(set->page, set->page->cnt);
1760         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1761         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1762         set->latch->dirty = 1;
1763
1764         //  cache new fence value
1765
1766         ptr = keyptr(set->page, set->page->cnt);
1767         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1768
1769         bt_lockpage (bt, BtLockParent, set->latch);
1770         bt_unlockpage (BtLockWrite, set->latch);
1771
1772         //      insert new (now smaller) fence key
1773
1774         bt_putid (value, set->latch->page_no);
1775         ptr = (BtKey*)leftkey;
1776
1777         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1778           return bt->err;
1779
1780         //      now delete old fence key
1781
1782         ptr = (BtKey*)rightkey;
1783
1784         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1785                 return bt->err;
1786
1787         bt_unlockpage (BtLockParent, set->latch);
1788         bt_unpinlatch(set->latch);
1789         return 0;
1790 }
1791
1792 //      root has a single child
1793 //      collapse a level from the tree
1794
1795 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1796 {
1797 BtPageSet child[1];
1798 uid page_no;
1799 BtVal *val;
1800 uint idx;
1801
1802   // find the child entry and promote as new root contents
1803
1804   do {
1805         for( idx = 0; idx++ < root->page->cnt; )
1806           if( !slotptr(root->page, idx)->dead )
1807                 break;
1808
1809         val = valptr(root->page, idx);
1810
1811         if( val->len == BtId )
1812                 page_no = bt_getid (valptr(root->page, idx)->value);
1813         else
1814                 return bt->line = __LINE__, bt->err = BTERR_struct;
1815
1816         if( child->latch = bt_pinlatch (bt, page_no, 1) )
1817                 child->page = bt_mappage (bt, child->latch);
1818         else
1819                 return bt->err;
1820
1821         bt_lockpage (bt, BtLockDelete, child->latch);
1822         bt_lockpage (bt, BtLockWrite, child->latch);
1823
1824         memcpy (root->page, child->page, bt->mgr->page_size);
1825         root->latch->dirty = 1;
1826
1827         bt_freepage (bt, child);
1828
1829   } while( root->page->lvl > 1 && root->page->act == 1 );
1830
1831   bt_unlockpage (BtLockWrite, root->latch);
1832   bt_unpinlatch (root->latch);
1833   return 0;
1834 }
1835
1836 //  delete a page and manage keys
1837 //  call with page writelocked
1838 //      returns with page unpinned
1839
1840 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1841 {
1842 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1843 unsigned char value[BtId];
1844 uint lvl = set->page->lvl;
1845 BtPageSet right[1];
1846 uid page_no;
1847 BtKey *ptr;
1848
1849         //      cache copy of fence key
1850         //      to post in parent
1851
1852         ptr = keyptr(set->page, set->page->cnt);
1853         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1854
1855         //      obtain lock on right page
1856
1857         page_no = bt_getid(set->page->right);
1858
1859         if( right->latch = bt_pinlatch (bt, page_no, 1) )
1860                 right->page = bt_mappage (bt, right->latch);
1861         else
1862                 return 0;
1863
1864         bt_lockpage (bt, BtLockWrite, right->latch);
1865
1866         // cache copy of key to update
1867
1868         ptr = keyptr(right->page, right->page->cnt);
1869         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1870
1871         if( right->page->kill )
1872                 return bt->line = __LINE__, bt->err = BTERR_struct;
1873
1874         // pull contents of right peer into our empty page
1875
1876         memcpy (set->page, right->page, bt->mgr->page_size);
1877         set->latch->dirty = 1;
1878
1879         // mark right page deleted and point it to left page
1880         //      until we can post parent updates that remove access
1881         //      to the deleted page.
1882
1883         bt_putid (right->page->right, set->latch->page_no);
1884         right->latch->dirty = 1;
1885         right->page->kill = 1;
1886
1887         bt_lockpage (bt, BtLockParent, right->latch);
1888         bt_unlockpage (BtLockWrite, right->latch);
1889
1890         bt_lockpage (bt, BtLockParent, set->latch);
1891         bt_unlockpage (BtLockWrite, set->latch);
1892
1893         // redirect higher key directly to our new node contents
1894
1895         bt_putid (value, set->latch->page_no);
1896         ptr = (BtKey*)higherfence;
1897
1898         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1899           return bt->err;
1900
1901         //      delete old lower key to our node
1902
1903         ptr = (BtKey*)lowerfence;
1904
1905         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1906           return bt->err;
1907
1908         //      obtain delete and write locks to right node
1909
1910         bt_unlockpage (BtLockParent, right->latch);
1911         bt_lockpage (bt, BtLockDelete, right->latch);
1912         bt_lockpage (bt, BtLockWrite, right->latch);
1913         bt_freepage (bt, right);
1914
1915         bt_unlockpage (BtLockParent, set->latch);
1916         bt_unpinlatch (set->latch);
1917         return 0;
1918 }
1919
1920 //  find and delete key on page by marking delete flag bit
1921 //  if page becomes empty, delete it from the btree
1922
1923 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1924 {
1925 uint slot, idx, found, fence;
1926 BtPageSet set[1];
1927 BtKey *ptr;
1928 BtVal *val;
1929
1930         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1931                 ptr = keyptr(set->page, slot);
1932         else
1933                 return bt->err;
1934
1935         // if librarian slot, advance to real slot
1936
1937         if( slotptr(set->page, slot)->type == Librarian )
1938                 ptr = keyptr(set->page, ++slot);
1939
1940         fence = slot == set->page->cnt;
1941
1942         // if key is found delete it, otherwise ignore request
1943
1944         if( found = !keycmp (ptr, key, len) )
1945           if( found = slotptr(set->page, slot)->dead == 0 ) {
1946                 val = valptr(set->page,slot);
1947                 slotptr(set->page, slot)->dead = 1;
1948                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1949                 set->page->act--;
1950
1951                 // collapse empty slots beneath the fence
1952
1953                 while( idx = set->page->cnt - 1 )
1954                   if( slotptr(set->page, idx)->dead ) {
1955                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1956                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1957                   } else
1958                         break;
1959           }
1960
1961         //      did we delete a fence key in an upper level?
1962
1963         if( found && lvl && set->page->act && fence )
1964           if( bt_fixfence (bt, set, lvl) )
1965                 return bt->err;
1966           else
1967                 return 0;
1968
1969         //      do we need to collapse root?
1970
1971         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1972           if( bt_collapseroot (bt, set) )
1973                 return bt->err;
1974           else
1975                 return 0;
1976
1977         //      delete empty page
1978
1979         if( !set->page->act )
1980                 return bt_deletepage (bt, set);
1981
1982         set->latch->dirty = 1;
1983         bt_unlockpage(BtLockWrite, set->latch);
1984         bt_unpinlatch (set->latch);
1985         return 0;
1986 }
1987
1988 BtKey *bt_foundkey (BtDb *bt)
1989 {
1990         return (BtKey*)(bt->key);
1991 }
1992
1993 //      advance to next slot
1994
1995 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1996 {
1997 BtLatchSet *prevlatch;
1998 uid page_no;
1999
2000         if( slot < set->page->cnt )
2001                 return slot + 1;
2002
2003         prevlatch = set->latch;
2004
2005         if( page_no = bt_getid(set->page->right) )
2006           if( set->latch = bt_pinlatch (bt, page_no, 1) )
2007                 set->page = bt_mappage (bt, set->latch);
2008           else
2009                 return 0;
2010         else
2011           return bt->err = BTERR_struct, bt->line = __LINE__, 0;
2012
2013         // obtain access lock using lock chaining with Access mode
2014
2015         bt_lockpage(bt, BtLockAccess, set->latch);
2016
2017         bt_unlockpage(BtLockRead, prevlatch);
2018         bt_unpinlatch (prevlatch);
2019
2020         bt_lockpage(bt, BtLockRead, set->latch);
2021         bt_unlockpage(BtLockAccess, set->latch);
2022         return 1;
2023 }
2024
2025 //      find unique key or first duplicate key in
2026 //      leaf level and return number of value bytes
2027 //      or (-1) if not found.  Setup key for bt_foundkey
2028
2029 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2030 {
2031 BtPageSet set[1];
2032 uint len, slot;
2033 int ret = -1;
2034 BtKey *ptr;
2035 BtVal *val;
2036
2037   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
2038    do {
2039         ptr = keyptr(set->page, slot);
2040
2041         //      skip librarian slot place holder
2042
2043         if( slotptr(set->page, slot)->type == Librarian )
2044                 ptr = keyptr(set->page, ++slot);
2045
2046         //      return actual key found
2047
2048         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2049         len = ptr->len;
2050
2051         if( slotptr(set->page, slot)->type == Duplicate )
2052                 len -= BtId;
2053
2054         //      not there if we reach the stopper key
2055
2056         if( slot == set->page->cnt )
2057           if( !bt_getid (set->page->right) )
2058                 break;
2059
2060         // if key exists, return >= 0 value bytes copied
2061         //      otherwise return (-1)
2062
2063         if( slotptr(set->page, slot)->dead )
2064                 continue;
2065
2066         if( keylen == len )
2067           if( !memcmp (ptr->key, key, len) ) {
2068                 val = valptr (set->page,slot);
2069                 if( valmax > val->len )
2070                         valmax = val->len;
2071                 memcpy (value, val->value, valmax);
2072                 ret = valmax;
2073           }
2074
2075         break;
2076
2077    } while( slot = bt_findnext (bt, set, slot) );
2078
2079   bt_unlockpage (BtLockRead, set->latch);
2080   bt_unpinlatch (set->latch);
2081   return ret;
2082 }
2083
2084 //      check page for space available,
2085 //      clean if necessary and return
2086 //      0 - page needs splitting
2087 //      >0  new slot value
2088
2089 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
2090 {
2091 uint nxt = bt->mgr->page_size;
2092 BtPage page = set->page;
2093 uint cnt = 0, idx = 0;
2094 uint max = page->cnt;
2095 uint newslot = max;
2096 BtKey *key;
2097 BtVal *val;
2098
2099         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2100                 return slot;
2101
2102         //      skip cleanup and proceed to split
2103         //      if there's not enough garbage
2104         //      to bother with.
2105
2106         if( page->garbage < nxt / 5 )
2107                 return 0;
2108
2109         memcpy (bt->frame, page, bt->mgr->page_size);
2110
2111         // skip page info and set rest of page to zero
2112
2113         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2114         set->latch->dirty = 1;
2115         page->garbage = 0;
2116         page->act = 0;
2117
2118         // clean up page first by
2119         // removing deleted keys
2120
2121         while( cnt++ < max ) {
2122                 if( cnt == slot )
2123                         newslot = idx + 2;
2124
2125                 if( cnt < max || bt->frame->lvl )
2126                   if( slotptr(bt->frame,cnt)->dead )
2127                         continue;
2128
2129                 // copy the value across
2130
2131                 val = valptr(bt->frame, cnt);
2132                 nxt -= val->len + sizeof(BtVal);
2133                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2134
2135                 // copy the key across
2136
2137                 key = keyptr(bt->frame, cnt);
2138                 nxt -= key->len + sizeof(BtKey);
2139                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2140
2141                 // make a librarian slot
2142
2143                 slotptr(page, ++idx)->off = nxt;
2144                 slotptr(page, idx)->type = Librarian;
2145                 slotptr(page, idx)->dead = 1;
2146
2147                 // set up the slot
2148
2149                 slotptr(page, ++idx)->off = nxt;
2150                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2151
2152                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2153                         page->act++;
2154         }
2155
2156         page->min = nxt;
2157         page->cnt = idx;
2158
2159         //      see if page has enough space now, or does it need splitting?
2160
2161         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2162                 return newslot;
2163
2164         return 0;
2165 }
2166
2167 // split the root and raise the height of the btree
2168
2169 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
2170 {  
2171 unsigned char leftkey[BT_keyarray];
2172 uint nxt = bt->mgr->page_size;
2173 unsigned char value[BtId];
2174 BtPageSet left[1];
2175 uid left_page_no;
2176 BtKey *ptr;
2177 BtVal *val;
2178
2179         //      save left page fence key for new root
2180
2181         ptr = keyptr(root->page, root->page->cnt);
2182         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2183
2184         //  Obtain an empty page to use, and copy the current
2185         //  root contents into it, e.g. lower keys
2186
2187         if( bt_newpage(bt, left, root->page) )
2188                 return bt->err;
2189
2190         left_page_no = left->latch->page_no;
2191         bt_unpinlatch (left->latch);
2192
2193         // preserve the page info at the bottom
2194         // of higher keys and set rest to zero
2195
2196         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2197
2198         // insert stopper key at top of newroot page
2199         // and increase the root height
2200
2201         nxt -= BtId + sizeof(BtVal);
2202         bt_putid (value, right->page_no);
2203         val = (BtVal *)((unsigned char *)root->page + nxt);
2204         memcpy (val->value, value, BtId);
2205         val->len = BtId;
2206
2207         nxt -= 2 + sizeof(BtKey);
2208         slotptr(root->page, 2)->off = nxt;
2209         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2210         ptr->len = 2;
2211         ptr->key[0] = 0xff;
2212         ptr->key[1] = 0xff;
2213
2214         // insert lower keys page fence key on newroot page as first key
2215
2216         nxt -= BtId + sizeof(BtVal);
2217         bt_putid (value, left_page_no);
2218         val = (BtVal *)((unsigned char *)root->page + nxt);
2219         memcpy (val->value, value, BtId);
2220         val->len = BtId;
2221
2222         ptr = (BtKey *)leftkey;
2223         nxt -= ptr->len + sizeof(BtKey);
2224         slotptr(root->page, 1)->off = nxt;
2225         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2226         
2227         bt_putid(root->page->right, 0);
2228         root->page->min = nxt;          // reset lowest used offset and key count
2229         root->page->cnt = 2;
2230         root->page->act = 2;
2231         root->page->lvl++;
2232
2233         // release and unpin root pages
2234
2235         bt_unlockpage(BtLockWrite, root->latch);
2236         bt_unpinlatch (root->latch);
2237
2238         bt_unpinlatch (right);
2239         return 0;
2240 }
2241
2242 //  split already locked full node
2243 //      leave it locked.
2244 //      return pool entry for new right
2245 //      page, unlocked
2246
2247 uint bt_splitpage (BtDb *bt, BtPageSet *set)
2248 {
2249 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2250 uint lvl = set->page->lvl;
2251 BtPageSet right[1];
2252 BtKey *key, *ptr;
2253 BtVal *val, *src;
2254 uid right2;
2255 uint prev;
2256
2257         //  split higher half of keys to bt->frame
2258
2259         memset (bt->frame, 0, bt->mgr->page_size);
2260         max = set->page->cnt;
2261         cnt = max / 2;
2262         idx = 0;
2263
2264         while( cnt++ < max ) {
2265                 if( cnt < max || set->page->lvl )
2266                   if( slotptr(set->page, cnt)->dead )
2267                         continue;
2268
2269                 src = valptr(set->page, cnt);
2270                 nxt -= src->len + sizeof(BtVal);
2271                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
2272
2273                 key = keyptr(set->page, cnt);
2274                 nxt -= key->len + sizeof(BtKey);
2275                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
2276                 memcpy (ptr, key, key->len + sizeof(BtKey));
2277
2278                 //      add librarian slot
2279
2280                 slotptr(bt->frame, ++idx)->off = nxt;
2281                 slotptr(bt->frame, idx)->type = Librarian;
2282                 slotptr(bt->frame, idx)->dead = 1;
2283
2284                 //  add actual slot
2285
2286                 slotptr(bt->frame, ++idx)->off = nxt;
2287                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2288
2289                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2290                         bt->frame->act++;
2291         }
2292
2293         bt->frame->bits = bt->mgr->page_bits;
2294         bt->frame->min = nxt;
2295         bt->frame->cnt = idx;
2296         bt->frame->lvl = lvl;
2297
2298         // link right node
2299
2300         if( set->latch->page_no > ROOT_page )
2301                 bt_putid (bt->frame->right, bt_getid (set->page->right));
2302
2303         //      get new free page and write higher keys to it.
2304
2305         if( bt_newpage(bt, right, bt->frame) )
2306                 return 0;
2307
2308         memcpy (bt->frame, set->page, bt->mgr->page_size);
2309         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2310         set->latch->dirty = 1;
2311
2312         nxt = bt->mgr->page_size;
2313         set->page->garbage = 0;
2314         set->page->act = 0;
2315         max /= 2;
2316         cnt = 0;
2317         idx = 0;
2318
2319         if( slotptr(bt->frame, max)->type == Librarian )
2320                 max--;
2321
2322         //  assemble page of smaller keys
2323
2324         while( cnt++ < max ) {
2325                 if( slotptr(bt->frame, cnt)->dead )
2326                         continue;
2327                 val = valptr(bt->frame, cnt);
2328                 nxt -= val->len + sizeof(BtVal);
2329                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2330
2331                 key = keyptr(bt->frame, cnt);
2332                 nxt -= key->len + sizeof(BtKey);
2333                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2334
2335                 //      add librarian slot
2336
2337                 slotptr(set->page, ++idx)->off = nxt;
2338                 slotptr(set->page, idx)->type = Librarian;
2339                 slotptr(set->page, idx)->dead = 1;
2340
2341                 //      add actual slot
2342
2343                 slotptr(set->page, ++idx)->off = nxt;
2344                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2345                 set->page->act++;
2346         }
2347
2348         bt_putid(set->page->right, right->latch->page_no);
2349         set->page->min = nxt;
2350         set->page->cnt = idx;
2351
2352         return right->latch->entry;
2353 }
2354
2355 //      fix keys for newly split page
2356 //      call with page locked, return
2357 //      unlocked
2358
2359 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2360 {
2361 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2362 unsigned char value[BtId];
2363 uint lvl = set->page->lvl;
2364 BtPage page;
2365 BtKey *ptr;
2366
2367         // if current page is the root page, split it
2368
2369         if( set->latch->page_no == ROOT_page )
2370                 return bt_splitroot (bt, set, right);
2371
2372         ptr = keyptr(set->page, set->page->cnt);
2373         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2374
2375         page = bt_mappage (bt, right);
2376
2377         ptr = keyptr(page, page->cnt);
2378         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2379
2380         // insert new fences in their parent pages
2381
2382         bt_lockpage (bt, BtLockParent, right);
2383
2384         bt_lockpage (bt, BtLockParent, set->latch);
2385         bt_unlockpage (BtLockWrite, set->latch);
2386
2387         // insert new fence for reformulated left block of smaller keys
2388
2389         bt_putid (value, set->latch->page_no);
2390         ptr = (BtKey *)leftkey;
2391
2392         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2393                 return bt->err;
2394
2395         // switch fence for right block of larger keys to new right page
2396
2397         bt_putid (value, right->page_no);
2398         ptr = (BtKey *)rightkey;
2399
2400         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2401                 return bt->err;
2402
2403         bt_unlockpage (BtLockParent, set->latch);
2404         bt_unpinlatch (set->latch);
2405
2406         bt_unlockpage (BtLockParent, right);
2407         bt_unpinlatch (right);
2408         return 0;
2409 }
2410
2411 //      install new key and value onto page
2412 //      page must already be checked for
2413 //      adequate space
2414
2415 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2416 {
2417 uint idx, librarian;
2418 BtSlot *node;
2419 BtKey *ptr;
2420 BtVal *val;
2421
2422         //      if found slot > desired slot and previous slot
2423         //      is a librarian slot, use it
2424
2425         if( slot > 1 )
2426           if( slotptr(set->page, slot-1)->type == Librarian )
2427                 slot--;
2428
2429         // copy value onto page
2430
2431         set->page->min -= vallen + sizeof(BtVal);
2432         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2433         memcpy (val->value, value, vallen);
2434         val->len = vallen;
2435
2436         // copy key onto page
2437
2438         set->page->min -= keylen + sizeof(BtKey);
2439         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2440         memcpy (ptr->key, key, keylen);
2441         ptr->len = keylen;
2442         
2443         //      find first empty slot
2444
2445         for( idx = slot; idx < set->page->cnt; idx++ )
2446           if( slotptr(set->page, idx)->dead )
2447                 break;
2448
2449         // now insert key into array before slot
2450
2451         if( idx == set->page->cnt )
2452                 idx += 2, set->page->cnt += 2, librarian = 2;
2453         else
2454                 librarian = 1;
2455
2456         set->latch->dirty = 1;
2457         set->page->act++;
2458
2459         while( idx > slot + librarian - 1 )
2460                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2461
2462         //      add librarian slot
2463
2464         if( librarian > 1 ) {
2465                 node = slotptr(set->page, slot++);
2466                 node->off = set->page->min;
2467                 node->type = Librarian;
2468                 node->dead = 1;
2469         }
2470
2471         //      fill in new slot
2472
2473         node = slotptr(set->page, slot);
2474         node->off = set->page->min;
2475         node->type = type;
2476         node->dead = 0;
2477
2478         if( release ) {
2479                 bt_unlockpage (BtLockWrite, set->latch);
2480                 bt_unpinlatch (set->latch);
2481         }
2482
2483         return 0;
2484 }
2485
2486 //  Insert new key into the btree at given level.
2487 //      either add a new key or update/add an existing one
2488
2489 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2490 {
2491 unsigned char newkey[BT_keyarray];
2492 uint slot, idx, len, entry;
2493 BtPageSet set[1];
2494 BtKey *ptr, *ins;
2495 uid sequence;
2496 BtVal *val;
2497 uint type;
2498
2499   // set up the key we're working on
2500
2501   ins = (BtKey*)newkey;
2502   memcpy (ins->key, key, keylen);
2503   ins->len = keylen;
2504
2505   // is this a non-unique index value?
2506
2507   if( unique )
2508         type = Unique;
2509   else {
2510         type = Duplicate;
2511         sequence = bt_newdup (bt);
2512         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2513         ins->len += BtId;
2514   }
2515
2516   while( 1 ) { // find the page and slot for the current key
2517         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2518                 ptr = keyptr(set->page, slot);
2519         else {
2520                 if( !bt->err )
2521                         bt->line = __LINE__, bt->err = BTERR_ovflw;
2522                 return bt->err;
2523         }
2524
2525         // if librarian slot == found slot, advance to real slot
2526
2527         if( slotptr(set->page, slot)->type == Librarian )
2528           if( !keycmp (ptr, key, keylen) )
2529                 ptr = keyptr(set->page, ++slot);
2530
2531         len = ptr->len;
2532
2533         if( slotptr(set->page, slot)->type == Duplicate )
2534                 len -= BtId;
2535
2536         //  if inserting a duplicate key or unique key
2537         //      check for adequate space on the page
2538         //      and insert the new key before slot.
2539
2540         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2541           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2542                 if( !(entry = bt_splitpage (bt, set)) )
2543                   return bt->err;
2544                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2545                   return bt->err;
2546                 else
2547                   continue;
2548
2549           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2550         }
2551
2552         // if key already exists, update value and return
2553
2554         val = valptr(set->page, slot);
2555
2556         if( val->len >= vallen ) {
2557                 if( slotptr(set->page, slot)->dead )
2558                         set->page->act++;
2559                 set->page->garbage += val->len - vallen;
2560                 set->latch->dirty = 1;
2561                 slotptr(set->page, slot)->dead = 0;
2562                 val->len = vallen;
2563                 memcpy (val->value, value, vallen);
2564                 bt_unlockpage(BtLockWrite, set->latch);
2565                 bt_unpinlatch (set->latch);
2566                 return 0;
2567         }
2568
2569         //      new update value doesn't fit in existing value area
2570
2571         if( !slotptr(set->page, slot)->dead )
2572                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2573         else {
2574                 slotptr(set->page, slot)->dead = 0;
2575                 set->page->act++;
2576         }
2577
2578         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2579           if( !(entry = bt_splitpage (bt, set)) )
2580                 return bt->err;
2581           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2582                 return bt->err;
2583           else
2584                 continue;
2585
2586         set->page->min -= vallen + sizeof(BtVal);
2587         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2588         memcpy (val->value, value, vallen);
2589         val->len = vallen;
2590
2591         set->latch->dirty = 1;
2592         set->page->min -= keylen + sizeof(BtKey);
2593         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2594         memcpy (ptr->key, key, keylen);
2595         ptr->len = keylen;
2596         
2597         slotptr(set->page, slot)->off = set->page->min;
2598         bt_unlockpage(BtLockWrite, set->latch);
2599         bt_unpinlatch (set->latch);
2600         return 0;
2601   }
2602   return 0;
2603 }
2604
2605 typedef struct {
2606         logseqno reqlsn;        // redo log seq no required
2607         logseqno lsn;           // redo log sequence number
2608         uint entry;                     // latch table entry number
2609         uint slot:31;           // page slot number
2610         uint reuse:1;           // reused previous page
2611 } AtomicTxn;
2612
2613 typedef struct {
2614         uid page_no;            // page number for split leaf
2615         void *next;                     // next key to insert
2616         uint entry:29;          // latch table entry number
2617         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2618         uint nounlock:1;        // don't unlock ParentModification
2619         unsigned char leafkey[BT_keyarray];
2620 } AtomicKey;
2621
2622 //      determine actual page where key is located
2623 //  return slot number
2624
2625 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2626 {
2627 BtKey *key = keyptr(source,src);
2628 uint slot = locks[src].slot;
2629 uint entry;
2630
2631         if( src > 1 && locks[src].reuse )
2632           entry = locks[src-1].entry, slot = 0;
2633         else
2634           entry = locks[src].entry;
2635
2636         if( slot ) {
2637                 set->latch = bt->mgr->latchsets + entry;
2638                 set->page = bt_mappage (bt, set->latch);
2639                 return slot;
2640         }
2641
2642         //      is locks->reuse set? or was slot zeroed?
2643         //      if so, find where our key is located 
2644         //      on current page or pages split on
2645         //      same page txn operations.
2646
2647         do {
2648                 set->latch = bt->mgr->latchsets + entry;
2649                 set->page = bt_mappage (bt, set->latch);
2650
2651                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2652                   if( slotptr(set->page, slot)->type == Librarian )
2653                         slot++;
2654                   if( locks[src].reuse )
2655                         locks[src].entry = entry;
2656                   return slot;
2657                 }
2658         } while( entry = set->latch->split );
2659
2660         bt->line = __LINE__, bt->err = BTERR_atomic;
2661         return 0;
2662 }
2663
2664 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2665 {
2666 BtKey *key = keyptr(source, src);
2667 BtVal *val = valptr(source, src);
2668 BtLatchSet *latch;
2669 BtPageSet set[1];
2670 uint entry, slot;
2671
2672   while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2673         if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) ) {
2674           if( bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2675                 return bt->err;
2676           set->page->lsn = locks[src].lsn;
2677           return 0;
2678         }
2679
2680         if( entry = bt_splitpage (bt, set) )
2681           latch = bt->mgr->latchsets + entry;
2682         else
2683           return bt->err;
2684
2685         //      splice right page into split chain
2686         //      and WriteLock it.
2687
2688         bt_lockpage(bt, BtLockWrite, latch);
2689         latch->split = set->latch->split;
2690         set->latch->split = entry;
2691         locks[src].slot = 0;
2692   }
2693
2694   return bt->line = __LINE__, bt->err = BTERR_atomic;
2695 }
2696
2697 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2698 {
2699 BtKey *key = keyptr(source, src);
2700 BtPageSet set[1];
2701 uint idx, slot;
2702 BtKey *ptr;
2703 BtVal *val;
2704
2705         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2706           ptr = keyptr(set->page, slot);
2707         else
2708           return bt->line = __LINE__, bt->err = BTERR_struct;
2709
2710         if( !keycmp (ptr, key->key, key->len) )
2711           if( !slotptr(set->page, slot)->dead )
2712                 slotptr(set->page, slot)->dead = 1;
2713           else
2714                 return 0;
2715         else
2716                 return 0;
2717
2718         val = valptr(set->page, slot);
2719         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2720         set->page->lsn = locks[src].lsn;
2721         set->latch->dirty = 1;
2722         set->page->act--;
2723         bt->found++;
2724         return 0;
2725 }
2726
2727 //      delete an empty master page for a transaction
2728
2729 //      note that the far right page never empties because
2730 //      it always contains (at least) the infinite stopper key
2731 //      and that all pages that don't contain any keys are
2732 //      deleted, or are being held under Atomic lock.
2733
2734 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2735 {
2736 BtPageSet right[1], temp[1];
2737 unsigned char value[BtId];
2738 uid right_page_no;
2739 BtKey *ptr;
2740
2741         bt_lockpage(bt, BtLockWrite, prev->latch);
2742
2743         //      grab the right sibling
2744
2745         if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2746                 right->page = bt_mappage (bt, right->latch);
2747         else
2748                 return bt->err;
2749
2750         bt_lockpage(bt, BtLockAtomic, right->latch);
2751         bt_lockpage(bt, BtLockWrite, right->latch);
2752
2753         //      and pull contents over empty page
2754         //      while preserving master's left link
2755
2756         memcpy (right->page->left, prev->page->left, BtId);
2757         memcpy (prev->page, right->page, bt->mgr->page_size);
2758
2759         //      forward seekers to old right sibling
2760         //      to new page location in set
2761
2762         bt_putid (right->page->right, prev->latch->page_no);
2763         right->latch->dirty = 1;
2764         right->page->kill = 1;
2765
2766         //      remove pointer to right page for searchers by
2767         //      changing right fence key to point to the master page
2768
2769         ptr = keyptr(right->page,right->page->cnt);
2770         bt_putid (value, prev->latch->page_no);
2771
2772         if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2773                 return bt->err;
2774
2775         //  now that master page is in good shape we can
2776         //      remove its locks.
2777
2778         bt_unlockpage (BtLockAtomic, prev->latch);
2779         bt_unlockpage (BtLockWrite, prev->latch);
2780
2781         //  fix master's right sibling's left pointer
2782         //      to remove scanner's poiner to the right page
2783
2784         if( right_page_no = bt_getid (prev->page->right) ) {
2785           if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
2786                 temp->page = bt_mappage (bt, temp->latch);
2787
2788           bt_lockpage (bt, BtLockWrite, temp->latch);
2789           bt_putid (temp->page->left, prev->latch->page_no);
2790           temp->latch->dirty = 1;
2791
2792           bt_unlockpage (BtLockWrite, temp->latch);
2793           bt_unpinlatch (temp->latch);
2794         } else {        // master is now the far right page
2795           bt_spinmutexlock (bt->mgr->lock);
2796           bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2797           bt_spinreleasemutex(bt->mgr->lock);
2798         }
2799
2800         //      now that there are no pointers to the right page
2801         //      we can delete it after the last read access occurs
2802
2803         bt_unlockpage (BtLockWrite, right->latch);
2804         bt_unlockpage (BtLockAtomic, right->latch);
2805         bt_lockpage (bt, BtLockDelete, right->latch);
2806         bt_lockpage (bt, BtLockWrite, right->latch);
2807         bt_freepage (bt, right);
2808         return 0;
2809 }
2810
2811 //      atomic modification of a batch of keys.
2812
2813 //      return -1 if BTERR is set
2814 //      otherwise return slot number
2815 //      causing the key constraint violation
2816 //      or zero on successful completion.
2817
2818 int bt_atomictxn (BtDb *bt, BtPage source)
2819 {
2820 uint src, idx, slot, samepage, entry;
2821 AtomicKey *head, *tail, *leaf;
2822 BtPageSet set[1], prev[1];
2823 unsigned char value[BtId];
2824 BtKey *key, *ptr, *key2;
2825 BtLatchSet *latch;
2826 AtomicTxn *locks;
2827 int result = 0;
2828 BtSlot temp[1];
2829 logseqno lsn;
2830 BtPage page;
2831 BtVal *val;
2832 uid right;
2833 int type;
2834
2835   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2836   head = NULL;
2837   tail = NULL;
2838
2839   // stable sort the list of keys into order to
2840   //    prevent deadlocks between threads.
2841
2842   for( src = 1; src++ < source->cnt; ) {
2843         *temp = *slotptr(source,src);
2844         key = keyptr (source,src);
2845
2846         for( idx = src; --idx; ) {
2847           key2 = keyptr (source,idx);
2848           if( keycmp (key, key2->key, key2->len) < 0 ) {
2849                 *slotptr(source,idx+1) = *slotptr(source,idx);
2850                 *slotptr(source,idx) = *temp;
2851           } else
2852                 break;
2853         }
2854   }
2855
2856   // Load the leaf page for each key
2857   // group same page references with reuse bit
2858   // and determine any constraint violations
2859
2860   for( src = 0; src++ < source->cnt; ) {
2861         key = keyptr(source, src);
2862         slot = 0;
2863
2864         // first determine if this modification falls
2865         // on the same page as the previous modification
2866         //      note that the far right leaf page is a special case
2867
2868         if( samepage = src > 1 )
2869           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2870                 slot = bt_findslot(set->page, key->key, key->len);
2871           else
2872                 bt_unlockpage(BtLockRead, set->latch); 
2873
2874         if( !slot )
2875           if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) )
2876                 set->latch->split = 0;
2877           else
2878                 goto atomicerr;
2879
2880         if( slotptr(set->page, slot)->type == Librarian )
2881           ptr = keyptr(set->page, ++slot);
2882         else
2883           ptr = keyptr(set->page, slot);
2884
2885         if( !samepage ) {
2886           locks[src].entry = set->latch->entry;
2887           locks[src].slot = slot;
2888           locks[src].reuse = 0;
2889         } else {
2890           locks[src].entry = 0;
2891           locks[src].slot = 0;
2892           locks[src].reuse = 1;
2893         }
2894
2895         //      capture current lsn for master page
2896
2897         locks[src].reqlsn = set->page->lsn;
2898
2899         //      perform constraint checks
2900
2901         switch( slotptr(source, src)->type ) {
2902         case Duplicate:
2903         case Unique:
2904           if( !slotptr(set->page, slot)->dead )
2905            if( slot < set->page->cnt || bt_getid (set->page->right) )
2906             if( !keycmp (ptr, key->key, key->len) ) {
2907
2908                   // return constraint violation if key already exists
2909
2910                   bt_unlockpage(BtLockRead, set->latch);
2911                   result = src;
2912
2913                   while( src ) {
2914                         if( locks[src].entry ) {
2915                           set->latch = bt->mgr->latchsets + locks[src].entry;
2916                           bt_unlockpage(BtLockAtomic, set->latch);
2917                           bt_unpinlatch (set->latch);
2918                         }
2919                         src--;
2920                   }
2921                   free (locks);
2922                   return result;
2923                 }
2924           break;
2925         }
2926   }
2927
2928   //  unlock last loadpage lock
2929
2930   if( source->cnt )
2931         bt_unlockpage(BtLockRead, set->latch);
2932
2933   //  and add entries to redo log
2934
2935   if( bt->mgr->redopages )
2936    if( lsn = bt_txnredo (bt, source) )
2937         for( src = 0; src++ < source->cnt; )
2938          locks[src].lsn = lsn;
2939         else
2940          goto atomicerr;
2941
2942   //  obtain write lock for each master page
2943   //  sync flushed pages to disk
2944
2945   for( src = 0; src++ < source->cnt; ) {
2946         if( locks[src].reuse )
2947           continue;
2948
2949         set->latch = bt->mgr->latchsets + locks[src].entry;
2950         bt_lockpage(bt, BtLockWrite, set->latch);
2951   }
2952
2953   // insert or delete each key
2954   // process any splits or merges
2955   // release Write & Atomic latches
2956   // set ParentModifications and build
2957   // queue of keys to insert for split pages
2958   // or delete for deleted pages.
2959
2960   // run through txn list backwards
2961
2962   samepage = source->cnt + 1;
2963
2964   for( src = source->cnt; src; src-- ) {
2965         if( locks[src].reuse )
2966           continue;
2967
2968         //  perform the txn operations
2969         //      from smaller to larger on
2970         //  the same page
2971
2972         for( idx = src; idx < samepage; idx++ )
2973          switch( slotptr(source,idx)->type ) {
2974          case Delete:
2975           if( bt_atomicdelete (bt, source, locks, idx) )
2976                 goto atomicerr;
2977           break;
2978
2979         case Duplicate:
2980         case Unique:
2981           if( bt_atomicinsert (bt, source, locks, idx) )
2982                 goto atomicerr;
2983           break;
2984         }
2985
2986         //      after the same page operations have finished,
2987         //  process master page for splits or deletion.
2988
2989         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2990         prev->page = bt_mappage (bt, prev->latch);
2991         samepage = src;
2992
2993         //  pick-up all splits from master page
2994         //      each one is already WriteLocked.
2995
2996         entry = prev->latch->split;
2997
2998         while( entry ) {
2999           set->latch = bt->mgr->latchsets + entry;
3000           set->page = bt_mappage (bt, set->latch);
3001           entry = set->latch->split;
3002
3003           // delete empty master page by undoing its split
3004           //  (this is potentially another empty page)
3005           //  note that there are no new left pointers yet
3006
3007           if( !prev->page->act ) {
3008                 memcpy (set->page->left, prev->page->left, BtId);
3009                 memcpy (prev->page, set->page, bt->mgr->page_size);
3010                 bt_lockpage (bt, BtLockDelete, set->latch);
3011                 bt_freepage (bt, set);
3012
3013                 prev->latch->split = set->latch->split;
3014                 prev->latch->dirty = 1;
3015                 continue;
3016           }
3017
3018           // remove empty page from the split chain
3019           // and return it to the free list.
3020
3021           if( !set->page->act ) {
3022                 memcpy (prev->page->right, set->page->right, BtId);
3023                 prev->latch->split = set->latch->split;
3024                 bt_lockpage (bt, BtLockDelete, set->latch);
3025                 bt_freepage (bt, set);
3026                 continue;
3027           }
3028
3029           //  schedule prev fence key update
3030
3031           ptr = keyptr(prev->page,prev->page->cnt);
3032           leaf = calloc (sizeof(AtomicKey), 1);
3033
3034           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3035           leaf->page_no = prev->latch->page_no;
3036           leaf->entry = prev->latch->entry;
3037           leaf->type = 0;
3038
3039           if( tail )
3040                 tail->next = leaf;
3041           else
3042                 head = leaf;
3043
3044           tail = leaf;
3045
3046           // splice in the left link into the split page
3047
3048           bt_putid (set->page->left, prev->latch->page_no);
3049           bt_lockpage(bt, BtLockParent, prev->latch);
3050           bt_unlockpage(BtLockWrite, prev->latch);
3051           *prev = *set;
3052         }
3053
3054         //  update left pointer in next right page from last split page
3055         //      (if all splits were reversed, latch->split == 0)
3056
3057         if( latch->split ) {
3058           //  fix left pointer in master's original (now split)
3059           //  far right sibling or set rightmost page in page zero
3060
3061           if( right = bt_getid (prev->page->right) ) {
3062                 if( set->latch = bt_pinlatch (bt, right, 1) )
3063                   set->page = bt_mappage (bt, set->latch);
3064                 else
3065                   goto atomicerr;
3066
3067             bt_lockpage (bt, BtLockWrite, set->latch);
3068             bt_putid (set->page->left, prev->latch->page_no);
3069                 set->latch->dirty = 1;
3070             bt_unlockpage (BtLockWrite, set->latch);
3071                 bt_unpinlatch (set->latch);
3072           } else {      // prev is rightmost page
3073             bt_spinmutexlock (bt->mgr->lock);
3074                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
3075             bt_spinreleasemutex(bt->mgr->lock);
3076           }
3077
3078           //  Process last page split in chain
3079
3080           ptr = keyptr(prev->page,prev->page->cnt);
3081           leaf = calloc (sizeof(AtomicKey), 1);
3082
3083           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3084           leaf->page_no = prev->latch->page_no;
3085           leaf->entry = prev->latch->entry;
3086           leaf->type = 0;
3087   
3088           if( tail )
3089                 tail->next = leaf;
3090           else
3091                 head = leaf;
3092
3093           tail = leaf;
3094
3095           bt_lockpage(bt, BtLockParent, prev->latch);
3096           bt_unlockpage(BtLockWrite, prev->latch);
3097
3098           //  remove atomic lock on master page
3099
3100           bt_unlockpage(BtLockAtomic, latch);
3101           continue;
3102         }
3103
3104         //  finished if prev page occupied (either master or final split)
3105
3106         if( prev->page->act ) {
3107           bt_unlockpage(BtLockWrite, latch);
3108           bt_unlockpage(BtLockAtomic, latch);
3109           bt_unpinlatch(latch);
3110           continue;
3111         }
3112
3113         // any and all splits were reversed, and the
3114         // master page located in prev is empty, delete it
3115         // by pulling over master's right sibling.
3116
3117         // Remove empty master's fence key
3118
3119         ptr = keyptr(prev->page,prev->page->cnt);
3120
3121         if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3122                 goto atomicerr;
3123
3124         //      perform the remainder of the delete
3125         //      from the FIFO queue
3126
3127         leaf = calloc (sizeof(AtomicKey), 1);
3128
3129         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3130         leaf->page_no = prev->latch->page_no;
3131         leaf->entry = prev->latch->entry;
3132         leaf->nounlock = 1;
3133         leaf->type = 2;
3134   
3135         if( tail )
3136           tail->next = leaf;
3137         else
3138           head = leaf;
3139
3140         tail = leaf;
3141
3142         //      leave atomic lock in place until
3143         //      deletion completes in next phase.
3144
3145         bt_unlockpage(BtLockWrite, prev->latch);
3146   }
3147
3148   //  add & delete keys for any pages split or merged during transaction
3149
3150   if( leaf = head )
3151     do {
3152           set->latch = bt->mgr->latchsets + leaf->entry;
3153           set->page = bt_mappage (bt, set->latch);
3154
3155           bt_putid (value, leaf->page_no);
3156           ptr = (BtKey *)leaf->leafkey;
3157
3158           switch( leaf->type ) {
3159           case 0:       // insert key
3160             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
3161                   goto atomicerr;
3162
3163                 break;
3164
3165           case 1:       // delete key
3166                 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3167                   goto atomicerr;
3168
3169                 break;
3170
3171           case 2:       // free page
3172                 if( bt_atomicfree (bt, set) )
3173                   goto atomicerr;
3174
3175                 break;
3176           }
3177
3178           if( !leaf->nounlock )
3179             bt_unlockpage (BtLockParent, set->latch);
3180
3181           bt_unpinlatch (set->latch);
3182           tail = leaf->next;
3183           free (leaf);
3184         } while( leaf = tail );
3185
3186   // return success
3187
3188   free (locks);
3189   return 0;
3190 atomicerr:
3191   return -1;
3192 }
3193
3194 //      set cursor to highest slot on highest page
3195
3196 uint bt_lastkey (BtDb *bt)
3197 {
3198 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3199 BtPageSet set[1];
3200
3201         if( set->latch = bt_pinlatch (bt, page_no, 1) )
3202                 set->page = bt_mappage (bt, set->latch);
3203         else
3204                 return 0;
3205
3206     bt_lockpage(bt, BtLockRead, set->latch);
3207         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3208     bt_unlockpage(BtLockRead, set->latch);
3209         bt_unpinlatch (set->latch);
3210
3211         bt->cursor_page = page_no;
3212         return bt->cursor->cnt;
3213 }
3214
3215 //      return previous slot on cursor page
3216
3217 uint bt_prevkey (BtDb *bt, uint slot)
3218 {
3219 uid ourright, next, us = bt->cursor_page;
3220 BtPageSet set[1];
3221
3222         if( --slot )
3223                 return slot;
3224
3225         ourright = bt_getid(bt->cursor->right);
3226
3227 goleft:
3228         if( !(next = bt_getid(bt->cursor->left)) )
3229                 return 0;
3230
3231 findourself:
3232         bt->cursor_page = next;
3233
3234         if( set->latch = bt_pinlatch (bt, next, 1) )
3235                 set->page = bt_mappage (bt, set->latch);
3236         else
3237                 return 0;
3238
3239     bt_lockpage(bt, BtLockRead, set->latch);
3240         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3241         bt_unlockpage(BtLockRead, set->latch);
3242         bt_unpinlatch (set->latch);
3243         
3244         next = bt_getid (bt->cursor->right);
3245
3246         if( bt->cursor->kill )
3247                 goto findourself;
3248
3249         if( next != us )
3250           if( next == ourright )
3251                 goto goleft;
3252           else
3253                 goto findourself;
3254
3255         return bt->cursor->cnt;
3256 }
3257
3258 //  return next slot on cursor page
3259 //  or slide cursor right into next page
3260
3261 uint bt_nextkey (BtDb *bt, uint slot)
3262 {
3263 BtPageSet set[1];
3264 uid right;
3265
3266   do {
3267         right = bt_getid(bt->cursor->right);
3268
3269         while( slot++ < bt->cursor->cnt )
3270           if( slotptr(bt->cursor,slot)->dead )
3271                 continue;
3272           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3273                 return slot;
3274           else
3275                 break;
3276
3277         if( !right )
3278                 break;
3279
3280         bt->cursor_page = right;
3281
3282         if( set->latch = bt_pinlatch (bt, right, 1) )
3283                 set->page = bt_mappage (bt, set->latch);
3284         else
3285                 return 0;
3286
3287     bt_lockpage(bt, BtLockRead, set->latch);
3288
3289         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3290
3291         bt_unlockpage(BtLockRead, set->latch);
3292         bt_unpinlatch (set->latch);
3293         slot = 0;
3294
3295   } while( 1 );
3296
3297   return bt->err = 0;
3298 }
3299
3300 //  cache page of keys into cursor and return starting slot for given key
3301
3302 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3303 {
3304 BtPageSet set[1];
3305 uint slot;
3306
3307         // cache page for retrieval
3308
3309         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
3310           memcpy (bt->cursor, set->page, bt->mgr->page_size);
3311         else
3312           return 0;
3313
3314         bt->cursor_page = set->latch->page_no;
3315
3316         bt_unlockpage(BtLockRead, set->latch);
3317         bt_unpinlatch (set->latch);
3318         return slot;
3319 }
3320
3321 BtKey *bt_key(BtDb *bt, uint slot)
3322 {
3323         return keyptr(bt->cursor, slot);
3324 }
3325
3326 BtVal *bt_val(BtDb *bt, uint slot)
3327 {
3328         return valptr(bt->cursor,slot);
3329 }
3330
3331 #ifdef STANDALONE
3332
3333 #ifndef unix
3334 double getCpuTime(int type)
3335 {
3336 FILETIME crtime[1];
3337 FILETIME xittime[1];
3338 FILETIME systime[1];
3339 FILETIME usrtime[1];
3340 SYSTEMTIME timeconv[1];
3341 double ans = 0;
3342
3343         memset (timeconv, 0, sizeof(SYSTEMTIME));
3344
3345         switch( type ) {
3346         case 0:
3347                 GetSystemTimeAsFileTime (xittime);
3348                 FileTimeToSystemTime (xittime, timeconv);
3349                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3350                 break;
3351         case 1:
3352                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3353                 FileTimeToSystemTime (usrtime, timeconv);
3354                 break;
3355         case 2:
3356                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3357                 FileTimeToSystemTime (systime, timeconv);
3358                 break;
3359         }
3360
3361         ans += (double)timeconv->wHour * 3600;
3362         ans += (double)timeconv->wMinute * 60;
3363         ans += (double)timeconv->wSecond;
3364         ans += (double)timeconv->wMilliseconds / 1000;
3365         return ans;
3366 }
3367 #else
3368 #include <time.h>
3369 #include <sys/resource.h>
3370
3371 double getCpuTime(int type)
3372 {
3373 struct rusage used[1];
3374 struct timeval tv[1];
3375
3376         switch( type ) {
3377         case 0:
3378                 gettimeofday(tv, NULL);
3379                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3380
3381         case 1:
3382                 getrusage(RUSAGE_SELF, used);
3383                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3384
3385         case 2:
3386                 getrusage(RUSAGE_SELF, used);
3387                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3388         }
3389
3390         return 0;
3391 }
3392 #endif
3393
3394 void bt_poolaudit (BtMgr *mgr)
3395 {
3396 BtLatchSet *latch;
3397 uint slot = 0;
3398
3399         while( slot++ < mgr->latchdeployed ) {
3400                 latch = mgr->latchsets + slot;
3401
3402                 if( *latch->readwr->rin & MASK )
3403                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3404                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3405
3406                 if( *latch->access->rin & MASK )
3407                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3408                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3409
3410                 if( *latch->parent->ticket != *latch->parent->serving )
3411                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3412                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3413
3414                 if( *latch->pin & ~CLOCK_bit ) {
3415                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3416                         *latch->pin = 0;
3417                 }
3418         }
3419 }
3420
3421 uint bt_latchaudit (BtDb *bt)
3422 {
3423 ushort idx, hashidx;
3424 uid next, page_no;
3425 BtLatchSet *latch;
3426 uint cnt = 0;
3427 BtKey *ptr;
3428
3429         if( *(ushort *)(bt->mgr->lock) )
3430                 fprintf(stderr, "Alloc page locked\n");
3431         *(ushort *)(bt->mgr->lock) = 0;
3432
3433         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3434                 latch = bt->mgr->latchsets + idx;
3435                 if( *latch->readwr->rin & MASK )
3436                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3437                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3438
3439                 if( *latch->access->rin & MASK )
3440                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3441                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3442
3443                 if( *latch->parent->ticket != *latch->parent->serving )
3444                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3445                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3446
3447                 if( *latch->pin ) {
3448                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3449                         *latch->pin = 0;
3450                 }
3451         }
3452
3453         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3454           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3455                         fprintf(stderr, "hash entry %d locked\n", hashidx);
3456
3457           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3458
3459           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3460                 latch = bt->mgr->latchsets + idx;
3461                 if( *latch->pin )
3462                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3463           } while( idx = latch->next );
3464         }
3465
3466         page_no = LEAF_page;
3467
3468         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3469         uid off = page_no << bt->mgr->page_bits;
3470 #ifdef unix
3471           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3472 #else
3473         DWORD amt[1];
3474
3475           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3476
3477           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3478                 return bt->line = __LINE__, bt->err = BTERR_map;
3479
3480           if( *amt <  bt->mgr->page_size )
3481                 return bt->line = __LINE__, bt->err = BTERR_map;
3482 #endif
3483                 if( !bt->frame->free && !bt->frame->lvl )
3484                         cnt += bt->frame->act;
3485                 page_no++;
3486         }
3487                 
3488         cnt--;  // remove stopper key
3489         fprintf(stderr, " Total keys read %d\n", cnt);
3490
3491         bt_close (bt);
3492         return 0;
3493 }
3494
3495 typedef struct {
3496         char idx;
3497         char *type;
3498         char *infile;
3499         BtMgr *mgr;
3500         int num;
3501 } ThreadArg;
3502
3503 //  standalone program to index file of keys
3504 //  then list them onto std-out
3505
3506 #ifdef unix
3507 void *index_file (void *arg)
3508 #else
3509 uint __stdcall index_file (void *arg)
3510 #endif
3511 {
3512 int line = 0, found = 0, cnt = 0, idx;
3513 uid next, page_no = LEAF_page;  // start on first page of leaves
3514 int ch, len = 0, slot, type = 0;
3515 unsigned char key[BT_maxkey];
3516 unsigned char txn[65536];
3517 ThreadArg *args = arg;
3518 BtPageSet set[1];
3519 uint nxt = 65536;
3520 BtPage page;
3521 BtKey *ptr;
3522 BtVal *val;
3523 BtDb *bt;
3524 FILE *in;
3525
3526         bt = bt_open (args->mgr);
3527         page = (BtPage)txn;
3528
3529         if( args->idx < strlen (args->type) )
3530                 ch = args->type[args->idx];
3531         else
3532                 ch = args->type[strlen(args->type) - 1];
3533
3534         switch(ch | 0x20)
3535         {
3536         case 'a':
3537                 fprintf(stderr, "started latch mgr audit\n");
3538                 cnt = bt_latchaudit (bt);
3539                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3540                 break;
3541
3542         case 'd':
3543                 type = Delete;
3544
3545         case 'p':
3546                 if( !type )
3547                         type = Unique;
3548
3549                 if( args->num )
3550                  if( type == Delete )
3551                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3552                  else
3553                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3554                 else
3555                  if( type == Delete )
3556                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3557                  else
3558                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3559
3560                 if( in = fopen (args->infile, "rb") )
3561                   while( ch = getc(in), ch != EOF )
3562                         if( ch == '\n' )
3563                         {
3564                           line++;
3565
3566                           if( !args->num ) {
3567                             if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3568                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3569                             len = 0;
3570                                 continue;
3571                           }
3572
3573                           nxt -= len - 10;
3574                           memcpy (txn + nxt, key + 10, len - 10);
3575                           nxt -= 1;
3576                           txn[nxt] = len - 10;
3577                           nxt -= 10;
3578                           memcpy (txn + nxt, key, 10);
3579                           nxt -= 1;
3580                           txn[nxt] = 10;
3581                           slotptr(page,++cnt)->off  = nxt;
3582                           slotptr(page,cnt)->type = type;
3583                           len = 0;
3584
3585                           if( cnt < args->num )
3586                                 continue;
3587
3588                           page->cnt = cnt;
3589                           page->act = cnt;
3590                           page->min = nxt;
3591
3592                           if( bt_atomictxn (bt, page) )
3593                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3594                           nxt = sizeof(txn);
3595                           cnt = 0;
3596
3597                         }
3598                         else if( len < BT_maxkey )
3599                                 key[len++] = ch;
3600                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
3601                 break;
3602
3603         case 'w':
3604                 fprintf(stderr, "started indexing for %s\n", args->infile);
3605                 if( in = fopen (args->infile, "r") )
3606                   while( ch = getc(in), ch != EOF )
3607                         if( ch == '\n' )
3608                         {
3609                           line++;
3610
3611                           if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3612                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3613                           len = 0;
3614                         }
3615                         else if( len < BT_maxkey )
3616                                 key[len++] = ch;
3617                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3618                 break;
3619
3620         case 'f':
3621                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3622                 if( in = fopen (args->infile, "rb") )
3623                   while( ch = getc(in), ch != EOF )
3624                         if( ch == '\n' )
3625                         {
3626                           line++;
3627                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3628                                 found++;
3629                           else if( bt->err )
3630                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->err, errno, bt->line, line), exit(0);
3631                           len = 0;
3632                         }
3633                         else if( len < BT_maxkey )
3634                                 key[len++] = ch;
3635                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3636                 break;
3637
3638         case 's':
3639                 fprintf(stderr, "started scanning\n");
3640                 do {
3641                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
3642                                 set->page = bt_mappage (bt, set->latch);
3643                         else
3644                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3645                         bt_lockpage (bt, BtLockRead, set->latch);
3646                         next = bt_getid (set->page->right);
3647
3648                         for( slot = 0; slot++ < set->page->cnt; )
3649                          if( next || slot < set->page->cnt )
3650                           if( !slotptr(set->page, slot)->dead ) {
3651                                 ptr = keyptr(set->page, slot);
3652                                 len = ptr->len;
3653
3654                             if( slotptr(set->page, slot)->type == Duplicate )
3655                                         len -= BtId;
3656
3657                                 fwrite (ptr->key, len, 1, stdout);
3658                                 val = valptr(set->page, slot);
3659                                 fwrite (val->value, val->len, 1, stdout);
3660                                 fputc ('\n', stdout);
3661                                 cnt++;
3662                            }
3663
3664                         bt_unlockpage (BtLockRead, set->latch);
3665                         bt_unpinlatch (set->latch);
3666                 } while( page_no = next );
3667
3668                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3669                 break;
3670
3671         case 'r':
3672                 fprintf(stderr, "started reverse scan\n");
3673                 if( slot = bt_lastkey (bt) )
3674                    while( slot = bt_prevkey (bt, slot) ) {
3675                         if( slotptr(bt->cursor, slot)->dead )
3676                           continue;
3677
3678                         ptr = keyptr(bt->cursor, slot);
3679                         len = ptr->len;
3680
3681                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3682                                 len -= BtId;
3683
3684                         fwrite (ptr->key, len, 1, stdout);
3685                         val = valptr(bt->cursor, slot);
3686                         fwrite (val->value, val->len, 1, stdout);
3687                         fputc ('\n', stdout);
3688                         cnt++;
3689                   }
3690
3691                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3692                 break;
3693
3694         case 'c':
3695 #ifdef unix
3696                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3697 #endif
3698                 fprintf(stderr, "started counting\n");
3699                 page_no = LEAF_page;
3700
3701                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3702                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3703                                 break;
3704
3705                         if( !bt->frame->free && !bt->frame->lvl )
3706                                 cnt += bt->frame->act;
3707
3708                         bt->reads++;
3709                         page_no++;
3710                 }
3711                 
3712                 cnt--;  // remove stopper key
3713                 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3714                 break;
3715         }
3716
3717         bt_close (bt);
3718 #ifdef unix
3719         return NULL;
3720 #else
3721         return 0;
3722 #endif
3723 }
3724
3725 typedef struct timeval timer;
3726
3727 int main (int argc, char **argv)
3728 {
3729 int idx, cnt, len, slot, err;
3730 int segsize, bits = 16;
3731 double start, stop;
3732 #ifdef unix
3733 pthread_t *threads;
3734 #else
3735 HANDLE *threads;
3736 #endif
3737 ThreadArg *args;
3738 uint poolsize = 0;
3739 uint recovery = 0;
3740 float elapsed;
3741 int num = 0;
3742 char key[1];
3743 BtMgr *mgr;
3744 BtKey *ptr;
3745 BtDb *bt;
3746
3747         if( argc < 3 ) {
3748                 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size recovery_pages src_file1 src_file2 ... ]\n", argv[0]);
3749                 fprintf (stderr, "  where idx_file is the name of the btree file\n");
3750                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3751                 fprintf (stderr, "  page_bits is the page size in bits\n");
3752                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3753                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3754                 fprintf (stderr, "  recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3755                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3756                 exit(0);
3757         }
3758
3759         start = getCpuTime(0);
3760
3761         if( argc > 3 )
3762                 bits = atoi(argv[3]);
3763
3764         if( argc > 4 )
3765                 poolsize = atoi(argv[4]);
3766
3767         if( !poolsize )
3768                 fprintf (stderr, "Warning: no mapped_pool\n");
3769
3770         if( argc > 5 )
3771                 num = atoi(argv[5]);
3772
3773         if( argc > 6 )
3774                 recovery = atoi(argv[6]);
3775
3776         cnt = argc - 7;
3777 #ifdef unix
3778         threads = malloc (cnt * sizeof(pthread_t));
3779 #else
3780         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3781 #endif
3782         args = malloc (cnt * sizeof(ThreadArg));
3783
3784         mgr = bt_mgr ((argv[1]), bits, poolsize, recovery);
3785
3786         if( !mgr ) {
3787                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3788                 exit (1);
3789         }
3790
3791         //      fire off threads
3792
3793         for( idx = 0; idx < cnt; idx++ ) {
3794                 args[idx].infile = argv[idx + 7];
3795                 args[idx].type = argv[2];
3796                 args[idx].mgr = mgr;
3797                 args[idx].num = num;
3798                 args[idx].idx = idx;
3799 #ifdef unix
3800                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3801                         fprintf(stderr, "Error creating thread %d\n", err);
3802 #else
3803                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3804 #endif
3805         }
3806
3807         //      wait for termination
3808
3809 #ifdef unix
3810         for( idx = 0; idx < cnt; idx++ )
3811                 pthread_join (threads[idx], NULL);
3812 #else
3813         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3814
3815         for( idx = 0; idx < cnt; idx++ )
3816                 CloseHandle(threads[idx]);
3817
3818 #endif
3819         bt_poolaudit(mgr);
3820         bt_mgrclose (mgr);
3821
3822         elapsed = getCpuTime(0) - start;
3823         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3824         elapsed = getCpuTime(1);
3825         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3826         elapsed = getCpuTime(2);
3827         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3828 }
3829
3830 #endif  //STANDALONE