]> pd.if.org Git - btree/blob - threadskv9c.c
Replace futex latches with spin latches
[btree] / threadskv9c.c
1 // btree version threadskv9c sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      and redo log for failure recovery
10
11 // 07 OCT 2014
12
13 // author: karl malbrain, malbrain@cal.berkeley.edu
14
15 /*
16 This work, including the source code, documentation
17 and related data, is placed into the public domain.
18
19 The orginal author is Karl Malbrain.
20
21 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
22 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
23 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
24 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
25 RESULTING FROM THE USE, MODIFICATION, OR
26 REDISTRIBUTION OF THIS SOFTWARE.
27 */
28
29 // Please see the project home page for documentation
30 // code.google.com/p/high-concurrency-btree
31
32 #define _FILE_OFFSET_BITS 64
33 #define _LARGEFILE64_SOURCE
34
35 #ifdef linux
36 #define _GNU_SOURCE
37 #include <linux/futex.h>
38 #define SYS_futex 202
39 #endif
40
41 #ifdef unix
42 #include <unistd.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <fcntl.h>
46 #include <sys/time.h>
47 #include <sys/mman.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <limits.h>
51 #else
52 #define WIN32_LEAN_AND_MEAN
53 #include <windows.h>
54 #include <stdio.h>
55 #include <stdlib.h>
56 #include <time.h>
57 #include <fcntl.h>
58 #include <process.h>
59 #include <intrin.h>
60 #endif
61
62 #include <memory.h>
63 #include <string.h>
64 #include <stddef.h>
65
66 typedef unsigned long long      uid;
67 typedef unsigned long long      logseqno;
68
69 #ifndef unix
70 typedef unsigned long long      off64_t;
71 typedef unsigned short          ushort;
72 typedef unsigned int            uint;
73 #endif
74
75 #define BT_ro 0x6f72    // ro
76 #define BT_rw 0x7772    // rw
77
78 #define BT_maxbits              24                                      // maximum page size in bits
79 #define BT_minbits              9                                       // minimum page size in bits
80 #define BT_minpage              (1 << BT_minbits)       // minimum page size
81 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
82
83 //  BTree page number constants
84 #define ALLOC_page              0       // allocation page
85 #define ROOT_page               1       // root of the btree
86 #define LEAF_page               2       // first page of leaves
87 #define REDO_page               3       // first page of redo buffer
88
89 //      Number of levels to create in a new BTree
90
91 #define MIN_lvl                 2
92
93 /*
94 There are six lock types for each node in four independent sets: 
95 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
96 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
97 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
98 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
99 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
100 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
101 */
102
103 typedef enum{
104         BtLockAccess = 1,
105         BtLockDelete = 2,
106         BtLockRead   = 4,
107         BtLockWrite  = 8,
108         BtLockParent = 16,
109         BtLockAtomic = 32
110 } BtLock;
111
112 //      definition for phase-fair reader/writer lock implementation
113
114 typedef struct {
115         volatile ushort rin[1];
116         volatile ushort rout[1];
117         volatile ushort ticket[1];
118         volatile ushort serving[1];
119         ushort tid;
120         ushort dup;
121 } RWLock;
122
123 //      write only lock
124
125 typedef struct {
126         volatile ushort ticket[1];
127         volatile ushort serving[1];
128         ushort tid;
129         ushort dup;
130 } WOLock;
131
132 #define PHID 0x1
133 #define PRES 0x2
134 #define MASK 0x3
135 #define RINC 0x4
136
137 //      lite weight mutex
138
139 // exclusive is set for write access
140
141 typedef struct {
142         volatile unsigned char exclusive[1];
143         unsigned char filler;
144 } BtMutexLatch;
145
146 #define XCL 1
147
148 //      mode & definition for lite latch implementation
149
150 enum {
151         QueRd = 1,              // reader queue
152         QueWr = 2               // writer queue
153 } RWQueue;
154
155 //  hash table entries
156
157 typedef struct {
158         uint entry;             // Latch table entry at head of chain
159         BtMutexLatch latch[1];
160 } BtHashEntry;
161
162 //      latch manager table structure
163
164 typedef struct {
165         uid page_no;                                    // latch set page number
166         RWLock readwr[1];                               // read/write page lock
167         RWLock access[1];                               // Access Intent/Page delete
168         WOLock parent[1];                               // Posting of fence key in parent
169         WOLock atomic[1];                               // Atomic update in progress
170         uint split;                                             // right split page atomic insert
171         uint entry;                                             // entry slot in latch table
172         uint next;                                              // next entry in hash table chain
173         uint prev;                                              // prev entry in hash table chain
174         BtMutexLatch modify[1];                 // modify entry lite latch
175         volatile ushort pin;                    // number of accessing threads
176         volatile unsigned char dirty;   // page in cache is dirty (atomic set)
177         volatile unsigned char avail;   // page is an available entry
178 } BtLatchSet;
179
180 //      Define the length of the page record numbers
181
182 #define BtId 6
183
184 //      Page key slot definition.
185
186 //      Keys are marked dead, but remain on the page until
187 //      it cleanup is called. The fence key (highest key) for
188 //      a leaf page is always present, even after cleanup.
189
190 //      Slot types
191
192 //      In addition to the Unique keys that occupy slots
193 //      there are Librarian and Duplicate key
194 //      slots occupying the key slot array.
195
196 //      The Librarian slots are dead keys that
197 //      serve as filler, available to add new Unique
198 //      or Dup slots that are inserted into the B-tree.
199
200 //      The Duplicate slots have had their key bytes extended
201 //      by 6 bytes to contain a binary duplicate key uniqueifier.
202
203 typedef enum {
204         Unique,
205         Librarian,
206         Duplicate,
207         Delete,
208         Update
209 } BtSlotType;
210
211 typedef struct {
212         uint off:BT_maxbits;    // page offset for key start
213         uint type:3;                    // type of slot
214         uint dead:1;                    // set for deleted slot
215 } BtSlot;
216
217 //      The key structure occupies space at the upper end of
218 //      each page.  It's a length byte followed by the key
219 //      bytes.
220
221 typedef struct {
222         unsigned char len;              // this can be changed to a ushort or uint
223         unsigned char key[0];
224 } BtKey;
225
226 //      the value structure also occupies space at the upper
227 //      end of the page. Each key is immediately followed by a value.
228
229 typedef struct {
230         unsigned char len;              // this can be changed to a ushort or uint
231         unsigned char value[0];
232 } BtVal;
233
234 #define BT_maxkey       255             // maximum number of bytes in a key
235 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
236
237 //      The first part of an index page.
238 //      It is immediately followed
239 //      by the BtSlot array of keys.
240
241 //      note that this structure size
242 //      must be a multiple of 8 bytes
243 //      in order to place dups correctly.
244
245 typedef struct BtPage_ {
246         uint cnt;                                       // count of keys in page
247         uint act;                                       // count of active keys
248         uint min;                                       // next key offset
249         uint garbage;                           // page garbage in bytes
250         unsigned char bits:7;           // page size in bits
251         unsigned char free:1;           // page is on free chain
252         unsigned char lvl:7;            // level of page
253         unsigned char kill:1;           // page is being deleted
254         unsigned char right[BtId];      // page number to right
255         unsigned char left[BtId];       // page number to left
256         unsigned char filler[2];        // padding to multiple of 8
257         logseqno lsn;                           // log sequence number applied
258 } *BtPage;
259
260 //  The loadpage interface object
261
262 typedef struct {
263         BtPage page;            // current page pointer
264         BtLatchSet *latch;      // current page latch set
265 } BtPageSet;
266
267 //      structure for latch manager on ALLOC_page
268
269 typedef struct {
270         struct BtPage_ alloc[1];        // next page_no in right ptr
271         unsigned long long dups[1];     // global duplicate key uniqueifier
272         unsigned char chain[BtId];      // head of free page_nos chain
273 } BtPageZero;
274
275 //      The object structure for Btree access
276
277 typedef struct {
278         uint page_size;                         // page size    
279         uint page_bits;                         // page size in bits    
280 #ifdef unix
281         int idx;
282 #else
283         HANDLE idx;
284 #endif
285         BtPageZero *pagezero;           // mapped allocation page
286         BtHashEntry *hashtable;         // the buffer pool hash table entries
287         BtLatchSet *latchsets;          // mapped latch set from buffer pool
288         unsigned char *pagepool;        // mapped to the buffer pool pages
289         unsigned char *redobuff;        // mapped recovery buffer pointer
290         logseqno lsn, flushlsn;         // current & first lsn flushed
291         BtMutexLatch dump[1];           // redo dump lite latch
292         BtMutexLatch redo[1];           // redo area lite latch
293         BtMutexLatch lock[1];           // allocation area lite latch
294         ushort thread_no[1];            // next thread number
295         uint nlatchpage;                        // number of latch pages at BT_latch
296         uint latchtotal;                        // number of page latch entries
297         uint latchhash;                         // number of latch hash table slots
298         uint latchvictim;                       // next latch entry to examine
299         uint latchavail;                        // next available latch entry
300         uint availlock[1];                      // latch available chain commitments
301         uint available;                         // size of latch available chain
302         uint redopages;                         // size of recovery buff in pages
303         uint redoend;                           // eof/end element in recovery buff
304 #ifndef unix
305         HANDLE halloc;                          // allocation handle
306         HANDLE hpool;                           // buffer pool handle
307 #endif
308 } BtMgr;
309
310 typedef struct {
311         BtMgr *mgr;                             // buffer manager for thread
312         BtPage cursor;                  // cached frame for start/next (never mapped)
313         BtPage frame;                   // spare frame for the page split (never mapped)
314         uid cursor_page;                                // current cursor page number   
315         unsigned char *mem;                             // frame, cursor, page memory buffer
316         unsigned char key[BT_keyarray]; // last found complete key
317         int found;                              // last delete or insert was found
318         int err;                                // last error
319         int line;                               // last error line no
320         int reads, writes;              // number of reads and writes from the btree
321         ushort thread_no;               // thread number
322 } BtDb;
323
324 //      Catastrophic errors
325
326 typedef enum {
327         BTERR_ok = 0,
328         BTERR_struct,
329         BTERR_ovflw,
330         BTERR_lock,
331         BTERR_map,
332         BTERR_read,
333         BTERR_wrt,
334         BTERR_atomic,
335         BTERR_recovery,
336         BTERR_avail
337 } BTERR;
338
339 #define CLOCK_bit 0x8000
340
341 // recovery manager entry types
342
343 typedef enum {
344         BTRM_eof = 0,   // rest of buffer is emtpy
345         BTRM_add,               // add a unique key-value to btree
346         BTRM_dup,               // add a duplicate key-value to btree
347         BTRM_del,               // delete a key-value from btree
348         BTRM_upd,               // update a key with a new value
349         BTRM_new,               // allocate a new empty page
350         BTRM_old                // reuse an old empty page
351 } BTRM;
352
353 // recovery manager entry
354 //      structure followed by BtKey & BtVal
355
356 typedef struct {
357         logseqno reqlsn;        // log sequence number required
358         logseqno lsn;           // log sequence number for entry
359         uint len;                       // length of entry
360         unsigned char type;     // type of entry
361         unsigned char lvl;      // level of btree entry pertains to
362 } BtLogHdr;
363
364 // B-Tree functions
365
366 extern void bt_close (BtDb *bt);
367 extern BtDb *bt_open (BtMgr *mgr);
368 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
369 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
370 extern void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
371 extern void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
372 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
373 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
374 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
375 extern BtKey *bt_foundkey (BtDb *bt);
376 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
377 extern uint bt_nextkey   (BtDb *bt, uint slot);
378
379 //      manager functions
380 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint rmpages);
381 extern void bt_mgrclose (BtMgr *mgr);
382 extern logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val);
383 extern logseqno bt_txnredo (BtDb *bt, BtPage page);
384
385 //  Helper functions to return slot values
386 //      from the cursor page.
387
388 extern BtKey *bt_key (BtDb *bt, uint slot);
389 extern BtVal *bt_val (BtDb *bt, uint slot);
390
391 //  The page is allocated from low and hi ends.
392 //  The key slots are allocated from the bottom,
393 //      while the text and value of the key
394 //  are allocated from the top.  When the two
395 //  areas meet, the page is split into two.
396
397 //  A key consists of a length byte, two bytes of
398 //  index number (0 - 65535), and up to 253 bytes
399 //  of key value.
400
401 //  Associated with each key is a value byte string
402 //      containing any value desired.
403
404 //  The b-tree root is always located at page 1.
405 //      The first leaf page of level zero is always
406 //      located on page 2.
407
408 //      The b-tree pages are linked with next
409 //      pointers to facilitate enumerators,
410 //      and provide for concurrency.
411
412 //      When to root page fills, it is split in two and
413 //      the tree height is raised by a new root at page
414 //      one with two keys.
415
416 //      Deleted keys are marked with a dead bit until
417 //      page cleanup. The fence key for a leaf node is
418 //      always present
419
420 //  To achieve maximum concurrency one page is locked at a time
421 //  as the tree is traversed to find leaf key in question. The right
422 //  page numbers are used in cases where the page is being split,
423 //      or consolidated.
424
425 //  Page 0 is dedicated to lock for new page extensions,
426 //      and chains empty pages together for reuse. It also
427 //      contains the latch manager hash table.
428
429 //      The ParentModification lock on a node is obtained to serialize posting
430 //      or changing the fence key for a node.
431
432 //      Empty pages are chained together through the ALLOC page and reused.
433
434 //      Access macros to address slot and key values from the page
435 //      Page slots use 1 based indexing.
436
437 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
438 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
439 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
440
441 void bt_putid(unsigned char *dest, uid id)
442 {
443 int i = BtId;
444
445         while( i-- )
446                 dest[i] = (unsigned char)id, id >>= 8;
447 }
448
449 uid bt_getid(unsigned char *src)
450 {
451 uid id = 0;
452 int i;
453
454         for( i = 0; i < BtId; i++ )
455                 id <<= 8, id |= *src++; 
456
457         return id;
458 }
459
460 uid bt_newdup (BtDb *bt)
461 {
462 #ifdef unix
463         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
464 #else
465         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
466 #endif
467 }
468
469 //      Write-Only Queue Lock
470
471 void WriteOLock (WOLock *lock, ushort tid)
472 {
473 ushort tix;
474
475         if( lock->tid == tid ) {
476                 lock->dup++;
477                 return;
478         }
479 #ifdef unix
480         tix = __sync_fetch_and_add (lock->ticket, 1);
481 #else
482         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
483 #endif
484         // wait for our ticket to come up
485
486         while( tix != lock->serving[0] )
487 #ifdef unix
488                 sched_yield();
489 #else
490                 SwitchToThread ();
491 #endif
492         lock->tid = tid;
493 }
494
495 void WriteORelease (WOLock *lock)
496 {
497         if( lock->dup ) {
498                 lock->dup--;
499                 return;
500         }
501
502         lock->tid = 0;
503         lock->serving[0]++;
504 }
505
506 //      Phase-Fair reader/writer lock implementation
507
508 void WriteLock (RWLock *lock, ushort tid)
509 {
510 ushort w, r, tix;
511
512         if( lock->tid == tid ) {
513                 lock->dup++;
514                 return;
515         }
516 #ifdef unix
517         tix = __sync_fetch_and_add (lock->ticket, 1);
518 #else
519         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
520 #endif
521         // wait for our ticket to come up
522
523         while( tix != lock->serving[0] )
524 #ifdef unix
525                 sched_yield();
526 #else
527                 SwitchToThread ();
528 #endif
529
530         w = PRES | (tix & PHID);
531 #ifdef  unix
532         r = __sync_fetch_and_add (lock->rin, w);
533 #else
534         r = _InterlockedExchangeAdd16 (lock->rin, w);
535 #endif
536         while( r != *lock->rout )
537 #ifdef unix
538                 sched_yield();
539 #else
540                 SwitchToThread();
541 #endif
542         lock->tid = tid;
543 }
544
545 void WriteRelease (RWLock *lock)
546 {
547         if( lock->dup ) {
548                 lock->dup--;
549                 return;
550         }
551
552         lock->tid = 0;
553 #ifdef unix
554         __sync_fetch_and_and (lock->rin, ~MASK);
555 #else
556         _InterlockedAnd16 (lock->rin, ~MASK);
557 #endif
558         lock->serving[0]++;
559 }
560
561 //      try to obtain read lock
562 //  return 1 if successful
563
564 int ReadTry (RWLock *lock, ushort tid)
565 {
566 ushort w;
567
568         //  OK if write lock already held by same thread
569
570         if( lock->tid == tid ) {
571                 lock->dup++;
572                 return 1;
573         }
574 #ifdef unix
575         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
576 #else
577         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
578 #endif
579         if( w )
580           if( w == (*lock->rin & MASK) ) {
581 #ifdef unix
582                 __sync_fetch_and_add (lock->rin, -RINC);
583 #else
584                 _InterlockedExchangeAdd16 (lock->rin, -RINC);
585 #endif
586                 return 0;
587           }
588
589         return 1;
590 }
591
592 void ReadLock (RWLock *lock, ushort tid)
593 {
594 ushort w;
595         if( lock->tid == tid ) {
596                 lock->dup++;
597                 return;
598         }
599 #ifdef unix
600         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
601 #else
602         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
603 #endif
604         if( w )
605           while( w == (*lock->rin & MASK) )
606 #ifdef unix
607                 sched_yield ();
608 #else
609                 SwitchToThread ();
610 #endif
611 }
612
613 void ReadRelease (RWLock *lock)
614 {
615         if( lock->dup ) {
616                 lock->dup--;
617                 return;
618         }
619
620 #ifdef unix
621         __sync_fetch_and_add (lock->rout, RINC);
622 #else
623         _InterlockedExchangeAdd16 (lock->rout, RINC);
624 #endif
625 }
626
627 //      lite weight spin lock Latch Manager
628
629 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
630 {
631         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
632 }
633
634 void bt_mutexlock(BtMutexLatch *latch)
635 {
636 unsigned char prev;
637
638   do {
639 #ifdef  unix
640         prev = __sync_fetch_and_or(latch->exclusive, XCL);
641 #else
642         prev = _InterlockedOr8(latch->exclusive, XCL);
643 #endif
644         if( !(prev & XCL) )
645                 return;
646 #ifdef  unix
647   } while( sched_yield(), 1 );
648 #else
649   } while( SwitchToThread(), 1 );
650 #endif
651 }
652
653 //      try to obtain write lock
654
655 //      return 1 if obtained,
656 //              0 otherwise
657
658 int bt_mutextry(BtMutexLatch *latch)
659 {
660 uint prev;
661
662 #ifdef  unix
663         prev = __sync_fetch_and_or(latch->exclusive, XCL);
664 #else
665         prev = _InterlockedOr8(latch->exclusive, XCL);
666 #endif
667         //      take write access if exclusive bit is clear
668
669         return !(prev & XCL);
670 }
671
672 //      clear write mode
673
674 void bt_releasemutex(BtMutexLatch *latch)
675 {
676         *latch->exclusive = 0;
677 }
678
679 //      recovery manager -- flush dirty pages
680
681 void bt_flushlsn (BtDb *bt)
682 {
683 uint cnt3 = 0, cnt2 = 0, cnt = 0;
684 BtLatchSet *latch;
685 BtPage page;
686 uint entry;
687
688   //    flush dirty pool pages to the btree that were
689   //    dirty before the start of this redo recovery buffer
690 fprintf(stderr, "Start flushlsn\n");
691   for( entry = 1; entry < bt->mgr->latchtotal; entry++ ) {
692         page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
693         latch = bt->mgr->latchsets + entry;
694         bt_mutexlock (latch->modify);
695         bt_lockpage(bt, BtLockRead, latch);
696
697         if( latch->dirty ) {
698           bt_writepage(bt->mgr, page, latch->page_no);
699           latch->dirty = 0, bt->writes++, cnt++;
700         }
701 if( latch->avail )
702 cnt3++;
703 if( latch->pin & ~CLOCK_bit )
704 cnt2++;
705         bt_unlockpage(bt, BtLockRead, latch);
706         bt_releasemutex (latch->modify);
707   }
708 fprintf(stderr, "End flushlsn %d pages  %d pinned  %d available\n", cnt, cnt2, cnt3);
709 }
710
711 //      recovery manager -- process current recovery buff on startup
712 //      this won't do much if previous session was properly closed.
713
714 BTERR bt_recoveryredo (BtMgr *mgr)
715 {
716 BtLogHdr *hdr, *eof;
717 uint offset = 0;
718 BtKey *key;
719 BtVal *val;
720
721   pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
722
723   hdr = (BtLogHdr *)mgr->redobuff;
724   mgr->flushlsn = hdr->lsn;
725
726   while( 1 ) {
727         hdr = (BtLogHdr *)(mgr->redobuff + offset);
728         switch( hdr->type ) {
729         case BTRM_eof:
730                 mgr->lsn = hdr->lsn;
731                 return 0;
732         case BTRM_add:          // add a unique key-value to btree
733         
734         case BTRM_dup:          // add a duplicate key-value to btree
735         case BTRM_del:          // delete a key-value from btree
736         case BTRM_upd:          // update a key with a new value
737         case BTRM_new:          // allocate a new empty page
738         case BTRM_old:          // reuse an old empty page
739                 return 0;
740         }
741   }
742 }
743
744 //      recovery manager -- dump current recovery buff & flush dirty pages
745 //              in preparation for next recovery buffer.
746
747 BTERR bt_dumpredo (BtDb *bt)
748 {
749 BtLogHdr *eof;
750 fprintf(stderr, "Flush pages ");
751
752         eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
753         memset (eof, 0, sizeof(BtLogHdr));
754
755         //  flush pages written at beginning of this redo buffer
756         //      then write the redo buffer out to disk
757
758         fdatasync (bt->mgr->idx);
759
760 fprintf(stderr, "Dump ReDo: %d bytes\n", bt->mgr->redoend);
761         pwrite (bt->mgr->idx, bt->mgr->redobuff, bt->mgr->redoend + sizeof(BtLogHdr), REDO_page << bt->mgr->page_bits);
762
763         sync_file_range (bt->mgr->idx, REDO_page << bt->mgr->page_bits, bt->mgr->redoend + sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
764
765         bt->mgr->flushlsn = bt->mgr->lsn;
766         bt->mgr->redoend = 0;
767
768         eof = (BtLogHdr *)(bt->mgr->redobuff);
769         memset (eof, 0, sizeof(BtLogHdr));
770         eof->lsn = bt->mgr->lsn;
771         return 0;
772 }
773
774 //      recovery manager -- append new entry to recovery log
775 //      flush to disk when it overflows.
776
777 logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val)
778 {
779 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
780 uint amt = sizeof(BtLogHdr);
781 BtLogHdr *hdr, *eof;
782 uint flush;
783
784         bt_mutexlock (bt->mgr->redo);
785
786         if( key )
787           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
788
789         //      see if new entry fits in buffer
790         //      flush and reset if it doesn't
791
792         if( flush = amt > size - bt->mgr->redoend ) {
793           bt_mutexlock (bt->mgr->dump);
794
795           if( bt_dumpredo (bt) )
796                 return 0;
797         }
798
799         //      fill in new entry & either eof or end block
800
801         hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
802
803         hdr->len = amt;
804         hdr->type = type;
805         hdr->lvl = lvl;
806         hdr->lsn = ++bt->mgr->lsn;
807
808         bt->mgr->redoend += amt;
809
810         eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
811         memset (eof, 0, sizeof(BtLogHdr));
812
813         //  fill in key and value
814
815         if( key ) {
816           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
817           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
818         }
819
820         bt_releasemutex(bt->mgr->redo);
821
822         if( flush ) {
823           bt_flushlsn (bt);
824           bt_releasemutex(bt->mgr->dump);
825         }
826
827         return hdr->lsn;
828 }
829
830 //      recovery manager -- append transaction to recovery log
831 //      flush to disk when it overflows.
832
833 logseqno bt_txnredo (BtDb *bt, BtPage source)
834 {
835 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
836 uint amt = 0, src, type;
837 BtLogHdr *hdr, *eof;
838 logseqno lsn;
839 uint flush;
840 BtKey *key;
841 BtVal *val;
842
843         //      determine amount of redo recovery log space required
844
845         for( src = 0; src++ < source->cnt; ) {
846           key = keyptr(source,src);
847           val = valptr(source,src);
848           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
849           amt += sizeof(BtLogHdr);
850         }
851
852         bt_mutexlock (bt->mgr->redo);
853
854         //      see if new entry fits in buffer
855         //      flush and reset if it doesn't
856
857         if( flush = amt > size - bt->mgr->redoend ) {
858           bt_mutexlock (bt->mgr->dump);
859
860           if( bt_dumpredo (bt) )
861                 return 0;
862         }
863
864         //      assign new lsn to transaction
865
866         lsn = ++bt->mgr->lsn;
867
868         //      fill in new entries
869
870         for( src = 0; src++ < source->cnt; ) {
871           key = keyptr(source, src);
872           val = valptr(source, src);
873
874           switch( slotptr(source, src)->type ) {
875           case Unique:
876                 type = BTRM_add;
877                 break;
878           case Duplicate:
879                 type = BTRM_dup;
880                 break;
881           case Delete:
882                 type = BTRM_del;
883                 break;
884           case Update:
885                 type = BTRM_upd;
886                 break;
887           }
888
889           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
890           amt += sizeof(BtLogHdr);
891
892           hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
893           hdr->len = amt;
894           hdr->type = type;
895           hdr->lsn = lsn;
896           hdr->lvl = 0;
897
898           //  fill in key and value
899
900           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
901           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
902
903           bt->mgr->redoend += amt;
904         }
905
906         eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
907         memset (eof, 0, sizeof(BtLogHdr));
908
909         bt_releasemutex(bt->mgr->redo);
910
911         if( flush ) {
912           bt_flushlsn (bt);
913           bt_releasemutex(bt->mgr->dump);
914         }
915
916         return lsn;
917 }
918
919 //      read page into buffer pool from permanent location in Btree file
920
921 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
922 {
923 off64_t off = page_no << mgr->page_bits;
924
925 #ifdef unix
926         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
927                 fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno);
928                 return BTERR_read;
929         }
930 #else
931 OVERLAPPED ovl[1];
932 uint amt[1];
933
934         memset (ovl, 0, sizeof(OVERLAPPED));
935         ovl->Offset = off;
936         ovl->OffsetHigh = off >> 32;
937
938         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
939                 fprintf (stderr, "Unable to read page %d GetLastError = %d\n", page_no, GetLastError());
940                 return BTERR_read;
941         }
942         if( *amt <  mgr->page_size ) {
943                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
944                 return BTERR_read;
945         }
946 #endif
947         return 0;
948 }
949
950 //      write page to permanent location in Btree file
951 //      clear the dirty bit
952
953 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
954 {
955 off64_t off = page_no << mgr->page_bits;
956
957 #ifdef unix
958         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
959                 return BTERR_wrt;
960 #else
961 OVERLAPPED ovl[1];
962 uint amt[1];
963
964         memset (ovl, 0, sizeof(OVERLAPPED));
965         ovl->Offset = off;
966         ovl->OffsetHigh = off >> 32;
967
968         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
969                 return BTERR_wrt;
970
971         if( *amt <  mgr->page_size )
972                 return BTERR_wrt;
973 #endif
974         return 0;
975 }
976
977 //      set CLOCK bit in latch
978 //      decrement pin count
979
980 void bt_unpinlatch (BtLatchSet *latch)
981 {
982         bt_mutexlock(latch->modify);
983         latch->pin |= CLOCK_bit;
984         latch->pin--;
985         bt_releasemutex(latch->modify);
986 }
987
988 //  return the btree cached page address
989 //      if page is dirty and has not yet been
990 //      flushed to disk for the current redo
991 //      recovery buffer, write it out.
992
993 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
994 {
995 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
996
997   return page;
998 }
999
1000 //  return next available latch entry
1001 //        and with latch entry locked
1002
1003 uint bt_availnext (BtDb *bt)
1004 {
1005 BtLatchSet *latch;
1006 uint entry;
1007
1008   while( 1 ) {
1009 #ifdef unix
1010         entry = __sync_fetch_and_add (&bt->mgr->latchavail, 1) + 1;
1011 #else
1012         entry = _InterlockedIncrement (&bt->mgr->latchavail);
1013 #endif
1014         entry %= bt->mgr->latchtotal;
1015
1016         if( !entry )
1017                 continue;
1018
1019         latch = bt->mgr->latchsets + entry;
1020
1021         if( !latch->avail )
1022                 continue;
1023
1024         bt_mutexlock(latch->modify);
1025
1026         if( !latch->avail ) {
1027                 bt_releasemutex(latch->modify);
1028                 continue;
1029         }
1030
1031         return entry;
1032   }
1033 }
1034
1035 //      find and add the next available latch entry
1036 //      to the queue
1037
1038 BTERR bt_availlatch (BtDb *bt)
1039 {
1040 BtLatchSet *latch;
1041 uint startattempt;
1042 uint cnt, entry;
1043 uint hashidx;
1044 BtPage page;
1045
1046   //  find and reuse previous entry on victim
1047
1048   startattempt = bt->mgr->latchvictim;
1049
1050   while( 1 ) {
1051 #ifdef unix
1052         entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
1053 #else
1054         entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
1055 #endif
1056         //      skip entry if it has outstanding pins
1057
1058         entry %= bt->mgr->latchtotal;
1059
1060         if( !entry )
1061                 continue;
1062
1063         //      only go around one time before
1064         //      flushing redo recovery buffer,
1065         //      and the buffer pool to free up entries.
1066
1067         if( bt->mgr->redopages )
1068           if( bt->mgr->latchvictim - startattempt > bt->mgr->latchtotal ) {
1069             if( bt_mutextry (bt->mgr->dump) ) {
1070                  if( bt_dumpredo (bt) )
1071                   return bt->err;
1072                  bt_flushlsn (bt);
1073                 // synchronize the various threads running into this condition
1074                 //      so that only one thread does the dump and flush
1075                 } else
1076                         bt_mutexlock(bt->mgr->dump);
1077
1078                 startattempt = bt->mgr->latchvictim;
1079                 bt_releasemutex(bt->mgr->dump);
1080           }
1081
1082         latch = bt->mgr->latchsets + entry;
1083
1084         if( latch->avail )
1085           continue;
1086
1087         bt_mutexlock(latch->modify);
1088
1089         //      skip if already an available entry
1090
1091         if( latch->avail ) {
1092           bt_releasemutex(latch->modify);
1093           continue;
1094         }
1095
1096         //  skip this entry if it is pinned
1097         //      if the CLOCK bit is set
1098         //      reset it to zero.
1099
1100         if( latch->pin ) {
1101           latch->pin &= ~CLOCK_bit;
1102           bt_releasemutex(latch->modify);
1103           continue;
1104         }
1105
1106         page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
1107
1108         //      if dirty page has lsn >= last redo recovery buffer
1109         //      then hold page in the buffer pool until next redo
1110         //      recovery buffer is being written to disk.
1111
1112         if( latch->dirty )
1113           if( page->lsn >= bt->mgr->flushlsn ) {
1114             bt_releasemutex(latch->modify);
1115                 continue;
1116           }
1117
1118         //      entry is available
1119 #ifdef unix
1120         __sync_fetch_and_add (&bt->mgr->available, 1);
1121 #else
1122         _InterlockedIncrement(&bt->mgr->available);
1123 #endif
1124         latch->avail = 1;
1125         bt_releasemutex(latch->modify);
1126         return 0;
1127   }
1128 }
1129
1130 //      release available latch requests
1131
1132 void bt_availrelease (BtDb *bt, uint count)
1133 {
1134 #ifdef unix
1135         __sync_fetch_and_add(bt->mgr->availlock, -count);
1136 #else
1137         _InterlockedAdd(bt->mgr->availlock, -count);
1138 #endif
1139 }
1140
1141 //      commit available chain entries
1142 //      find available entries as required
1143
1144 void bt_availrequest (BtDb *bt, uint count)
1145 {
1146 #ifdef unix
1147         __sync_fetch_and_add(bt->mgr->availlock, count);
1148 #else
1149         _InterlockedAdd(bt->mgr->availlock, count);
1150 #endif
1151
1152         while( *bt->mgr->availlock > bt->mgr->available )
1153                 bt_availlatch (bt);
1154 }
1155
1156 //      find available latchset
1157 //      return with latchset pinned
1158
1159 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, BtPage loadit)
1160 {
1161 uint hashidx = page_no % bt->mgr->latchhash;
1162 BtLatchSet *latch;
1163 uint entry, idx;
1164 BtPage page;
1165
1166   //  try to find our entry
1167
1168   bt_mutexlock(bt->mgr->hashtable[hashidx].latch);
1169
1170   if( entry = bt->mgr->hashtable[hashidx].entry ) do
1171   {
1172         latch = bt->mgr->latchsets + entry;
1173         if( page_no == latch->page_no )
1174                 break;
1175   } while( entry = latch->next );
1176
1177   //  found our entry: increment pin
1178   //  remove from available status
1179
1180   if( entry ) {
1181         latch = bt->mgr->latchsets + entry;
1182         bt_mutexlock(latch->modify);
1183         if( latch->avail )
1184 #ifdef unix
1185           __sync_fetch_and_add (&bt->mgr->available, -1);
1186 #else
1187           _InterlockedDecrement(&bt->mgr->available);
1188 #endif
1189         latch->avail = 0;
1190         latch->pin |= CLOCK_bit;
1191         latch->pin++;
1192
1193         bt_releasemutex(latch->modify);
1194         bt_releasemutex(bt->mgr->hashtable[hashidx].latch);
1195         return latch;
1196   }
1197
1198   //  find and reuse entry from available set
1199
1200 trynext:
1201
1202   if( entry = bt_availnext (bt) )
1203         latch = bt->mgr->latchsets + entry;
1204   else
1205         return bt->line = __LINE__, bt->err = BTERR_avail, NULL;
1206
1207   idx = latch->page_no % bt->mgr->latchhash;
1208
1209   //  if latch is on a different hash chain
1210   //    unlink from the old page_no chain
1211
1212   if( latch->page_no )
1213    if( idx != hashidx ) {
1214
1215         //  skip over this entry if latch not available
1216
1217     if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
1218           bt_releasemutex(latch->modify);
1219           goto trynext;
1220         }
1221
1222         if( latch->prev )
1223           bt->mgr->latchsets[latch->prev].next = latch->next;
1224         else
1225           bt->mgr->hashtable[idx].entry = latch->next;
1226
1227         if( latch->next )
1228           bt->mgr->latchsets[latch->next].prev = latch->prev;
1229
1230     bt_releasemutex (bt->mgr->hashtable[idx].latch);
1231    }
1232
1233   //  remove available status
1234
1235   latch->avail = 0;
1236 #ifdef unix
1237   __sync_fetch_and_add (&bt->mgr->available, -1);
1238 #else
1239   _InterlockedDecrement(&bt->mgr->available);
1240 #endif
1241   page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
1242
1243   if( latch->dirty )
1244         if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
1245           return bt->line = __LINE__, NULL;
1246         else
1247           latch->dirty = 0, bt->writes++;
1248
1249   if( loadit ) {
1250         memcpy (page, loadit, bt->mgr->page_size);
1251         latch->dirty = 1;
1252   } else
1253         if( bt->err = bt_readpage (bt->mgr, page, page_no) )
1254           return bt->line = __LINE__, NULL;
1255         else
1256           bt->reads++;
1257
1258   //  link page as head of hash table chain
1259   //  if this is a never before used entry,
1260   //  or it was previously on a different
1261   //  hash table chain. Otherwise, just
1262   //  leave it in its current hash table
1263   //  chain position.
1264
1265   if( !latch->page_no || hashidx != idx ) {
1266         if( latch->next = bt->mgr->hashtable[hashidx].entry )
1267           bt->mgr->latchsets[latch->next].prev = entry;
1268
1269         bt->mgr->hashtable[hashidx].entry = entry;
1270     latch->prev = 0;
1271   }
1272
1273   //  fill in latch structure
1274
1275   latch->pin = CLOCK_bit | 1;
1276   latch->page_no = page_no;
1277   latch->entry = entry;
1278   latch->split = 0;
1279
1280   bt_releasemutex (latch->modify);
1281   bt_releasemutex (bt->mgr->hashtable[hashidx].latch);
1282   return latch;
1283 }
1284   
1285 void bt_mgrclose (BtMgr *mgr)
1286 {
1287 BtLatchSet *latch;
1288 BtLogHdr *eof;
1289 uint num = 0;
1290 BtPage page;
1291 uint slot;
1292
1293         //      flush previously written dirty pages
1294         //      and write recovery buffer to disk
1295
1296         fdatasync (mgr->idx);
1297
1298         if( mgr->redoend ) {
1299                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1300                 memset (eof, 0, sizeof(BtLogHdr));
1301
1302                 pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1303         }
1304
1305         //      write remaining dirty pool pages to the btree
1306
1307         for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1308                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1309                 latch = mgr->latchsets + slot;
1310
1311                 if( latch->dirty ) {
1312                         bt_writepage(mgr, page, latch->page_no);
1313                         latch->dirty = 0, num++;
1314                 }
1315         }
1316
1317         //      flush last batch to disk and clear
1318         //      redo recovery buffer on disk.
1319
1320         fdatasync (mgr->idx);
1321
1322         if( mgr->redopages ) {
1323                 eof = (BtLogHdr *)mgr->redobuff;
1324                 memset (eof, 0, sizeof(BtLogHdr));
1325                 eof->lsn = mgr->lsn;
1326
1327                 pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1328
1329                 sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
1330         }
1331
1332         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1333
1334 #ifdef unix
1335         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1336         munmap (mgr->pagezero, mgr->page_size);
1337 #else
1338         FlushViewOfFile(mgr->pagezero, 0);
1339         UnmapViewOfFile(mgr->pagezero);
1340         UnmapViewOfFile(mgr->pagepool);
1341         CloseHandle(mgr->halloc);
1342         CloseHandle(mgr->hpool);
1343 #endif
1344 #ifdef unix
1345         if( mgr->redopages )
1346                 free (mgr->redobuff);
1347         close (mgr->idx);
1348         free (mgr);
1349 #else
1350         if( mgr->redopages )
1351                 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1352         FlushFileBuffers(mgr->idx);
1353         CloseHandle(mgr->idx);
1354         GlobalFree (mgr);
1355 #endif
1356 }
1357
1358 //      close and release memory
1359
1360 void bt_close (BtDb *bt)
1361 {
1362 #ifdef unix
1363         if( bt->mem )
1364                 free (bt->mem);
1365 #else
1366         if( bt->mem)
1367                 VirtualFree (bt->mem, 0, MEM_RELEASE);
1368 #endif
1369         free (bt);
1370 }
1371
1372 //  open/create new btree buffer manager
1373
1374 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1375 //              size of page pool (e.g. 262144)
1376
1377 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1378 {
1379 uint lvl, attr, last, slot, idx;
1380 uint nlatchpage, latchhash;
1381 unsigned char value[BtId];
1382 int flag, initit = 0;
1383 BtPageZero *pagezero;
1384 BtLatchSet *latch;
1385 off64_t size;
1386 uint amt[1];
1387 BtMgr* mgr;
1388 BtKey* key;
1389 BtVal *val;
1390
1391         // determine sanity of page size and buffer pool
1392
1393         if( bits > BT_maxbits )
1394                 bits = BT_maxbits;
1395         else if( bits < BT_minbits )
1396                 bits = BT_minbits;
1397
1398         if( nodemax < 16 ) {
1399                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
1400                 return NULL;
1401         }
1402
1403 #ifdef unix
1404         mgr = calloc (1, sizeof(BtMgr));
1405
1406         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1407
1408         if( mgr->idx == -1 ) {
1409                 fprintf (stderr, "Unable to open btree file\n");
1410                 return free(mgr), NULL;
1411         }
1412 #else
1413         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1414         attr = FILE_ATTRIBUTE_NORMAL;
1415         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1416
1417         if( mgr->idx == INVALID_HANDLE_VALUE )
1418                 return GlobalFree(mgr), NULL;
1419 #endif
1420
1421 #ifdef unix
1422         pagezero = valloc (BT_maxpage);
1423         *amt = 0;
1424
1425         // read minimum page size to get root info
1426         //      to support raw disk partition files
1427         //      check if bits == 0 on the disk.
1428
1429         if( size = lseek (mgr->idx, 0L, 2) )
1430                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1431                         if( pagezero->alloc->bits )
1432                                 bits = pagezero->alloc->bits;
1433                         else
1434                                 initit = 1;
1435                 else
1436                         return free(mgr), free(pagezero), NULL;
1437         else
1438                 initit = 1;
1439 #else
1440         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1441         size = GetFileSize(mgr->idx, amt);
1442
1443         if( size || *amt ) {
1444                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1445                         return bt_mgrclose (mgr), NULL;
1446                 bits = pagezero->alloc->bits;
1447         } else
1448                 initit = 1;
1449 #endif
1450
1451         mgr->page_size = 1 << bits;
1452         mgr->page_bits = bits;
1453
1454         //  calculate number of latch hash table entries
1455
1456         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1457
1458         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1459         mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1460         mgr->latchtotal = nodemax;
1461
1462         if( !initit )
1463                 goto mgrlatch;
1464
1465         // initialize an empty b-tree with latch page, root page, page of leaves
1466         // and page(s) of latches and page pool cache
1467
1468         memset (pagezero, 0, 1 << bits);
1469         pagezero->alloc->bits = mgr->page_bits;
1470         bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
1471
1472         //  initialize left-most LEAF page in
1473         //      alloc->left.
1474
1475         bt_putid (pagezero->alloc->left, LEAF_page);
1476
1477         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1478                 fprintf (stderr, "Unable to create btree page zero\n");
1479                 return bt_mgrclose (mgr), NULL;
1480         }
1481
1482         memset (pagezero, 0, 1 << bits);
1483         pagezero->alloc->bits = mgr->page_bits;
1484
1485         for( lvl=MIN_lvl; lvl--; ) {
1486                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1487                 key = keyptr(pagezero->alloc, 1);
1488                 key->len = 2;           // create stopper key
1489                 key->key[0] = 0xff;
1490                 key->key[1] = 0xff;
1491
1492                 bt_putid(value, MIN_lvl - lvl + 1);
1493                 val = valptr(pagezero->alloc, 1);
1494                 val->len = lvl ? BtId : 0;
1495                 memcpy (val->value, value, val->len);
1496
1497                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1498                 pagezero->alloc->lvl = lvl;
1499                 pagezero->alloc->cnt = 1;
1500                 pagezero->alloc->act = 1;
1501
1502                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1503                         fprintf (stderr, "Unable to create btree page zero\n");
1504                         return bt_mgrclose (mgr), NULL;
1505                 }
1506         }
1507
1508 mgrlatch:
1509 #ifdef unix
1510         free (pagezero);
1511 #else
1512         VirtualFree (pagezero, 0, MEM_RELEASE);
1513 #endif
1514 #ifdef unix
1515         // mlock the pagezero page
1516
1517         flag = PROT_READ | PROT_WRITE;
1518         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1519         if( mgr->pagezero == MAP_FAILED ) {
1520                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1521                 return bt_mgrclose (mgr), NULL;
1522         }
1523         mlock (mgr->pagezero, mgr->page_size);
1524
1525         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1526         if( mgr->pagepool == MAP_FAILED ) {
1527                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1528                 return bt_mgrclose (mgr), NULL;
1529         }
1530         if( mgr->redopages = redopages )
1531                 mgr->redobuff = valloc (redopages * mgr->page_size);
1532 #else
1533         flag = PAGE_READWRITE;
1534         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1535         if( !mgr->halloc ) {
1536                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1537                 return bt_mgrclose (mgr), NULL;
1538         }
1539
1540         flag = FILE_MAP_WRITE;
1541         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1542         if( !mgr->pagezero ) {
1543                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1544                 return bt_mgrclose (mgr), NULL;
1545         }
1546
1547         flag = PAGE_READWRITE;
1548         size = (uid)mgr->nlatchpage << mgr->page_bits;
1549         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1550         if( !mgr->hpool ) {
1551                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1552                 return bt_mgrclose (mgr), NULL;
1553         }
1554
1555         flag = FILE_MAP_WRITE;
1556         mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1557         if( !mgr->pagepool ) {
1558                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1559                 return bt_mgrclose (mgr), NULL;
1560         }
1561         if( mgr->redopages = redopages )
1562                 mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
1563 #endif
1564
1565         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1566         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1567         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1568
1569         //      mark all pool entries as available
1570
1571         for( idx = 1; idx < mgr->latchtotal; idx++ ) {
1572                 latch = mgr->latchsets + idx;
1573                 latch->avail = 1;
1574                 mgr->available++;
1575         }
1576
1577         return mgr;
1578 }
1579
1580 //      open BTree access method
1581 //      based on buffer manager
1582
1583 BtDb *bt_open (BtMgr *mgr)
1584 {
1585 BtDb *bt = malloc (sizeof(*bt));
1586
1587         memset (bt, 0, sizeof(*bt));
1588         bt->mgr = mgr;
1589 #ifdef unix
1590         bt->mem = valloc (2 *mgr->page_size);
1591 #else
1592         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1593 #endif
1594         bt->frame = (BtPage)bt->mem;
1595         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1596 #ifdef unix
1597         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1598 #else
1599         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1600 #endif
1601         return bt;
1602 }
1603
1604 //  compare two keys, return > 0, = 0, or < 0
1605 //  =0: keys are same
1606 //  -1: key2 > key1
1607 //  +1: key2 < key1
1608 //  as the comparison value
1609
1610 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1611 {
1612 uint len1 = key1->len;
1613 int ans;
1614
1615         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1616                 return ans;
1617
1618         if( len1 > len2 )
1619                 return 1;
1620         if( len1 < len2 )
1621                 return -1;
1622
1623         return 0;
1624 }
1625
1626 // place write, read, or parent lock on requested page_no.
1627
1628 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1629 {
1630         switch( mode ) {
1631         case BtLockRead:
1632                 ReadLock (latch->readwr, bt->thread_no);
1633                 break;
1634         case BtLockWrite:
1635                 WriteLock (latch->readwr, bt->thread_no);
1636                 break;
1637         case BtLockAccess:
1638                 ReadLock (latch->access, bt->thread_no);
1639                 break;
1640         case BtLockDelete:
1641                 WriteLock (latch->access, bt->thread_no);
1642                 break;
1643         case BtLockParent:
1644                 WriteOLock (latch->parent, bt->thread_no);
1645                 break;
1646         case BtLockAtomic:
1647                 WriteOLock (latch->atomic, bt->thread_no);
1648                 break;
1649         case BtLockAtomic | BtLockRead:
1650                 WriteOLock (latch->atomic, bt->thread_no);
1651                 ReadLock (latch->readwr, bt->thread_no);
1652                 break;
1653         }
1654 }
1655
1656 // remove write, read, or parent lock on requested page
1657
1658 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1659 {
1660         switch( mode ) {
1661         case BtLockRead:
1662                 ReadRelease (latch->readwr);
1663                 break;
1664         case BtLockWrite:
1665                 WriteRelease (latch->readwr);
1666                 break;
1667         case BtLockAccess:
1668                 ReadRelease (latch->access);
1669                 break;
1670         case BtLockDelete:
1671                 WriteRelease (latch->access);
1672                 break;
1673         case BtLockParent:
1674                 WriteORelease (latch->parent);
1675                 break;
1676         case BtLockAtomic:
1677                 WriteORelease (latch->atomic);
1678                 break;
1679         case BtLockAtomic | BtLockRead:
1680                 WriteORelease (latch->atomic);
1681                 ReadRelease (latch->readwr);
1682                 break;
1683         }
1684 }
1685
1686 //      allocate a new page
1687 //      return with page latched, but unlocked.
1688
1689 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1690 {
1691 uid page_no;
1692 int blk;
1693
1694         //      lock allocation page
1695
1696         bt_mutexlock(bt->mgr->lock);
1697
1698         // use empty chain first
1699         // else allocate empty page
1700
1701         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1702                 if( set->latch = bt_pinlatch (bt, page_no, NULL) )
1703                         set->page = bt_mappage (bt, set->latch);
1704                 else
1705                         return bt->err = BTERR_struct, bt->line = __LINE__, -1;
1706
1707                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1708                 bt_releasemutex(bt->mgr->lock);
1709
1710                 memcpy (set->page, contents, bt->mgr->page_size);
1711                 set->latch->dirty = 1;
1712                 return 0;
1713         }
1714
1715         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1716         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1717
1718         // unlock allocation latch and
1719         //      extend file into new page.
1720
1721         bt_releasemutex(bt->mgr->lock);
1722
1723         //      don't load cache from btree page
1724
1725         if( set->latch = bt_pinlatch (bt, page_no, contents) )
1726                 set->page = bt_mappage (bt, set->latch);
1727         else
1728                 return bt->err;
1729
1730         set->latch->dirty = 1;
1731         return 0;
1732 }
1733
1734 //  find slot in page for given key at a given level
1735
1736 int bt_findslot (BtPage page, unsigned char *key, uint len)
1737 {
1738 uint diff, higher = page->cnt, low = 1, slot;
1739 uint good = 0;
1740
1741         //        make stopper key an infinite fence value
1742
1743         if( bt_getid (page->right) )
1744                 higher++;
1745         else
1746                 good++;
1747
1748         //      low is the lowest candidate.
1749         //  loop ends when they meet
1750
1751         //  higher is already
1752         //      tested as .ge. the passed key.
1753
1754         while( diff = higher - low ) {
1755                 slot = low + ( diff >> 1 );
1756                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1757                         low = slot + 1;
1758                 else
1759                         higher = slot, good++;
1760         }
1761
1762         //      return zero if key is on right link page
1763
1764         return good ? higher : 0;
1765 }
1766
1767 //  find and load page at given level for given key
1768 //      leave page rd or wr locked as requested
1769
1770 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1771 {
1772 uid page_no = ROOT_page, prevpage = 0;
1773 uint drill = 0xff, slot;
1774 BtLatchSet *prevlatch;
1775 uint mode, prevmode;
1776 BtVal *val;
1777
1778   //  start at root of btree and drill down
1779
1780   do {
1781         // determine lock mode of drill level
1782         mode = (drill == lvl) ? lock : BtLockRead; 
1783
1784         if( !(set->latch = bt_pinlatch (bt, page_no, NULL)) )
1785           return 0;
1786
1787         // obtain access lock using lock chaining with Access mode
1788
1789         if( page_no > ROOT_page )
1790           bt_lockpage(bt, BtLockAccess, set->latch);
1791
1792         set->page = bt_mappage (bt, set->latch);
1793
1794         //      release & unpin parent or left sibling page
1795
1796         if( prevpage ) {
1797           bt_unlockpage(bt, prevmode, prevlatch);
1798           bt_unpinlatch (prevlatch);
1799           prevpage = 0;
1800         }
1801
1802         // obtain mode lock using lock chaining through AccessLock
1803
1804         bt_lockpage(bt, mode, set->latch);
1805
1806         if( set->page->free )
1807                 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1808
1809         if( page_no > ROOT_page )
1810           bt_unlockpage(bt, BtLockAccess, set->latch);
1811
1812         // re-read and re-lock root after determining actual level of root
1813
1814         if( set->page->lvl != drill) {
1815                 if( set->latch->page_no != ROOT_page )
1816                         return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1817                         
1818                 drill = set->page->lvl;
1819
1820                 if( lock != BtLockRead && drill == lvl ) {
1821                   bt_unlockpage(bt, mode, set->latch);
1822                   bt_unpinlatch (set->latch);
1823                   continue;
1824                 }
1825         }
1826
1827         prevpage = set->latch->page_no;
1828         prevlatch = set->latch;
1829         prevmode = mode;
1830
1831         //  find key on page at this level
1832         //  and descend to requested level
1833
1834         if( !set->page->kill )
1835          if( slot = bt_findslot (set->page, key, len) ) {
1836           if( drill == lvl )
1837                 return slot;
1838
1839           // find next non-dead slot -- the fence key if nothing else
1840
1841           while( slotptr(set->page, slot)->dead )
1842                 if( slot++ < set->page->cnt )
1843                   continue;
1844                 else
1845                   return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1846
1847           val = valptr(set->page, slot);
1848
1849           if( val->len == BtId )
1850                 page_no = bt_getid(valptr(set->page, slot)->value);
1851           else
1852                 return bt->line = __LINE__, bt->err = BTERR_struct, 0;
1853
1854           drill--;
1855           continue;
1856          }
1857
1858         //  slide right into next page
1859
1860         page_no = bt_getid(set->page->right);
1861   } while( page_no );
1862
1863   // return error on end of right chain
1864
1865   bt->line = __LINE__, bt->err = BTERR_struct;
1866   return 0;     // return error
1867 }
1868
1869 //      return page to free list
1870 //      page must be delete & write locked
1871
1872 void bt_freepage (BtDb *bt, BtPageSet *set)
1873 {
1874         //      lock allocation page
1875
1876         bt_mutexlock (bt->mgr->lock);
1877
1878         //      store chain
1879
1880         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1881         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1882         set->latch->dirty = 1;
1883         set->page->free = 1;
1884
1885         // unlock released page
1886
1887         bt_unlockpage (bt, BtLockDelete, set->latch);
1888         bt_unlockpage (bt, BtLockWrite, set->latch);
1889         bt_unpinlatch (set->latch);
1890
1891         // unlock allocation page
1892
1893         bt_releasemutex (bt->mgr->lock);
1894 }
1895
1896 //      a fence key was deleted from a page
1897 //      push new fence value upwards
1898
1899 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1900 {
1901 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1902 unsigned char value[BtId];
1903 BtKey* ptr;
1904 uint idx;
1905
1906         //      remove the old fence value
1907
1908         ptr = keyptr(set->page, set->page->cnt);
1909         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1910         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1911         set->latch->dirty = 1;
1912
1913         //  cache new fence value
1914
1915         ptr = keyptr(set->page, set->page->cnt);
1916         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1917
1918         bt_lockpage (bt, BtLockParent, set->latch);
1919         bt_unlockpage (bt, BtLockWrite, set->latch);
1920
1921         //      insert new (now smaller) fence key
1922
1923         bt_putid (value, set->latch->page_no);
1924         ptr = (BtKey*)leftkey;
1925
1926         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1927           return bt->err;
1928
1929         //      now delete old fence key
1930
1931         ptr = (BtKey*)rightkey;
1932
1933         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1934                 return bt->err;
1935
1936         bt_unlockpage (bt, BtLockParent, set->latch);
1937         bt_unpinlatch(set->latch);
1938         return 0;
1939 }
1940
1941 //      root has a single child
1942 //      collapse a level from the tree
1943
1944 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1945 {
1946 BtPageSet child[1];
1947 uid page_no;
1948 BtVal *val;
1949 uint idx;
1950
1951   // find the child entry and promote as new root contents
1952
1953   do {
1954         for( idx = 0; idx++ < root->page->cnt; )
1955           if( !slotptr(root->page, idx)->dead )
1956                 break;
1957
1958         val = valptr(root->page, idx);
1959
1960         if( val->len == BtId )
1961                 page_no = bt_getid (valptr(root->page, idx)->value);
1962         else
1963                 return bt->line = __LINE__, bt->err = BTERR_struct;
1964
1965         if( child->latch = bt_pinlatch (bt, page_no, NULL) )
1966                 child->page = bt_mappage (bt, child->latch);
1967         else
1968                 return bt->err;
1969
1970         bt_lockpage (bt, BtLockDelete, child->latch);
1971         bt_lockpage (bt, BtLockWrite, child->latch);
1972
1973         memcpy (root->page, child->page, bt->mgr->page_size);
1974         root->latch->dirty = 1;
1975
1976         bt_freepage (bt, child);
1977
1978   } while( root->page->lvl > 1 && root->page->act == 1 );
1979
1980   bt_unlockpage (bt, BtLockWrite, root->latch);
1981   bt_unpinlatch (root->latch);
1982   return 0;
1983 }
1984
1985 //  delete a page and manage keys
1986 //  call with page writelocked
1987 //      returns with page unpinned
1988
1989 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1990 {
1991 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1992 unsigned char value[BtId];
1993 uint lvl = set->page->lvl;
1994 BtPageSet right[1];
1995 uid page_no;
1996 BtKey *ptr;
1997
1998         //      cache copy of fence key
1999         //      to post in parent
2000
2001         ptr = keyptr(set->page, set->page->cnt);
2002         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
2003
2004         //      obtain lock on right page
2005
2006         page_no = bt_getid(set->page->right);
2007
2008         if( right->latch = bt_pinlatch (bt, page_no, NULL) )
2009                 right->page = bt_mappage (bt, right->latch);
2010         else
2011                 return 0;
2012
2013         bt_lockpage (bt, BtLockWrite, right->latch);
2014
2015         // cache copy of key to update
2016
2017         ptr = keyptr(right->page, right->page->cnt);
2018         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
2019
2020         if( right->page->kill )
2021                 return bt->line = __LINE__, bt->err = BTERR_struct;
2022
2023         // pull contents of right peer into our empty page
2024
2025         memcpy (set->page, right->page, bt->mgr->page_size);
2026         set->latch->dirty = 1;
2027
2028         // mark right page deleted and point it to left page
2029         //      until we can post parent updates that remove access
2030         //      to the deleted page.
2031
2032         bt_putid (right->page->right, set->latch->page_no);
2033         right->latch->dirty = 1;
2034         right->page->kill = 1;
2035
2036         bt_lockpage (bt, BtLockParent, right->latch);
2037         bt_unlockpage (bt, BtLockWrite, right->latch);
2038
2039         bt_lockpage (bt, BtLockParent, set->latch);
2040         bt_unlockpage (bt, BtLockWrite, set->latch);
2041
2042         // redirect higher key directly to our new node contents
2043
2044         bt_putid (value, set->latch->page_no);
2045         ptr = (BtKey*)higherfence;
2046
2047         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2048           return bt->err;
2049
2050         //      delete old lower key to our node
2051
2052         ptr = (BtKey*)lowerfence;
2053
2054         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
2055           return bt->err;
2056
2057         //      obtain delete and write locks to right node
2058
2059         bt_unlockpage (bt, BtLockParent, right->latch);
2060         bt_lockpage (bt, BtLockDelete, right->latch);
2061         bt_lockpage (bt, BtLockWrite, right->latch);
2062         bt_freepage (bt, right);
2063
2064         bt_unlockpage (bt, BtLockParent, set->latch);
2065         bt_unpinlatch (set->latch);
2066         return 0;
2067 }
2068
2069 //  find and delete key on page by marking delete flag bit
2070 //  if page becomes empty, delete it from the btree
2071
2072 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
2073 {
2074 uint slot, idx, found, fence;
2075 BtPageSet set[1];
2076 BtKey *ptr;
2077 BtVal *val;
2078
2079         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
2080                 ptr = keyptr(set->page, slot);
2081         else
2082                 return bt->err;
2083
2084         // if librarian slot, advance to real slot
2085
2086         if( slotptr(set->page, slot)->type == Librarian )
2087                 ptr = keyptr(set->page, ++slot);
2088
2089         fence = slot == set->page->cnt;
2090
2091         // if key is found delete it, otherwise ignore request
2092
2093         if( found = !keycmp (ptr, key, len) )
2094           if( found = slotptr(set->page, slot)->dead == 0 ) {
2095                 val = valptr(set->page,slot);
2096                 slotptr(set->page, slot)->dead = 1;
2097                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2098                 set->page->act--;
2099
2100                 // collapse empty slots beneath the fence
2101
2102                 while( idx = set->page->cnt - 1 )
2103                   if( slotptr(set->page, idx)->dead ) {
2104                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2105                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2106                   } else
2107                         break;
2108           }
2109
2110         //      did we delete a fence key in an upper level?
2111
2112         if( found && lvl && set->page->act && fence )
2113           if( bt_fixfence (bt, set, lvl) )
2114                 return bt->err;
2115           else
2116                 return 0;
2117
2118         //      do we need to collapse root?
2119
2120         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
2121           if( bt_collapseroot (bt, set) )
2122                 return bt->err;
2123           else
2124                 return 0;
2125
2126         //      delete empty page
2127
2128         if( !set->page->act )
2129                 return bt_deletepage (bt, set);
2130
2131         set->latch->dirty = 1;
2132         bt_unlockpage(bt, BtLockWrite, set->latch);
2133         bt_unpinlatch (set->latch);
2134         return 0;
2135 }
2136
2137 BtKey *bt_foundkey (BtDb *bt)
2138 {
2139         return (BtKey*)(bt->key);
2140 }
2141
2142 //      advance to next slot
2143
2144 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2145 {
2146 BtLatchSet *prevlatch;
2147 uid page_no;
2148
2149         if( slot < set->page->cnt )
2150                 return slot + 1;
2151
2152         prevlatch = set->latch;
2153
2154         if( page_no = bt_getid(set->page->right) )
2155           if( set->latch = bt_pinlatch (bt, page_no, NULL) )
2156                 set->page = bt_mappage (bt, set->latch);
2157           else
2158                 return 0;
2159         else
2160           return bt->err = BTERR_struct, bt->line = __LINE__, 0;
2161
2162         // obtain access lock using lock chaining with Access mode
2163
2164         bt_lockpage(bt, BtLockAccess, set->latch);
2165
2166         bt_unlockpage(bt, BtLockRead, prevlatch);
2167         bt_unpinlatch (prevlatch);
2168
2169         bt_lockpage(bt, BtLockRead, set->latch);
2170         bt_unlockpage(bt, BtLockAccess, set->latch);
2171         return 1;
2172 }
2173
2174 //      find unique key or first duplicate key in
2175 //      leaf level and return number of value bytes
2176 //      or (-1) if not found.  Setup key for bt_foundkey
2177
2178 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2179 {
2180 BtPageSet set[1];
2181 uint len, slot;
2182 int ret = -1;
2183 BtKey *ptr;
2184 BtVal *val;
2185
2186   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
2187    do {
2188         ptr = keyptr(set->page, slot);
2189
2190         //      skip librarian slot place holder
2191
2192         if( slotptr(set->page, slot)->type == Librarian )
2193                 ptr = keyptr(set->page, ++slot);
2194
2195         //      return actual key found
2196
2197         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2198         len = ptr->len;
2199
2200         if( slotptr(set->page, slot)->type == Duplicate )
2201                 len -= BtId;
2202
2203         //      not there if we reach the stopper key
2204
2205         if( slot == set->page->cnt )
2206           if( !bt_getid (set->page->right) )
2207                 break;
2208
2209         // if key exists, return >= 0 value bytes copied
2210         //      otherwise return (-1)
2211
2212         if( slotptr(set->page, slot)->dead )
2213                 continue;
2214
2215         if( keylen == len )
2216           if( !memcmp (ptr->key, key, len) ) {
2217                 val = valptr (set->page,slot);
2218                 if( valmax > val->len )
2219                         valmax = val->len;
2220                 memcpy (value, val->value, valmax);
2221                 ret = valmax;
2222           }
2223
2224         break;
2225
2226    } while( slot = bt_findnext (bt, set, slot) );
2227
2228   bt_unlockpage (bt, BtLockRead, set->latch);
2229   bt_unpinlatch (set->latch);
2230   return ret;
2231 }
2232
2233 //      check page for space available,
2234 //      clean if necessary and return
2235 //      0 - page needs splitting
2236 //      >0  new slot value
2237
2238 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
2239 {
2240 uint nxt = bt->mgr->page_size;
2241 BtPage page = set->page;
2242 uint cnt = 0, idx = 0;
2243 uint max = page->cnt;
2244 uint newslot = max;
2245 BtKey *key;
2246 BtVal *val;
2247
2248         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2249                 return slot;
2250
2251         //      skip cleanup and proceed to split
2252         //      if there's not enough garbage
2253         //      to bother with.
2254
2255         if( page->garbage < nxt / 5 )
2256                 return 0;
2257
2258         memcpy (bt->frame, page, bt->mgr->page_size);
2259
2260         // skip page info and set rest of page to zero
2261
2262         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2263         set->latch->dirty = 1;
2264
2265         page->garbage = 0;
2266         page->act = 0;
2267
2268         // clean up page first by
2269         // removing deleted keys
2270
2271         while( cnt++ < max ) {
2272                 if( cnt == slot )
2273                         newslot = idx + 2;
2274
2275                 if( cnt < max || bt->frame->lvl )
2276                   if( slotptr(bt->frame,cnt)->dead )
2277                         continue;
2278
2279                 // copy the value across
2280
2281                 val = valptr(bt->frame, cnt);
2282                 nxt -= val->len + sizeof(BtVal);
2283                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2284
2285                 // copy the key across
2286
2287                 key = keyptr(bt->frame, cnt);
2288                 nxt -= key->len + sizeof(BtKey);
2289                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2290
2291                 // make a librarian slot
2292
2293                 slotptr(page, ++idx)->off = nxt;
2294                 slotptr(page, idx)->type = Librarian;
2295                 slotptr(page, idx)->dead = 1;
2296
2297                 // set up the slot
2298
2299                 slotptr(page, ++idx)->off = nxt;
2300                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2301
2302                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2303                         page->act++;
2304         }
2305
2306         page->min = nxt;
2307         page->cnt = idx;
2308
2309         //      see if page has enough space now, or does it need splitting?
2310
2311         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2312                 return newslot;
2313
2314         return 0;
2315 }
2316
2317 // split the root and raise the height of the btree
2318
2319 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
2320 {  
2321 unsigned char leftkey[BT_keyarray];
2322 uint nxt = bt->mgr->page_size;
2323 unsigned char value[BtId];
2324 BtPageSet left[1];
2325 uid left_page_no;
2326 BtKey *ptr;
2327 BtVal *val;
2328
2329         //      save left page fence key for new root
2330
2331         ptr = keyptr(root->page, root->page->cnt);
2332         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2333
2334         //  Obtain an empty page to use, and copy the current
2335         //  root contents into it, e.g. lower keys
2336
2337         if( bt_newpage(bt, left, root->page) )
2338                 return bt->err;
2339
2340         left_page_no = left->latch->page_no;
2341         bt_unpinlatch (left->latch);
2342
2343         // preserve the page info at the bottom
2344         // of higher keys and set rest to zero
2345
2346         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2347
2348         // insert stopper key at top of newroot page
2349         // and increase the root height
2350
2351         nxt -= BtId + sizeof(BtVal);
2352         bt_putid (value, right->page_no);
2353         val = (BtVal *)((unsigned char *)root->page + nxt);
2354         memcpy (val->value, value, BtId);
2355         val->len = BtId;
2356
2357         nxt -= 2 + sizeof(BtKey);
2358         slotptr(root->page, 2)->off = nxt;
2359         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2360         ptr->len = 2;
2361         ptr->key[0] = 0xff;
2362         ptr->key[1] = 0xff;
2363
2364         // insert lower keys page fence key on newroot page as first key
2365
2366         nxt -= BtId + sizeof(BtVal);
2367         bt_putid (value, left_page_no);
2368         val = (BtVal *)((unsigned char *)root->page + nxt);
2369         memcpy (val->value, value, BtId);
2370         val->len = BtId;
2371
2372         ptr = (BtKey *)leftkey;
2373         nxt -= ptr->len + sizeof(BtKey);
2374         slotptr(root->page, 1)->off = nxt;
2375         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2376         
2377         bt_putid(root->page->right, 0);
2378         root->page->min = nxt;          // reset lowest used offset and key count
2379         root->page->cnt = 2;
2380         root->page->act = 2;
2381         root->page->lvl++;
2382
2383         // release and unpin root pages
2384
2385         bt_unlockpage(bt, BtLockWrite, root->latch);
2386         bt_unpinlatch (root->latch);
2387
2388         bt_unpinlatch (right);
2389         return 0;
2390 }
2391
2392 //  split already locked full node
2393 //      leave it locked.
2394 //      return pool entry for new right
2395 //      page, unlocked
2396
2397 uint bt_splitpage (BtDb *bt, BtPageSet *set)
2398 {
2399 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2400 uint lvl = set->page->lvl;
2401 BtPageSet right[1];
2402 BtKey *key, *ptr;
2403 BtVal *val, *src;
2404 uid right2;
2405 uint prev;
2406
2407         //  split higher half of keys to bt->frame
2408
2409         memset (bt->frame, 0, bt->mgr->page_size);
2410         max = set->page->cnt;
2411         cnt = max / 2;
2412         idx = 0;
2413
2414         while( cnt++ < max ) {
2415                 if( cnt < max || set->page->lvl )
2416                   if( slotptr(set->page, cnt)->dead )
2417                         continue;
2418
2419                 src = valptr(set->page, cnt);
2420                 nxt -= src->len + sizeof(BtVal);
2421                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
2422
2423                 key = keyptr(set->page, cnt);
2424                 nxt -= key->len + sizeof(BtKey);
2425                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
2426                 memcpy (ptr, key, key->len + sizeof(BtKey));
2427
2428                 //      add librarian slot
2429
2430                 slotptr(bt->frame, ++idx)->off = nxt;
2431                 slotptr(bt->frame, idx)->type = Librarian;
2432                 slotptr(bt->frame, idx)->dead = 1;
2433
2434                 //  add actual slot
2435
2436                 slotptr(bt->frame, ++idx)->off = nxt;
2437                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2438
2439                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2440                         bt->frame->act++;
2441         }
2442
2443         bt->frame->bits = bt->mgr->page_bits;
2444         bt->frame->min = nxt;
2445         bt->frame->cnt = idx;
2446         bt->frame->lvl = lvl;
2447
2448         // link right node
2449
2450         if( set->latch->page_no > ROOT_page )
2451                 bt_putid (bt->frame->right, bt_getid (set->page->right));
2452
2453         //      get new free page and write higher keys to it.
2454
2455         if( bt_newpage(bt, right, bt->frame) )
2456                 return 0;
2457
2458         // process lower keys
2459
2460         memcpy (bt->frame, set->page, bt->mgr->page_size);
2461         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2462         set->latch->dirty = 1;
2463
2464         nxt = bt->mgr->page_size;
2465         set->page->garbage = 0;
2466         set->page->act = 0;
2467         max /= 2;
2468         cnt = 0;
2469         idx = 0;
2470
2471         if( slotptr(bt->frame, max)->type == Librarian )
2472                 max--;
2473
2474         //  assemble page of smaller keys
2475
2476         while( cnt++ < max ) {
2477                 if( slotptr(bt->frame, cnt)->dead )
2478                         continue;
2479                 val = valptr(bt->frame, cnt);
2480                 nxt -= val->len + sizeof(BtVal);
2481                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2482
2483                 key = keyptr(bt->frame, cnt);
2484                 nxt -= key->len + sizeof(BtKey);
2485                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2486
2487                 //      add librarian slot
2488
2489                 slotptr(set->page, ++idx)->off = nxt;
2490                 slotptr(set->page, idx)->type = Librarian;
2491                 slotptr(set->page, idx)->dead = 1;
2492
2493                 //      add actual slot
2494
2495                 slotptr(set->page, ++idx)->off = nxt;
2496                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2497                 set->page->act++;
2498         }
2499
2500         bt_putid(set->page->right, right->latch->page_no);
2501         set->page->min = nxt;
2502         set->page->cnt = idx;
2503
2504         return right->latch->entry;
2505 }
2506
2507 //      fix keys for newly split page
2508 //      call with page locked, return
2509 //      unlocked
2510
2511 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2512 {
2513 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2514 unsigned char value[BtId];
2515 uint lvl = set->page->lvl;
2516 BtPage page;
2517 BtKey *ptr;
2518
2519         // if current page is the root page, split it
2520
2521         if( set->latch->page_no == ROOT_page )
2522                 return bt_splitroot (bt, set, right);
2523
2524         ptr = keyptr(set->page, set->page->cnt);
2525         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2526
2527         page = bt_mappage (bt, right);
2528
2529         ptr = keyptr(page, page->cnt);
2530         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2531
2532         // insert new fences in their parent pages
2533
2534         bt_lockpage (bt, BtLockParent, right);
2535
2536         bt_lockpage (bt, BtLockParent, set->latch);
2537         bt_unlockpage (bt, BtLockWrite, set->latch);
2538
2539         // insert new fence for reformulated left block of smaller keys
2540
2541         bt_putid (value, set->latch->page_no);
2542         ptr = (BtKey *)leftkey;
2543
2544         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2545                 return bt->err;
2546
2547         // switch fence for right block of larger keys to new right page
2548
2549         bt_putid (value, right->page_no);
2550         ptr = (BtKey *)rightkey;
2551
2552         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2553                 return bt->err;
2554
2555         bt_unlockpage (bt, BtLockParent, set->latch);
2556         bt_unpinlatch (set->latch);
2557
2558         bt_unlockpage (bt, BtLockParent, right);
2559         bt_unpinlatch (right);
2560         return 0;
2561 }
2562
2563 //      install new key and value onto page
2564 //      page must already be checked for
2565 //      adequate space
2566
2567 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2568 {
2569 uint idx, librarian;
2570 BtSlot *node;
2571 BtKey *ptr;
2572 BtVal *val;
2573
2574         //      if found slot > desired slot and previous slot
2575         //      is a librarian slot, use it
2576
2577         if( slot > 1 )
2578           if( slotptr(set->page, slot-1)->type == Librarian )
2579                 slot--;
2580
2581         // copy value onto page
2582
2583         set->page->min -= vallen + sizeof(BtVal);
2584         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2585         memcpy (val->value, value, vallen);
2586         val->len = vallen;
2587
2588         // copy key onto page
2589
2590         set->page->min -= keylen + sizeof(BtKey);
2591         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2592         memcpy (ptr->key, key, keylen);
2593         ptr->len = keylen;
2594         
2595         //      find first empty slot
2596
2597         for( idx = slot; idx < set->page->cnt; idx++ )
2598           if( slotptr(set->page, idx)->dead )
2599                 break;
2600
2601         // now insert key into array before slot
2602
2603         if( idx == set->page->cnt )
2604                 idx += 2, set->page->cnt += 2, librarian = 2;
2605         else
2606                 librarian = 1;
2607
2608         set->latch->dirty = 1;
2609         set->page->act++;
2610
2611         while( idx > slot + librarian - 1 )
2612                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2613
2614         //      add librarian slot
2615
2616         if( librarian > 1 ) {
2617                 node = slotptr(set->page, slot++);
2618                 node->off = set->page->min;
2619                 node->type = Librarian;
2620                 node->dead = 1;
2621         }
2622
2623         //      fill in new slot
2624
2625         node = slotptr(set->page, slot);
2626         node->off = set->page->min;
2627         node->type = type;
2628         node->dead = 0;
2629
2630         if( release ) {
2631                 bt_unlockpage (bt, BtLockWrite, set->latch);
2632                 bt_unpinlatch (set->latch);
2633         }
2634
2635         return 0;
2636 }
2637
2638 //  Insert new key into the btree at given level.
2639 //      either add a new key or update/add an existing one
2640
2641 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2642 {
2643 unsigned char newkey[BT_keyarray];
2644 uint slot, idx, len, entry;
2645 BtPageSet set[1];
2646 BtKey *ptr, *ins;
2647 uid sequence;
2648 BtVal *val;
2649 uint type;
2650
2651   // set up the key we're working on
2652
2653   ins = (BtKey*)newkey;
2654   memcpy (ins->key, key, keylen);
2655   ins->len = keylen;
2656
2657   // is this a non-unique index value?
2658
2659   if( unique )
2660         type = Unique;
2661   else {
2662         type = Duplicate;
2663         sequence = bt_newdup (bt);
2664         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2665         ins->len += BtId;
2666   }
2667
2668   while( 1 ) { // find the page and slot for the current key
2669         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2670                 ptr = keyptr(set->page, slot);
2671         else {
2672                 if( !bt->err )
2673                         bt->line = __LINE__, bt->err = BTERR_ovflw;
2674                 return bt->err;
2675         }
2676
2677         // if librarian slot == found slot, advance to real slot
2678
2679         if( slotptr(set->page, slot)->type == Librarian )
2680           if( !keycmp (ptr, key, keylen) )
2681                 ptr = keyptr(set->page, ++slot);
2682
2683         len = ptr->len;
2684
2685         if( slotptr(set->page, slot)->type == Duplicate )
2686                 len -= BtId;
2687
2688         //  if inserting a duplicate key or unique key
2689         //      check for adequate space on the page
2690         //      and insert the new key before slot.
2691
2692         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2693           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2694                 if( !(entry = bt_splitpage (bt, set)) )
2695                   return bt->err;
2696                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2697                   return bt->err;
2698                 else
2699                   continue;
2700
2701           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2702         }
2703
2704         // if key already exists, update value and return
2705
2706         val = valptr(set->page, slot);
2707
2708         if( val->len >= vallen ) {
2709                 if( slotptr(set->page, slot)->dead )
2710                         set->page->act++;
2711                 set->page->garbage += val->len - vallen;
2712                 set->latch->dirty = 1;
2713                 slotptr(set->page, slot)->dead = 0;
2714                 val->len = vallen;
2715                 memcpy (val->value, value, vallen);
2716                 bt_unlockpage(bt, BtLockWrite, set->latch);
2717                 bt_unpinlatch (set->latch);
2718                 return 0;
2719         }
2720
2721         //      new update value doesn't fit in existing value area
2722
2723         if( !slotptr(set->page, slot)->dead )
2724                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2725         else {
2726                 slotptr(set->page, slot)->dead = 0;
2727                 set->page->act++;
2728         }
2729
2730         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2731           if( !(entry = bt_splitpage (bt, set)) )
2732                 return bt->err;
2733           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2734                 return bt->err;
2735           else
2736                 continue;
2737
2738         set->page->min -= vallen + sizeof(BtVal);
2739         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2740         memcpy (val->value, value, vallen);
2741         val->len = vallen;
2742
2743         set->latch->dirty = 1;
2744         set->page->min -= keylen + sizeof(BtKey);
2745         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2746         memcpy (ptr->key, key, keylen);
2747         ptr->len = keylen;
2748         
2749         slotptr(set->page, slot)->off = set->page->min;
2750         bt_unlockpage(bt, BtLockWrite, set->latch);
2751         bt_unpinlatch (set->latch);
2752         return 0;
2753   }
2754   return 0;
2755 }
2756
2757 typedef struct {
2758         logseqno reqlsn;        // redo log seq no required
2759         logseqno lsn;           // redo log sequence number
2760         uint entry;                     // latch table entry number
2761         uint slot:31;           // page slot number
2762         uint reuse:1;           // reused previous page
2763 } AtomicTxn;
2764
2765 typedef struct {
2766         uid page_no;            // page number for split leaf
2767         void *next;                     // next key to insert
2768         uint entry:29;          // latch table entry number
2769         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2770         uint nounlock:1;        // don't unlock ParentModification
2771         unsigned char leafkey[BT_keyarray];
2772 } AtomicKey;
2773
2774 //      determine actual page where key is located
2775 //  return slot number
2776
2777 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2778 {
2779 BtKey *key = keyptr(source,src);
2780 uint slot = locks[src].slot;
2781 uint entry;
2782
2783         if( src > 1 && locks[src].reuse )
2784           entry = locks[src-1].entry, slot = 0;
2785         else
2786           entry = locks[src].entry;
2787
2788         if( slot ) {
2789                 set->latch = bt->mgr->latchsets + entry;
2790                 set->page = bt_mappage (bt, set->latch);
2791                 return slot;
2792         }
2793
2794         //      is locks->reuse set? or was slot zeroed?
2795         //      if so, find where our key is located 
2796         //      on current page or pages split on
2797         //      same page txn operations.
2798
2799         do {
2800                 set->latch = bt->mgr->latchsets + entry;
2801                 set->page = bt_mappage (bt, set->latch);
2802
2803                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2804                   if( slotptr(set->page, slot)->type == Librarian )
2805                         slot++;
2806                   if( locks[src].reuse )
2807                         locks[src].entry = entry;
2808                   return slot;
2809                 }
2810         } while( entry = set->latch->split );
2811
2812         bt->line = __LINE__, bt->err = BTERR_atomic;
2813         return 0;
2814 }
2815
2816 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2817 {
2818 BtKey *key = keyptr(source, src);
2819 BtVal *val = valptr(source, src);
2820 BtLatchSet *latch;
2821 BtPageSet set[1];
2822 uint entry, slot;
2823
2824   while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2825         if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) ) {
2826           if( bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2827                 return bt->err;
2828           set->page->lsn = locks[src].lsn;
2829           return 0;
2830         }
2831
2832         if( entry = bt_splitpage (bt, set) )
2833           latch = bt->mgr->latchsets + entry;
2834         else
2835           return bt->err;
2836
2837         //      splice right page into split chain
2838         //      and WriteLock it.
2839
2840         bt_lockpage(bt, BtLockWrite, latch);
2841         latch->split = set->latch->split;
2842         set->latch->split = entry;
2843         locks[src].slot = 0;
2844   }
2845
2846   return bt->line = __LINE__, bt->err = BTERR_atomic;
2847 }
2848
2849 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2850 {
2851 BtKey *key = keyptr(source, src);
2852 BtPageSet set[1];
2853 uint idx, slot;
2854 BtKey *ptr;
2855 BtVal *val;
2856
2857         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2858           ptr = keyptr(set->page, slot);
2859         else
2860           return bt->line = __LINE__, bt->err = BTERR_struct;
2861
2862         if( !keycmp (ptr, key->key, key->len) )
2863           if( !slotptr(set->page, slot)->dead )
2864                 slotptr(set->page, slot)->dead = 1;
2865           else
2866                 return 0;
2867         else
2868                 return 0;
2869
2870         val = valptr(set->page, slot);
2871         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2872         set->latch->dirty = 1;
2873         set->page->lsn = locks[src].lsn;
2874         set->page->act--;
2875         bt->found++;
2876         return 0;
2877 }
2878
2879 //      delete an empty master page for a transaction
2880
2881 //      note that the far right page never empties because
2882 //      it always contains (at least) the infinite stopper key
2883 //      and that all pages that don't contain any keys are
2884 //      deleted, or are being held under Atomic lock.
2885
2886 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2887 {
2888 BtPageSet right[1], temp[1];
2889 unsigned char value[BtId];
2890 uid right_page_no;
2891 BtKey *ptr;
2892
2893         bt_lockpage(bt, BtLockWrite, prev->latch);
2894
2895         //      grab the right sibling
2896
2897         if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), NULL) )
2898                 right->page = bt_mappage (bt, right->latch);
2899         else
2900                 return bt->err;
2901
2902         bt_lockpage(bt, BtLockAtomic, right->latch);
2903         bt_lockpage(bt, BtLockWrite, right->latch);
2904
2905         //      and pull contents over empty page
2906         //      while preserving master's left link
2907
2908         memcpy (right->page->left, prev->page->left, BtId);
2909         memcpy (prev->page, right->page, bt->mgr->page_size);
2910
2911         //      forward seekers to old right sibling
2912         //      to new page location in set
2913
2914         bt_putid (right->page->right, prev->latch->page_no);
2915         right->latch->dirty = 1;
2916         right->page->kill = 1;
2917
2918         //      remove pointer to right page for searchers by
2919         //      changing right fence key to point to the master page
2920
2921         ptr = keyptr(right->page,right->page->cnt);
2922         bt_putid (value, prev->latch->page_no);
2923
2924         if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2925                 return bt->err;
2926
2927         //  now that master page is in good shape we can
2928         //      remove its locks.
2929
2930         bt_unlockpage (bt, BtLockAtomic, prev->latch);
2931         bt_unlockpage (bt, BtLockWrite, prev->latch);
2932
2933         //  fix master's right sibling's left pointer
2934         //      to remove scanner's poiner to the right page
2935
2936         if( right_page_no = bt_getid (prev->page->right) ) {
2937           if( temp->latch = bt_pinlatch (bt, right_page_no, NULL) )
2938                 temp->page = bt_mappage (bt, temp->latch);
2939
2940           bt_lockpage (bt, BtLockWrite, temp->latch);
2941           bt_putid (temp->page->left, prev->latch->page_no);
2942           temp->latch->dirty = 1;
2943
2944           bt_unlockpage (bt, BtLockWrite, temp->latch);
2945           bt_unpinlatch (temp->latch);
2946         } else {        // master is now the far right page
2947           bt_mutexlock (bt->mgr->lock);
2948           bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2949           bt_releasemutex(bt->mgr->lock);
2950         }
2951
2952         //      now that there are no pointers to the right page
2953         //      we can delete it after the last read access occurs
2954
2955         bt_unlockpage (bt, BtLockWrite, right->latch);
2956         bt_unlockpage (bt, BtLockAtomic, right->latch);
2957         bt_lockpage (bt, BtLockDelete, right->latch);
2958         bt_lockpage (bt, BtLockWrite, right->latch);
2959         bt_freepage (bt, right);
2960         return 0;
2961 }
2962
2963 //      atomic modification of a batch of keys.
2964
2965 //      return -1 if BTERR is set
2966 //      otherwise return slot number
2967 //      causing the key constraint violation
2968 //      or zero on successful completion.
2969
2970 int bt_atomictxn (BtDb *bt, BtPage source)
2971 {
2972 uint src, idx, slot, samepage, entry, avail, que = 0;
2973 AtomicKey *head, *tail, *leaf;
2974 BtPageSet set[1], prev[1];
2975 unsigned char value[BtId];
2976 BtKey *key, *ptr, *key2;
2977 BtLatchSet *latch;
2978 AtomicTxn *locks;
2979 int result = 0;
2980 BtSlot temp[1];
2981 logseqno lsn;
2982 BtPage page;
2983 BtVal *val;
2984 uid right;
2985 int type;
2986
2987   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2988   head = NULL;
2989   tail = NULL;
2990
2991   // stable sort the list of keys into order to
2992   //    prevent deadlocks between threads.
2993
2994   for( src = 1; src++ < source->cnt; ) {
2995         *temp = *slotptr(source,src);
2996         key = keyptr (source,src);
2997
2998         for( idx = src; --idx; ) {
2999           key2 = keyptr (source,idx);
3000           if( keycmp (key, key2->key, key2->len) < 0 ) {
3001                 *slotptr(source,idx+1) = *slotptr(source,idx);
3002                 *slotptr(source,idx) = *temp;
3003           } else
3004                 break;
3005         }
3006   }
3007
3008   // reserve enough buffer pool entries
3009
3010   avail = source->cnt * 3 + bt->mgr->pagezero->alloc->lvl + 1;
3011   bt_availrequest (bt, avail);
3012
3013   // Load the leaf page for each key
3014   // group same page references with reuse bit
3015   // and determine any constraint violations
3016
3017   for( src = 0; src++ < source->cnt; ) {
3018         key = keyptr(source, src);
3019         slot = 0;
3020
3021         // first determine if this modification falls
3022         // on the same page as the previous modification
3023         //      note that the far right leaf page is a special case
3024
3025         if( samepage = src > 1 )
3026           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
3027                 slot = bt_findslot(set->page, key->key, key->len);
3028           else
3029                 bt_unlockpage(bt, BtLockRead, set->latch); 
3030
3031         if( !slot )
3032           if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) )
3033                 set->latch->split = 0;
3034           else
3035                 goto atomicerr;
3036
3037         if( slotptr(set->page, slot)->type == Librarian )
3038           ptr = keyptr(set->page, ++slot);
3039         else
3040           ptr = keyptr(set->page, slot);
3041
3042         if( !samepage ) {
3043           locks[src].entry = set->latch->entry;
3044           locks[src].slot = slot;
3045           locks[src].reuse = 0;
3046         } else {
3047           locks[src].entry = 0;
3048           locks[src].slot = 0;
3049           locks[src].reuse = 1;
3050         }
3051
3052         //      capture current lsn for master page
3053
3054         locks[src].reqlsn = set->page->lsn;
3055
3056         //      perform constraint checks
3057
3058         switch( slotptr(source, src)->type ) {
3059         case Duplicate:
3060         case Unique:
3061           if( !slotptr(set->page, slot)->dead )
3062            if( slot < set->page->cnt || bt_getid (set->page->right) )
3063             if( !keycmp (ptr, key->key, key->len) ) {
3064
3065                   // return constraint violation if key already exists
3066
3067                   bt_unlockpage(bt, BtLockRead, set->latch);
3068                   result = src;
3069
3070                   while( src ) {
3071                         if( locks[src].entry ) {
3072                           set->latch = bt->mgr->latchsets + locks[src].entry;
3073                           bt_unlockpage(bt, BtLockAtomic, set->latch);
3074                           bt_unpinlatch (set->latch);
3075                         }
3076                         src--;
3077                   }
3078                   free (locks);
3079                   return result;
3080                 }
3081           break;
3082         }
3083   }
3084
3085   //  unlock last loadpage lock
3086
3087   if( source->cnt )
3088         bt_unlockpage(bt, BtLockRead, set->latch);
3089
3090   //  and add entries to redo log
3091
3092   if( bt->mgr->redopages )
3093    if( lsn = bt_txnredo (bt, source) )
3094     for( src = 0; src++ < source->cnt; )
3095          locks[src].lsn = lsn;
3096     else
3097          goto atomicerr;
3098
3099   //  obtain write lock for each master page
3100
3101   for( src = 0; src++ < source->cnt; ) {
3102         if( locks[src].reuse )
3103           continue;
3104         else
3105           bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
3106   }
3107
3108   // insert or delete each key
3109   // process any splits or merges
3110   // release Write & Atomic latches
3111   // set ParentModifications and build
3112   // queue of keys to insert for split pages
3113   // or delete for deleted pages.
3114
3115   // run through txn list backwards
3116
3117   samepage = source->cnt + 1;
3118
3119   for( src = source->cnt; src; src-- ) {
3120         if( locks[src].reuse )
3121           continue;
3122
3123         //  perform the txn operations
3124         //      from smaller to larger on
3125         //  the same page
3126
3127         for( idx = src; idx < samepage; idx++ )
3128          switch( slotptr(source,idx)->type ) {
3129          case Delete:
3130           if( bt_atomicdelete (bt, source, locks, idx) )
3131                 goto atomicerr;
3132           break;
3133
3134         case Duplicate:
3135         case Unique:
3136           if( bt_atomicinsert (bt, source, locks, idx) )
3137                 goto atomicerr;
3138           break;
3139         }
3140
3141         //      after the same page operations have finished,
3142         //  process master page for splits or deletion.
3143
3144         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
3145         prev->page = bt_mappage (bt, prev->latch);
3146         samepage = src;
3147
3148         //  pick-up all splits from master page
3149         //      each one is already WriteLocked.
3150
3151         entry = prev->latch->split;
3152
3153         while( entry ) {
3154           set->latch = bt->mgr->latchsets + entry;
3155           set->page = bt_mappage (bt, set->latch);
3156           entry = set->latch->split;
3157
3158           // delete empty master page by undoing its split
3159           //  (this is potentially another empty page)
3160           //  note that there are no new left pointers yet
3161
3162           if( !prev->page->act ) {
3163                 memcpy (set->page->left, prev->page->left, BtId);
3164                 memcpy (prev->page, set->page, bt->mgr->page_size);
3165                 bt_lockpage (bt, BtLockDelete, set->latch);
3166                 bt_freepage (bt, set);
3167
3168                 prev->latch->split = set->latch->split;
3169                 prev->latch->dirty = 1;
3170                 continue;
3171           }
3172
3173           // remove empty page from the split chain
3174           // and return it to the free list.
3175
3176           if( !set->page->act ) {
3177                 memcpy (prev->page->right, set->page->right, BtId);
3178                 prev->latch->split = set->latch->split;
3179                 bt_lockpage (bt, BtLockDelete, set->latch);
3180                 bt_freepage (bt, set);
3181                 continue;
3182           }
3183
3184           //  schedule prev fence key update
3185
3186           ptr = keyptr(prev->page,prev->page->cnt);
3187           leaf = calloc (sizeof(AtomicKey), 1), que++;
3188
3189           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3190           leaf->page_no = prev->latch->page_no;
3191           leaf->entry = prev->latch->entry;
3192           leaf->type = 0;
3193
3194           if( tail )
3195                 tail->next = leaf;
3196           else
3197                 head = leaf;
3198
3199           tail = leaf;
3200
3201           // splice in the left link into the split page
3202
3203           bt_putid (set->page->left, prev->latch->page_no);
3204           bt_lockpage(bt, BtLockParent, prev->latch);
3205           bt_unlockpage(bt, BtLockWrite, prev->latch);
3206           *prev = *set;
3207         }
3208
3209         //  update left pointer in next right page from last split page
3210         //      (if all splits were reversed, latch->split == 0)
3211
3212         if( latch->split ) {
3213           //  fix left pointer in master's original (now split)
3214           //  far right sibling or set rightmost page in page zero
3215
3216           if( right = bt_getid (prev->page->right) ) {
3217                 if( set->latch = bt_pinlatch (bt, right, NULL) )
3218                   set->page = bt_mappage (bt, set->latch);
3219                 else
3220                   goto atomicerr;
3221
3222             bt_lockpage (bt, BtLockWrite, set->latch);
3223             bt_putid (set->page->left, prev->latch->page_no);
3224                 set->latch->dirty = 1;
3225             bt_unlockpage (bt, BtLockWrite, set->latch);
3226                 bt_unpinlatch (set->latch);
3227           } else {      // prev is rightmost page
3228             bt_mutexlock (bt->mgr->lock);
3229                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
3230             bt_releasemutex(bt->mgr->lock);
3231           }
3232
3233           //  Process last page split in chain
3234
3235           ptr = keyptr(prev->page,prev->page->cnt);
3236           leaf = calloc (sizeof(AtomicKey), 1), que++;
3237
3238           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3239           leaf->page_no = prev->latch->page_no;
3240           leaf->entry = prev->latch->entry;
3241           leaf->type = 0;
3242   
3243           if( tail )
3244                 tail->next = leaf;
3245           else
3246                 head = leaf;
3247
3248           tail = leaf;
3249
3250           bt_lockpage(bt, BtLockParent, prev->latch);
3251           bt_unlockpage(bt, BtLockWrite, prev->latch);
3252
3253           //  remove atomic lock on master page
3254
3255           bt_unlockpage(bt, BtLockAtomic, latch);
3256           continue;
3257         }
3258
3259         //  finished if prev page occupied (either master or final split)
3260
3261         if( prev->page->act ) {
3262           bt_unlockpage(bt, BtLockWrite, latch);
3263           bt_unlockpage(bt, BtLockAtomic, latch);
3264           bt_unpinlatch(latch);
3265           continue;
3266         }
3267
3268         // any and all splits were reversed, and the
3269         // master page located in prev is empty, delete it
3270         // by pulling over master's right sibling.
3271
3272         // Remove empty master's fence key
3273
3274         ptr = keyptr(prev->page,prev->page->cnt);
3275
3276         if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3277                 goto atomicerr;
3278
3279         //      perform the remainder of the delete
3280         //      from the FIFO queue
3281
3282         leaf = calloc (sizeof(AtomicKey), 1), que++;
3283
3284         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3285         leaf->page_no = prev->latch->page_no;
3286         leaf->entry = prev->latch->entry;
3287         leaf->nounlock = 1;
3288         leaf->type = 2;
3289   
3290         if( tail )
3291           tail->next = leaf;
3292         else
3293           head = leaf;
3294
3295         tail = leaf;
3296
3297         //      leave atomic lock in place until
3298         //      deletion completes in next phase.
3299
3300         bt_unlockpage(bt, BtLockWrite, prev->latch);
3301   }
3302
3303   bt_availrelease (bt, avail);
3304
3305   que *= bt->mgr->pagezero->alloc->lvl;
3306   bt_availrequest (bt, que);
3307
3308   //  add & delete keys for any pages split or merged during transaction
3309
3310   if( leaf = head )
3311     do {
3312           set->latch = bt->mgr->latchsets + leaf->entry;
3313           set->page = bt_mappage (bt, set->latch);
3314
3315           bt_putid (value, leaf->page_no);
3316           ptr = (BtKey *)leaf->leafkey;
3317
3318           switch( leaf->type ) {
3319           case 0:       // insert key
3320             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
3321                   goto atomicerr;
3322
3323                 break;
3324
3325           case 1:       // delete key
3326                 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3327                   goto atomicerr;
3328
3329                 break;
3330
3331           case 2:       // free page
3332                 if( bt_atomicfree (bt, set) )
3333                   goto atomicerr;
3334
3335                 break;
3336           }
3337
3338           if( !leaf->nounlock )
3339             bt_unlockpage (bt, BtLockParent, set->latch);
3340
3341           bt_unpinlatch (set->latch);
3342           tail = leaf->next;
3343           free (leaf);
3344         } while( leaf = tail );
3345
3346   bt_availrelease (bt, que);
3347
3348   // return success
3349
3350   free (locks);
3351   return 0;
3352 atomicerr:
3353   return -1;
3354 }
3355
3356 //      set cursor to highest slot on highest page
3357
3358 uint bt_lastkey (BtDb *bt)
3359 {
3360 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3361 BtPageSet set[1];
3362
3363         if( set->latch = bt_pinlatch (bt, page_no, NULL) )
3364                 set->page = bt_mappage (bt, set->latch);
3365         else
3366                 return 0;
3367
3368     bt_lockpage(bt, BtLockRead, set->latch);
3369         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3370     bt_unlockpage(bt, BtLockRead, set->latch);
3371         bt_unpinlatch (set->latch);
3372
3373         bt->cursor_page = page_no;
3374         return bt->cursor->cnt;
3375 }
3376
3377 //      return previous slot on cursor page
3378
3379 uint bt_prevkey (BtDb *bt, uint slot)
3380 {
3381 uid ourright, next, us = bt->cursor_page;
3382 BtPageSet set[1];
3383
3384         if( --slot )
3385                 return slot;
3386
3387         ourright = bt_getid(bt->cursor->right);
3388
3389 goleft:
3390         if( !(next = bt_getid(bt->cursor->left)) )
3391                 return 0;
3392
3393 findourself:
3394         bt->cursor_page = next;
3395
3396         if( set->latch = bt_pinlatch (bt, next, NULL) )
3397                 set->page = bt_mappage (bt, set->latch);
3398         else
3399                 return 0;
3400
3401     bt_lockpage(bt, BtLockRead, set->latch);
3402         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3403         bt_unlockpage(bt, BtLockRead, set->latch);
3404         bt_unpinlatch (set->latch);
3405         
3406         next = bt_getid (bt->cursor->right);
3407
3408         if( bt->cursor->kill )
3409                 goto findourself;
3410
3411         if( next != us )
3412           if( next == ourright )
3413                 goto goleft;
3414           else
3415                 goto findourself;
3416
3417         return bt->cursor->cnt;
3418 }
3419
3420 //  return next slot on cursor page
3421 //  or slide cursor right into next page
3422
3423 uint bt_nextkey (BtDb *bt, uint slot)
3424 {
3425 BtPageSet set[1];
3426 uid right;
3427
3428   do {
3429         right = bt_getid(bt->cursor->right);
3430
3431         while( slot++ < bt->cursor->cnt )
3432           if( slotptr(bt->cursor,slot)->dead )
3433                 continue;
3434           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3435                 return slot;
3436           else
3437                 break;
3438
3439         if( !right )
3440                 break;
3441
3442         bt->cursor_page = right;
3443
3444         if( set->latch = bt_pinlatch (bt, right, NULL) )
3445                 set->page = bt_mappage (bt, set->latch);
3446         else
3447                 return 0;
3448
3449     bt_lockpage(bt, BtLockRead, set->latch);
3450
3451         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3452
3453         bt_unlockpage(bt, BtLockRead, set->latch);
3454         bt_unpinlatch (set->latch);
3455         slot = 0;
3456
3457   } while( 1 );
3458
3459   return bt->err = 0;
3460 }
3461
3462 //  cache page of keys into cursor and return starting slot for given key
3463
3464 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3465 {
3466 BtPageSet set[1];
3467 uint slot;
3468
3469         // cache page for retrieval
3470
3471         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
3472           memcpy (bt->cursor, set->page, bt->mgr->page_size);
3473         else
3474           return 0;
3475
3476         bt->cursor_page = set->latch->page_no;
3477
3478         bt_unlockpage(bt, BtLockRead, set->latch);
3479         bt_unpinlatch (set->latch);
3480         return slot;
3481 }
3482
3483 BtKey *bt_key(BtDb *bt, uint slot)
3484 {
3485         return keyptr(bt->cursor, slot);
3486 }
3487
3488 BtVal *bt_val(BtDb *bt, uint slot)
3489 {
3490         return valptr(bt->cursor,slot);
3491 }
3492
3493 #ifdef STANDALONE
3494
3495 #ifndef unix
3496 double getCpuTime(int type)
3497 {
3498 FILETIME crtime[1];
3499 FILETIME xittime[1];
3500 FILETIME systime[1];
3501 FILETIME usrtime[1];
3502 SYSTEMTIME timeconv[1];
3503 double ans = 0;
3504
3505         memset (timeconv, 0, sizeof(SYSTEMTIME));
3506
3507         switch( type ) {
3508         case 0:
3509                 GetSystemTimeAsFileTime (xittime);
3510                 FileTimeToSystemTime (xittime, timeconv);
3511                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3512                 break;
3513         case 1:
3514                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3515                 FileTimeToSystemTime (usrtime, timeconv);
3516                 break;
3517         case 2:
3518                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3519                 FileTimeToSystemTime (systime, timeconv);
3520                 break;
3521         }
3522
3523         ans += (double)timeconv->wHour * 3600;
3524         ans += (double)timeconv->wMinute * 60;
3525         ans += (double)timeconv->wSecond;
3526         ans += (double)timeconv->wMilliseconds / 1000;
3527         return ans;
3528 }
3529 #else
3530 #include <time.h>
3531 #include <sys/resource.h>
3532
3533 double getCpuTime(int type)
3534 {
3535 struct rusage used[1];
3536 struct timeval tv[1];
3537
3538         switch( type ) {
3539         case 0:
3540                 gettimeofday(tv, NULL);
3541                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3542
3543         case 1:
3544                 getrusage(RUSAGE_SELF, used);
3545                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3546
3547         case 2:
3548                 getrusage(RUSAGE_SELF, used);
3549                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3550         }
3551
3552         return 0;
3553 }
3554 #endif
3555
3556 void bt_poolaudit (BtMgr *mgr)
3557 {
3558 BtLatchSet *latch;
3559 uint entry = 0;
3560
3561         while( ++entry < mgr->latchtotal ) {
3562                 latch = mgr->latchsets + entry;
3563
3564                 if( *latch->readwr->rin & MASK )
3565                         fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3566
3567                 if( *latch->access->rin & MASK )
3568                         fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3569
3570                 if( *latch->parent->ticket != *latch->parent->serving )
3571                         fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3572
3573                 if( *latch->atomic->ticket != *latch->atomic->serving )
3574                         fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3575
3576                 if( *latch->modify->exclusive )
3577                         fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3578
3579                 if( latch->pin & ~CLOCK_bit )
3580                         fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3581         }
3582 }
3583
3584 typedef struct {
3585         char idx;
3586         char *type;
3587         char *infile;
3588         BtMgr *mgr;
3589         int num;
3590 } ThreadArg;
3591
3592 //  standalone program to index file of keys
3593 //  then list them onto std-out
3594
3595 #ifdef unix
3596 void *index_file (void *arg)
3597 #else
3598 uint __stdcall index_file (void *arg)
3599 #endif
3600 {
3601 int line = 0, found = 0, cnt = 0, idx;
3602 uid next, page_no = LEAF_page;  // start on first page of leaves
3603 int ch, len = 0, slot, type = 0;
3604 unsigned char key[BT_maxkey];
3605 unsigned char txn[65536];
3606 ThreadArg *args = arg;
3607 BtPageSet set[1];
3608 uint nxt = 65536;
3609 BtPage page;
3610 BtKey *ptr;
3611 BtVal *val;
3612 BtDb *bt;
3613 FILE *in;
3614
3615         bt = bt_open (args->mgr);
3616         page = (BtPage)txn;
3617
3618         if( args->idx < strlen (args->type) )
3619                 ch = args->type[args->idx];
3620         else
3621                 ch = args->type[strlen(args->type) - 1];
3622
3623         switch(ch | 0x20)
3624         {
3625         case 'd':
3626                 type = Delete;
3627
3628         case 'p':
3629                 if( !type )
3630                         type = Unique;
3631
3632                 if( args->num )
3633                  if( type == Delete )
3634                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3635                  else
3636                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3637                 else
3638                  if( type == Delete )
3639                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3640                  else
3641                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3642
3643                 if( in = fopen (args->infile, "rb") )
3644                   while( ch = getc(in), ch != EOF )
3645                         if( ch == '\n' )
3646                         {
3647                           line++;
3648
3649                           if( !args->num ) {
3650                             if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3651                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3652                             len = 0;
3653                                 continue;
3654                           }
3655
3656                           nxt -= len - 10;
3657                           memcpy (txn + nxt, key + 10, len - 10);
3658                           nxt -= 1;
3659                           txn[nxt] = len - 10;
3660                           nxt -= 10;
3661                           memcpy (txn + nxt, key, 10);
3662                           nxt -= 1;
3663                           txn[nxt] = 10;
3664                           slotptr(page,++cnt)->off  = nxt;
3665                           slotptr(page,cnt)->type = type;
3666                           len = 0;
3667
3668                           if( cnt < args->num )
3669                                 continue;
3670
3671                           page->cnt = cnt;
3672                           page->act = cnt;
3673                           page->min = nxt;
3674
3675                           if( bt_atomictxn (bt, page) )
3676                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3677                           nxt = sizeof(txn);
3678                           cnt = 0;
3679
3680                         }
3681                         else if( len < BT_maxkey )
3682                                 key[len++] = ch;
3683                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
3684                 break;
3685
3686         case 'w':
3687                 fprintf(stderr, "started indexing for %s\n", args->infile);
3688                 if( in = fopen (args->infile, "r") )
3689                   while( ch = getc(in), ch != EOF )
3690                         if( ch == '\n' )
3691                         {
3692                           line++;
3693
3694                           if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3695                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3696                           len = 0;
3697                         }
3698                         else if( len < BT_maxkey )
3699                                 key[len++] = ch;
3700                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3701                 break;
3702
3703         case 'f':
3704                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3705                 if( in = fopen (args->infile, "rb") )
3706                   while( ch = getc(in), ch != EOF )
3707                         if( ch == '\n' )
3708                         {
3709                           line++;
3710                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3711                                 found++;
3712                           else if( bt->err )
3713                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->err, errno, bt->line, line), exit(0);
3714                           len = 0;
3715                         }
3716                         else if( len < BT_maxkey )
3717                                 key[len++] = ch;
3718                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3719                 break;
3720
3721         case 's':
3722                 fprintf(stderr, "started scanning\n");
3723                 
3724                 do {
3725                         if( set->latch = bt_pinlatch (bt, page_no, NULL) )
3726                                 set->page = bt_mappage (bt, set->latch);
3727                         else
3728                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3729                         bt_lockpage (bt, BtLockRead, set->latch);
3730                         next = bt_getid (set->page->right);
3731
3732                         for( slot = 0; slot++ < set->page->cnt; )
3733                          if( next || slot < set->page->cnt )
3734                           if( !slotptr(set->page, slot)->dead ) {
3735                                 ptr = keyptr(set->page, slot);
3736                                 len = ptr->len;
3737
3738                             if( slotptr(set->page, slot)->type == Duplicate )
3739                                         len -= BtId;
3740
3741                                 fwrite (ptr->key, len, 1, stdout);
3742                                 val = valptr(set->page, slot);
3743                                 fwrite (val->value, val->len, 1, stdout);
3744                                 fputc ('\n', stdout);
3745                                 cnt++;
3746                            }
3747
3748                         set->latch->avail = 1;
3749                         bt_unlockpage (bt, BtLockRead, set->latch);
3750                         bt_unpinlatch (set->latch);
3751                 } while( page_no = next );
3752
3753                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3754                 break;
3755
3756         case 'r':
3757                 fprintf(stderr, "started reverse scan\n");
3758                 if( slot = bt_lastkey (bt) )
3759                    while( slot = bt_prevkey (bt, slot) ) {
3760                         if( slotptr(bt->cursor, slot)->dead )
3761                           continue;
3762
3763                         ptr = keyptr(bt->cursor, slot);
3764                         len = ptr->len;
3765
3766                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3767                                 len -= BtId;
3768
3769                         fwrite (ptr->key, len, 1, stdout);
3770                         val = valptr(bt->cursor, slot);
3771                         fwrite (val->value, val->len, 1, stdout);
3772                         fputc ('\n', stdout);
3773                         cnt++;
3774                   }
3775
3776                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3777                 break;
3778
3779         case 'c':
3780 #ifdef unix
3781                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3782 #endif
3783                 fprintf(stderr, "started counting\n");
3784                 next = LEAF_page + bt->mgr->redopages + 1;
3785
3786                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3787                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3788                                 break;
3789
3790                         if( !bt->frame->free && !bt->frame->lvl )
3791                                 cnt += bt->frame->act;
3792
3793                         bt->reads++;
3794                         page_no = next++;
3795                 }
3796                 
3797                 cnt--;  // remove stopper key
3798                 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3799                 break;
3800         }
3801
3802         bt_close (bt);
3803 #ifdef unix
3804         return NULL;
3805 #else
3806         return 0;
3807 #endif
3808 }
3809
3810 typedef struct timeval timer;
3811
3812 int main (int argc, char **argv)
3813 {
3814 int idx, cnt, len, slot, err;
3815 int segsize, bits = 16;
3816 double start, stop;
3817 #ifdef unix
3818 pthread_t *threads;
3819 #else
3820 HANDLE *threads;
3821 #endif
3822 ThreadArg *args;
3823 uint poolsize = 0;
3824 uint recovery = 0;
3825 float elapsed;
3826 int num = 0;
3827 char key[1];
3828 BtMgr *mgr;
3829 BtKey *ptr;
3830 BtDb *bt;
3831
3832         if( argc < 3 ) {
3833                 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size recovery_pages src_file1 src_file2 ... ]\n", argv[0]);
3834                 fprintf (stderr, "  where idx_file is the name of the btree file\n");
3835                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3836                 fprintf (stderr, "  page_bits is the page size in bits\n");
3837                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3838                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3839                 fprintf (stderr, "  recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3840                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3841                 exit(0);
3842         }
3843
3844         start = getCpuTime(0);
3845
3846         if( argc > 3 )
3847                 bits = atoi(argv[3]);
3848
3849         if( argc > 4 )
3850                 poolsize = atoi(argv[4]);
3851
3852         if( !poolsize )
3853                 fprintf (stderr, "Warning: no mapped_pool\n");
3854
3855         if( argc > 5 )
3856                 num = atoi(argv[5]);
3857
3858         if( argc > 6 )
3859                 recovery = atoi(argv[6]);
3860
3861         cnt = argc - 7;
3862 #ifdef unix
3863         threads = malloc (cnt * sizeof(pthread_t));
3864 #else
3865         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3866 #endif
3867         args = malloc (cnt * sizeof(ThreadArg));
3868
3869         mgr = bt_mgr ((argv[1]), bits, poolsize, recovery);
3870
3871         if( !mgr ) {
3872                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3873                 exit (1);
3874         }
3875
3876         //      fire off threads
3877
3878         for( idx = 0; idx < cnt; idx++ ) {
3879                 args[idx].infile = argv[idx + 7];
3880                 args[idx].type = argv[2];
3881                 args[idx].mgr = mgr;
3882                 args[idx].num = num;
3883                 args[idx].idx = idx;
3884 #ifdef unix
3885                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3886                         fprintf(stderr, "Error creating thread %d\n", err);
3887 #else
3888                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3889 #endif
3890         }
3891
3892         //      wait for termination
3893
3894 #ifdef unix
3895         for( idx = 0; idx < cnt; idx++ )
3896                 pthread_join (threads[idx], NULL);
3897 #else
3898         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3899
3900         for( idx = 0; idx < cnt; idx++ )
3901                 CloseHandle(threads[idx]);
3902
3903 #endif
3904         bt_poolaudit(mgr);
3905         bt_mgrclose (mgr);
3906
3907         elapsed = getCpuTime(0) - start;
3908         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3909         elapsed = getCpuTime(1);
3910         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3911         elapsed = getCpuTime(2);
3912         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3913 }
3914
3915 #endif  //STANDALONE