]> pd.if.org Git - btree/blob - threadskv9.c
Upload experimental threadskv9.c version with durability.
[btree] / threadskv9.c
1 // btree version threadskv9 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      and redo log for failure recovery
10
11 // 01 OCT 2014
12
13 // author: karl malbrain, malbrain@cal.berkeley.edu
14
15 /*
16 This work, including the source code, documentation
17 and related data, is placed into the public domain.
18
19 The orginal author is Karl Malbrain.
20
21 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
22 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
23 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
24 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
25 RESULTING FROM THE USE, MODIFICATION, OR
26 REDISTRIBUTION OF THIS SOFTWARE.
27 */
28
29 // Please see the project home page for documentation
30 // code.google.com/p/high-concurrency-btree
31
32 #define _FILE_OFFSET_BITS 64
33 #define _LARGEFILE64_SOURCE
34
35 #ifdef linux
36 #define _GNU_SOURCE
37 #endif
38
39 #ifdef unix
40 #include <unistd.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <fcntl.h>
44 #include <sys/time.h>
45 #include <sys/mman.h>
46 #include <errno.h>
47 #include <pthread.h>
48 #else
49 #define WIN32_LEAN_AND_MEAN
50 #include <windows.h>
51 #include <stdio.h>
52 #include <stdlib.h>
53 #include <time.h>
54 #include <fcntl.h>
55 #include <process.h>
56 #include <intrin.h>
57 #endif
58
59 #include <memory.h>
60 #include <string.h>
61 #include <stddef.h>
62
63 typedef unsigned long long      uid;
64 typedef unsigned long long      logseqno;
65
66 #ifndef unix
67 typedef unsigned long long      off64_t;
68 typedef unsigned short          ushort;
69 typedef unsigned int            uint;
70 #endif
71
72 #define BT_ro 0x6f72    // ro
73 #define BT_rw 0x7772    // rw
74
75 #define BT_maxbits              24                                      // maximum page size in bits
76 #define BT_minbits              9                                       // minimum page size in bits
77 #define BT_minpage              (1 << BT_minbits)       // minimum page size
78 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
79
80 //  BTree page number constants
81 #define ALLOC_page              0       // allocation page
82 #define ROOT_page               1       // root of the btree
83 #define LEAF_page               2       // first page of leaves
84 #define REDO_page               3       // first page of redo buffer
85
86 //      Number of levels to create in a new BTree
87
88 #define MIN_lvl                 2
89
90 /*
91 There are six lock types for each node in four independent sets: 
92 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
93 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
94 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
95 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
96 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
97 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
98 */
99
100 typedef enum{
101         BtLockAccess = 1,
102         BtLockDelete = 2,
103         BtLockRead   = 4,
104         BtLockWrite  = 8,
105         BtLockParent = 16,
106         BtLockAtomic = 32
107 } BtLock;
108
109 //      definition for phase-fair reader/writer lock implementation
110
111 typedef struct {
112         volatile ushort rin[1];
113         volatile ushort rout[1];
114         volatile ushort ticket[1];
115         volatile ushort serving[1];
116         ushort tid;
117         ushort dup;
118 } RWLock;
119
120 //      write only queue lock
121
122 typedef struct {
123         volatile ushort ticket[1];
124         volatile ushort serving[1];
125         ushort tid;
126         ushort dup;
127 } WOLock;
128
129 #define PHID 0x1
130 #define PRES 0x2
131 #define MASK 0x3
132 #define RINC 0x4
133
134 //      definition for spin latch implementation
135
136 // exclusive is set for write access
137 // share is count of read accessors
138 // grant write lock when share == 0
139
140 volatile typedef struct {
141         ushort exclusive:1;
142         ushort pending:1;
143         ushort share:14;
144 } BtSpinLatch;
145
146 #define XCL 1
147 #define PEND 2
148 #define BOTH 3
149 #define SHARE 4
150
151 //  hash table entries
152
153 typedef struct {
154         volatile uint slot;             // Latch table entry at head of chain
155         BtSpinLatch latch[1];
156 } BtHashEntry;
157
158 //      latch manager table structure
159
160 typedef struct {
161         uid page_no;                    // latch set page number
162         RWLock readwr[1];               // read/write page lock
163         RWLock access[1];               // Access Intent/Page delete
164         WOLock parent[1];               // Posting of fence key in parent
165         WOLock atomic[1];               // Atomic update in progress
166         uint split;                             // right split page atomic insert
167         uint entry;                             // entry slot in latch table
168         uint next;                              // next entry in hash table chain
169         uint prev;                              // prev entry in hash table chain
170         volatile ushort pin;    // number of outstanding threads
171         ushort dirty:1;                 // page in cache is dirty
172 } BtLatchSet;
173
174 //      Define the length of the page record numbers
175
176 #define BtId 6
177
178 //      Page key slot definition.
179
180 //      Keys are marked dead, but remain on the page until
181 //      it cleanup is called. The fence key (highest key) for
182 //      a leaf page is always present, even after cleanup.
183
184 //      Slot types
185
186 //      In addition to the Unique keys that occupy slots
187 //      there are Librarian and Duplicate key
188 //      slots occupying the key slot array.
189
190 //      The Librarian slots are dead keys that
191 //      serve as filler, available to add new Unique
192 //      or Dup slots that are inserted into the B-tree.
193
194 //      The Duplicate slots have had their key bytes extended
195 //      by 6 bytes to contain a binary duplicate key uniqueifier.
196
197 typedef enum {
198         Unique,
199         Librarian,
200         Duplicate,
201         Delete,
202         Update
203 } BtSlotType;
204
205 typedef struct {
206         uint off:BT_maxbits;    // page offset for key start
207         uint type:3;                    // type of slot
208         uint dead:1;                    // set for deleted slot
209 } BtSlot;
210
211 //      The key structure occupies space at the upper end of
212 //      each page.  It's a length byte followed by the key
213 //      bytes.
214
215 typedef struct {
216         unsigned char len;              // this can be changed to a ushort or uint
217         unsigned char key[0];
218 } BtKey;
219
220 //      the value structure also occupies space at the upper
221 //      end of the page. Each key is immediately followed by a value.
222
223 typedef struct {
224         unsigned char len;              // this can be changed to a ushort or uint
225         unsigned char value[0];
226 } BtVal;
227
228 #define BT_maxkey       255             // maximum number of bytes in a key
229 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
230
231 //      The first part of an index page.
232 //      It is immediately followed
233 //      by the BtSlot array of keys.
234
235 //      note that this structure size
236 //      must be a multiple of 8 bytes
237 //      in order to place dups correctly.
238
239 typedef struct BtPage_ {
240         uint cnt;                                       // count of keys in page
241         uint act;                                       // count of active keys
242         uint min;                                       // next key offset
243         uint garbage;                           // page garbage in bytes
244         unsigned char bits:7;           // page size in bits
245         unsigned char free:1;           // page is on free chain
246         unsigned char lvl:7;            // level of page
247         unsigned char kill:1;           // page is being deleted
248         unsigned char right[BtId];      // page number to right
249         unsigned char left[BtId];       // page number to left
250         unsigned char filler[2];        // padding to multiple of 8
251         logseqno lsn;                           // last LogSeqNo applied to page
252 } *BtPage;
253
254 //  The loadpage interface object
255
256 typedef struct {
257         BtPage page;            // current page pointer
258         BtLatchSet *latch;      // current page latch set
259 } BtPageSet;
260
261 //      structure for latch manager on ALLOC_page
262
263 typedef struct {
264         struct BtPage_ alloc[1];        // next page_no in right ptr
265         unsigned long long dups[1];     // global duplicate key uniqueifier
266         unsigned char chain[BtId];      // head of free page_nos chain
267 } BtPageZero;
268
269 //      The object structure for Btree access
270
271 typedef struct {
272         uint page_size;                         // page size    
273         uint page_bits;                         // page size in bits    
274 #ifdef unix
275         int idx;
276 #else
277         HANDLE idx;
278 #endif
279         BtPageZero *pagezero;           // mapped allocation page
280         BtHashEntry *hashtable;         // the buffer pool hash table entries
281         BtLatchSet *latchsets;          // mapped latch set from buffer pool
282         unsigned char *pagepool;        // mapped to the buffer pool pages
283         unsigned char *redobuff;        // mapped recovery buffer pointer
284         logseqno flushlsn;                      // first lsn flushed w/msync
285         BtSpinLatch redo[1];            // redo area lite latch
286         BtSpinLatch lock[1];            // allocation area lite latch
287         ushort thread_no[1];            // next thread number
288         uint latchdeployed;                     // highest number of latch entries deployed
289         uint nlatchpage;                        // number of latch pages at BT_latch
290         uint latchtotal;                        // number of page latch entries
291         uint latchhash;                         // number of latch hash table slots
292         uint latchvictim;                       // next latch entry to examine
293         uint redopages;                         // size of recovery buff in pages
294         uint redoend;                           // eof/end element in recovery buff
295 #ifndef unix
296         HANDLE halloc;                          // allocation handle
297         HANDLE hpool;                           // buffer pool handle
298 #endif
299 } BtMgr;
300
301 typedef struct {
302         BtMgr *mgr;                             // buffer manager for thread
303         BtPage cursor;                  // cached frame for start/next (never mapped)
304         BtPage frame;                   // spare frame for the page split (never mapped)
305         uid cursor_page;                                // current cursor page number   
306         unsigned char *mem;                             // frame, cursor, page memory buffer
307         unsigned char key[BT_keyarray]; // last found complete key
308         int found;                              // last delete or insert was found
309         int err;                                // last error
310         int reads, writes;              // number of reads and writes from the btree
311         ushort thread_no;               // thread number
312 } BtDb;
313
314 //      Catastrophic errors
315
316 typedef enum {
317         BTERR_ok = 0,
318         BTERR_struct,
319         BTERR_ovflw,
320         BTERR_lock,
321         BTERR_map,
322         BTERR_read,
323         BTERR_wrt,
324         BTERR_atomic,
325         BTERR_recovery
326 } BTERR;
327
328 #define CLOCK_bit 0x8000
329
330 // recovery manager entry types
331
332 typedef enum {
333         BTRM_eof = 0,   // rest of buffer is emtpy
334         BTRM_add,               // add a unique key-value to btree
335         BTRM_dup,               // add a duplicate key-value to btree
336         BTRM_del,               // delete a key-value from btree
337         BTRM_upd,               // update a key with a new value
338         BTRM_new,               // allocate a new empty page
339         BTRM_old,               // reuse an old empty page
340         BTRM_end = 255  // circular buffer inter-gap
341 } BTRM;
342
343 // recovery manager entry
344 //      structure followed by BtKey & BtVal
345
346 typedef struct {
347         logseqno lsn;           // log sequence number for entry
348         uint len;                       // length of entry
349         unsigned char type;     // type of entry
350         unsigned char lvl;      // level of btree entry pertains to
351 } BtLogHdr;
352
353 // B-Tree functions
354
355 extern void bt_close (BtDb *bt);
356 extern BtDb *bt_open (BtMgr *mgr);
357 extern void bt_flushlsn (BtDb *bt);
358 extern void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
359 extern void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
360 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
361 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
362 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
363 extern BtKey *bt_foundkey (BtDb *bt);
364 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
365 extern uint bt_nextkey   (BtDb *bt, uint slot);
366
367 //      manager functions
368 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint rmpages);
369 extern void bt_mgrclose (BtMgr *mgr);
370 extern logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val);
371
372 //  Helper functions to return slot values
373 //      from the cursor page.
374
375 extern BtKey *bt_key (BtDb *bt, uint slot);
376 extern BtVal *bt_val (BtDb *bt, uint slot);
377
378 //  The page is allocated from low and hi ends.
379 //  The key slots are allocated from the bottom,
380 //      while the text and value of the key
381 //  are allocated from the top.  When the two
382 //  areas meet, the page is split into two.
383
384 //  A key consists of a length byte, two bytes of
385 //  index number (0 - 65535), and up to 253 bytes
386 //  of key value.
387
388 //  Associated with each key is a value byte string
389 //      containing any value desired.
390
391 //  The b-tree root is always located at page 1.
392 //      The first leaf page of level zero is always
393 //      located on page 2.
394
395 //      The b-tree pages are linked with next
396 //      pointers to facilitate enumerators,
397 //      and provide for concurrency.
398
399 //      When to root page fills, it is split in two and
400 //      the tree height is raised by a new root at page
401 //      one with two keys.
402
403 //      Deleted keys are marked with a dead bit until
404 //      page cleanup. The fence key for a leaf node is
405 //      always present
406
407 //  To achieve maximum concurrency one page is locked at a time
408 //  as the tree is traversed to find leaf key in question. The right
409 //  page numbers are used in cases where the page is being split,
410 //      or consolidated.
411
412 //  Page 0 is dedicated to lock for new page extensions,
413 //      and chains empty pages together for reuse. It also
414 //      contains the latch manager hash table.
415
416 //      The ParentModification lock on a node is obtained to serialize posting
417 //      or changing the fence key for a node.
418
419 //      Empty pages are chained together through the ALLOC page and reused.
420
421 //      Access macros to address slot and key values from the page
422 //      Page slots use 1 based indexing.
423
424 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
425 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
426 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
427
428 void bt_putid(unsigned char *dest, uid id)
429 {
430 int i = BtId;
431
432         while( i-- )
433                 dest[i] = (unsigned char)id, id >>= 8;
434 }
435
436 uid bt_getid(unsigned char *src)
437 {
438 uid id = 0;
439 int i;
440
441         for( i = 0; i < BtId; i++ )
442                 id <<= 8, id |= *src++; 
443
444         return id;
445 }
446
447 uid bt_newdup (BtDb *bt)
448 {
449 #ifdef unix
450         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
451 #else
452         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
453 #endif
454 }
455
456 //      Write-Only Queue Lock
457
458 void WriteOLock (WOLock *lock, ushort tid)
459 {
460 ushort tix;
461
462         if( lock->tid == tid ) {
463                 lock->dup++;
464                 return;
465         }
466 #ifdef unix
467         tix = __sync_fetch_and_add (lock->ticket, 1);
468 #else
469         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
470 #endif
471         // wait for our ticket to come up
472
473         while( tix != lock->serving[0] )
474 #ifdef unix
475                 sched_yield();
476 #else
477                 SwitchToThread ();
478 #endif
479         lock->tid = tid;
480 }
481
482 void WriteORelease (WOLock *lock)
483 {
484         if( lock->dup ) {
485                 lock->dup--;
486                 return;
487         }
488
489         lock->tid = 0;
490         lock->serving[0]++;
491 }
492
493 //      Phase-Fair reader/writer lock implementation
494
495 void WriteLock (RWLock *lock, ushort tid)
496 {
497 ushort w, r, tix;
498
499         if( lock->tid == tid ) {
500                 lock->dup++;
501                 return;
502         }
503 #ifdef unix
504         tix = __sync_fetch_and_add (lock->ticket, 1);
505 #else
506         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
507 #endif
508         // wait for our ticket to come up
509
510         while( tix != lock->serving[0] )
511 #ifdef unix
512                 sched_yield();
513 #else
514                 SwitchToThread ();
515 #endif
516
517         w = PRES | (tix & PHID);
518 #ifdef  unix
519         r = __sync_fetch_and_add (lock->rin, w);
520 #else
521         r = _InterlockedExchangeAdd16 (lock->rin, w);
522 #endif
523         while( r != *lock->rout )
524 #ifdef unix
525                 sched_yield();
526 #else
527                 SwitchToThread();
528 #endif
529         lock->tid = tid;
530 }
531
532 void WriteRelease (RWLock *lock)
533 {
534         if( lock->dup ) {
535                 lock->dup--;
536                 return;
537         }
538
539         lock->tid = 0;
540 #ifdef unix
541         __sync_fetch_and_and (lock->rin, ~MASK);
542 #else
543         _InterlockedAnd16 (lock->rin, ~MASK);
544 #endif
545         lock->serving[0]++;
546 }
547
548 void ReadLock (RWLock *lock, ushort tid)
549 {
550 ushort w;
551         if( lock->tid == tid ) {
552                 lock->dup++;
553                 return;
554         }
555 #ifdef unix
556         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
557 #else
558         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
559 #endif
560         if( w )
561           while( w == (*lock->rin & MASK) )
562 #ifdef unix
563                 sched_yield ();
564 #else
565                 SwitchToThread ();
566 #endif
567 }
568
569 void ReadRelease (RWLock *lock)
570 {
571         if( lock->dup ) {
572                 lock->dup--;
573                 return;
574         }
575
576 #ifdef unix
577         __sync_fetch_and_add (lock->rout, RINC);
578 #else
579         _InterlockedExchangeAdd16 (lock->rout, RINC);
580 #endif
581 }
582
583 //      Spin Latch Manager
584
585 //      wait until write lock mode is clear
586 //      and add 1 to the share count
587
588 void bt_spinreadlock(BtSpinLatch *latch)
589 {
590 ushort prev;
591
592   do {
593 #ifdef unix
594         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
595 #else
596         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
597 #endif
598         //  see if exclusive request is granted or pending
599
600         if( !(prev & BOTH) )
601                 return;
602 #ifdef unix
603         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
604 #else
605         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
606 #endif
607 #ifdef  unix
608   } while( sched_yield(), 1 );
609 #else
610   } while( SwitchToThread(), 1 );
611 #endif
612 }
613
614 //      wait for other read and write latches to relinquish
615
616 void bt_spinwritelock(BtSpinLatch *latch)
617 {
618 ushort prev;
619
620   do {
621 #ifdef  unix
622         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
623 #else
624         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
625 #endif
626         if( !(prev & XCL) )
627           if( !(prev & ~BOTH) )
628                 return;
629           else
630 #ifdef unix
631                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
632 #else
633                 _InterlockedAnd16((ushort *)latch, ~XCL);
634 #endif
635 #ifdef  unix
636   } while( sched_yield(), 1 );
637 #else
638   } while( SwitchToThread(), 1 );
639 #endif
640 }
641
642 //      try to obtain write lock
643
644 //      return 1 if obtained,
645 //              0 otherwise
646
647 int bt_spinwritetry(BtSpinLatch *latch)
648 {
649 ushort prev;
650
651 #ifdef  unix
652         prev = __sync_fetch_and_or((ushort *)latch, XCL);
653 #else
654         prev = _InterlockedOr16((ushort *)latch, XCL);
655 #endif
656         //      take write access if all bits are clear
657
658         if( !(prev & XCL) )
659           if( !(prev & ~BOTH) )
660                 return 1;
661           else
662 #ifdef unix
663                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
664 #else
665                 _InterlockedAnd16((ushort *)latch, ~XCL);
666 #endif
667         return 0;
668 }
669
670 //      clear write mode
671
672 void bt_spinreleasewrite(BtSpinLatch *latch)
673 {
674 #ifdef unix
675         __sync_fetch_and_and((ushort *)latch, ~BOTH);
676 #else
677         _InterlockedAnd16((ushort *)latch, ~BOTH);
678 #endif
679 }
680
681 //      decrement reader count
682
683 void bt_spinreleaseread(BtSpinLatch *latch)
684 {
685 #ifdef unix
686         __sync_fetch_and_add((ushort *)latch, -SHARE);
687 #else
688         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
689 #endif
690 }
691
692 //      recovery manager -- dump current recovery buff & flush
693
694 BTERR bt_dumpredo (BtDb *bt)
695 {
696 BtLogHdr *eof;
697
698         eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
699         memset (eof, 0, sizeof(BtLogHdr));
700
701         pwrite (bt->mgr->idx, bt->mgr->redobuff, bt->mgr->redoend + sizeof(BtLogHdr), REDO_page << bt->mgr->page_bits);
702
703         //  flush pages written at beginning of this redo buffer
704         //      along with the redo buffer out to disk
705
706         fdatasync (bt->mgr->idx);
707
708         bt->mgr->flushlsn = bt->mgr->pagezero->alloc->lsn;
709         bt->mgr->redoend = 0;
710         return 0;
711 }
712
713 //      recovery manager -- append new entry to recovery log
714 //      flush to disk when it overflows.
715
716 logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val)
717 {
718 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
719 uint amt = sizeof(BtLogHdr);
720 BtLogHdr *hdr, *eof;
721 uint flush;
722
723         bt_spinwritelock (bt->mgr->redo);
724
725         if( key )
726           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
727
728         //      see if new entry fits in buffer
729         //      flush and reset if it doesn't
730
731         if( flush = amt > size - bt->mgr->redoend )
732           if( bt_dumpredo (bt) )
733                 return 0;
734
735         //      fill in new entry & either eof or end block
736
737         hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
738
739         hdr->len = amt;
740         hdr->type = type;
741         hdr->lvl = lvl;
742         hdr->lsn = ++bt->mgr->pagezero->alloc->lsn;
743
744         bt->mgr->redoend += amt;
745
746         eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
747         memset (eof, 0, sizeof(BtLogHdr));
748
749         //  fill in key and value
750
751         if( key ) {
752           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
753           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
754         }
755
756         bt_spinreleasewrite(bt->mgr->redo);
757
758         if( flush )
759           bt_flushlsn (bt);
760
761         return hdr->lsn;
762 }
763
764 //      read page into buffer pool from permanent location in Btree file
765
766 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
767 {
768 off64_t off = page_no << mgr->page_bits;
769
770 #ifdef unix
771         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
772                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
773                 return BTERR_read;
774         }
775 #else
776 OVERLAPPED ovl[1];
777 uint amt[1];
778
779         memset (ovl, 0, sizeof(OVERLAPPED));
780         ovl->Offset = off;
781         ovl->OffsetHigh = off >> 32;
782
783         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
784                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
785                 return BTERR_read;
786         }
787         if( *amt <  mgr->page_size ) {
788                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
789                 return BTERR_read;
790         }
791 #endif
792         return 0;
793 }
794
795 //      write page to permanent location in Btree file
796 //      clear the dirty bit
797
798 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
799 {
800 off64_t off = page_no << mgr->page_bits;
801
802 #ifdef unix
803         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
804                 return BTERR_wrt;
805 #else
806 OVERLAPPED ovl[1];
807 uint amt[1];
808
809         memset (ovl, 0, sizeof(OVERLAPPED));
810         ovl->Offset = off;
811         ovl->OffsetHigh = off >> 32;
812
813         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
814                 return BTERR_wrt;
815
816         if( *amt <  mgr->page_size )
817                 return BTERR_wrt;
818 #endif
819         return 0;
820 }
821
822 //      link latch table entry into head of latch hash table
823
824 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
825 {
826 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
827 BtLatchSet *latch = bt->mgr->latchsets + slot;
828
829         if( latch->next = bt->mgr->hashtable[hashidx].slot )
830                 bt->mgr->latchsets[latch->next].prev = slot;
831
832         bt->mgr->hashtable[hashidx].slot = slot;
833         latch->page_no = page_no;
834         latch->entry = slot;
835         latch->split = 0;
836         latch->prev = 0;
837         latch->pin = 1;
838
839         if( loadit )
840           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
841                 return bt->err;
842           else
843                 bt->reads++;
844
845         return bt->err = 0;
846 }
847
848 //      set CLOCK bit in latch
849 //      decrement pin count
850
851 void bt_unpinlatch (BtLatchSet *latch)
852 {
853 #ifdef unix
854         if( ~latch->pin & CLOCK_bit )
855                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
856         __sync_fetch_and_add(&latch->pin, -1);
857 #else
858         if( ~latch->pin & CLOCK_bit )
859                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
860         _InterlockedDecrement16 (&latch->pin);
861 #endif
862 }
863
864 //  return the btree cached page address
865 //      if page is dirty and has not yet been
866 //      flushed to disk for the current redo
867 //      recovery buffer, write it out.
868
869 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
870 {
871 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
872
873   if( latch->dirty )
874         if( page->lsn < bt->mgr->flushlsn )
875           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
876                 return NULL;
877           else
878                 latch->dirty = 0, bt->writes++;
879
880   return page;
881 }
882
883 //      find existing latchset or inspire new one
884 //      return with latchset pinned
885
886 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
887 {
888 uint hashidx = page_no % bt->mgr->latchhash;
889 BtLatchSet *latch;
890 uint attempts = 0;
891 uint slot, idx;
892 uint lvl, cnt;
893 off64_t off;
894 uint amt[1];
895 BtPage page;
896
897   //  try to find our entry
898
899   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
900
901   if( slot = bt->mgr->hashtable[hashidx].slot ) do
902   {
903         latch = bt->mgr->latchsets + slot;
904         if( page_no == latch->page_no )
905                 break;
906   } while( slot = latch->next );
907
908   //  found our entry
909   //  increment clock
910
911   if( slot ) {
912         latch = bt->mgr->latchsets + slot;
913 #ifdef unix
914         __sync_fetch_and_add(&latch->pin, 1);
915 #else
916         _InterlockedIncrement16 (&latch->pin);
917 #endif
918         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
919         return latch;
920   }
921
922         //  see if there are any unused pool entries
923 #ifdef unix
924         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
925 #else
926         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
927 #endif
928
929         if( slot < bt->mgr->latchtotal ) {
930                 latch = bt->mgr->latchsets + slot;
931                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
932                         return NULL;
933                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
934                 return latch;
935         }
936
937 #ifdef unix
938         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
939 #else
940         _InterlockedDecrement (&bt->mgr->latchdeployed);
941 #endif
942   //  find and reuse previous entry on victim
943
944   while( 1 ) {
945 #ifdef unix
946         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
947 #else
948         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
949 #endif
950         // try to get write lock on hash chain
951         //      skip entry if not obtained
952         //      or has outstanding pins
953
954         slot %= bt->mgr->latchtotal;
955
956         if( !slot )
957                 continue;
958
959         //      only go around two times before
960         //      flushing redo recovery buffer,
961         //      and the buffer pool.
962
963         if( bt->mgr->redopages )
964           if( attempts++ > 2 * bt->mgr->latchtotal ) {
965                 if( bt_dumpredo (bt) )
966                   return NULL;
967                 bt_flushlsn (bt);
968                 attempts = 0;
969           }
970
971         latch = bt->mgr->latchsets + slot;
972         idx = latch->page_no % bt->mgr->latchhash;
973
974         //      see we are on same chain as hashidx
975
976         if( idx == hashidx )
977                 continue;
978
979         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
980                 continue;
981
982         //  skip this slot if it is pinned
983         //      or the CLOCK bit is set
984
985         if( latch->pin ) {
986           if( latch->pin & CLOCK_bit ) {
987 #ifdef unix
988                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
989 #else
990                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
991 #endif
992           }
993           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
994           continue;
995         }
996
997         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
998
999         //      if dirty page has lsn >= last redo recovery buffer
1000         //      then hold page in the buffer pool until redo
1001         //      recovery buffer is written to disk.
1002
1003         if( latch->dirty )
1004           if( page->lsn >= bt->mgr->flushlsn ) {
1005                 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
1006                 continue;
1007           }
1008                 
1009         //  update permanent page area in btree from buffer pool
1010         //      no read-lock is required since page is not pinned.
1011
1012         if( latch->dirty )
1013           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
1014                 return NULL;
1015           else
1016                 latch->dirty = 0, bt->writes++;
1017
1018         //  unlink our available slot from its hash chain
1019
1020         if( latch->prev )
1021                 bt->mgr->latchsets[latch->prev].next = latch->next;
1022         else
1023                 bt->mgr->hashtable[idx].slot = latch->next;
1024
1025         if( latch->next )
1026                 bt->mgr->latchsets[latch->next].prev = latch->prev;
1027
1028         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
1029
1030         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
1031                 return NULL;
1032
1033         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
1034         return latch;
1035   }
1036 }
1037
1038 // flush pages
1039
1040 void bt_flushlsn (BtDb *bt)
1041 {
1042 BtLatchSet *latch;
1043 uint hashidx;
1044 uint num = 0;
1045 BtPage page;
1046 uint slot;
1047
1048         //      flush dirty pool pages to the btree that were
1049         //      dirty before the start of this redo recovery buffer
1050
1051         for( slot = 1; slot <= bt->mgr->latchdeployed; slot++ ) {
1052                 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
1053                 latch = bt->mgr->latchsets + slot;
1054 //              hashidx = latch->page_no % bt->mgr->latchhash;
1055
1056 //              if( !bt_spinwritetry (bt->mgr->hashtable[hashidx].latch) )
1057 //                      continue;
1058
1059                 if( latch->dirty ) {
1060                   bt_lockpage(bt, BtLockRead, latch);
1061                   bt_writepage(bt->mgr, page, latch->page_no);
1062                   latch->dirty = 0, bt->writes++;
1063                   bt_unlockpage(bt, BtLockRead, latch);
1064                 }
1065
1066 //          bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
1067         }
1068 }
1069
1070 void bt_mgrclose (BtMgr *mgr)
1071 {
1072 BtLatchSet *latch;
1073 BtLogHdr *eof;
1074 uint num = 0;
1075 BtPage page;
1076 uint slot;
1077
1078         //      flush recovery buffer to disk
1079
1080         if( mgr->redoend ) {
1081                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1082                 memset (eof, 0, sizeof(BtLogHdr));
1083
1084                 pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1085         }
1086
1087         //      flush dirty pool pages to the btree
1088
1089         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
1090                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1091                 latch = mgr->latchsets + slot;
1092
1093                 if( latch->dirty ) {
1094                         bt_writepage(mgr, page, latch->page_no);
1095                         latch->dirty = 0, num++;
1096                 }
1097         }
1098
1099         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1100
1101 #ifdef unix
1102         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
1103         munmap (mgr->pagezero, mgr->page_size);
1104 #else
1105         FlushViewOfFile(mgr->pagezero, 0);
1106         UnmapViewOfFile(mgr->pagezero);
1107         UnmapViewOfFile(mgr->hashtable);
1108         CloseHandle(mgr->halloc);
1109         CloseHandle(mgr->hpool);
1110 #endif
1111 #ifdef unix
1112         free (mgr->redobuff);
1113         close (mgr->idx);
1114         free (mgr);
1115 #else
1116         VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1117         FlushFileBuffers(mgr->idx);
1118         CloseHandle(mgr->idx);
1119         GlobalFree (mgr);
1120 #endif
1121 }
1122
1123 //      close and release memory
1124
1125 void bt_close (BtDb *bt)
1126 {
1127 #ifdef unix
1128         if( bt->mem )
1129                 free (bt->mem);
1130 #else
1131         if( bt->mem)
1132                 VirtualFree (bt->mem, 0, MEM_RELEASE);
1133 #endif
1134         free (bt);
1135 }
1136
1137 //  open/create new btree buffer manager
1138
1139 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1140 //              size of page pool (e.g. 262144)
1141
1142 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1143 {
1144 uint lvl, attr, last, slot, idx;
1145 uint nlatchpage, latchhash;
1146 unsigned char value[BtId];
1147 int flag, initit = 0;
1148 BtPageZero *pagezero;
1149 off64_t size;
1150 uint amt[1];
1151 BtMgr* mgr;
1152 BtKey* key;
1153 BtVal *val;
1154
1155         // determine sanity of page size and buffer pool
1156
1157         if( bits > BT_maxbits )
1158                 bits = BT_maxbits;
1159         else if( bits < BT_minbits )
1160                 bits = BT_minbits;
1161
1162         if( nodemax < 16 ) {
1163                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
1164                 return NULL;
1165         }
1166
1167 #ifdef unix
1168         mgr = calloc (1, sizeof(BtMgr));
1169
1170         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1171
1172         if( mgr->idx == -1 ) {
1173                 fprintf (stderr, "Unable to open btree file\n");
1174                 return free(mgr), NULL;
1175         }
1176 #else
1177         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1178         attr = FILE_ATTRIBUTE_NORMAL;
1179         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1180
1181         if( mgr->idx == INVALID_HANDLE_VALUE )
1182                 return GlobalFree(mgr), NULL;
1183 #endif
1184
1185 #ifdef unix
1186         pagezero = valloc (BT_maxpage);
1187         *amt = 0;
1188
1189         // read minimum page size to get root info
1190         //      to support raw disk partition files
1191         //      check if bits == 0 on the disk.
1192
1193         if( size = lseek (mgr->idx, 0L, 2) )
1194                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1195                         if( pagezero->alloc->bits )
1196                                 bits = pagezero->alloc->bits;
1197                         else
1198                                 initit = 1;
1199                 else
1200                         return free(mgr), free(pagezero), NULL;
1201         else
1202                 initit = 1;
1203 #else
1204         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1205         size = GetFileSize(mgr->idx, amt);
1206
1207         if( size || *amt ) {
1208                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1209                         return bt_mgrclose (mgr), NULL;
1210                 bits = pagezero->alloc->bits;
1211         } else
1212                 initit = 1;
1213 #endif
1214
1215         mgr->page_size = 1 << bits;
1216         mgr->page_bits = bits;
1217
1218         //  calculate number of latch hash table entries
1219
1220         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1221         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
1222
1223         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1224         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1225         mgr->latchtotal = nodemax;
1226
1227         if( !initit )
1228                 goto mgrlatch;
1229
1230         // initialize an empty b-tree with latch page, root page, page of leaves
1231         // and page(s) of latches and page pool cache
1232
1233         memset (pagezero, 0, 1 << bits);
1234         pagezero->alloc->bits = mgr->page_bits;
1235         bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
1236
1237         //  initialize left-most LEAF page in
1238         //      alloc->left.
1239
1240         bt_putid (pagezero->alloc->left, LEAF_page);
1241
1242         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1243                 fprintf (stderr, "Unable to create btree page zero\n");
1244                 return bt_mgrclose (mgr), NULL;
1245         }
1246
1247         memset (pagezero, 0, 1 << bits);
1248         pagezero->alloc->bits = mgr->page_bits;
1249
1250         for( lvl=MIN_lvl; lvl--; ) {
1251                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1252                 key = keyptr(pagezero->alloc, 1);
1253                 key->len = 2;           // create stopper key
1254                 key->key[0] = 0xff;
1255                 key->key[1] = 0xff;
1256
1257                 bt_putid(value, MIN_lvl - lvl + 1);
1258                 val = valptr(pagezero->alloc, 1);
1259                 val->len = lvl ? BtId : 0;
1260                 memcpy (val->value, value, val->len);
1261
1262                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1263                 pagezero->alloc->lvl = lvl;
1264                 pagezero->alloc->cnt = 1;
1265                 pagezero->alloc->act = 1;
1266
1267                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1268                         fprintf (stderr, "Unable to create btree page zero\n");
1269                         return bt_mgrclose (mgr), NULL;
1270                 }
1271         }
1272
1273 mgrlatch:
1274 #ifdef unix
1275         free (pagezero);
1276 #else
1277         VirtualFree (pagezero, 0, MEM_RELEASE);
1278 #endif
1279 #ifdef unix
1280         // mlock the pagezero page
1281
1282         flag = PROT_READ | PROT_WRITE;
1283         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1284         if( mgr->pagezero == MAP_FAILED ) {
1285                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1286                 return bt_mgrclose (mgr), NULL;
1287         }
1288         mlock (mgr->pagezero, mgr->page_size);
1289
1290         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1291         if( mgr->hashtable == MAP_FAILED ) {
1292                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1293                 return bt_mgrclose (mgr), NULL;
1294         }
1295         if( mgr->redopages = redopages )
1296                 mgr->redobuff = valloc (redopages * mgr->page_size);
1297 #else
1298         flag = PAGE_READWRITE;
1299         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1300         if( !mgr->halloc ) {
1301                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1302                 return bt_mgrclose (mgr), NULL;
1303         }
1304
1305         flag = FILE_MAP_WRITE;
1306         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1307         if( !mgr->pagezero ) {
1308                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1309                 return bt_mgrclose (mgr), NULL;
1310         }
1311
1312         flag = PAGE_READWRITE;
1313         size = (uid)mgr->nlatchpage << mgr->page_bits;
1314         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1315         if( !mgr->hpool ) {
1316                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1317                 return bt_mgrclose (mgr), NULL;
1318         }
1319
1320         flag = FILE_MAP_WRITE;
1321         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1322         if( !mgr->hashtable ) {
1323                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1324                 return bt_mgrclose (mgr), NULL;
1325         }
1326         if( mgr->redopages = redopages )
1327                 mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
1328 #endif
1329
1330         mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1331         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1332
1333         return mgr;
1334 }
1335
1336 //      open BTree access method
1337 //      based on buffer manager
1338
1339 BtDb *bt_open (BtMgr *mgr)
1340 {
1341 BtDb *bt = malloc (sizeof(*bt));
1342
1343         memset (bt, 0, sizeof(*bt));
1344         bt->mgr = mgr;
1345 #ifdef unix
1346         bt->mem = valloc (2 *mgr->page_size);
1347 #else
1348         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1349 #endif
1350         bt->frame = (BtPage)bt->mem;
1351         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1352 #ifdef unix
1353         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1354 #else
1355         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1356 #endif
1357         return bt;
1358 }
1359
1360 //  compare two keys, return > 0, = 0, or < 0
1361 //  =0: keys are same
1362 //  -1: key2 > key1
1363 //  +1: key2 < key1
1364 //  as the comparison value
1365
1366 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1367 {
1368 uint len1 = key1->len;
1369 int ans;
1370
1371         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1372                 return ans;
1373
1374         if( len1 > len2 )
1375                 return 1;
1376         if( len1 < len2 )
1377                 return -1;
1378
1379         return 0;
1380 }
1381
1382 // place write, read, or parent lock on requested page_no.
1383
1384 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1385 {
1386         switch( mode ) {
1387         case BtLockRead:
1388                 ReadLock (latch->readwr, bt->thread_no);
1389                 break;
1390         case BtLockWrite:
1391                 WriteLock (latch->readwr, bt->thread_no);
1392                 break;
1393         case BtLockAccess:
1394                 ReadLock (latch->access, bt->thread_no);
1395                 break;
1396         case BtLockDelete:
1397                 WriteLock (latch->access, bt->thread_no);
1398                 break;
1399         case BtLockParent:
1400                 WriteOLock (latch->parent, bt->thread_no);
1401                 break;
1402         case BtLockAtomic:
1403                 WriteOLock (latch->atomic, bt->thread_no);
1404                 break;
1405         case BtLockAtomic | BtLockRead:
1406                 WriteOLock (latch->atomic, bt->thread_no);
1407                 ReadLock (latch->readwr, bt->thread_no);
1408                 break;
1409         }
1410 }
1411
1412 // remove write, read, or parent lock on requested page
1413
1414 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1415 {
1416         switch( mode ) {
1417         case BtLockRead:
1418                 ReadRelease (latch->readwr);
1419                 break;
1420         case BtLockWrite:
1421                 WriteRelease (latch->readwr);
1422                 break;
1423         case BtLockAccess:
1424                 ReadRelease (latch->access);
1425                 break;
1426         case BtLockDelete:
1427                 WriteRelease (latch->access);
1428                 break;
1429         case BtLockParent:
1430                 WriteORelease (latch->parent);
1431                 break;
1432         case BtLockAtomic:
1433                 WriteORelease (latch->atomic);
1434                 break;
1435         case BtLockAtomic | BtLockRead:
1436                 WriteORelease (latch->atomic);
1437                 ReadRelease (latch->readwr);
1438                 break;
1439         }
1440 }
1441
1442 //      allocate a new page
1443 //      return with page latched, but unlocked.
1444
1445 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1446 {
1447 uid page_no;
1448 int blk;
1449
1450         //      lock allocation page
1451
1452         bt_spinwritelock(bt->mgr->lock);
1453
1454         // use empty chain first
1455         // else allocate empty page
1456
1457         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1458                 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1459                         set->page = bt_mappage (bt, set->latch);
1460                 else
1461                         return bt->err = BTERR_struct, -1;
1462
1463                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1464                 bt_spinreleasewrite(bt->mgr->lock);
1465
1466                 memcpy (set->page, contents, bt->mgr->page_size);
1467                 set->latch->dirty = 1;
1468                 return 0;
1469         }
1470
1471         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1472         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1473
1474         // unlock allocation latch
1475
1476         bt_spinreleasewrite(bt->mgr->lock);
1477
1478         //      don't load cache from btree page
1479
1480         if( set->latch = bt_pinlatch (bt, page_no, 0) )
1481                 set->page = bt_mappage (bt, set->latch);
1482         else
1483                 return bt->err = BTERR_struct;
1484
1485         memcpy (set->page, contents, bt->mgr->page_size);
1486         set->latch->dirty = 1;
1487         return 0;
1488 }
1489
1490 //  find slot in page for given key at a given level
1491
1492 int bt_findslot (BtPage page, unsigned char *key, uint len)
1493 {
1494 uint diff, higher = page->cnt, low = 1, slot;
1495 uint good = 0;
1496
1497         //        make stopper key an infinite fence value
1498
1499         if( bt_getid (page->right) )
1500                 higher++;
1501         else
1502                 good++;
1503
1504         //      low is the lowest candidate.
1505         //  loop ends when they meet
1506
1507         //  higher is already
1508         //      tested as .ge. the passed key.
1509
1510         while( diff = higher - low ) {
1511                 slot = low + ( diff >> 1 );
1512                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1513                         low = slot + 1;
1514                 else
1515                         higher = slot, good++;
1516         }
1517
1518         //      return zero if key is on right link page
1519
1520         return good ? higher : 0;
1521 }
1522
1523 //  find and load page at given level for given key
1524 //      leave page rd or wr locked as requested
1525
1526 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1527 {
1528 uid page_no = ROOT_page, prevpage = 0;
1529 uint drill = 0xff, slot;
1530 BtLatchSet *prevlatch;
1531 uint mode, prevmode;
1532
1533   //  start at root of btree and drill down
1534
1535   do {
1536         // determine lock mode of drill level
1537         mode = (drill == lvl) ? lock : BtLockRead; 
1538
1539         if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1540           return 0;
1541
1542         // obtain access lock using lock chaining with Access mode
1543
1544         if( page_no > ROOT_page )
1545           bt_lockpage(bt, BtLockAccess, set->latch);
1546
1547         set->page = bt_mappage (bt, set->latch);
1548
1549         //      release & unpin parent or left sibling page
1550
1551         if( prevpage ) {
1552           bt_unlockpage(bt, prevmode, prevlatch);
1553           bt_unpinlatch (prevlatch);
1554           prevpage = 0;
1555         }
1556
1557         // obtain mode lock using lock chaining through AccessLock
1558
1559         bt_lockpage(bt, mode, set->latch);
1560
1561         if( set->page->free )
1562                 return bt->err = BTERR_struct, 0;
1563
1564         if( page_no > ROOT_page )
1565           bt_unlockpage(bt, BtLockAccess, set->latch);
1566
1567         // re-read and re-lock root after determining actual level of root
1568
1569         if( set->page->lvl != drill) {
1570                 if( set->latch->page_no != ROOT_page )
1571                         return bt->err = BTERR_struct, 0;
1572                         
1573                 drill = set->page->lvl;
1574
1575                 if( lock != BtLockRead && drill == lvl ) {
1576                   bt_unlockpage(bt, mode, set->latch);
1577                   bt_unpinlatch (set->latch);
1578                   continue;
1579                 }
1580         }
1581
1582         prevpage = set->latch->page_no;
1583         prevlatch = set->latch;
1584         prevmode = mode;
1585
1586         //  find key on page at this level
1587         //  and descend to requested level
1588
1589         if( !set->page->kill )
1590          if( slot = bt_findslot (set->page, key, len) ) {
1591           if( drill == lvl )
1592                 return slot;
1593
1594           // find next non-dead slot -- the fence key if nothing else
1595
1596           while( slotptr(set->page, slot)->dead )
1597                 if( slot++ < set->page->cnt )
1598                   continue;
1599                 else
1600                   return bt->err = BTERR_struct, 0;
1601
1602           page_no = bt_getid(valptr(set->page, slot)->value);
1603           drill--;
1604           continue;
1605          }
1606
1607         //  or slide right into next page
1608
1609         page_no = bt_getid(set->page->right);
1610   } while( page_no );
1611
1612   // return error on end of right chain
1613
1614   bt->err = BTERR_struct;
1615   return 0;     // return error
1616 }
1617
1618 //      return page to free list
1619 //      page must be delete & write locked
1620
1621 void bt_freepage (BtDb *bt, BtPageSet *set)
1622 {
1623         //      lock allocation page
1624
1625         bt_spinwritelock (bt->mgr->lock);
1626
1627         //      store chain
1628
1629         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1630         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1631         set->latch->dirty = 1;
1632         set->page->free = 1;
1633
1634         // unlock released page
1635
1636         bt_unlockpage (bt, BtLockDelete, set->latch);
1637         bt_unlockpage (bt, BtLockWrite, set->latch);
1638         bt_unpinlatch (set->latch);
1639
1640         // unlock allocation page
1641
1642         bt_spinreleasewrite (bt->mgr->lock);
1643 }
1644
1645 //      a fence key was deleted from a page
1646 //      push new fence value upwards
1647
1648 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1649 {
1650 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1651 unsigned char value[BtId];
1652 BtKey* ptr;
1653 uint idx;
1654
1655         //      remove the old fence value
1656
1657         ptr = keyptr(set->page, set->page->cnt);
1658         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1659         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1660         set->latch->dirty = 1;
1661
1662         //  cache new fence value
1663
1664         ptr = keyptr(set->page, set->page->cnt);
1665         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1666
1667         bt_lockpage (bt, BtLockParent, set->latch);
1668         bt_unlockpage (bt, BtLockWrite, set->latch);
1669
1670         //      insert new (now smaller) fence key
1671
1672         bt_putid (value, set->latch->page_no);
1673         ptr = (BtKey*)leftkey;
1674
1675         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1676           return bt->err;
1677
1678         //      now delete old fence key
1679
1680         ptr = (BtKey*)rightkey;
1681
1682         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1683                 return bt->err;
1684
1685         bt_unlockpage (bt, BtLockParent, set->latch);
1686         bt_unpinlatch(set->latch);
1687         return 0;
1688 }
1689
1690 //      root has a single child
1691 //      collapse a level from the tree
1692
1693 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1694 {
1695 BtPageSet child[1];
1696 uid page_no;
1697 uint idx;
1698
1699   // find the child entry and promote as new root contents
1700
1701   do {
1702         for( idx = 0; idx++ < root->page->cnt; )
1703           if( !slotptr(root->page, idx)->dead )
1704                 break;
1705
1706         page_no = bt_getid (valptr(root->page, idx)->value);
1707
1708         if( child->latch = bt_pinlatch (bt, page_no, 1) )
1709                 child->page = bt_mappage (bt, child->latch);
1710         else
1711                 return bt->err;
1712
1713         bt_lockpage (bt, BtLockDelete, child->latch);
1714         bt_lockpage (bt, BtLockWrite, child->latch);
1715
1716         memcpy (root->page, child->page, bt->mgr->page_size);
1717         root->latch->dirty = 1;
1718
1719         bt_freepage (bt, child);
1720
1721   } while( root->page->lvl > 1 && root->page->act == 1 );
1722
1723   bt_unlockpage (bt, BtLockWrite, root->latch);
1724   bt_unpinlatch (root->latch);
1725   return 0;
1726 }
1727
1728 //  delete a page and manage keys
1729 //  call with page writelocked
1730 //      returns with page unpinned
1731
1732 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1733 {
1734 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1735 unsigned char value[BtId];
1736 uint lvl = set->page->lvl;
1737 BtPageSet right[1];
1738 uid page_no;
1739 BtKey *ptr;
1740
1741         //      cache copy of fence key
1742         //      to post in parent
1743
1744         ptr = keyptr(set->page, set->page->cnt);
1745         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1746
1747         //      obtain lock on right page
1748
1749         page_no = bt_getid(set->page->right);
1750
1751         if( right->latch = bt_pinlatch (bt, page_no, 1) )
1752                 right->page = bt_mappage (bt, right->latch);
1753         else
1754                 return 0;
1755
1756         bt_lockpage (bt, BtLockWrite, right->latch);
1757
1758         // cache copy of key to update
1759
1760         ptr = keyptr(right->page, right->page->cnt);
1761         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1762
1763         if( right->page->kill )
1764                 return bt->err = BTERR_struct;
1765
1766         // pull contents of right peer into our empty page
1767
1768         memcpy (set->page, right->page, bt->mgr->page_size);
1769         set->latch->dirty = 1;
1770
1771         // mark right page deleted and point it to left page
1772         //      until we can post parent updates that remove access
1773         //      to the deleted page.
1774
1775         bt_putid (right->page->right, set->latch->page_no);
1776         right->latch->dirty = 1;
1777         right->page->kill = 1;
1778
1779         bt_lockpage (bt, BtLockParent, right->latch);
1780         bt_unlockpage (bt, BtLockWrite, right->latch);
1781
1782         bt_lockpage (bt, BtLockParent, set->latch);
1783         bt_unlockpage (bt, BtLockWrite, set->latch);
1784
1785         // redirect higher key directly to our new node contents
1786
1787         bt_putid (value, set->latch->page_no);
1788         ptr = (BtKey*)higherfence;
1789
1790         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1791           return bt->err;
1792
1793         //      delete old lower key to our node
1794
1795         ptr = (BtKey*)lowerfence;
1796
1797         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1798           return bt->err;
1799
1800         //      obtain delete and write locks to right node
1801
1802         bt_unlockpage (bt, BtLockParent, right->latch);
1803         bt_lockpage (bt, BtLockDelete, right->latch);
1804         bt_lockpage (bt, BtLockWrite, right->latch);
1805         bt_freepage (bt, right);
1806
1807         bt_unlockpage (bt, BtLockParent, set->latch);
1808         bt_unpinlatch (set->latch);
1809         return 0;
1810 }
1811
1812 //  find and delete key on page by marking delete flag bit
1813 //  if page becomes empty, delete it from the btree
1814
1815 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1816 {
1817 uint slot, idx, found, fence;
1818 BtPageSet set[1];
1819 BtKey *ptr;
1820 BtVal *val;
1821
1822         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1823                 ptr = keyptr(set->page, slot);
1824         else
1825                 return bt->err;
1826
1827         // if librarian slot, advance to real slot
1828
1829         if( slotptr(set->page, slot)->type == Librarian )
1830                 ptr = keyptr(set->page, ++slot);
1831
1832         fence = slot == set->page->cnt;
1833
1834         // if key is found delete it, otherwise ignore request
1835
1836         if( found = !keycmp (ptr, key, len) )
1837           if( found = slotptr(set->page, slot)->dead == 0 ) {
1838                 val = valptr(set->page,slot);
1839                 slotptr(set->page, slot)->dead = 1;
1840                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1841                 set->page->act--;
1842
1843                 // collapse empty slots beneath the fence
1844
1845                 while( idx = set->page->cnt - 1 )
1846                   if( slotptr(set->page, idx)->dead ) {
1847                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1848                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1849                   } else
1850                         break;
1851           }
1852
1853         //      did we delete a fence key in an upper level?
1854
1855         if( found && lvl && set->page->act && fence )
1856           if( bt_fixfence (bt, set, lvl) )
1857                 return bt->err;
1858           else
1859                 return 0;
1860
1861         //      do we need to collapse root?
1862
1863         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1864           if( bt_collapseroot (bt, set) )
1865                 return bt->err;
1866           else
1867                 return 0;
1868
1869         //      delete empty page
1870
1871         if( !set->page->act )
1872                 return bt_deletepage (bt, set);
1873
1874         set->latch->dirty = 1;
1875         bt_unlockpage(bt, BtLockWrite, set->latch);
1876         bt_unpinlatch (set->latch);
1877         return 0;
1878 }
1879
1880 BtKey *bt_foundkey (BtDb *bt)
1881 {
1882         return (BtKey*)(bt->key);
1883 }
1884
1885 //      advance to next slot
1886
1887 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1888 {
1889 BtLatchSet *prevlatch;
1890 uid page_no;
1891
1892         if( slot < set->page->cnt )
1893                 return slot + 1;
1894
1895         prevlatch = set->latch;
1896
1897         if( page_no = bt_getid(set->page->right) )
1898           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1899                 set->page = bt_mappage (bt, set->latch);
1900           else
1901                 return 0;
1902         else
1903           return bt->err = BTERR_struct, 0;
1904
1905         // obtain access lock using lock chaining with Access mode
1906
1907         bt_lockpage(bt, BtLockAccess, set->latch);
1908
1909         bt_unlockpage(bt, BtLockRead, prevlatch);
1910         bt_unpinlatch (prevlatch);
1911
1912         bt_lockpage(bt, BtLockRead, set->latch);
1913         bt_unlockpage(bt, BtLockAccess, set->latch);
1914         return 1;
1915 }
1916
1917 //      find unique key or first duplicate key in
1918 //      leaf level and return number of value bytes
1919 //      or (-1) if not found.  Setup key for bt_foundkey
1920
1921 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1922 {
1923 BtPageSet set[1];
1924 uint len, slot;
1925 int ret = -1;
1926 BtKey *ptr;
1927 BtVal *val;
1928
1929   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1930    do {
1931         ptr = keyptr(set->page, slot);
1932
1933         //      skip librarian slot place holder
1934
1935         if( slotptr(set->page, slot)->type == Librarian )
1936                 ptr = keyptr(set->page, ++slot);
1937
1938         //      return actual key found
1939
1940         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1941         len = ptr->len;
1942
1943         if( slotptr(set->page, slot)->type == Duplicate )
1944                 len -= BtId;
1945
1946         //      not there if we reach the stopper key
1947
1948         if( slot == set->page->cnt )
1949           if( !bt_getid (set->page->right) )
1950                 break;
1951
1952         // if key exists, return >= 0 value bytes copied
1953         //      otherwise return (-1)
1954
1955         if( slotptr(set->page, slot)->dead )
1956                 continue;
1957
1958         if( keylen == len )
1959           if( !memcmp (ptr->key, key, len) ) {
1960                 val = valptr (set->page,slot);
1961                 if( valmax > val->len )
1962                         valmax = val->len;
1963                 memcpy (value, val->value, valmax);
1964                 ret = valmax;
1965           }
1966
1967         break;
1968
1969    } while( slot = bt_findnext (bt, set, slot) );
1970
1971   bt_unlockpage (bt, BtLockRead, set->latch);
1972   bt_unpinlatch (set->latch);
1973   return ret;
1974 }
1975
1976 //      check page for space available,
1977 //      clean if necessary and return
1978 //      0 - page needs splitting
1979 //      >0  new slot value
1980
1981 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1982 {
1983 uint nxt = bt->mgr->page_size;
1984 BtPage page = set->page;
1985 uint cnt = 0, idx = 0;
1986 uint max = page->cnt;
1987 uint newslot = max;
1988 BtKey *key;
1989 BtVal *val;
1990
1991         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1992                 return slot;
1993
1994         //      skip cleanup and proceed to split
1995         //      if there's not enough garbage
1996         //      to bother with.
1997
1998         if( page->garbage < nxt / 5 )
1999                 return 0;
2000
2001         memcpy (bt->frame, page, bt->mgr->page_size);
2002
2003         // skip page info and set rest of page to zero
2004
2005         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2006         set->latch->dirty = 1;
2007         page->garbage = 0;
2008         page->act = 0;
2009
2010         // clean up page first by
2011         // removing deleted keys
2012
2013         while( cnt++ < max ) {
2014                 if( cnt == slot )
2015                         newslot = idx + 2;
2016
2017                 if( cnt < max || bt->frame->lvl )
2018                   if( slotptr(bt->frame,cnt)->dead )
2019                         continue;
2020
2021                 // copy the value across
2022
2023                 val = valptr(bt->frame, cnt);
2024                 nxt -= val->len + sizeof(BtVal);
2025                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2026
2027                 // copy the key across
2028
2029                 key = keyptr(bt->frame, cnt);
2030                 nxt -= key->len + sizeof(BtKey);
2031                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2032
2033                 // make a librarian slot
2034
2035                 slotptr(page, ++idx)->off = nxt;
2036                 slotptr(page, idx)->type = Librarian;
2037                 slotptr(page, idx)->dead = 1;
2038
2039                 // set up the slot
2040
2041                 slotptr(page, ++idx)->off = nxt;
2042                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2043
2044                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2045                         page->act++;
2046         }
2047
2048         page->min = nxt;
2049         page->cnt = idx;
2050
2051         //      see if page has enough space now, or does it need splitting?
2052
2053         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2054                 return newslot;
2055
2056         return 0;
2057 }
2058
2059 // split the root and raise the height of the btree
2060
2061 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
2062 {  
2063 unsigned char leftkey[BT_keyarray];
2064 uint nxt = bt->mgr->page_size;
2065 unsigned char value[BtId];
2066 BtPageSet left[1];
2067 uid left_page_no;
2068 BtKey *ptr;
2069 BtVal *val;
2070
2071         //      save left page fence key for new root
2072
2073         ptr = keyptr(root->page, root->page->cnt);
2074         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2075
2076         //  Obtain an empty page to use, and copy the current
2077         //  root contents into it, e.g. lower keys
2078
2079         if( bt_newpage(bt, left, root->page) )
2080                 return bt->err;
2081
2082         left_page_no = left->latch->page_no;
2083         bt_unpinlatch (left->latch);
2084
2085         // preserve the page info at the bottom
2086         // of higher keys and set rest to zero
2087
2088         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2089
2090         // insert stopper key at top of newroot page
2091         // and increase the root height
2092
2093         nxt -= BtId + sizeof(BtVal);
2094         bt_putid (value, right->page_no);
2095         val = (BtVal *)((unsigned char *)root->page + nxt);
2096         memcpy (val->value, value, BtId);
2097         val->len = BtId;
2098
2099         nxt -= 2 + sizeof(BtKey);
2100         slotptr(root->page, 2)->off = nxt;
2101         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2102         ptr->len = 2;
2103         ptr->key[0] = 0xff;
2104         ptr->key[1] = 0xff;
2105
2106         // insert lower keys page fence key on newroot page as first key
2107
2108         nxt -= BtId + sizeof(BtVal);
2109         bt_putid (value, left_page_no);
2110         val = (BtVal *)((unsigned char *)root->page + nxt);
2111         memcpy (val->value, value, BtId);
2112         val->len = BtId;
2113
2114         ptr = (BtKey *)leftkey;
2115         nxt -= ptr->len + sizeof(BtKey);
2116         slotptr(root->page, 1)->off = nxt;
2117         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2118         
2119         bt_putid(root->page->right, 0);
2120         root->page->min = nxt;          // reset lowest used offset and key count
2121         root->page->cnt = 2;
2122         root->page->act = 2;
2123         root->page->lvl++;
2124
2125         // release and unpin root pages
2126
2127         bt_unlockpage(bt, BtLockWrite, root->latch);
2128         bt_unpinlatch (root->latch);
2129
2130         bt_unpinlatch (right);
2131         return 0;
2132 }
2133
2134 //  split already locked full node
2135 //      leave it locked.
2136 //      return pool entry for new right
2137 //      page, unlocked
2138
2139 uint bt_splitpage (BtDb *bt, BtPageSet *set)
2140 {
2141 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2142 uint lvl = set->page->lvl;
2143 BtPageSet right[1];
2144 BtKey *key, *ptr;
2145 BtVal *val, *src;
2146 uid right2;
2147 uint prev;
2148
2149         //  split higher half of keys to bt->frame
2150
2151         memset (bt->frame, 0, bt->mgr->page_size);
2152         max = set->page->cnt;
2153         cnt = max / 2;
2154         idx = 0;
2155
2156         while( cnt++ < max ) {
2157                 if( cnt < max || set->page->lvl )
2158                   if( slotptr(set->page, cnt)->dead )
2159                         continue;
2160
2161                 src = valptr(set->page, cnt);
2162                 nxt -= src->len + sizeof(BtVal);
2163                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
2164
2165                 key = keyptr(set->page, cnt);
2166                 nxt -= key->len + sizeof(BtKey);
2167                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
2168                 memcpy (ptr, key, key->len + sizeof(BtKey));
2169
2170                 //      add librarian slot
2171
2172                 slotptr(bt->frame, ++idx)->off = nxt;
2173                 slotptr(bt->frame, idx)->type = Librarian;
2174                 slotptr(bt->frame, idx)->dead = 1;
2175
2176                 //  add actual slot
2177
2178                 slotptr(bt->frame, ++idx)->off = nxt;
2179                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2180
2181                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2182                         bt->frame->act++;
2183         }
2184
2185         bt->frame->bits = bt->mgr->page_bits;
2186         bt->frame->min = nxt;
2187         bt->frame->cnt = idx;
2188         bt->frame->lvl = lvl;
2189
2190         // link right node
2191
2192         if( set->latch->page_no > ROOT_page )
2193                 bt_putid (bt->frame->right, bt_getid (set->page->right));
2194
2195         //      get new free page and write higher keys to it.
2196
2197         if( bt_newpage(bt, right, bt->frame) )
2198                 return 0;
2199
2200         memcpy (bt->frame, set->page, bt->mgr->page_size);
2201         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2202         set->latch->dirty = 1;
2203
2204         nxt = bt->mgr->page_size;
2205         set->page->garbage = 0;
2206         set->page->act = 0;
2207         max /= 2;
2208         cnt = 0;
2209         idx = 0;
2210
2211         if( slotptr(bt->frame, max)->type == Librarian )
2212                 max--;
2213
2214         //  assemble page of smaller keys
2215
2216         while( cnt++ < max ) {
2217                 if( slotptr(bt->frame, cnt)->dead )
2218                         continue;
2219                 val = valptr(bt->frame, cnt);
2220                 nxt -= val->len + sizeof(BtVal);
2221                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2222
2223                 key = keyptr(bt->frame, cnt);
2224                 nxt -= key->len + sizeof(BtKey);
2225                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2226
2227                 //      add librarian slot
2228
2229                 slotptr(set->page, ++idx)->off = nxt;
2230                 slotptr(set->page, idx)->type = Librarian;
2231                 slotptr(set->page, idx)->dead = 1;
2232
2233                 //      add actual slot
2234
2235                 slotptr(set->page, ++idx)->off = nxt;
2236                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2237                 set->page->act++;
2238         }
2239
2240         bt_putid(set->page->right, right->latch->page_no);
2241         set->page->min = nxt;
2242         set->page->cnt = idx;
2243
2244         return right->latch->entry;
2245 }
2246
2247 //      fix keys for newly split page
2248 //      call with page locked, return
2249 //      unlocked
2250
2251 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2252 {
2253 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2254 unsigned char value[BtId];
2255 uint lvl = set->page->lvl;
2256 BtPage page;
2257 BtKey *ptr;
2258
2259         // if current page is the root page, split it
2260
2261         if( set->latch->page_no == ROOT_page )
2262                 return bt_splitroot (bt, set, right);
2263
2264         ptr = keyptr(set->page, set->page->cnt);
2265         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2266
2267         page = bt_mappage (bt, right);
2268
2269         ptr = keyptr(page, page->cnt);
2270         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2271
2272         // insert new fences in their parent pages
2273
2274         bt_lockpage (bt, BtLockParent, right);
2275
2276         bt_lockpage (bt, BtLockParent, set->latch);
2277         bt_unlockpage (bt, BtLockWrite, set->latch);
2278
2279         // insert new fence for reformulated left block of smaller keys
2280
2281         bt_putid (value, set->latch->page_no);
2282         ptr = (BtKey *)leftkey;
2283
2284         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2285                 return bt->err;
2286
2287         // switch fence for right block of larger keys to new right page
2288
2289         bt_putid (value, right->page_no);
2290         ptr = (BtKey *)rightkey;
2291
2292         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2293                 return bt->err;
2294
2295         bt_unlockpage (bt, BtLockParent, set->latch);
2296         bt_unpinlatch (set->latch);
2297
2298         bt_unlockpage (bt, BtLockParent, right);
2299         bt_unpinlatch (right);
2300         return 0;
2301 }
2302
2303 //      install new key and value onto page
2304 //      page must already be checked for
2305 //      adequate space
2306
2307 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2308 {
2309 uint idx, librarian;
2310 BtSlot *node;
2311 BtKey *ptr;
2312 BtVal *val;
2313
2314         //      if found slot > desired slot and previous slot
2315         //      is a librarian slot, use it
2316
2317         if( slot > 1 )
2318           if( slotptr(set->page, slot-1)->type == Librarian )
2319                 slot--;
2320
2321         // copy value onto page
2322
2323         set->page->min -= vallen + sizeof(BtVal);
2324         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2325         memcpy (val->value, value, vallen);
2326         val->len = vallen;
2327
2328         // copy key onto page
2329
2330         set->page->min -= keylen + sizeof(BtKey);
2331         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2332         memcpy (ptr->key, key, keylen);
2333         ptr->len = keylen;
2334         
2335         //      find first empty slot
2336
2337         for( idx = slot; idx < set->page->cnt; idx++ )
2338           if( slotptr(set->page, idx)->dead )
2339                 break;
2340
2341         // now insert key into array before slot
2342
2343         if( idx == set->page->cnt )
2344                 idx += 2, set->page->cnt += 2, librarian = 2;
2345         else
2346                 librarian = 1;
2347
2348         set->latch->dirty = 1;
2349         set->page->act++;
2350
2351         while( idx > slot + librarian - 1 )
2352                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2353
2354         //      add librarian slot
2355
2356         if( librarian > 1 ) {
2357                 node = slotptr(set->page, slot++);
2358                 node->off = set->page->min;
2359                 node->type = Librarian;
2360                 node->dead = 1;
2361         }
2362
2363         //      fill in new slot
2364
2365         node = slotptr(set->page, slot);
2366         node->off = set->page->min;
2367         node->type = type;
2368         node->dead = 0;
2369
2370         if( release ) {
2371                 bt_unlockpage (bt, BtLockWrite, set->latch);
2372                 bt_unpinlatch (set->latch);
2373         }
2374
2375         return 0;
2376 }
2377
2378 //  Insert new key into the btree at given level.
2379 //      either add a new key or update/add an existing one
2380
2381 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2382 {
2383 unsigned char newkey[BT_keyarray];
2384 uint slot, idx, len, entry;
2385 BtPageSet set[1];
2386 BtKey *ptr, *ins;
2387 uid sequence;
2388 BtVal *val;
2389 uint type;
2390
2391   // set up the key we're working on
2392
2393   ins = (BtKey*)newkey;
2394   memcpy (ins->key, key, keylen);
2395   ins->len = keylen;
2396
2397   // is this a non-unique index value?
2398
2399   if( unique )
2400         type = Unique;
2401   else {
2402         type = Duplicate;
2403         sequence = bt_newdup (bt);
2404         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2405         ins->len += BtId;
2406   }
2407
2408   while( 1 ) { // find the page and slot for the current key
2409         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2410                 ptr = keyptr(set->page, slot);
2411         else {
2412                 if( !bt->err )
2413                         bt->err = BTERR_ovflw;
2414                 return bt->err;
2415         }
2416
2417         // if librarian slot == found slot, advance to real slot
2418
2419         if( slotptr(set->page, slot)->type == Librarian )
2420           if( !keycmp (ptr, key, keylen) )
2421                 ptr = keyptr(set->page, ++slot);
2422
2423         len = ptr->len;
2424
2425         if( slotptr(set->page, slot)->type == Duplicate )
2426                 len -= BtId;
2427
2428         //  if inserting a duplicate key or unique key
2429         //      check for adequate space on the page
2430         //      and insert the new key before slot.
2431
2432         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2433           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2434                 if( !(entry = bt_splitpage (bt, set)) )
2435                   return bt->err;
2436                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2437                   return bt->err;
2438                 else
2439                   continue;
2440
2441           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2442         }
2443
2444         // if key already exists, update value and return
2445
2446         val = valptr(set->page, slot);
2447
2448         if( val->len >= vallen ) {
2449                 if( slotptr(set->page, slot)->dead )
2450                         set->page->act++;
2451                 set->page->garbage += val->len - vallen;
2452                 set->latch->dirty = 1;
2453                 slotptr(set->page, slot)->dead = 0;
2454                 val->len = vallen;
2455                 memcpy (val->value, value, vallen);
2456                 bt_unlockpage(bt, BtLockWrite, set->latch);
2457                 bt_unpinlatch (set->latch);
2458                 return 0;
2459         }
2460
2461         //      new update value doesn't fit in existing value area
2462
2463         if( !slotptr(set->page, slot)->dead )
2464                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2465         else {
2466                 slotptr(set->page, slot)->dead = 0;
2467                 set->page->act++;
2468         }
2469
2470         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2471           if( !(entry = bt_splitpage (bt, set)) )
2472                 return bt->err;
2473           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2474                 return bt->err;
2475           else
2476                 continue;
2477
2478         set->page->min -= vallen + sizeof(BtVal);
2479         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2480         memcpy (val->value, value, vallen);
2481         val->len = vallen;
2482
2483         set->latch->dirty = 1;
2484         set->page->min -= keylen + sizeof(BtKey);
2485         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2486         memcpy (ptr->key, key, keylen);
2487         ptr->len = keylen;
2488         
2489         slotptr(set->page, slot)->off = set->page->min;
2490         bt_unlockpage(bt, BtLockWrite, set->latch);
2491         bt_unpinlatch (set->latch);
2492         return 0;
2493   }
2494   return 0;
2495 }
2496
2497 typedef struct {
2498         logseqno lsn;           // redo log sequence number
2499         uint entry;                     // latch table entry number
2500         uint slot:31;           // page slot number
2501         uint reuse:1;           // reused previous page
2502 } AtomicTxn;
2503
2504 typedef struct {
2505         uid page_no;            // page number for split leaf
2506         void *next;                     // next key to insert
2507         uint entry:29;          // latch table entry number
2508         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2509         uint nounlock:1;        // don't unlock ParentModification
2510         unsigned char leafkey[BT_keyarray];
2511 } AtomicKey;
2512
2513 //      determine actual page where key is located
2514 //  return slot number
2515
2516 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2517 {
2518 BtKey *key = keyptr(source,src);
2519 uint slot = locks[src].slot;
2520 uint entry;
2521
2522         if( src > 1 && locks[src].reuse )
2523           entry = locks[src-1].entry, slot = 0;
2524         else
2525           entry = locks[src].entry;
2526
2527         if( slot ) {
2528                 set->latch = bt->mgr->latchsets + entry;
2529                 set->page = bt_mappage (bt, set->latch);
2530                 return slot;
2531         }
2532
2533         //      is locks->reuse set? or was slot zeroed?
2534         //      if so, find where our key is located 
2535         //      on current page or pages split on
2536         //      same page txn operations.
2537
2538         do {
2539                 set->latch = bt->mgr->latchsets + entry;
2540                 set->page = bt_mappage (bt, set->latch);
2541
2542                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2543                   if( slotptr(set->page, slot)->type == Librarian )
2544                         slot++;
2545                   if( locks[src].reuse )
2546                         locks[src].entry = entry;
2547                   return slot;
2548                 }
2549         } while( entry = set->latch->split );
2550
2551         bt->err = BTERR_atomic;
2552         return 0;
2553 }
2554
2555 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2556 {
2557 BtKey *key = keyptr(source, src);
2558 BtVal *val = valptr(source, src);
2559 BtLatchSet *latch;
2560 BtPageSet set[1];
2561 uint entry, slot;
2562
2563   while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2564         if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) ) {
2565           if( bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2566                 return bt->err;
2567           set->page->lsn = locks[src].lsn;
2568           return 0;
2569         }
2570
2571         if( entry = bt_splitpage (bt, set) )
2572           latch = bt->mgr->latchsets + entry;
2573         else
2574           return bt->err;
2575
2576         //      splice right page into split chain
2577         //      and WriteLock it.
2578
2579         bt_lockpage(bt, BtLockWrite, latch);
2580         latch->split = set->latch->split;
2581         set->latch->split = entry;
2582         locks[src].slot = 0;
2583   }
2584
2585   return bt->err = BTERR_atomic;
2586 }
2587
2588 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2589 {
2590 BtKey *key = keyptr(source, src);
2591 uint idx, entry, slot;
2592 BtPageSet set[1];
2593 BtKey *ptr;
2594 BtVal *val;
2595
2596         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2597           ptr = keyptr(set->page, slot);
2598         else
2599           return bt->err = BTERR_struct;
2600
2601         if( !keycmp (ptr, key->key, key->len) )
2602           if( !slotptr(set->page, slot)->dead )
2603                 slotptr(set->page, slot)->dead = 1;
2604           else
2605                 return 0;
2606         else
2607                 return 0;
2608
2609         val = valptr(set->page, slot);
2610         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2611         set->page->lsn = locks[src].lsn;
2612         set->latch->dirty = 1;
2613         set->page->act--;
2614         bt->found++;
2615         return 0;
2616 }
2617
2618 //      delete an empty master page for a transaction
2619
2620 //      note that the far right page never empties because
2621 //      it always contains (at least) the infinite stopper key
2622 //      and that all pages that don't contain any keys are
2623 //      deleted, or are being held under Atomic lock.
2624
2625 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2626 {
2627 BtPageSet right[1], temp[1];
2628 unsigned char value[BtId];
2629 uid right_page_no;
2630 BtKey *ptr;
2631
2632         bt_lockpage(bt, BtLockWrite, prev->latch);
2633
2634         //      grab the right sibling
2635
2636         if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2637                 right->page = bt_mappage (bt, right->latch);
2638         else
2639                 return bt->err;
2640
2641         bt_lockpage(bt, BtLockAtomic, right->latch);
2642         bt_lockpage(bt, BtLockWrite, right->latch);
2643
2644         //      and pull contents over empty page
2645         //      while preserving master's left link
2646
2647         memcpy (right->page->left, prev->page->left, BtId);
2648         memcpy (prev->page, right->page, bt->mgr->page_size);
2649
2650         //      forward seekers to old right sibling
2651         //      to new page location in set
2652
2653         bt_putid (right->page->right, prev->latch->page_no);
2654         right->latch->dirty = 1;
2655         right->page->kill = 1;
2656
2657         //      remove pointer to right page for searchers by
2658         //      changing right fence key to point to the master page
2659
2660         ptr = keyptr(right->page,right->page->cnt);
2661         bt_putid (value, prev->latch->page_no);
2662
2663         if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2664                 return bt->err;
2665
2666         //  now that master page is in good shape we can
2667         //      remove its locks.
2668
2669         bt_unlockpage (bt, BtLockAtomic, prev->latch);
2670         bt_unlockpage (bt, BtLockWrite, prev->latch);
2671
2672         //  fix master's right sibling's left pointer
2673         //      to remove scanner's poiner to the right page
2674
2675         if( right_page_no = bt_getid (prev->page->right) ) {
2676           if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
2677                 temp->page = bt_mappage (bt, temp->latch);
2678
2679           bt_lockpage (bt, BtLockWrite, temp->latch);
2680           bt_putid (temp->page->left, prev->latch->page_no);
2681           temp->latch->dirty = 1;
2682
2683           bt_unlockpage (bt, BtLockWrite, temp->latch);
2684           bt_unpinlatch (temp->latch);
2685         } else {        // master is now the far right page
2686           bt_spinwritelock (bt->mgr->lock);
2687           bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2688           bt_spinreleasewrite(bt->mgr->lock);
2689         }
2690
2691         //      now that there are no pointers to the right page
2692         //      we can delete it after the last read access occurs
2693
2694         bt_unlockpage (bt, BtLockWrite, right->latch);
2695         bt_unlockpage (bt, BtLockAtomic, right->latch);
2696         bt_lockpage (bt, BtLockDelete, right->latch);
2697         bt_lockpage (bt, BtLockWrite, right->latch);
2698         bt_freepage (bt, right);
2699         return 0;
2700 }
2701
2702 //      atomic modification of a batch of keys.
2703
2704 //      return -1 if BTERR is set
2705 //      otherwise return slot number
2706 //      causing the key constraint violation
2707 //      or zero on successful completion.
2708
2709 int bt_atomictxn (BtDb *bt, BtPage source)
2710 {
2711 uint src, idx, slot, samepage, entry;
2712 AtomicKey *head, *tail, *leaf;
2713 BtPageSet set[1], prev[1];
2714 unsigned char value[BtId];
2715 BtKey *key, *ptr, *key2;
2716 BtLatchSet *latch;
2717 AtomicTxn *locks;
2718 int result = 0;
2719 BtSlot temp[1];
2720 BtPage page;
2721 BtVal *val;
2722 uid right;
2723 int type;
2724
2725   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2726   head = NULL;
2727   tail = NULL;
2728
2729   // stable sort the list of keys into order to
2730   //    prevent deadlocks between threads.
2731
2732   for( src = 1; src++ < source->cnt; ) {
2733         *temp = *slotptr(source,src);
2734         key = keyptr (source,src);
2735
2736         for( idx = src; --idx; ) {
2737           key2 = keyptr (source,idx);
2738           if( keycmp (key, key2->key, key2->len) < 0 ) {
2739                 *slotptr(source,idx+1) = *slotptr(source,idx);
2740                 *slotptr(source,idx) = *temp;
2741           } else
2742                 break;
2743         }
2744   }
2745
2746   // Load the leaf page for each key
2747   // group same page references with reuse bit
2748   // and determine any constraint violations
2749
2750   for( src = 0; src++ < source->cnt; ) {
2751         key = keyptr(source, src);
2752         slot = 0;
2753
2754         // first determine if this modification falls
2755         // on the same page as the previous modification
2756         //      note that the far right leaf page is a special case
2757
2758         if( samepage = src > 1 )
2759           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2760                 slot = bt_findslot(set->page, key->key, key->len);
2761           else
2762                 bt_unlockpage(bt, BtLockRead, set->latch); 
2763
2764         if( !slot )
2765           if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) )
2766                 set->latch->split = 0;
2767           else
2768                 goto atomicerr;
2769
2770         if( slotptr(set->page, slot)->type == Librarian )
2771           ptr = keyptr(set->page, ++slot);
2772         else
2773           ptr = keyptr(set->page, slot);
2774
2775         if( !samepage ) {
2776           locks[src].entry = set->latch->entry;
2777           locks[src].slot = slot;
2778           locks[src].reuse = 0;
2779         } else {
2780           locks[src].entry = 0;
2781           locks[src].slot = 0;
2782           locks[src].reuse = 1;
2783         }
2784
2785         switch( slotptr(source, src)->type ) {
2786         case Duplicate:
2787         case Unique:
2788           if( !slotptr(set->page, slot)->dead )
2789            if( slot < set->page->cnt || bt_getid (set->page->right) )
2790             if( !keycmp (ptr, key->key, key->len) ) {
2791
2792                   // return constraint violation if key already exists
2793
2794                   bt_unlockpage(bt, BtLockRead, set->latch);
2795                   result = src;
2796
2797                   while( src ) {
2798                         if( locks[src].entry ) {
2799                           set->latch = bt->mgr->latchsets + locks[src].entry;
2800                           bt_unlockpage(bt, BtLockAtomic, set->latch);
2801                           bt_unpinlatch (set->latch);
2802                         }
2803                         src--;
2804                   }
2805                   free (locks);
2806                   return result;
2807                 }
2808           break;
2809         }
2810   }
2811
2812   //  unlock last loadpage lock
2813
2814   if( source->cnt )
2815         bt_unlockpage(bt, BtLockRead, set->latch);
2816
2817   //  and add entries to redo log
2818
2819   for( src = 0; src++ < source->cnt; ) {
2820         key = keyptr(source, src);
2821         val = valptr(source, src);
2822         switch( slotptr(source, src)->type ) {
2823         case Unique:
2824                 type = BTRM_add;
2825                 break;
2826         case Duplicate:
2827                 type = BTRM_dup;
2828                 break;
2829         case Delete:
2830                 type = BTRM_del;
2831                 break;
2832         case Update:
2833                 type = BTRM_upd;
2834                 break;
2835         }
2836
2837         if( locks[src].lsn = bt_newredo (bt, type, 0, key, val) )
2838                 continue;
2839
2840         goto atomicerr;
2841   }
2842
2843   //  obtain write lock for each master page
2844
2845   for( src = 0; src++ < source->cnt; ) {
2846         if( locks[src].reuse )
2847           continue;
2848         else
2849           bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2850   }
2851
2852   // insert or delete each key
2853   // process any splits or merges
2854   // release Write & Atomic latches
2855   // set ParentModifications and build
2856   // queue of keys to insert for split pages
2857   // or delete for deleted pages.
2858
2859   // run through txn list backwards
2860
2861   samepage = source->cnt + 1;
2862
2863   for( src = source->cnt; src; src-- ) {
2864         if( locks[src].reuse )
2865           continue;
2866
2867         //  perform the txn operations
2868         //      from smaller to larger on
2869         //  the same page
2870
2871         for( idx = src; idx < samepage; idx++ )
2872          switch( slotptr(source,idx)->type ) {
2873          case Delete:
2874           if( bt_atomicdelete (bt, source, locks, idx) )
2875                 goto atomicerr;
2876           break;
2877
2878         case Duplicate:
2879         case Unique:
2880           if( bt_atomicinsert (bt, source, locks, idx) )
2881                 goto atomicerr;
2882           break;
2883         }
2884
2885         //      after the same page operations have finished,
2886         //  process master page for splits or deletion.
2887
2888         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2889         prev->page = bt_mappage (bt, prev->latch);
2890         samepage = src;
2891
2892         //  pick-up all splits from master page
2893         //      each one is already WriteLocked.
2894
2895         entry = prev->latch->split;
2896
2897         while( entry ) {
2898           set->latch = bt->mgr->latchsets + entry;
2899           set->page = bt_mappage (bt, set->latch);
2900           entry = set->latch->split;
2901
2902           // delete empty master page by undoing its split
2903           //  (this is potentially another empty page)
2904           //  note that there are no new left pointers yet
2905
2906           if( !prev->page->act ) {
2907                 memcpy (set->page->left, prev->page->left, BtId);
2908                 memcpy (prev->page, set->page, bt->mgr->page_size);
2909                 bt_lockpage (bt, BtLockDelete, set->latch);
2910                 bt_freepage (bt, set);
2911
2912                 prev->latch->split = set->latch->split;
2913                 prev->latch->dirty = 1;
2914                 continue;
2915           }
2916
2917           // remove empty page from the split chain
2918           // and return it to the free list.
2919
2920           if( !set->page->act ) {
2921                 memcpy (prev->page->right, set->page->right, BtId);
2922                 prev->latch->split = set->latch->split;
2923                 bt_lockpage (bt, BtLockDelete, set->latch);
2924                 bt_freepage (bt, set);
2925                 continue;
2926           }
2927
2928           //  schedule prev fence key update
2929
2930           ptr = keyptr(prev->page,prev->page->cnt);
2931           leaf = calloc (sizeof(AtomicKey), 1);
2932
2933           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2934           leaf->page_no = prev->latch->page_no;
2935           leaf->entry = prev->latch->entry;
2936           leaf->type = 0;
2937
2938           if( tail )
2939                 tail->next = leaf;
2940           else
2941                 head = leaf;
2942
2943           tail = leaf;
2944
2945           // splice in the left link into the split page
2946
2947           bt_putid (set->page->left, prev->latch->page_no);
2948           bt_lockpage(bt, BtLockParent, prev->latch);
2949           bt_unlockpage(bt, BtLockWrite, prev->latch);
2950           *prev = *set;
2951         }
2952
2953         //  update left pointer in next right page from last split page
2954         //      (if all splits were reversed, latch->split == 0)
2955
2956         if( latch->split ) {
2957           //  fix left pointer in master's original (now split)
2958           //  far right sibling or set rightmost page in page zero
2959
2960           if( right = bt_getid (prev->page->right) ) {
2961                 if( set->latch = bt_pinlatch (bt, right, 1) )
2962                   set->page = bt_mappage (bt, set->latch);
2963                 else
2964                   goto atomicerr;
2965
2966             bt_lockpage (bt, BtLockWrite, set->latch);
2967             bt_putid (set->page->left, prev->latch->page_no);
2968                 set->latch->dirty = 1;
2969             bt_unlockpage (bt, BtLockWrite, set->latch);
2970                 bt_unpinlatch (set->latch);
2971           } else {      // prev is rightmost page
2972             bt_spinwritelock (bt->mgr->lock);
2973                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2974             bt_spinreleasewrite(bt->mgr->lock);
2975           }
2976
2977           //  Process last page split in chain
2978
2979           ptr = keyptr(prev->page,prev->page->cnt);
2980           leaf = calloc (sizeof(AtomicKey), 1);
2981
2982           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2983           leaf->page_no = prev->latch->page_no;
2984           leaf->entry = prev->latch->entry;
2985           leaf->type = 0;
2986   
2987           if( tail )
2988                 tail->next = leaf;
2989           else
2990                 head = leaf;
2991
2992           tail = leaf;
2993
2994           bt_lockpage(bt, BtLockParent, prev->latch);
2995           bt_unlockpage(bt, BtLockWrite, prev->latch);
2996
2997           //  remove atomic lock on master page
2998
2999           bt_unlockpage(bt, BtLockAtomic, latch);
3000           continue;
3001         }
3002
3003         //  finished if prev page occupied (either master or final split)
3004
3005         if( prev->page->act ) {
3006           bt_unlockpage(bt, BtLockWrite, latch);
3007           bt_unlockpage(bt, BtLockAtomic, latch);
3008           bt_unpinlatch(latch);
3009           continue;
3010         }
3011
3012         // any and all splits were reversed, and the
3013         // master page located in prev is empty, delete it
3014         // by pulling over master's right sibling.
3015
3016         // Remove empty master's fence key
3017
3018         ptr = keyptr(prev->page,prev->page->cnt);
3019
3020         if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3021                 goto atomicerr;
3022
3023         //      perform the remainder of the delete
3024         //      from the FIFO queue
3025
3026         leaf = calloc (sizeof(AtomicKey), 1);
3027
3028         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3029         leaf->page_no = prev->latch->page_no;
3030         leaf->entry = prev->latch->entry;
3031         leaf->nounlock = 1;
3032         leaf->type = 2;
3033   
3034         if( tail )
3035           tail->next = leaf;
3036         else
3037           head = leaf;
3038
3039         tail = leaf;
3040
3041         //      leave atomic lock in place until
3042         //      deletion completes in next phase.
3043
3044         bt_unlockpage(bt, BtLockWrite, prev->latch);
3045   }
3046
3047   //  add & delete keys for any pages split or merged during transaction
3048
3049   if( leaf = head )
3050     do {
3051           set->latch = bt->mgr->latchsets + leaf->entry;
3052           set->page = bt_mappage (bt, set->latch);
3053
3054           bt_putid (value, leaf->page_no);
3055           ptr = (BtKey *)leaf->leafkey;
3056
3057           switch( leaf->type ) {
3058           case 0:       // insert key
3059             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
3060                   goto atomicerr;
3061
3062                 break;
3063
3064           case 1:       // delete key
3065                 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3066                   goto atomicerr;
3067
3068                 break;
3069
3070           case 2:       // free page
3071                 if( bt_atomicfree (bt, set) )
3072                   goto atomicerr;
3073
3074                 break;
3075           }
3076
3077           if( !leaf->nounlock )
3078             bt_unlockpage (bt, BtLockParent, set->latch);
3079
3080           bt_unpinlatch (set->latch);
3081           tail = leaf->next;
3082           free (leaf);
3083         } while( leaf = tail );
3084
3085   // return success
3086
3087   free (locks);
3088   return 0;
3089 atomicerr:
3090   return -1;
3091 }
3092
3093 //      set cursor to highest slot on highest page
3094
3095 uint bt_lastkey (BtDb *bt)
3096 {
3097 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3098 BtPageSet set[1];
3099
3100         if( set->latch = bt_pinlatch (bt, page_no, 1) )
3101                 set->page = bt_mappage (bt, set->latch);
3102         else
3103                 return 0;
3104
3105     bt_lockpage(bt, BtLockRead, set->latch);
3106         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3107     bt_unlockpage(bt, BtLockRead, set->latch);
3108         bt_unpinlatch (set->latch);
3109
3110         bt->cursor_page = page_no;
3111         return bt->cursor->cnt;
3112 }
3113
3114 //      return previous slot on cursor page
3115
3116 uint bt_prevkey (BtDb *bt, uint slot)
3117 {
3118 uid ourright, next, us = bt->cursor_page;
3119 BtPageSet set[1];
3120
3121         if( --slot )
3122                 return slot;
3123
3124         ourright = bt_getid(bt->cursor->right);
3125
3126 goleft:
3127         if( !(next = bt_getid(bt->cursor->left)) )
3128                 return 0;
3129
3130 findourself:
3131         bt->cursor_page = next;
3132
3133         if( set->latch = bt_pinlatch (bt, next, 1) )
3134                 set->page = bt_mappage (bt, set->latch);
3135         else
3136                 return 0;
3137
3138     bt_lockpage(bt, BtLockRead, set->latch);
3139         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3140         bt_unlockpage(bt, BtLockRead, set->latch);
3141         bt_unpinlatch (set->latch);
3142         
3143         next = bt_getid (bt->cursor->right);
3144
3145         if( bt->cursor->kill )
3146                 goto findourself;
3147
3148         if( next != us )
3149           if( next == ourright )
3150                 goto goleft;
3151           else
3152                 goto findourself;
3153
3154         return bt->cursor->cnt;
3155 }
3156
3157 //  return next slot on cursor page
3158 //  or slide cursor right into next page
3159
3160 uint bt_nextkey (BtDb *bt, uint slot)
3161 {
3162 BtPageSet set[1];
3163 uid right;
3164
3165   do {
3166         right = bt_getid(bt->cursor->right);
3167
3168         while( slot++ < bt->cursor->cnt )
3169           if( slotptr(bt->cursor,slot)->dead )
3170                 continue;
3171           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3172                 return slot;
3173           else
3174                 break;
3175
3176         if( !right )
3177                 break;
3178
3179         bt->cursor_page = right;
3180
3181         if( set->latch = bt_pinlatch (bt, right, 1) )
3182                 set->page = bt_mappage (bt, set->latch);
3183         else
3184                 return 0;
3185
3186     bt_lockpage(bt, BtLockRead, set->latch);
3187
3188         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3189
3190         bt_unlockpage(bt, BtLockRead, set->latch);
3191         bt_unpinlatch (set->latch);
3192         slot = 0;
3193
3194   } while( 1 );
3195
3196   return bt->err = 0;
3197 }
3198
3199 //  cache page of keys into cursor and return starting slot for given key
3200
3201 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3202 {
3203 BtPageSet set[1];
3204 uint slot;
3205
3206         // cache page for retrieval
3207
3208         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
3209           memcpy (bt->cursor, set->page, bt->mgr->page_size);
3210         else
3211           return 0;
3212
3213         bt->cursor_page = set->latch->page_no;
3214
3215         bt_unlockpage(bt, BtLockRead, set->latch);
3216         bt_unpinlatch (set->latch);
3217         return slot;
3218 }
3219
3220 BtKey *bt_key(BtDb *bt, uint slot)
3221 {
3222         return keyptr(bt->cursor, slot);
3223 }
3224
3225 BtVal *bt_val(BtDb *bt, uint slot)
3226 {
3227         return valptr(bt->cursor,slot);
3228 }
3229
3230 #ifdef STANDALONE
3231
3232 #ifndef unix
3233 double getCpuTime(int type)
3234 {
3235 FILETIME crtime[1];
3236 FILETIME xittime[1];
3237 FILETIME systime[1];
3238 FILETIME usrtime[1];
3239 SYSTEMTIME timeconv[1];
3240 double ans = 0;
3241
3242         memset (timeconv, 0, sizeof(SYSTEMTIME));
3243
3244         switch( type ) {
3245         case 0:
3246                 GetSystemTimeAsFileTime (xittime);
3247                 FileTimeToSystemTime (xittime, timeconv);
3248                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3249                 break;
3250         case 1:
3251                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3252                 FileTimeToSystemTime (usrtime, timeconv);
3253                 break;
3254         case 2:
3255                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3256                 FileTimeToSystemTime (systime, timeconv);
3257                 break;
3258         }
3259
3260         ans += (double)timeconv->wHour * 3600;
3261         ans += (double)timeconv->wMinute * 60;
3262         ans += (double)timeconv->wSecond;
3263         ans += (double)timeconv->wMilliseconds / 1000;
3264         return ans;
3265 }
3266 #else
3267 #include <time.h>
3268 #include <sys/resource.h>
3269
3270 double getCpuTime(int type)
3271 {
3272 struct rusage used[1];
3273 struct timeval tv[1];
3274
3275         switch( type ) {
3276         case 0:
3277                 gettimeofday(tv, NULL);
3278                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3279
3280         case 1:
3281                 getrusage(RUSAGE_SELF, used);
3282                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3283
3284         case 2:
3285                 getrusage(RUSAGE_SELF, used);
3286                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3287         }
3288
3289         return 0;
3290 }
3291 #endif
3292
3293 void bt_poolaudit (BtMgr *mgr)
3294 {
3295 BtLatchSet *latch;
3296 uint slot = 0;
3297
3298         while( slot++ < mgr->latchdeployed ) {
3299                 latch = mgr->latchsets + slot;
3300
3301                 if( *latch->readwr->rin & MASK )
3302                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3303                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3304
3305                 if( *latch->access->rin & MASK )
3306                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3307                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3308
3309                 if( *latch->parent->ticket != *latch->parent->serving )
3310                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3311                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3312
3313                 if( latch->pin & ~CLOCK_bit ) {
3314                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3315                         latch->pin = 0;
3316                 }
3317         }
3318 }
3319
3320 uint bt_latchaudit (BtDb *bt)
3321 {
3322 ushort idx, hashidx;
3323 uid next, page_no;
3324 BtLatchSet *latch;
3325 uint cnt = 0;
3326 BtKey *ptr;
3327
3328         if( *(ushort *)(bt->mgr->lock) )
3329                 fprintf(stderr, "Alloc page locked\n");
3330         *(ushort *)(bt->mgr->lock) = 0;
3331
3332         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3333                 latch = bt->mgr->latchsets + idx;
3334                 if( *latch->readwr->rin & MASK )
3335                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3336                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3337
3338                 if( *latch->access->rin & MASK )
3339                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3340                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3341
3342                 if( *latch->parent->ticket != *latch->parent->serving )
3343                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3344                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3345
3346                 if( latch->pin ) {
3347                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3348                         latch->pin = 0;
3349                 }
3350         }
3351
3352         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3353           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3354                         fprintf(stderr, "hash entry %d locked\n", hashidx);
3355
3356           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3357
3358           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3359                 latch = bt->mgr->latchsets + idx;
3360                 if( latch->pin )
3361                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3362           } while( idx = latch->next );
3363         }
3364
3365         page_no = LEAF_page;
3366
3367         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3368         uid off = page_no << bt->mgr->page_bits;
3369 #ifdef unix
3370           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3371 #else
3372         DWORD amt[1];
3373
3374           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3375
3376           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3377                 return bt->err = BTERR_map;
3378
3379           if( *amt <  bt->mgr->page_size )
3380                 return bt->err = BTERR_map;
3381 #endif
3382                 if( !bt->frame->free && !bt->frame->lvl )
3383                         cnt += bt->frame->act;
3384                 page_no++;
3385         }
3386                 
3387         cnt--;  // remove stopper key
3388         fprintf(stderr, " Total keys read %d\n", cnt);
3389
3390         bt_close (bt);
3391         return 0;
3392 }
3393
3394 typedef struct {
3395         char idx;
3396         char *type;
3397         char *infile;
3398         BtMgr *mgr;
3399         int num;
3400 } ThreadArg;
3401
3402 //  standalone program to index file of keys
3403 //  then list them onto std-out
3404
3405 #ifdef unix
3406 void *index_file (void *arg)
3407 #else
3408 uint __stdcall index_file (void *arg)
3409 #endif
3410 {
3411 int line = 0, found = 0, cnt = 0, idx;
3412 uid next, page_no = LEAF_page;  // start on first page of leaves
3413 int ch, len = 0, slot, type = 0;
3414 unsigned char key[BT_maxkey];
3415 unsigned char txn[65536];
3416 ThreadArg *args = arg;
3417 BtPageSet set[1];
3418 uint nxt = 65536;
3419 BtPage page;
3420 BtKey *ptr;
3421 BtVal *val;
3422 BtDb *bt;
3423 FILE *in;
3424
3425         bt = bt_open (args->mgr);
3426         page = (BtPage)txn;
3427
3428         if( args->idx < strlen (args->type) )
3429                 ch = args->type[args->idx];
3430         else
3431                 ch = args->type[strlen(args->type) - 1];
3432
3433         switch(ch | 0x20)
3434         {
3435         case 'a':
3436                 fprintf(stderr, "started latch mgr audit\n");
3437                 cnt = bt_latchaudit (bt);
3438                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3439                 break;
3440
3441         case 'd':
3442                 type = Delete;
3443
3444         case 'p':
3445                 if( !type )
3446                         type = Unique;
3447
3448                 if( args->num )
3449                  if( type == Delete )
3450                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3451                  else
3452                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3453                 else
3454                  if( type == Delete )
3455                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3456                  else
3457                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3458
3459                 if( in = fopen (args->infile, "rb") )
3460                   while( ch = getc(in), ch != EOF )
3461                         if( ch == '\n' )
3462                         {
3463                           line++;
3464
3465                           if( !args->num ) {
3466                             if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3467                                   fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3468                             len = 0;
3469                                 continue;
3470                           }
3471
3472                           nxt -= len - 10;
3473                           memcpy (txn + nxt, key + 10, len - 10);
3474                           nxt -= 1;
3475                           txn[nxt] = len - 10;
3476                           nxt -= 10;
3477                           memcpy (txn + nxt, key, 10);
3478                           nxt -= 1;
3479                           txn[nxt] = 10;
3480                           slotptr(page,++cnt)->off  = nxt;
3481                           slotptr(page,cnt)->type = type;
3482                           len = 0;
3483
3484                           if( cnt < args->num )
3485                                 continue;
3486
3487                           page->cnt = cnt;
3488                           page->act = cnt;
3489                           page->min = nxt;
3490
3491                           if( bt_atomictxn (bt, page) )
3492                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3493                           nxt = sizeof(txn);
3494                           cnt = 0;
3495
3496                         }
3497                         else if( len < BT_maxkey )
3498                                 key[len++] = ch;
3499                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
3500                 break;
3501
3502         case 'w':
3503                 fprintf(stderr, "started indexing for %s\n", args->infile);
3504                 if( in = fopen (args->infile, "r") )
3505                   while( ch = getc(in), ch != EOF )
3506                         if( ch == '\n' )
3507                         {
3508                           line++;
3509
3510                           if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3511                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3512                           len = 0;
3513                         }
3514                         else if( len < BT_maxkey )
3515                                 key[len++] = ch;
3516                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3517                 break;
3518
3519         case 'f':
3520                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3521                 if( in = fopen (args->infile, "rb") )
3522                   while( ch = getc(in), ch != EOF )
3523                         if( ch == '\n' )
3524                         {
3525                           line++;
3526                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3527                                 found++;
3528                           else if( bt->err )
3529                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3530                           len = 0;
3531                         }
3532                         else if( len < BT_maxkey )
3533                                 key[len++] = ch;
3534                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3535                 break;
3536
3537         case 's':
3538                 fprintf(stderr, "started scanning\n");
3539                 do {
3540                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
3541                                 set->page = bt_mappage (bt, set->latch);
3542                         else
3543                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3544                         bt_lockpage (bt, BtLockRead, set->latch);
3545                         next = bt_getid (set->page->right);
3546
3547                         for( slot = 0; slot++ < set->page->cnt; )
3548                          if( next || slot < set->page->cnt )
3549                           if( !slotptr(set->page, slot)->dead ) {
3550                                 ptr = keyptr(set->page, slot);
3551                                 len = ptr->len;
3552
3553                             if( slotptr(set->page, slot)->type == Duplicate )
3554                                         len -= BtId;
3555
3556                                 fwrite (ptr->key, len, 1, stdout);
3557                                 val = valptr(set->page, slot);
3558                                 fwrite (val->value, val->len, 1, stdout);
3559                                 fputc ('\n', stdout);
3560                                 cnt++;
3561                            }
3562
3563                         bt_unlockpage (bt, BtLockRead, set->latch);
3564                         bt_unpinlatch (set->latch);
3565                 } while( page_no = next );
3566
3567                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3568                 break;
3569
3570         case 'r':
3571                 fprintf(stderr, "started reverse scan\n");
3572                 if( slot = bt_lastkey (bt) )
3573                    while( slot = bt_prevkey (bt, slot) ) {
3574                         if( slotptr(bt->cursor, slot)->dead )
3575                           continue;
3576
3577                         ptr = keyptr(bt->cursor, slot);
3578                         len = ptr->len;
3579
3580                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3581                                 len -= BtId;
3582
3583                         fwrite (ptr->key, len, 1, stdout);
3584                         val = valptr(bt->cursor, slot);
3585                         fwrite (val->value, val->len, 1, stdout);
3586                         fputc ('\n', stdout);
3587                         cnt++;
3588                   }
3589
3590                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3591                 break;
3592
3593         case 'c':
3594 #ifdef unix
3595                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3596 #endif
3597                 fprintf(stderr, "started counting\n");
3598                 page_no = LEAF_page;
3599
3600                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3601                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3602                                 break;
3603
3604                         if( !bt->frame->free && !bt->frame->lvl )
3605                                 cnt += bt->frame->act;
3606
3607                         bt->reads++;
3608                         page_no++;
3609                 }
3610                 
3611                 cnt--;  // remove stopper key
3612                 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3613                 break;
3614         }
3615
3616         bt_close (bt);
3617 #ifdef unix
3618         return NULL;
3619 #else
3620         return 0;
3621 #endif
3622 }
3623
3624 typedef struct timeval timer;
3625
3626 int main (int argc, char **argv)
3627 {
3628 int idx, cnt, len, slot, err;
3629 int segsize, bits = 16;
3630 double start, stop;
3631 #ifdef unix
3632 pthread_t *threads;
3633 #else
3634 HANDLE *threads;
3635 #endif
3636 ThreadArg *args;
3637 uint poolsize = 0;
3638 uint recovery = 0;
3639 float elapsed;
3640 int num = 0;
3641 char key[1];
3642 BtMgr *mgr;
3643 BtKey *ptr;
3644 BtDb *bt;
3645
3646         if( argc < 3 ) {
3647                 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size recovery_pages src_file1 src_file2 ... ]\n", argv[0]);
3648                 fprintf (stderr, "  where idx_file is the name of the btree file\n");
3649                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3650                 fprintf (stderr, "  page_bits is the page size in bits\n");
3651                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3652                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3653                 fprintf (stderr, "  recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3654                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3655                 exit(0);
3656         }
3657
3658         start = getCpuTime(0);
3659
3660         if( argc > 3 )
3661                 bits = atoi(argv[3]);
3662
3663         if( argc > 4 )
3664                 poolsize = atoi(argv[4]);
3665
3666         if( !poolsize )
3667                 fprintf (stderr, "Warning: no mapped_pool\n");
3668
3669         if( argc > 5 )
3670                 num = atoi(argv[5]);
3671
3672         if( argc > 6 )
3673                 recovery = atoi(argv[6]);
3674
3675         cnt = argc - 7;
3676 #ifdef unix
3677         threads = malloc (cnt * sizeof(pthread_t));
3678 #else
3679         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3680 #endif
3681         args = malloc (cnt * sizeof(ThreadArg));
3682
3683         mgr = bt_mgr ((argv[1]), bits, poolsize, recovery);
3684
3685         if( !mgr ) {
3686                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3687                 exit (1);
3688         }
3689
3690         //      fire off threads
3691
3692         for( idx = 0; idx < cnt; idx++ ) {
3693                 args[idx].infile = argv[idx + 7];
3694                 args[idx].type = argv[2];
3695                 args[idx].mgr = mgr;
3696                 args[idx].num = num;
3697                 args[idx].idx = idx;
3698 #ifdef unix
3699                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3700                         fprintf(stderr, "Error creating thread %d\n", err);
3701 #else
3702                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3703 #endif
3704         }
3705
3706         //      wait for termination
3707
3708 #ifdef unix
3709         for( idx = 0; idx < cnt; idx++ )
3710                 pthread_join (threads[idx], NULL);
3711 #else
3712         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3713
3714         for( idx = 0; idx < cnt; idx++ )
3715                 CloseHandle(threads[idx]);
3716
3717 #endif
3718         bt_poolaudit(mgr);
3719         bt_mgrclose (mgr);
3720
3721         elapsed = getCpuTime(0) - start;
3722         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3723         elapsed = getCpuTime(1);
3724         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3725         elapsed = getCpuTime(2);
3726         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3727 }
3728
3729 #endif  //STANDALONE