]> pd.if.org Git - btree/blob - threadskv9c.c
Remove extra assert traps, finalize mutex lock.
[btree] / threadskv9c.c
1 // btree version threadskv9c sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      and redo log for failure recovery
10
11 // 07 OCT 2014
12
13 // author: karl malbrain, malbrain@cal.berkeley.edu
14
15 /*
16 This work, including the source code, documentation
17 and related data, is placed into the public domain.
18
19 The orginal author is Karl Malbrain.
20
21 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
22 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
23 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
24 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
25 RESULTING FROM THE USE, MODIFICATION, OR
26 REDISTRIBUTION OF THIS SOFTWARE.
27 */
28
29 // Please see the project home page for documentation
30 // code.google.com/p/high-concurrency-btree
31
32 #define _FILE_OFFSET_BITS 64
33 #define _LARGEFILE64_SOURCE
34
35 #ifdef linux
36 #define _GNU_SOURCE
37 #include <linux/futex.h>
38 #define SYS_futex 202
39 #endif
40
41 #ifdef unix
42 #include <unistd.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <fcntl.h>
46 #include <sys/time.h>
47 #include <sys/mman.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <limits.h>
51 #else
52 #define WIN32_LEAN_AND_MEAN
53 #include <windows.h>
54 #include <stdio.h>
55 #include <stdlib.h>
56 #include <time.h>
57 #include <fcntl.h>
58 #include <process.h>
59 #include <intrin.h>
60 #endif
61
62 #include <memory.h>
63 #include <string.h>
64 #include <stddef.h>
65
66 typedef unsigned long long      uid;
67 typedef unsigned long long      logseqno;
68
69 #ifndef unix
70 typedef unsigned long long      off64_t;
71 typedef unsigned short          ushort;
72 typedef unsigned int            uint;
73 #endif
74
75 #define BT_ro 0x6f72    // ro
76 #define BT_rw 0x7772    // rw
77
78 #define BT_maxbits              24                                      // maximum page size in bits
79 #define BT_minbits              9                                       // minimum page size in bits
80 #define BT_minpage              (1 << BT_minbits)       // minimum page size
81 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
82
83 //  BTree page number constants
84 #define ALLOC_page              0       // allocation page
85 #define ROOT_page               1       // root of the btree
86 #define LEAF_page               2       // first page of leaves
87 #define REDO_page               3       // first page of redo buffer
88
89 //      Number of levels to create in a new BTree
90
91 #define MIN_lvl                 2
92
93 /*
94 There are six lock types for each node in four independent sets: 
95 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
96 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
97 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
98 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
99 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
100 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
101 */
102
103 typedef enum{
104         BtLockAccess = 1,
105         BtLockDelete = 2,
106         BtLockRead   = 4,
107         BtLockWrite  = 8,
108         BtLockParent = 16,
109         BtLockAtomic = 32
110 } BtLock;
111
112 //      lite weight mutex
113
114 typedef struct {
115   union {
116         struct {
117           uint xlock:1;         // one writer has exclusive lock
118           uint wrt:31;          // count of other writers waiting
119         } bits[1];
120         uint value[1];
121   };
122 } BtMutexLatch;
123
124 #define XCL 1
125 #define WRT 2
126
127 //      mode & definition for lite latch implementation
128
129 enum {
130         QueRd = 1,              // reader queue
131         QueWr = 2               // writer queue
132 } RWQueue;
133
134 //      definition for phase-fair reader/writer lock implementation
135
136 typedef struct {
137         volatile ushort rin[1];
138         volatile ushort rout[1];
139         volatile ushort ticket[1];
140         volatile ushort serving[1];
141         ushort tid;
142         ushort dup;
143 } RWLock;
144
145 //      write only lock
146
147 typedef struct {
148         BtMutexLatch xcl[1];
149         ushort tid;
150         ushort dup;
151 } WOLock;
152
153 #define PHID 0x1
154 #define PRES 0x2
155 #define MASK 0x3
156 #define RINC 0x4
157
158 //  hash table entries
159
160 typedef struct {
161         uint entry;             // Latch table entry at head of chain
162         BtMutexLatch latch[1];
163 } BtHashEntry;
164
165 //      latch manager table structure
166
167 typedef struct {
168         uid page_no;                                    // latch set page number
169         RWLock readwr[1];                               // read/write page lock
170         RWLock access[1];                               // Access Intent/Page delete
171         WOLock parent[1];                               // Posting of fence key in parent
172         WOLock atomic[1];                               // Atomic update in progress
173         uint split;                                             // right split page atomic insert
174         uint entry;                                             // entry slot in latch table
175         uint next;                                              // next entry in hash table chain
176         uint prev;                                              // prev entry in hash table chain
177         BtMutexLatch modify[1];                 // modify entry lite latch
178         volatile ushort pin;                    // number of accessing threads
179         volatile unsigned char dirty;   // page in cache is dirty (atomic set)
180         volatile unsigned char avail;   // page is an available entry
181 } BtLatchSet;
182
183 //      Define the length of the page record numbers
184
185 #define BtId 6
186
187 //      Page key slot definition.
188
189 //      Keys are marked dead, but remain on the page until
190 //      it cleanup is called. The fence key (highest key) for
191 //      a leaf page is always present, even after cleanup.
192
193 //      Slot types
194
195 //      In addition to the Unique keys that occupy slots
196 //      there are Librarian and Duplicate key
197 //      slots occupying the key slot array.
198
199 //      The Librarian slots are dead keys that
200 //      serve as filler, available to add new Unique
201 //      or Dup slots that are inserted into the B-tree.
202
203 //      The Duplicate slots have had their key bytes extended
204 //      by 6 bytes to contain a binary duplicate key uniqueifier.
205
206 typedef enum {
207         Unique,
208         Librarian,
209         Duplicate,
210         Delete,
211         Update
212 } BtSlotType;
213
214 typedef struct {
215         uint off:BT_maxbits;    // page offset for key start
216         uint type:3;                    // type of slot
217         uint dead:1;                    // set for deleted slot
218 } BtSlot;
219
220 //      The key structure occupies space at the upper end of
221 //      each page.  It's a length byte followed by the key
222 //      bytes.
223
224 typedef struct {
225         unsigned char len;              // this can be changed to a ushort or uint
226         unsigned char key[0];
227 } BtKey;
228
229 //      the value structure also occupies space at the upper
230 //      end of the page. Each key is immediately followed by a value.
231
232 typedef struct {
233         unsigned char len;              // this can be changed to a ushort or uint
234         unsigned char value[0];
235 } BtVal;
236
237 #define BT_maxkey       255             // maximum number of bytes in a key
238 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
239
240 //      The first part of an index page.
241 //      It is immediately followed
242 //      by the BtSlot array of keys.
243
244 //      note that this structure size
245 //      must be a multiple of 8 bytes
246 //      in order to place dups correctly.
247
248 typedef struct BtPage_ {
249         uint cnt;                                       // count of keys in page
250         uint act;                                       // count of active keys
251         uint min;                                       // next key offset
252         uint garbage;                           // page garbage in bytes
253         unsigned char bits:7;           // page size in bits
254         unsigned char free:1;           // page is on free chain
255         unsigned char lvl:7;            // level of page
256         unsigned char kill:1;           // page is being deleted
257         unsigned char right[BtId];      // page number to right
258         unsigned char left[BtId];       // page number to left
259         unsigned char filler[2];        // padding to multiple of 8
260         logseqno lsn;                           // log sequence number applied
261 } *BtPage;
262
263 //  The loadpage interface object
264
265 typedef struct {
266         BtPage page;            // current page pointer
267         BtLatchSet *latch;      // current page latch set
268 } BtPageSet;
269
270 //      structure for latch manager on ALLOC_page
271
272 typedef struct {
273         struct BtPage_ alloc[1];        // next page_no in right ptr
274         unsigned long long dups[1];     // global duplicate key uniqueifier
275         unsigned char chain[BtId];      // head of free page_nos chain
276 } BtPageZero;
277
278 //      The object structure for Btree access
279
280 typedef struct {
281         uint page_size;                         // page size    
282         uint page_bits;                         // page size in bits    
283 #ifdef unix
284         int idx;
285 #else
286         HANDLE idx;
287 #endif
288         BtPageZero *pagezero;           // mapped allocation page
289         BtHashEntry *hashtable;         // the buffer pool hash table entries
290         BtLatchSet *latchsets;          // mapped latch set from buffer pool
291         unsigned char *pagepool;        // mapped to the buffer pool pages
292         unsigned char *redobuff;        // mapped recovery buffer pointer
293         logseqno lsn, flushlsn;         // current & first lsn flushed
294         BtMutexLatch dump[1];           // redo dump lite latch
295         BtMutexLatch redo[1];           // redo area lite latch
296         BtMutexLatch lock[1];           // allocation area lite latch
297         ushort thread_no[1];            // next thread number
298         uint nlatchpage;                        // number of latch pages at BT_latch
299         uint latchtotal;                        // number of page latch entries
300         uint latchhash;                         // number of latch hash table slots
301         uint latchvictim;                       // next latch entry to examine
302         uint latchavail;                        // next available latch entry
303         uint availlock[1];                      // latch available chain commitments
304         uint available;                         // size of latch available chain
305         uint redopages;                         // size of recovery buff in pages
306         uint redoend;                           // eof/end element in recovery buff
307 #ifndef unix
308         HANDLE halloc;                          // allocation handle
309         HANDLE hpool;                           // buffer pool handle
310 #endif
311 } BtMgr;
312
313 typedef struct {
314         BtMgr *mgr;                             // buffer manager for thread
315         BtPage cursor;                  // cached frame for start/next (never mapped)
316         BtPage frame;                   // spare frame for the page split (never mapped)
317         uid cursor_page;                                // current cursor page number   
318         unsigned char *mem;                             // frame, cursor, page memory buffer
319         unsigned char key[BT_keyarray]; // last found complete key
320         int found;                              // last delete or insert was found
321         int err;                                // last error
322         int line;                               // last error line no
323         int reads, writes;              // number of reads and writes from the btree
324         ushort thread_no;               // thread number
325 } BtDb;
326
327 //      Catastrophic errors
328
329 typedef enum {
330         BTERR_ok = 0,
331         BTERR_struct,
332         BTERR_ovflw,
333         BTERR_lock,
334         BTERR_map,
335         BTERR_read,
336         BTERR_wrt,
337         BTERR_atomic,
338         BTERR_recovery,
339         BTERR_avail
340 } BTERR;
341
342 #define CLOCK_bit 0x8000
343
344 // recovery manager entry types
345
346 typedef enum {
347         BTRM_eof = 0,   // rest of buffer is emtpy
348         BTRM_add,               // add a unique key-value to btree
349         BTRM_dup,               // add a duplicate key-value to btree
350         BTRM_del,               // delete a key-value from btree
351         BTRM_upd,               // update a key with a new value
352         BTRM_new,               // allocate a new empty page
353         BTRM_old                // reuse an old empty page
354 } BTRM;
355
356 // recovery manager entry
357 //      structure followed by BtKey & BtVal
358
359 typedef struct {
360         logseqno reqlsn;        // log sequence number required
361         logseqno lsn;           // log sequence number for entry
362         uint len;                       // length of entry
363         unsigned char type;     // type of entry
364         unsigned char lvl;      // level of btree entry pertains to
365 } BtLogHdr;
366
367 // B-Tree functions
368
369 extern void bt_close (BtDb *bt);
370 extern BtDb *bt_open (BtMgr *mgr);
371 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
372 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
373 extern void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
374 extern void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
375 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
376 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
377 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
378 extern BtKey *bt_foundkey (BtDb *bt);
379 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
380 extern uint bt_nextkey   (BtDb *bt, uint slot);
381
382 //      manager functions
383 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint rmpages);
384 extern void bt_mgrclose (BtMgr *mgr);
385 extern logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val);
386 extern logseqno bt_txnredo (BtDb *bt, BtPage page);
387
388 //  Helper functions to return slot values
389 //      from the cursor page.
390
391 extern BtKey *bt_key (BtDb *bt, uint slot);
392 extern BtVal *bt_val (BtDb *bt, uint slot);
393
394 //  The page is allocated from low and hi ends.
395 //  The key slots are allocated from the bottom,
396 //      while the text and value of the key
397 //  are allocated from the top.  When the two
398 //  areas meet, the page is split into two.
399
400 //  A key consists of a length byte, two bytes of
401 //  index number (0 - 65535), and up to 253 bytes
402 //  of key value.
403
404 //  Associated with each key is a value byte string
405 //      containing any value desired.
406
407 //  The b-tree root is always located at page 1.
408 //      The first leaf page of level zero is always
409 //      located on page 2.
410
411 //      The b-tree pages are linked with next
412 //      pointers to facilitate enumerators,
413 //      and provide for concurrency.
414
415 //      When to root page fills, it is split in two and
416 //      the tree height is raised by a new root at page
417 //      one with two keys.
418
419 //      Deleted keys are marked with a dead bit until
420 //      page cleanup. The fence key for a leaf node is
421 //      always present
422
423 //  To achieve maximum concurrency one page is locked at a time
424 //  as the tree is traversed to find leaf key in question. The right
425 //  page numbers are used in cases where the page is being split,
426 //      or consolidated.
427
428 //  Page 0 is dedicated to lock for new page extensions,
429 //      and chains empty pages together for reuse. It also
430 //      contains the latch manager hash table.
431
432 //      The ParentModification lock on a node is obtained to serialize posting
433 //      or changing the fence key for a node.
434
435 //      Empty pages are chained together through the ALLOC page and reused.
436
437 //      Access macros to address slot and key values from the page
438 //      Page slots use 1 based indexing.
439
440 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
441 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
442 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
443
444 void bt_putid(unsigned char *dest, uid id)
445 {
446 int i = BtId;
447
448         while( i-- )
449                 dest[i] = (unsigned char)id, id >>= 8;
450 }
451
452 uid bt_getid(unsigned char *src)
453 {
454 uid id = 0;
455 int i;
456
457         for( i = 0; i < BtId; i++ )
458                 id <<= 8, id |= *src++; 
459
460         return id;
461 }
462
463 uid bt_newdup (BtDb *bt)
464 {
465 #ifdef unix
466         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
467 #else
468         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
469 #endif
470 }
471
472 //      lite weight spin lock Latch Manager
473
474 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
475 {
476         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
477 }
478
479 void bt_mutexlock(BtMutexLatch *latch)
480 {
481 BtMutexLatch prev[1];
482 uint slept = 0;
483
484   while( 1 ) {
485         *prev->value = __sync_fetch_and_or(latch->value, XCL);
486
487         if( !prev->bits->xlock ) {                      // did we set XCL bit?
488             if( slept )
489                   __sync_fetch_and_sub(latch->value, WRT);
490                 return;
491         }
492
493         if( !slept ) {
494                 prev->bits->wrt++;
495                 __sync_fetch_and_add(latch->value, WRT);
496         }
497
498         sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr);
499         slept = 1;
500   }
501 }
502
503 //      try to obtain write lock
504
505 //      return 1 if obtained,
506 //              0 otherwise
507
508 int bt_mutextry(BtMutexLatch *latch)
509 {
510 BtMutexLatch prev[1];
511
512         *prev->value = __sync_fetch_and_or(latch->value, XCL);
513
514         //      take write access if exclusive bit is clear
515
516         return !prev->bits->xlock;
517 }
518
519 //      clear write mode
520
521 void bt_releasemutex(BtMutexLatch *latch)
522 {
523 BtMutexLatch prev[1];
524
525         *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
526
527         if( prev->bits->wrt )
528           sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
529 }
530
531 //      Write-Only Queue Lock
532
533 void WriteOLock (WOLock *lock, ushort tid)
534 {
535         if( lock->tid == tid ) {
536                 lock->dup++;
537                 return;
538         }
539
540         bt_mutexlock(lock->xcl);
541         lock->tid = tid;
542 }
543
544 void WriteORelease (WOLock *lock)
545 {
546         if( lock->dup ) {
547                 lock->dup--;
548                 return;
549         }
550
551         lock->tid = 0;
552         bt_releasemutex(lock->xcl);
553 }
554
555 //      Phase-Fair reader/writer lock implementation
556
557 void WriteLock (RWLock *lock, ushort tid)
558 {
559 ushort w, r, tix;
560
561         if( lock->tid == tid ) {
562                 lock->dup++;
563                 return;
564         }
565 #ifdef unix
566         tix = __sync_fetch_and_add (lock->ticket, 1);
567 #else
568         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
569 #endif
570         // wait for our ticket to come up
571
572         while( tix != lock->serving[0] )
573 #ifdef unix
574                 sched_yield();
575 #else
576                 SwitchToThread ();
577 #endif
578
579         w = PRES | (tix & PHID);
580 #ifdef  unix
581         r = __sync_fetch_and_add (lock->rin, w);
582 #else
583         r = _InterlockedExchangeAdd16 (lock->rin, w);
584 #endif
585         while( r != *lock->rout )
586 #ifdef unix
587                 sched_yield();
588 #else
589                 SwitchToThread();
590 #endif
591         lock->tid = tid;
592 }
593
594 void WriteRelease (RWLock *lock)
595 {
596         if( lock->dup ) {
597                 lock->dup--;
598                 return;
599         }
600
601         lock->tid = 0;
602 #ifdef unix
603         __sync_fetch_and_and (lock->rin, ~MASK);
604 #else
605         _InterlockedAnd16 (lock->rin, ~MASK);
606 #endif
607         lock->serving[0]++;
608 }
609
610 //      try to obtain read lock
611 //  return 1 if successful
612
613 int ReadTry (RWLock *lock, ushort tid)
614 {
615 ushort w;
616
617         //  OK if write lock already held by same thread
618
619         if( lock->tid == tid ) {
620                 lock->dup++;
621                 return 1;
622         }
623 #ifdef unix
624         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
625 #else
626         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
627 #endif
628         if( w )
629           if( w == (*lock->rin & MASK) ) {
630 #ifdef unix
631                 __sync_fetch_and_add (lock->rin, -RINC);
632 #else
633                 _InterlockedExchangeAdd16 (lock->rin, -RINC);
634 #endif
635                 return 0;
636           }
637
638         return 1;
639 }
640
641 void ReadLock (RWLock *lock, ushort tid)
642 {
643 ushort w;
644         if( lock->tid == tid ) {
645                 lock->dup++;
646                 return;
647         }
648 #ifdef unix
649         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
650 #else
651         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
652 #endif
653         if( w )
654           while( w == (*lock->rin & MASK) )
655 #ifdef unix
656                 sched_yield ();
657 #else
658                 SwitchToThread ();
659 #endif
660 }
661
662 void ReadRelease (RWLock *lock)
663 {
664         if( lock->dup ) {
665                 lock->dup--;
666                 return;
667         }
668
669 #ifdef unix
670         __sync_fetch_and_add (lock->rout, RINC);
671 #else
672         _InterlockedExchangeAdd16 (lock->rout, RINC);
673 #endif
674 }
675
676 void bt_flushlsn (BtDb *bt)
677 {
678 uint cnt3 = 0, cnt2 = 0, cnt = 0;
679 BtLatchSet *latch;
680 BtPage page;
681 uint entry;
682
683   //    flush dirty pool pages to the btree that were
684   //    dirty before the start of this redo recovery buffer
685 fprintf(stderr, "Start flushlsn\n");
686   for( entry = 1; entry < bt->mgr->latchtotal; entry++ ) {
687         page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
688         latch = bt->mgr->latchsets + entry;
689         bt_mutexlock (latch->modify);
690         bt_lockpage(bt, BtLockRead, latch);
691
692         if( latch->dirty ) {
693           bt_writepage(bt->mgr, page, latch->page_no);
694           latch->dirty = 0, bt->writes++, cnt++;
695         }
696 if( latch->avail )
697 cnt3++;
698 if( latch->pin & ~CLOCK_bit )
699 cnt2++;
700         bt_unlockpage(bt, BtLockRead, latch);
701         bt_releasemutex (latch->modify);
702   }
703 fprintf(stderr, "End flushlsn %d pages  %d pinned  %d available\n", cnt, cnt2, cnt3);
704 }
705
706 //      recovery manager -- process current recovery buff on startup
707 //      this won't do much if previous session was properly closed.
708
709 BTERR bt_recoveryredo (BtMgr *mgr)
710 {
711 BtLogHdr *hdr, *eof;
712 uint offset = 0;
713 BtKey *key;
714 BtVal *val;
715
716   pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
717
718   hdr = (BtLogHdr *)mgr->redobuff;
719   mgr->flushlsn = hdr->lsn;
720
721   while( 1 ) {
722         hdr = (BtLogHdr *)(mgr->redobuff + offset);
723         switch( hdr->type ) {
724         case BTRM_eof:
725                 mgr->lsn = hdr->lsn;
726                 return 0;
727         case BTRM_add:          // add a unique key-value to btree
728         
729         case BTRM_dup:          // add a duplicate key-value to btree
730         case BTRM_del:          // delete a key-value from btree
731         case BTRM_upd:          // update a key with a new value
732         case BTRM_new:          // allocate a new empty page
733         case BTRM_old:          // reuse an old empty page
734                 return 0;
735         }
736   }
737 }
738
739 //      recovery manager -- dump current recovery buff & flush dirty pages
740 //              in preparation for next recovery buffer.
741
742 BTERR bt_dumpredo (BtDb *bt)
743 {
744 BtLogHdr *eof;
745 fprintf(stderr, "Flush pages ");
746
747         eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
748         memset (eof, 0, sizeof(BtLogHdr));
749
750         //  flush pages written at beginning of this redo buffer
751         //      then write the redo buffer out to disk
752
753         fdatasync (bt->mgr->idx);
754
755 fprintf(stderr, "Dump ReDo: %d bytes\n", bt->mgr->redoend);
756         pwrite (bt->mgr->idx, bt->mgr->redobuff, bt->mgr->redoend + sizeof(BtLogHdr), REDO_page << bt->mgr->page_bits);
757
758         sync_file_range (bt->mgr->idx, REDO_page << bt->mgr->page_bits, bt->mgr->redoend + sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
759
760         bt->mgr->flushlsn = bt->mgr->lsn;
761         bt->mgr->redoend = 0;
762
763         eof = (BtLogHdr *)(bt->mgr->redobuff);
764         memset (eof, 0, sizeof(BtLogHdr));
765         eof->lsn = bt->mgr->lsn;
766         return 0;
767 }
768
769 //      recovery manager -- append new entry to recovery log
770 //      flush to disk when it overflows.
771
772 logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val)
773 {
774 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
775 uint amt = sizeof(BtLogHdr);
776 BtLogHdr *hdr, *eof;
777 uint flush;
778
779         bt_mutexlock (bt->mgr->redo);
780
781         if( key )
782           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
783
784         //      see if new entry fits in buffer
785         //      flush and reset if it doesn't
786
787         if( flush = amt > size - bt->mgr->redoend ) {
788           bt_mutexlock (bt->mgr->dump);
789
790           if( bt_dumpredo (bt) )
791                 return 0;
792         }
793
794         //      fill in new entry & either eof or end block
795
796         hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
797
798         hdr->len = amt;
799         hdr->type = type;
800         hdr->lvl = lvl;
801         hdr->lsn = ++bt->mgr->lsn;
802
803         bt->mgr->redoend += amt;
804
805         eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
806         memset (eof, 0, sizeof(BtLogHdr));
807
808         //  fill in key and value
809
810         if( key ) {
811           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
812           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
813         }
814
815         bt_releasemutex(bt->mgr->redo);
816
817         if( flush ) {
818           bt_flushlsn (bt);
819           bt_releasemutex(bt->mgr->dump);
820         }
821
822         return hdr->lsn;
823 }
824
825 //      recovery manager -- append transaction to recovery log
826 //      flush to disk when it overflows.
827
828 logseqno bt_txnredo (BtDb *bt, BtPage source)
829 {
830 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
831 uint amt = 0, src, type;
832 BtLogHdr *hdr, *eof;
833 logseqno lsn;
834 uint flush;
835 BtKey *key;
836 BtVal *val;
837
838         //      determine amount of redo recovery log space required
839
840         for( src = 0; src++ < source->cnt; ) {
841           key = keyptr(source,src);
842           val = valptr(source,src);
843           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
844           amt += sizeof(BtLogHdr);
845         }
846
847         bt_mutexlock (bt->mgr->redo);
848
849         //      see if new entry fits in buffer
850         //      flush and reset if it doesn't
851
852         if( flush = amt > size - bt->mgr->redoend ) {
853           bt_mutexlock (bt->mgr->dump);
854
855           if( bt_dumpredo (bt) )
856                 return 0;
857         }
858
859         //      assign new lsn to transaction
860
861         lsn = ++bt->mgr->lsn;
862
863         //      fill in new entries
864
865         for( src = 0; src++ < source->cnt; ) {
866           key = keyptr(source, src);
867           val = valptr(source, src);
868
869           switch( slotptr(source, src)->type ) {
870           case Unique:
871                 type = BTRM_add;
872                 break;
873           case Duplicate:
874                 type = BTRM_dup;
875                 break;
876           case Delete:
877                 type = BTRM_del;
878                 break;
879           case Update:
880                 type = BTRM_upd;
881                 break;
882           }
883
884           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
885           amt += sizeof(BtLogHdr);
886
887           hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
888           hdr->len = amt;
889           hdr->type = type;
890           hdr->lsn = lsn;
891           hdr->lvl = 0;
892
893           //  fill in key and value
894
895           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
896           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
897
898           bt->mgr->redoend += amt;
899         }
900
901         eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
902         memset (eof, 0, sizeof(BtLogHdr));
903
904         bt_releasemutex(bt->mgr->redo);
905
906         if( flush ) {
907           bt_flushlsn (bt);
908           bt_releasemutex(bt->mgr->dump);
909         }
910
911         return lsn;
912 }
913
914 //      read page into buffer pool from permanent location in Btree file
915
916 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
917 {
918 off64_t off = page_no << mgr->page_bits;
919
920 #ifdef unix
921         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
922                 fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno);
923                 return BTERR_read;
924         }
925 #else
926 OVERLAPPED ovl[1];
927 uint amt[1];
928
929         memset (ovl, 0, sizeof(OVERLAPPED));
930         ovl->Offset = off;
931         ovl->OffsetHigh = off >> 32;
932
933         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
934                 fprintf (stderr, "Unable to read page %d GetLastError = %d\n", page_no, GetLastError());
935                 return BTERR_read;
936         }
937         if( *amt <  mgr->page_size ) {
938                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
939                 return BTERR_read;
940         }
941 #endif
942         return 0;
943 }
944
945 //      write page to permanent location in Btree file
946 //      clear the dirty bit
947
948 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
949 {
950 off64_t off = page_no << mgr->page_bits;
951
952 #ifdef unix
953         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
954                 return BTERR_wrt;
955 #else
956 OVERLAPPED ovl[1];
957 uint amt[1];
958
959         memset (ovl, 0, sizeof(OVERLAPPED));
960         ovl->Offset = off;
961         ovl->OffsetHigh = off >> 32;
962
963         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
964                 return BTERR_wrt;
965
966         if( *amt <  mgr->page_size )
967                 return BTERR_wrt;
968 #endif
969         return 0;
970 }
971
972 //      set CLOCK bit in latch
973 //      decrement pin count
974
975 void bt_unpinlatch (BtLatchSet *latch)
976 {
977         bt_mutexlock(latch->modify);
978         latch->pin |= CLOCK_bit;
979         latch->pin--;
980         bt_releasemutex(latch->modify);
981 }
982
983 //  return the btree cached page address
984 //      if page is dirty and has not yet been
985 //      flushed to disk for the current redo
986 //      recovery buffer, write it out.
987
988 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
989 {
990 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
991
992   return page;
993 }
994
995 //  return next available latch entry
996 //        and with latch entry locked
997
998 uint bt_availnext (BtDb *bt)
999 {
1000 BtLatchSet *latch;
1001 uint entry;
1002
1003   while( 1 ) {
1004 #ifdef unix
1005         entry = __sync_fetch_and_add (&bt->mgr->latchavail, 1) + 1;
1006 #else
1007         entry = _InterlockedIncrement (&bt->mgr->latchavail);
1008 #endif
1009         entry %= bt->mgr->latchtotal;
1010
1011         if( !entry )
1012                 continue;
1013
1014         latch = bt->mgr->latchsets + entry;
1015
1016         if( !latch->avail )
1017                 continue;
1018
1019         bt_mutexlock(latch->modify);
1020
1021         if( !latch->avail ) {
1022                 bt_releasemutex(latch->modify);
1023                 continue;
1024         }
1025
1026         return entry;
1027   }
1028 }
1029
1030 //      find and add the next available latch entry
1031 //      to the queue
1032
1033 BTERR bt_availlatch (BtDb *bt)
1034 {
1035 BtLatchSet *latch;
1036 uint startattempt;
1037 uint cnt, entry;
1038 uint hashidx;
1039 BtPage page;
1040
1041   //  find and reuse previous entry on victim
1042
1043   startattempt = bt->mgr->latchvictim;
1044
1045   while( 1 ) {
1046 #ifdef unix
1047         entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
1048 #else
1049         entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
1050 #endif
1051         //      skip entry if it has outstanding pins
1052
1053         entry %= bt->mgr->latchtotal;
1054
1055         if( !entry )
1056                 continue;
1057
1058         //      only go around one time before
1059         //      flushing redo recovery buffer,
1060         //      and the buffer pool to free up entries.
1061
1062         if( bt->mgr->redopages )
1063           if( bt->mgr->latchvictim - startattempt > bt->mgr->latchtotal ) {
1064             if( bt_mutextry (bt->mgr->dump) ) {
1065                  if( bt_dumpredo (bt) )
1066                   return bt->err;
1067                  bt_flushlsn (bt);
1068                 // synchronize the various threads running into this condition
1069                 //      so that only one thread does the dump and flush
1070                 } else
1071                         bt_mutexlock(bt->mgr->dump);
1072
1073                 startattempt = bt->mgr->latchvictim;
1074                 bt_releasemutex(bt->mgr->dump);
1075           }
1076
1077         latch = bt->mgr->latchsets + entry;
1078
1079         if( latch->avail )
1080           continue;
1081
1082         bt_mutexlock(latch->modify);
1083
1084         //      skip if already an available entry
1085
1086         if( latch->avail ) {
1087           bt_releasemutex(latch->modify);
1088           continue;
1089         }
1090
1091         //  skip this entry if it is pinned
1092         //      if the CLOCK bit is set
1093         //      reset it to zero.
1094
1095         if( latch->pin ) {
1096           latch->pin &= ~CLOCK_bit;
1097           bt_releasemutex(latch->modify);
1098           continue;
1099         }
1100
1101         page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
1102
1103         //      if dirty page has lsn >= last redo recovery buffer
1104         //      then hold page in the buffer pool until next redo
1105         //      recovery buffer is being written to disk.
1106
1107         if( latch->dirty )
1108           if( page->lsn >= bt->mgr->flushlsn ) {
1109             bt_releasemutex(latch->modify);
1110                 continue;
1111           }
1112
1113         //      entry is available
1114 #ifdef unix
1115         __sync_fetch_and_add (&bt->mgr->available, 1);
1116 #else
1117         _InterlockedIncrement(&bt->mgr->available);
1118 #endif
1119         latch->avail = 1;
1120         bt_releasemutex(latch->modify);
1121         return 0;
1122   }
1123 }
1124
1125 //      release available latch requests
1126
1127 void bt_availrelease (BtDb *bt, uint count)
1128 {
1129 #ifdef unix
1130         __sync_fetch_and_add(bt->mgr->availlock, -count);
1131 #else
1132         _InterlockedAdd(bt->mgr->availlock, -count);
1133 #endif
1134 }
1135
1136 //      commit available chain entries
1137 //      find available entries as required
1138
1139 void bt_availrequest (BtDb *bt, uint count)
1140 {
1141 #ifdef unix
1142         __sync_fetch_and_add(bt->mgr->availlock, count);
1143 #else
1144         _InterlockedAdd(bt->mgr->availlock, count);
1145 #endif
1146
1147         while( *bt->mgr->availlock > bt->mgr->available )
1148                 bt_availlatch (bt);
1149 }
1150
1151 //      find available latchset
1152 //      return with latchset pinned
1153
1154 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, BtPage loadit)
1155 {
1156 uint hashidx = page_no % bt->mgr->latchhash;
1157 BtLatchSet *latch;
1158 uint entry, idx;
1159 BtPage page;
1160
1161   //  try to find our entry
1162
1163   bt_mutexlock(bt->mgr->hashtable[hashidx].latch);
1164
1165   if( entry = bt->mgr->hashtable[hashidx].entry ) do
1166   {
1167         latch = bt->mgr->latchsets + entry;
1168         if( page_no == latch->page_no )
1169                 break;
1170   } while( entry = latch->next );
1171
1172   //  found our entry: increment pin
1173   //  remove from available status
1174
1175   if( entry ) {
1176         latch = bt->mgr->latchsets + entry;
1177         bt_mutexlock(latch->modify);
1178         if( latch->avail )
1179 #ifdef unix
1180           __sync_fetch_and_add (&bt->mgr->available, -1);
1181 #else
1182           _InterlockedDecrement(&bt->mgr->available);
1183 #endif
1184         latch->avail = 0;
1185         latch->pin |= CLOCK_bit;
1186         latch->pin++;
1187
1188         bt_releasemutex(latch->modify);
1189         bt_releasemutex(bt->mgr->hashtable[hashidx].latch);
1190         return latch;
1191   }
1192
1193   //  find and reuse entry from available set
1194
1195 trynext:
1196
1197   if( entry = bt_availnext (bt) )
1198         latch = bt->mgr->latchsets + entry;
1199   else
1200         return bt->line = __LINE__, bt->err = BTERR_avail, NULL;
1201
1202   idx = latch->page_no % bt->mgr->latchhash;
1203
1204   //  if latch is on a different hash chain
1205   //    unlink from the old page_no chain
1206
1207   if( latch->page_no )
1208    if( idx != hashidx ) {
1209
1210         //  skip over this entry if latch not available
1211
1212     if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
1213           bt_releasemutex(latch->modify);
1214           goto trynext;
1215         }
1216
1217         if( latch->prev )
1218           bt->mgr->latchsets[latch->prev].next = latch->next;
1219         else
1220           bt->mgr->hashtable[idx].entry = latch->next;
1221
1222         if( latch->next )
1223           bt->mgr->latchsets[latch->next].prev = latch->prev;
1224
1225     bt_releasemutex (bt->mgr->hashtable[idx].latch);
1226    }
1227
1228   //  remove available status
1229
1230   latch->avail = 0;
1231 #ifdef unix
1232   __sync_fetch_and_add (&bt->mgr->available, -1);
1233 #else
1234   _InterlockedDecrement(&bt->mgr->available);
1235 #endif
1236   page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
1237
1238   if( latch->dirty )
1239         if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
1240           return bt->line = __LINE__, NULL;
1241         else
1242           latch->dirty = 0, bt->writes++;
1243
1244   if( loadit ) {
1245         memcpy (page, loadit, bt->mgr->page_size);
1246         latch->dirty = 1;
1247   } else
1248         if( bt->err = bt_readpage (bt->mgr, page, page_no) )
1249           return bt->line = __LINE__, NULL;
1250         else
1251           bt->reads++;
1252
1253   //  link page as head of hash table chain
1254   //  if this is a never before used entry,
1255   //  or it was previously on a different
1256   //  hash table chain. Otherwise, just
1257   //  leave it in its current hash table
1258   //  chain position.
1259
1260   if( !latch->page_no || hashidx != idx ) {
1261         if( latch->next = bt->mgr->hashtable[hashidx].entry )
1262           bt->mgr->latchsets[latch->next].prev = entry;
1263
1264         bt->mgr->hashtable[hashidx].entry = entry;
1265     latch->prev = 0;
1266   }
1267
1268   //  fill in latch structure
1269
1270   latch->pin = CLOCK_bit | 1;
1271   latch->page_no = page_no;
1272   latch->entry = entry;
1273   latch->split = 0;
1274
1275   bt_releasemutex (latch->modify);
1276   bt_releasemutex (bt->mgr->hashtable[hashidx].latch);
1277   return latch;
1278 }
1279   
1280 void bt_mgrclose (BtMgr *mgr)
1281 {
1282 BtLatchSet *latch;
1283 BtLogHdr *eof;
1284 uint num = 0;
1285 BtPage page;
1286 uint slot;
1287
1288         //      flush previously written dirty pages
1289         //      and write recovery buffer to disk
1290
1291         fdatasync (mgr->idx);
1292
1293         if( mgr->redoend ) {
1294                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1295                 memset (eof, 0, sizeof(BtLogHdr));
1296
1297                 pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1298         }
1299
1300         //      write remaining dirty pool pages to the btree
1301
1302         for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1303                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1304                 latch = mgr->latchsets + slot;
1305
1306                 if( latch->dirty ) {
1307                         bt_writepage(mgr, page, latch->page_no);
1308                         latch->dirty = 0, num++;
1309                 }
1310         }
1311
1312         //      flush last batch to disk and clear
1313         //      redo recovery buffer on disk.
1314
1315         fdatasync (mgr->idx);
1316
1317         if( mgr->redopages ) {
1318                 eof = (BtLogHdr *)mgr->redobuff;
1319                 memset (eof, 0, sizeof(BtLogHdr));
1320                 eof->lsn = mgr->lsn;
1321
1322                 pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1323
1324                 sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
1325         }
1326
1327         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1328
1329 #ifdef unix
1330         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1331         munmap (mgr->pagezero, mgr->page_size);
1332 #else
1333         FlushViewOfFile(mgr->pagezero, 0);
1334         UnmapViewOfFile(mgr->pagezero);
1335         UnmapViewOfFile(mgr->pagepool);
1336         CloseHandle(mgr->halloc);
1337         CloseHandle(mgr->hpool);
1338 #endif
1339 #ifdef unix
1340         if( mgr->redopages )
1341                 free (mgr->redobuff);
1342         close (mgr->idx);
1343         free (mgr);
1344 #else
1345         if( mgr->redopages )
1346                 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1347         FlushFileBuffers(mgr->idx);
1348         CloseHandle(mgr->idx);
1349         GlobalFree (mgr);
1350 #endif
1351 }
1352
1353 //      close and release memory
1354
1355 void bt_close (BtDb *bt)
1356 {
1357 #ifdef unix
1358         if( bt->mem )
1359                 free (bt->mem);
1360 #else
1361         if( bt->mem)
1362                 VirtualFree (bt->mem, 0, MEM_RELEASE);
1363 #endif
1364         free (bt);
1365 }
1366
1367 //  open/create new btree buffer manager
1368
1369 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1370 //              size of page pool (e.g. 262144)
1371
1372 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1373 {
1374 uint lvl, attr, last, slot, idx;
1375 uint nlatchpage, latchhash;
1376 unsigned char value[BtId];
1377 int flag, initit = 0;
1378 BtPageZero *pagezero;
1379 BtLatchSet *latch;
1380 off64_t size;
1381 uint amt[1];
1382 BtMgr* mgr;
1383 BtKey* key;
1384 BtVal *val;
1385
1386         // determine sanity of page size and buffer pool
1387
1388         if( bits > BT_maxbits )
1389                 bits = BT_maxbits;
1390         else if( bits < BT_minbits )
1391                 bits = BT_minbits;
1392
1393         if( nodemax < 16 ) {
1394                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
1395                 return NULL;
1396         }
1397
1398 #ifdef unix
1399         mgr = calloc (1, sizeof(BtMgr));
1400
1401         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1402
1403         if( mgr->idx == -1 ) {
1404                 fprintf (stderr, "Unable to open btree file\n");
1405                 return free(mgr), NULL;
1406         }
1407 #else
1408         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1409         attr = FILE_ATTRIBUTE_NORMAL;
1410         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1411
1412         if( mgr->idx == INVALID_HANDLE_VALUE )
1413                 return GlobalFree(mgr), NULL;
1414 #endif
1415
1416 #ifdef unix
1417         pagezero = valloc (BT_maxpage);
1418         *amt = 0;
1419
1420         // read minimum page size to get root info
1421         //      to support raw disk partition files
1422         //      check if bits == 0 on the disk.
1423
1424         if( size = lseek (mgr->idx, 0L, 2) )
1425                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1426                         if( pagezero->alloc->bits )
1427                                 bits = pagezero->alloc->bits;
1428                         else
1429                                 initit = 1;
1430                 else
1431                         return free(mgr), free(pagezero), NULL;
1432         else
1433                 initit = 1;
1434 #else
1435         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1436         size = GetFileSize(mgr->idx, amt);
1437
1438         if( size || *amt ) {
1439                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1440                         return bt_mgrclose (mgr), NULL;
1441                 bits = pagezero->alloc->bits;
1442         } else
1443                 initit = 1;
1444 #endif
1445
1446         mgr->page_size = 1 << bits;
1447         mgr->page_bits = bits;
1448
1449         //  calculate number of latch hash table entries
1450
1451         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1452
1453         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1454         mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1455         mgr->latchtotal = nodemax;
1456
1457         if( !initit )
1458                 goto mgrlatch;
1459
1460         // initialize an empty b-tree with latch page, root page, page of leaves
1461         // and page(s) of latches and page pool cache
1462
1463         memset (pagezero, 0, 1 << bits);
1464         pagezero->alloc->bits = mgr->page_bits;
1465         bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
1466
1467         //  initialize left-most LEAF page in
1468         //      alloc->left.
1469
1470         bt_putid (pagezero->alloc->left, LEAF_page);
1471
1472         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1473                 fprintf (stderr, "Unable to create btree page zero\n");
1474                 return bt_mgrclose (mgr), NULL;
1475         }
1476
1477         memset (pagezero, 0, 1 << bits);
1478         pagezero->alloc->bits = mgr->page_bits;
1479
1480         for( lvl=MIN_lvl; lvl--; ) {
1481                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1482                 key = keyptr(pagezero->alloc, 1);
1483                 key->len = 2;           // create stopper key
1484                 key->key[0] = 0xff;
1485                 key->key[1] = 0xff;
1486
1487                 bt_putid(value, MIN_lvl - lvl + 1);
1488                 val = valptr(pagezero->alloc, 1);
1489                 val->len = lvl ? BtId : 0;
1490                 memcpy (val->value, value, val->len);
1491
1492                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1493                 pagezero->alloc->lvl = lvl;
1494                 pagezero->alloc->cnt = 1;
1495                 pagezero->alloc->act = 1;
1496
1497                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1498                         fprintf (stderr, "Unable to create btree page zero\n");
1499                         return bt_mgrclose (mgr), NULL;
1500                 }
1501         }
1502
1503 mgrlatch:
1504 #ifdef unix
1505         free (pagezero);
1506 #else
1507         VirtualFree (pagezero, 0, MEM_RELEASE);
1508 #endif
1509 #ifdef unix
1510         // mlock the pagezero page
1511
1512         flag = PROT_READ | PROT_WRITE;
1513         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1514         if( mgr->pagezero == MAP_FAILED ) {
1515                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1516                 return bt_mgrclose (mgr), NULL;
1517         }
1518         mlock (mgr->pagezero, mgr->page_size);
1519
1520         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1521         if( mgr->pagepool == MAP_FAILED ) {
1522                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1523                 return bt_mgrclose (mgr), NULL;
1524         }
1525         if( mgr->redopages = redopages )
1526                 mgr->redobuff = valloc (redopages * mgr->page_size);
1527 #else
1528         flag = PAGE_READWRITE;
1529         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1530         if( !mgr->halloc ) {
1531                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1532                 return bt_mgrclose (mgr), NULL;
1533         }
1534
1535         flag = FILE_MAP_WRITE;
1536         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1537         if( !mgr->pagezero ) {
1538                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1539                 return bt_mgrclose (mgr), NULL;
1540         }
1541
1542         flag = PAGE_READWRITE;
1543         size = (uid)mgr->nlatchpage << mgr->page_bits;
1544         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1545         if( !mgr->hpool ) {
1546                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1547                 return bt_mgrclose (mgr), NULL;
1548         }
1549
1550         flag = FILE_MAP_WRITE;
1551         mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1552         if( !mgr->pagepool ) {
1553                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1554                 return bt_mgrclose (mgr), NULL;
1555         }
1556         if( mgr->redopages = redopages )
1557                 mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
1558 #endif
1559
1560         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1561         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1562         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1563
1564         //      mark all pool entries as available
1565
1566         for( idx = 1; idx < mgr->latchtotal; idx++ ) {
1567                 latch = mgr->latchsets + idx;
1568                 latch->avail = 1;
1569                 mgr->available++;
1570         }
1571
1572         return mgr;
1573 }
1574
1575 //      open BTree access method
1576 //      based on buffer manager
1577
1578 BtDb *bt_open (BtMgr *mgr)
1579 {
1580 BtDb *bt = malloc (sizeof(*bt));
1581
1582         memset (bt, 0, sizeof(*bt));
1583         bt->mgr = mgr;
1584 #ifdef unix
1585         bt->mem = valloc (2 *mgr->page_size);
1586 #else
1587         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1588 #endif
1589         bt->frame = (BtPage)bt->mem;
1590         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1591 #ifdef unix
1592         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1593 #else
1594         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1595 #endif
1596         return bt;
1597 }
1598
1599 //  compare two keys, return > 0, = 0, or < 0
1600 //  =0: keys are same
1601 //  -1: key2 > key1
1602 //  +1: key2 < key1
1603 //  as the comparison value
1604
1605 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1606 {
1607 uint len1 = key1->len;
1608 int ans;
1609
1610         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1611                 return ans;
1612
1613         if( len1 > len2 )
1614                 return 1;
1615         if( len1 < len2 )
1616                 return -1;
1617
1618         return 0;
1619 }
1620
1621 // place write, read, or parent lock on requested page_no.
1622
1623 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1624 {
1625         switch( mode ) {
1626         case BtLockRead:
1627                 ReadLock (latch->readwr, bt->thread_no);
1628                 break;
1629         case BtLockWrite:
1630                 WriteLock (latch->readwr, bt->thread_no);
1631                 break;
1632         case BtLockAccess:
1633                 ReadLock (latch->access, bt->thread_no);
1634                 break;
1635         case BtLockDelete:
1636                 WriteLock (latch->access, bt->thread_no);
1637                 break;
1638         case BtLockParent:
1639                 WriteOLock (latch->parent, bt->thread_no);
1640                 break;
1641         case BtLockAtomic:
1642                 WriteOLock (latch->atomic, bt->thread_no);
1643                 break;
1644         case BtLockAtomic | BtLockRead:
1645                 WriteOLock (latch->atomic, bt->thread_no);
1646                 ReadLock (latch->readwr, bt->thread_no);
1647                 break;
1648         }
1649 }
1650
1651 // remove write, read, or parent lock on requested page
1652
1653 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1654 {
1655         switch( mode ) {
1656         case BtLockRead:
1657                 ReadRelease (latch->readwr);
1658                 break;
1659         case BtLockWrite:
1660                 WriteRelease (latch->readwr);
1661                 break;
1662         case BtLockAccess:
1663                 ReadRelease (latch->access);
1664                 break;
1665         case BtLockDelete:
1666                 WriteRelease (latch->access);
1667                 break;
1668         case BtLockParent:
1669                 WriteORelease (latch->parent);
1670                 break;
1671         case BtLockAtomic:
1672                 WriteORelease (latch->atomic);
1673                 break;
1674         case BtLockAtomic | BtLockRead:
1675                 WriteORelease (latch->atomic);
1676                 ReadRelease (latch->readwr);
1677                 break;
1678         }
1679 }
1680
1681 //      allocate a new page
1682 //      return with page latched, but unlocked.
1683
1684 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1685 {
1686 uid page_no;
1687 int blk;
1688
1689         //      lock allocation page
1690
1691         bt_mutexlock(bt->mgr->lock);
1692
1693         // use empty chain first
1694         // else allocate empty page
1695
1696         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1697                 if( set->latch = bt_pinlatch (bt, page_no, NULL) )
1698                         set->page = bt_mappage (bt, set->latch);
1699                 else
1700                         return bt->err = BTERR_struct, bt->line = __LINE__, -1;
1701
1702                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1703                 bt_releasemutex(bt->mgr->lock);
1704
1705                 memcpy (set->page, contents, bt->mgr->page_size);
1706                 set->latch->dirty = 1;
1707                 return 0;
1708         }
1709
1710         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1711         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1712
1713         // unlock allocation latch and
1714         //      extend file into new page.
1715
1716         bt_releasemutex(bt->mgr->lock);
1717
1718         //      don't load cache from btree page
1719
1720         if( set->latch = bt_pinlatch (bt, page_no, contents) )
1721                 set->page = bt_mappage (bt, set->latch);
1722         else
1723                 return bt->err;
1724
1725         set->latch->dirty = 1;
1726         return 0;
1727 }
1728
1729 //  find slot in page for given key at a given level
1730
1731 int bt_findslot (BtPage page, unsigned char *key, uint len)
1732 {
1733 uint diff, higher = page->cnt, low = 1, slot;
1734 uint good = 0;
1735
1736         //        make stopper key an infinite fence value
1737
1738         if( bt_getid (page->right) )
1739                 higher++;
1740         else
1741                 good++;
1742
1743         //      low is the lowest candidate.
1744         //  loop ends when they meet
1745
1746         //  higher is already
1747         //      tested as .ge. the passed key.
1748
1749         while( diff = higher - low ) {
1750                 slot = low + ( diff >> 1 );
1751                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1752                         low = slot + 1;
1753                 else
1754                         higher = slot, good++;
1755         }
1756
1757         //      return zero if key is on right link page
1758
1759         return good ? higher : 0;
1760 }
1761
1762 //  find and load page at given level for given key
1763 //      leave page rd or wr locked as requested
1764
1765 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1766 {
1767 uid page_no = ROOT_page, prevpage = 0;
1768 uint drill = 0xff, slot;
1769 BtLatchSet *prevlatch;
1770 uint mode, prevmode;
1771 BtVal *val;
1772
1773   //  start at root of btree and drill down
1774
1775   do {
1776         // determine lock mode of drill level
1777         mode = (drill == lvl) ? lock : BtLockRead; 
1778
1779         if( !(set->latch = bt_pinlatch (bt, page_no, NULL)) )
1780           return 0;
1781
1782         // obtain access lock using lock chaining with Access mode
1783
1784         if( page_no > ROOT_page )
1785           bt_lockpage(bt, BtLockAccess, set->latch);
1786
1787         set->page = bt_mappage (bt, set->latch);
1788
1789         //      release & unpin parent or left sibling page
1790
1791         if( prevpage ) {
1792           bt_unlockpage(bt, prevmode, prevlatch);
1793           bt_unpinlatch (prevlatch);
1794           prevpage = 0;
1795         }
1796
1797         // obtain mode lock using lock chaining through AccessLock
1798
1799         bt_lockpage(bt, mode, set->latch);
1800
1801         if( set->page->free )
1802                 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1803
1804         if( page_no > ROOT_page )
1805           bt_unlockpage(bt, BtLockAccess, set->latch);
1806
1807         // re-read and re-lock root after determining actual level of root
1808
1809         if( set->page->lvl != drill) {
1810                 if( set->latch->page_no != ROOT_page )
1811                         return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1812                         
1813                 drill = set->page->lvl;
1814
1815                 if( lock != BtLockRead && drill == lvl ) {
1816                   bt_unlockpage(bt, mode, set->latch);
1817                   bt_unpinlatch (set->latch);
1818                   continue;
1819                 }
1820         }
1821
1822         prevpage = set->latch->page_no;
1823         prevlatch = set->latch;
1824         prevmode = mode;
1825
1826         //  find key on page at this level
1827         //  and descend to requested level
1828
1829         if( !set->page->kill )
1830          if( slot = bt_findslot (set->page, key, len) ) {
1831           if( drill == lvl )
1832                 return slot;
1833
1834           // find next non-dead slot -- the fence key if nothing else
1835
1836           while( slotptr(set->page, slot)->dead )
1837                 if( slot++ < set->page->cnt )
1838                   continue;
1839                 else
1840                   return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1841
1842           val = valptr(set->page, slot);
1843
1844           if( val->len == BtId )
1845                 page_no = bt_getid(valptr(set->page, slot)->value);
1846           else
1847                 return bt->line = __LINE__, bt->err = BTERR_struct, 0;
1848
1849           drill--;
1850           continue;
1851          }
1852
1853         //  slide right into next page
1854
1855         page_no = bt_getid(set->page->right);
1856   } while( page_no );
1857
1858   // return error on end of right chain
1859
1860   bt->line = __LINE__, bt->err = BTERR_struct;
1861   return 0;     // return error
1862 }
1863
1864 //      return page to free list
1865 //      page must be delete & write locked
1866
1867 void bt_freepage (BtDb *bt, BtPageSet *set)
1868 {
1869         //      lock allocation page
1870
1871         bt_mutexlock (bt->mgr->lock);
1872
1873         //      store chain
1874
1875         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1876         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1877         set->latch->dirty = 1;
1878         set->page->free = 1;
1879
1880         // unlock released page
1881
1882         bt_unlockpage (bt, BtLockDelete, set->latch);
1883         bt_unlockpage (bt, BtLockWrite, set->latch);
1884         bt_unpinlatch (set->latch);
1885
1886         // unlock allocation page
1887
1888         bt_releasemutex (bt->mgr->lock);
1889 }
1890
1891 //      a fence key was deleted from a page
1892 //      push new fence value upwards
1893
1894 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1895 {
1896 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1897 unsigned char value[BtId];
1898 BtKey* ptr;
1899 uint idx;
1900
1901         //      remove the old fence value
1902
1903         ptr = keyptr(set->page, set->page->cnt);
1904         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1905         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1906         set->latch->dirty = 1;
1907
1908         //  cache new fence value
1909
1910         ptr = keyptr(set->page, set->page->cnt);
1911         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1912
1913         bt_lockpage (bt, BtLockParent, set->latch);
1914         bt_unlockpage (bt, BtLockWrite, set->latch);
1915
1916         //      insert new (now smaller) fence key
1917
1918         bt_putid (value, set->latch->page_no);
1919         ptr = (BtKey*)leftkey;
1920
1921         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1922           return bt->err;
1923
1924         //      now delete old fence key
1925
1926         ptr = (BtKey*)rightkey;
1927
1928         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1929                 return bt->err;
1930
1931         bt_unlockpage (bt, BtLockParent, set->latch);
1932         bt_unpinlatch(set->latch);
1933         return 0;
1934 }
1935
1936 //      root has a single child
1937 //      collapse a level from the tree
1938
1939 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1940 {
1941 BtPageSet child[1];
1942 uid page_no;
1943 BtVal *val;
1944 uint idx;
1945
1946   // find the child entry and promote as new root contents
1947
1948   do {
1949         for( idx = 0; idx++ < root->page->cnt; )
1950           if( !slotptr(root->page, idx)->dead )
1951                 break;
1952
1953         val = valptr(root->page, idx);
1954
1955         if( val->len == BtId )
1956                 page_no = bt_getid (valptr(root->page, idx)->value);
1957         else
1958                 return bt->line = __LINE__, bt->err = BTERR_struct;
1959
1960         if( child->latch = bt_pinlatch (bt, page_no, NULL) )
1961                 child->page = bt_mappage (bt, child->latch);
1962         else
1963                 return bt->err;
1964
1965         bt_lockpage (bt, BtLockDelete, child->latch);
1966         bt_lockpage (bt, BtLockWrite, child->latch);
1967
1968         memcpy (root->page, child->page, bt->mgr->page_size);
1969         root->latch->dirty = 1;
1970
1971         bt_freepage (bt, child);
1972
1973   } while( root->page->lvl > 1 && root->page->act == 1 );
1974
1975   bt_unlockpage (bt, BtLockWrite, root->latch);
1976   bt_unpinlatch (root->latch);
1977   return 0;
1978 }
1979
1980 //  delete a page and manage keys
1981 //  call with page writelocked
1982 //      returns with page unpinned
1983
1984 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1985 {
1986 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1987 unsigned char value[BtId];
1988 uint lvl = set->page->lvl;
1989 BtPageSet right[1];
1990 uid page_no;
1991 BtKey *ptr;
1992
1993         //      cache copy of fence key
1994         //      to post in parent
1995
1996         ptr = keyptr(set->page, set->page->cnt);
1997         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1998
1999         //      obtain lock on right page
2000
2001         page_no = bt_getid(set->page->right);
2002
2003         if( right->latch = bt_pinlatch (bt, page_no, NULL) )
2004                 right->page = bt_mappage (bt, right->latch);
2005         else
2006                 return 0;
2007
2008         bt_lockpage (bt, BtLockWrite, right->latch);
2009
2010         // cache copy of key to update
2011
2012         ptr = keyptr(right->page, right->page->cnt);
2013         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
2014
2015         if( right->page->kill )
2016                 return bt->line = __LINE__, bt->err = BTERR_struct;
2017
2018         // pull contents of right peer into our empty page
2019
2020         memcpy (set->page, right->page, bt->mgr->page_size);
2021         set->latch->dirty = 1;
2022
2023         // mark right page deleted and point it to left page
2024         //      until we can post parent updates that remove access
2025         //      to the deleted page.
2026
2027         bt_putid (right->page->right, set->latch->page_no);
2028         right->latch->dirty = 1;
2029         right->page->kill = 1;
2030
2031         bt_lockpage (bt, BtLockParent, right->latch);
2032         bt_unlockpage (bt, BtLockWrite, right->latch);
2033
2034         bt_lockpage (bt, BtLockParent, set->latch);
2035         bt_unlockpage (bt, BtLockWrite, set->latch);
2036
2037         // redirect higher key directly to our new node contents
2038
2039         bt_putid (value, set->latch->page_no);
2040         ptr = (BtKey*)higherfence;
2041
2042         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2043           return bt->err;
2044
2045         //      delete old lower key to our node
2046
2047         ptr = (BtKey*)lowerfence;
2048
2049         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
2050           return bt->err;
2051
2052         //      obtain delete and write locks to right node
2053
2054         bt_unlockpage (bt, BtLockParent, right->latch);
2055         bt_lockpage (bt, BtLockDelete, right->latch);
2056         bt_lockpage (bt, BtLockWrite, right->latch);
2057         bt_freepage (bt, right);
2058
2059         bt_unlockpage (bt, BtLockParent, set->latch);
2060         bt_unpinlatch (set->latch);
2061         return 0;
2062 }
2063
2064 //  find and delete key on page by marking delete flag bit
2065 //  if page becomes empty, delete it from the btree
2066
2067 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
2068 {
2069 uint slot, idx, found, fence;
2070 BtPageSet set[1];
2071 BtKey *ptr;
2072 BtVal *val;
2073
2074         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
2075                 ptr = keyptr(set->page, slot);
2076         else
2077                 return bt->err;
2078
2079         // if librarian slot, advance to real slot
2080
2081         if( slotptr(set->page, slot)->type == Librarian )
2082                 ptr = keyptr(set->page, ++slot);
2083
2084         fence = slot == set->page->cnt;
2085
2086         // if key is found delete it, otherwise ignore request
2087
2088         if( found = !keycmp (ptr, key, len) )
2089           if( found = slotptr(set->page, slot)->dead == 0 ) {
2090                 val = valptr(set->page,slot);
2091                 slotptr(set->page, slot)->dead = 1;
2092                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2093                 set->page->act--;
2094
2095                 // collapse empty slots beneath the fence
2096
2097                 while( idx = set->page->cnt - 1 )
2098                   if( slotptr(set->page, idx)->dead ) {
2099                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2100                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2101                   } else
2102                         break;
2103           }
2104
2105         //      did we delete a fence key in an upper level?
2106
2107         if( found && lvl && set->page->act && fence )
2108           if( bt_fixfence (bt, set, lvl) )
2109                 return bt->err;
2110           else
2111                 return 0;
2112
2113         //      do we need to collapse root?
2114
2115         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
2116           if( bt_collapseroot (bt, set) )
2117                 return bt->err;
2118           else
2119                 return 0;
2120
2121         //      delete empty page
2122
2123         if( !set->page->act )
2124                 return bt_deletepage (bt, set);
2125
2126         set->latch->dirty = 1;
2127         bt_unlockpage(bt, BtLockWrite, set->latch);
2128         bt_unpinlatch (set->latch);
2129         return 0;
2130 }
2131
2132 BtKey *bt_foundkey (BtDb *bt)
2133 {
2134         return (BtKey*)(bt->key);
2135 }
2136
2137 //      advance to next slot
2138
2139 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2140 {
2141 BtLatchSet *prevlatch;
2142 uid page_no;
2143
2144         if( slot < set->page->cnt )
2145                 return slot + 1;
2146
2147         prevlatch = set->latch;
2148
2149         if( page_no = bt_getid(set->page->right) )
2150           if( set->latch = bt_pinlatch (bt, page_no, NULL) )
2151                 set->page = bt_mappage (bt, set->latch);
2152           else
2153                 return 0;
2154         else
2155           return bt->err = BTERR_struct, bt->line = __LINE__, 0;
2156
2157         // obtain access lock using lock chaining with Access mode
2158
2159         bt_lockpage(bt, BtLockAccess, set->latch);
2160
2161         bt_unlockpage(bt, BtLockRead, prevlatch);
2162         bt_unpinlatch (prevlatch);
2163
2164         bt_lockpage(bt, BtLockRead, set->latch);
2165         bt_unlockpage(bt, BtLockAccess, set->latch);
2166         return 1;
2167 }
2168
2169 //      find unique key or first duplicate key in
2170 //      leaf level and return number of value bytes
2171 //      or (-1) if not found.  Setup key for bt_foundkey
2172
2173 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2174 {
2175 BtPageSet set[1];
2176 uint len, slot;
2177 int ret = -1;
2178 BtKey *ptr;
2179 BtVal *val;
2180
2181   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
2182    do {
2183         ptr = keyptr(set->page, slot);
2184
2185         //      skip librarian slot place holder
2186
2187         if( slotptr(set->page, slot)->type == Librarian )
2188                 ptr = keyptr(set->page, ++slot);
2189
2190         //      return actual key found
2191
2192         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2193         len = ptr->len;
2194
2195         if( slotptr(set->page, slot)->type == Duplicate )
2196                 len -= BtId;
2197
2198         //      not there if we reach the stopper key
2199
2200         if( slot == set->page->cnt )
2201           if( !bt_getid (set->page->right) )
2202                 break;
2203
2204         // if key exists, return >= 0 value bytes copied
2205         //      otherwise return (-1)
2206
2207         if( slotptr(set->page, slot)->dead )
2208                 continue;
2209
2210         if( keylen == len )
2211           if( !memcmp (ptr->key, key, len) ) {
2212                 val = valptr (set->page,slot);
2213                 if( valmax > val->len )
2214                         valmax = val->len;
2215                 memcpy (value, val->value, valmax);
2216                 ret = valmax;
2217           }
2218
2219         break;
2220
2221    } while( slot = bt_findnext (bt, set, slot) );
2222
2223   bt_unlockpage (bt, BtLockRead, set->latch);
2224   bt_unpinlatch (set->latch);
2225   return ret;
2226 }
2227
2228 //      check page for space available,
2229 //      clean if necessary and return
2230 //      0 - page needs splitting
2231 //      >0  new slot value
2232
2233 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
2234 {
2235 uint nxt = bt->mgr->page_size;
2236 BtPage page = set->page;
2237 uint cnt = 0, idx = 0;
2238 uint max = page->cnt;
2239 uint newslot = max;
2240 BtKey *key;
2241 BtVal *val;
2242
2243         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2244                 return slot;
2245
2246         //      skip cleanup and proceed to split
2247         //      if there's not enough garbage
2248         //      to bother with.
2249
2250         if( page->garbage < nxt / 5 )
2251                 return 0;
2252
2253         memcpy (bt->frame, page, bt->mgr->page_size);
2254
2255         // skip page info and set rest of page to zero
2256
2257         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2258         set->latch->dirty = 1;
2259
2260         page->garbage = 0;
2261         page->act = 0;
2262
2263         // clean up page first by
2264         // removing deleted keys
2265
2266         while( cnt++ < max ) {
2267                 if( cnt == slot )
2268                         newslot = idx + 2;
2269
2270                 if( cnt < max || bt->frame->lvl )
2271                   if( slotptr(bt->frame,cnt)->dead )
2272                         continue;
2273
2274                 // copy the value across
2275
2276                 val = valptr(bt->frame, cnt);
2277                 nxt -= val->len + sizeof(BtVal);
2278                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2279
2280                 // copy the key across
2281
2282                 key = keyptr(bt->frame, cnt);
2283                 nxt -= key->len + sizeof(BtKey);
2284                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2285
2286                 // make a librarian slot
2287
2288                 slotptr(page, ++idx)->off = nxt;
2289                 slotptr(page, idx)->type = Librarian;
2290                 slotptr(page, idx)->dead = 1;
2291
2292                 // set up the slot
2293
2294                 slotptr(page, ++idx)->off = nxt;
2295                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2296
2297                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2298                         page->act++;
2299         }
2300
2301         page->min = nxt;
2302         page->cnt = idx;
2303
2304         //      see if page has enough space now, or does it need splitting?
2305
2306         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2307                 return newslot;
2308
2309         return 0;
2310 }
2311
2312 // split the root and raise the height of the btree
2313
2314 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
2315 {  
2316 unsigned char leftkey[BT_keyarray];
2317 uint nxt = bt->mgr->page_size;
2318 unsigned char value[BtId];
2319 BtPageSet left[1];
2320 uid left_page_no;
2321 BtKey *ptr;
2322 BtVal *val;
2323
2324         //      save left page fence key for new root
2325
2326         ptr = keyptr(root->page, root->page->cnt);
2327         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2328
2329         //  Obtain an empty page to use, and copy the current
2330         //  root contents into it, e.g. lower keys
2331
2332         if( bt_newpage(bt, left, root->page) )
2333                 return bt->err;
2334
2335         left_page_no = left->latch->page_no;
2336         bt_unpinlatch (left->latch);
2337
2338         // preserve the page info at the bottom
2339         // of higher keys and set rest to zero
2340
2341         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2342
2343         // insert stopper key at top of newroot page
2344         // and increase the root height
2345
2346         nxt -= BtId + sizeof(BtVal);
2347         bt_putid (value, right->page_no);
2348         val = (BtVal *)((unsigned char *)root->page + nxt);
2349         memcpy (val->value, value, BtId);
2350         val->len = BtId;
2351
2352         nxt -= 2 + sizeof(BtKey);
2353         slotptr(root->page, 2)->off = nxt;
2354         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2355         ptr->len = 2;
2356         ptr->key[0] = 0xff;
2357         ptr->key[1] = 0xff;
2358
2359         // insert lower keys page fence key on newroot page as first key
2360
2361         nxt -= BtId + sizeof(BtVal);
2362         bt_putid (value, left_page_no);
2363         val = (BtVal *)((unsigned char *)root->page + nxt);
2364         memcpy (val->value, value, BtId);
2365         val->len = BtId;
2366
2367         ptr = (BtKey *)leftkey;
2368         nxt -= ptr->len + sizeof(BtKey);
2369         slotptr(root->page, 1)->off = nxt;
2370         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2371         
2372         bt_putid(root->page->right, 0);
2373         root->page->min = nxt;          // reset lowest used offset and key count
2374         root->page->cnt = 2;
2375         root->page->act = 2;
2376         root->page->lvl++;
2377
2378         // release and unpin root pages
2379
2380         bt_unlockpage(bt, BtLockWrite, root->latch);
2381         bt_unpinlatch (root->latch);
2382
2383         bt_unpinlatch (right);
2384         return 0;
2385 }
2386
2387 //  split already locked full node
2388 //      leave it locked.
2389 //      return pool entry for new right
2390 //      page, unlocked
2391
2392 uint bt_splitpage (BtDb *bt, BtPageSet *set)
2393 {
2394 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2395 uint lvl = set->page->lvl;
2396 BtPageSet right[1];
2397 BtKey *key, *ptr;
2398 BtVal *val, *src;
2399 uid right2;
2400 uint prev;
2401
2402         //  split higher half of keys to bt->frame
2403
2404         memset (bt->frame, 0, bt->mgr->page_size);
2405         max = set->page->cnt;
2406         cnt = max / 2;
2407         idx = 0;
2408
2409         while( cnt++ < max ) {
2410                 if( cnt < max || set->page->lvl )
2411                   if( slotptr(set->page, cnt)->dead )
2412                         continue;
2413
2414                 src = valptr(set->page, cnt);
2415                 nxt -= src->len + sizeof(BtVal);
2416                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
2417
2418                 key = keyptr(set->page, cnt);
2419                 nxt -= key->len + sizeof(BtKey);
2420                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
2421                 memcpy (ptr, key, key->len + sizeof(BtKey));
2422
2423                 //      add librarian slot
2424
2425                 slotptr(bt->frame, ++idx)->off = nxt;
2426                 slotptr(bt->frame, idx)->type = Librarian;
2427                 slotptr(bt->frame, idx)->dead = 1;
2428
2429                 //  add actual slot
2430
2431                 slotptr(bt->frame, ++idx)->off = nxt;
2432                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2433
2434                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2435                         bt->frame->act++;
2436         }
2437
2438         bt->frame->bits = bt->mgr->page_bits;
2439         bt->frame->min = nxt;
2440         bt->frame->cnt = idx;
2441         bt->frame->lvl = lvl;
2442
2443         // link right node
2444
2445         if( set->latch->page_no > ROOT_page )
2446                 bt_putid (bt->frame->right, bt_getid (set->page->right));
2447
2448         //      get new free page and write higher keys to it.
2449
2450         if( bt_newpage(bt, right, bt->frame) )
2451                 return 0;
2452
2453         // process lower keys
2454
2455         memcpy (bt->frame, set->page, bt->mgr->page_size);
2456         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2457         set->latch->dirty = 1;
2458
2459         nxt = bt->mgr->page_size;
2460         set->page->garbage = 0;
2461         set->page->act = 0;
2462         max /= 2;
2463         cnt = 0;
2464         idx = 0;
2465
2466         if( slotptr(bt->frame, max)->type == Librarian )
2467                 max--;
2468
2469         //  assemble page of smaller keys
2470
2471         while( cnt++ < max ) {
2472                 if( slotptr(bt->frame, cnt)->dead )
2473                         continue;
2474                 val = valptr(bt->frame, cnt);
2475                 nxt -= val->len + sizeof(BtVal);
2476                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2477
2478                 key = keyptr(bt->frame, cnt);
2479                 nxt -= key->len + sizeof(BtKey);
2480                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2481
2482                 //      add librarian slot
2483
2484                 slotptr(set->page, ++idx)->off = nxt;
2485                 slotptr(set->page, idx)->type = Librarian;
2486                 slotptr(set->page, idx)->dead = 1;
2487
2488                 //      add actual slot
2489
2490                 slotptr(set->page, ++idx)->off = nxt;
2491                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2492                 set->page->act++;
2493         }
2494
2495         bt_putid(set->page->right, right->latch->page_no);
2496         set->page->min = nxt;
2497         set->page->cnt = idx;
2498
2499         return right->latch->entry;
2500 }
2501
2502 //      fix keys for newly split page
2503 //      call with page locked, return
2504 //      unlocked
2505
2506 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2507 {
2508 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2509 unsigned char value[BtId];
2510 uint lvl = set->page->lvl;
2511 BtPage page;
2512 BtKey *ptr;
2513
2514         // if current page is the root page, split it
2515
2516         if( set->latch->page_no == ROOT_page )
2517                 return bt_splitroot (bt, set, right);
2518
2519         ptr = keyptr(set->page, set->page->cnt);
2520         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2521
2522         page = bt_mappage (bt, right);
2523
2524         ptr = keyptr(page, page->cnt);
2525         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2526
2527         // insert new fences in their parent pages
2528
2529         bt_lockpage (bt, BtLockParent, right);
2530
2531         bt_lockpage (bt, BtLockParent, set->latch);
2532         bt_unlockpage (bt, BtLockWrite, set->latch);
2533
2534         // insert new fence for reformulated left block of smaller keys
2535
2536         bt_putid (value, set->latch->page_no);
2537         ptr = (BtKey *)leftkey;
2538
2539         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2540                 return bt->err;
2541
2542         // switch fence for right block of larger keys to new right page
2543
2544         bt_putid (value, right->page_no);
2545         ptr = (BtKey *)rightkey;
2546
2547         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2548                 return bt->err;
2549
2550         bt_unlockpage (bt, BtLockParent, set->latch);
2551         bt_unpinlatch (set->latch);
2552
2553         bt_unlockpage (bt, BtLockParent, right);
2554         bt_unpinlatch (right);
2555         return 0;
2556 }
2557
2558 //      install new key and value onto page
2559 //      page must already be checked for
2560 //      adequate space
2561
2562 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2563 {
2564 uint idx, librarian;
2565 BtSlot *node;
2566 BtKey *ptr;
2567 BtVal *val;
2568
2569         //      if found slot > desired slot and previous slot
2570         //      is a librarian slot, use it
2571
2572         if( slot > 1 )
2573           if( slotptr(set->page, slot-1)->type == Librarian )
2574                 slot--;
2575
2576         // copy value onto page
2577
2578         set->page->min -= vallen + sizeof(BtVal);
2579         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2580         memcpy (val->value, value, vallen);
2581         val->len = vallen;
2582
2583         // copy key onto page
2584
2585         set->page->min -= keylen + sizeof(BtKey);
2586         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2587         memcpy (ptr->key, key, keylen);
2588         ptr->len = keylen;
2589         
2590         //      find first empty slot
2591
2592         for( idx = slot; idx < set->page->cnt; idx++ )
2593           if( slotptr(set->page, idx)->dead )
2594                 break;
2595
2596         // now insert key into array before slot
2597
2598         if( idx == set->page->cnt )
2599                 idx += 2, set->page->cnt += 2, librarian = 2;
2600         else
2601                 librarian = 1;
2602
2603         set->latch->dirty = 1;
2604         set->page->act++;
2605
2606         while( idx > slot + librarian - 1 )
2607                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2608
2609         //      add librarian slot
2610
2611         if( librarian > 1 ) {
2612                 node = slotptr(set->page, slot++);
2613                 node->off = set->page->min;
2614                 node->type = Librarian;
2615                 node->dead = 1;
2616         }
2617
2618         //      fill in new slot
2619
2620         node = slotptr(set->page, slot);
2621         node->off = set->page->min;
2622         node->type = type;
2623         node->dead = 0;
2624
2625         if( release ) {
2626                 bt_unlockpage (bt, BtLockWrite, set->latch);
2627                 bt_unpinlatch (set->latch);
2628         }
2629
2630         return 0;
2631 }
2632
2633 //  Insert new key into the btree at given level.
2634 //      either add a new key or update/add an existing one
2635
2636 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2637 {
2638 unsigned char newkey[BT_keyarray];
2639 uint slot, idx, len, entry;
2640 BtPageSet set[1];
2641 BtKey *ptr, *ins;
2642 uid sequence;
2643 BtVal *val;
2644 uint type;
2645
2646   // set up the key we're working on
2647
2648   ins = (BtKey*)newkey;
2649   memcpy (ins->key, key, keylen);
2650   ins->len = keylen;
2651
2652   // is this a non-unique index value?
2653
2654   if( unique )
2655         type = Unique;
2656   else {
2657         type = Duplicate;
2658         sequence = bt_newdup (bt);
2659         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2660         ins->len += BtId;
2661   }
2662
2663   while( 1 ) { // find the page and slot for the current key
2664         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2665                 ptr = keyptr(set->page, slot);
2666         else {
2667                 if( !bt->err )
2668                         bt->line = __LINE__, bt->err = BTERR_ovflw;
2669                 return bt->err;
2670         }
2671
2672         // if librarian slot == found slot, advance to real slot
2673
2674         if( slotptr(set->page, slot)->type == Librarian )
2675           if( !keycmp (ptr, key, keylen) )
2676                 ptr = keyptr(set->page, ++slot);
2677
2678         len = ptr->len;
2679
2680         if( slotptr(set->page, slot)->type == Duplicate )
2681                 len -= BtId;
2682
2683         //  if inserting a duplicate key or unique key
2684         //      check for adequate space on the page
2685         //      and insert the new key before slot.
2686
2687         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2688           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2689                 if( !(entry = bt_splitpage (bt, set)) )
2690                   return bt->err;
2691                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2692                   return bt->err;
2693                 else
2694                   continue;
2695
2696           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2697         }
2698
2699         // if key already exists, update value and return
2700
2701         val = valptr(set->page, slot);
2702
2703         if( val->len >= vallen ) {
2704                 if( slotptr(set->page, slot)->dead )
2705                         set->page->act++;
2706                 set->page->garbage += val->len - vallen;
2707                 set->latch->dirty = 1;
2708                 slotptr(set->page, slot)->dead = 0;
2709                 val->len = vallen;
2710                 memcpy (val->value, value, vallen);
2711                 bt_unlockpage(bt, BtLockWrite, set->latch);
2712                 bt_unpinlatch (set->latch);
2713                 return 0;
2714         }
2715
2716         //      new update value doesn't fit in existing value area
2717
2718         if( !slotptr(set->page, slot)->dead )
2719                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2720         else {
2721                 slotptr(set->page, slot)->dead = 0;
2722                 set->page->act++;
2723         }
2724
2725         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2726           if( !(entry = bt_splitpage (bt, set)) )
2727                 return bt->err;
2728           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2729                 return bt->err;
2730           else
2731                 continue;
2732
2733         set->page->min -= vallen + sizeof(BtVal);
2734         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2735         memcpy (val->value, value, vallen);
2736         val->len = vallen;
2737
2738         set->latch->dirty = 1;
2739         set->page->min -= keylen + sizeof(BtKey);
2740         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2741         memcpy (ptr->key, key, keylen);
2742         ptr->len = keylen;
2743         
2744         slotptr(set->page, slot)->off = set->page->min;
2745         bt_unlockpage(bt, BtLockWrite, set->latch);
2746         bt_unpinlatch (set->latch);
2747         return 0;
2748   }
2749   return 0;
2750 }
2751
2752 typedef struct {
2753         logseqno reqlsn;        // redo log seq no required
2754         logseqno lsn;           // redo log sequence number
2755         uint entry;                     // latch table entry number
2756         uint slot:31;           // page slot number
2757         uint reuse:1;           // reused previous page
2758 } AtomicTxn;
2759
2760 typedef struct {
2761         uid page_no;            // page number for split leaf
2762         void *next;                     // next key to insert
2763         uint entry:29;          // latch table entry number
2764         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2765         uint nounlock:1;        // don't unlock ParentModification
2766         unsigned char leafkey[BT_keyarray];
2767 } AtomicKey;
2768
2769 //      determine actual page where key is located
2770 //  return slot number
2771
2772 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2773 {
2774 BtKey *key = keyptr(source,src);
2775 uint slot = locks[src].slot;
2776 uint entry;
2777
2778         if( src > 1 && locks[src].reuse )
2779           entry = locks[src-1].entry, slot = 0;
2780         else
2781           entry = locks[src].entry;
2782
2783         if( slot ) {
2784                 set->latch = bt->mgr->latchsets + entry;
2785                 set->page = bt_mappage (bt, set->latch);
2786                 return slot;
2787         }
2788
2789         //      is locks->reuse set? or was slot zeroed?
2790         //      if so, find where our key is located 
2791         //      on current page or pages split on
2792         //      same page txn operations.
2793
2794         do {
2795                 set->latch = bt->mgr->latchsets + entry;
2796                 set->page = bt_mappage (bt, set->latch);
2797
2798                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2799                   if( slotptr(set->page, slot)->type == Librarian )
2800                         slot++;
2801                   if( locks[src].reuse )
2802                         locks[src].entry = entry;
2803                   return slot;
2804                 }
2805         } while( entry = set->latch->split );
2806
2807         bt->line = __LINE__, bt->err = BTERR_atomic;
2808         return 0;
2809 }
2810
2811 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2812 {
2813 BtKey *key = keyptr(source, src);
2814 BtVal *val = valptr(source, src);
2815 BtLatchSet *latch;
2816 BtPageSet set[1];
2817 uint entry, slot;
2818
2819   while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2820         if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) ) {
2821           if( bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2822                 return bt->err;
2823           set->page->lsn = locks[src].lsn;
2824           return 0;
2825         }
2826
2827         if( entry = bt_splitpage (bt, set) )
2828           latch = bt->mgr->latchsets + entry;
2829         else
2830           return bt->err;
2831
2832         //      splice right page into split chain
2833         //      and WriteLock it.
2834
2835         bt_lockpage(bt, BtLockWrite, latch);
2836         latch->split = set->latch->split;
2837         set->latch->split = entry;
2838         locks[src].slot = 0;
2839   }
2840
2841   return bt->line = __LINE__, bt->err = BTERR_atomic;
2842 }
2843
2844 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2845 {
2846 BtKey *key = keyptr(source, src);
2847 BtPageSet set[1];
2848 uint idx, slot;
2849 BtKey *ptr;
2850 BtVal *val;
2851
2852         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2853           ptr = keyptr(set->page, slot);
2854         else
2855           return bt->line = __LINE__, bt->err = BTERR_struct;
2856
2857         if( !keycmp (ptr, key->key, key->len) )
2858           if( !slotptr(set->page, slot)->dead )
2859                 slotptr(set->page, slot)->dead = 1;
2860           else
2861                 return 0;
2862         else
2863                 return 0;
2864
2865         val = valptr(set->page, slot);
2866         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2867         set->latch->dirty = 1;
2868         set->page->lsn = locks[src].lsn;
2869         set->page->act--;
2870         bt->found++;
2871         return 0;
2872 }
2873
2874 //      delete an empty master page for a transaction
2875
2876 //      note that the far right page never empties because
2877 //      it always contains (at least) the infinite stopper key
2878 //      and that all pages that don't contain any keys are
2879 //      deleted, or are being held under Atomic lock.
2880
2881 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2882 {
2883 BtPageSet right[1], temp[1];
2884 unsigned char value[BtId];
2885 uid right_page_no;
2886 BtKey *ptr;
2887
2888         bt_lockpage(bt, BtLockWrite, prev->latch);
2889
2890         //      grab the right sibling
2891
2892         if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), NULL) )
2893                 right->page = bt_mappage (bt, right->latch);
2894         else
2895                 return bt->err;
2896
2897         bt_lockpage(bt, BtLockAtomic, right->latch);
2898         bt_lockpage(bt, BtLockWrite, right->latch);
2899
2900         //      and pull contents over empty page
2901         //      while preserving master's left link
2902
2903         memcpy (right->page->left, prev->page->left, BtId);
2904         memcpy (prev->page, right->page, bt->mgr->page_size);
2905
2906         //      forward seekers to old right sibling
2907         //      to new page location in set
2908
2909         bt_putid (right->page->right, prev->latch->page_no);
2910         right->latch->dirty = 1;
2911         right->page->kill = 1;
2912
2913         //      remove pointer to right page for searchers by
2914         //      changing right fence key to point to the master page
2915
2916         ptr = keyptr(right->page,right->page->cnt);
2917         bt_putid (value, prev->latch->page_no);
2918
2919         if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2920                 return bt->err;
2921
2922         //  now that master page is in good shape we can
2923         //      remove its locks.
2924
2925         bt_unlockpage (bt, BtLockAtomic, prev->latch);
2926         bt_unlockpage (bt, BtLockWrite, prev->latch);
2927
2928         //  fix master's right sibling's left pointer
2929         //      to remove scanner's poiner to the right page
2930
2931         if( right_page_no = bt_getid (prev->page->right) ) {
2932           if( temp->latch = bt_pinlatch (bt, right_page_no, NULL) )
2933                 temp->page = bt_mappage (bt, temp->latch);
2934
2935           bt_lockpage (bt, BtLockWrite, temp->latch);
2936           bt_putid (temp->page->left, prev->latch->page_no);
2937           temp->latch->dirty = 1;
2938
2939           bt_unlockpage (bt, BtLockWrite, temp->latch);
2940           bt_unpinlatch (temp->latch);
2941         } else {        // master is now the far right page
2942           bt_mutexlock (bt->mgr->lock);
2943           bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2944           bt_releasemutex(bt->mgr->lock);
2945         }
2946
2947         //      now that there are no pointers to the right page
2948         //      we can delete it after the last read access occurs
2949
2950         bt_unlockpage (bt, BtLockWrite, right->latch);
2951         bt_unlockpage (bt, BtLockAtomic, right->latch);
2952         bt_lockpage (bt, BtLockDelete, right->latch);
2953         bt_lockpage (bt, BtLockWrite, right->latch);
2954         bt_freepage (bt, right);
2955         return 0;
2956 }
2957
2958 //      atomic modification of a batch of keys.
2959
2960 //      return -1 if BTERR is set
2961 //      otherwise return slot number
2962 //      causing the key constraint violation
2963 //      or zero on successful completion.
2964
2965 int bt_atomictxn (BtDb *bt, BtPage source)
2966 {
2967 uint src, idx, slot, samepage, entry, avail, que = 0;
2968 AtomicKey *head, *tail, *leaf;
2969 BtPageSet set[1], prev[1];
2970 unsigned char value[BtId];
2971 BtKey *key, *ptr, *key2;
2972 BtLatchSet *latch;
2973 AtomicTxn *locks;
2974 int result = 0;
2975 BtSlot temp[1];
2976 logseqno lsn;
2977 BtPage page;
2978 BtVal *val;
2979 uid right;
2980 int type;
2981
2982   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2983   head = NULL;
2984   tail = NULL;
2985
2986   // stable sort the list of keys into order to
2987   //    prevent deadlocks between threads.
2988
2989   for( src = 1; src++ < source->cnt; ) {
2990         *temp = *slotptr(source,src);
2991         key = keyptr (source,src);
2992
2993         for( idx = src; --idx; ) {
2994           key2 = keyptr (source,idx);
2995           if( keycmp (key, key2->key, key2->len) < 0 ) {
2996                 *slotptr(source,idx+1) = *slotptr(source,idx);
2997                 *slotptr(source,idx) = *temp;
2998           } else
2999                 break;
3000         }
3001   }
3002
3003   // reserve enough buffer pool entries
3004
3005   avail = source->cnt * 3 + bt->mgr->pagezero->alloc->lvl + 1;
3006   bt_availrequest (bt, avail);
3007
3008   // Load the leaf page for each key
3009   // group same page references with reuse bit
3010   // and determine any constraint violations
3011
3012   for( src = 0; src++ < source->cnt; ) {
3013         key = keyptr(source, src);
3014         slot = 0;
3015
3016         // first determine if this modification falls
3017         // on the same page as the previous modification
3018         //      note that the far right leaf page is a special case
3019
3020         if( samepage = src > 1 )
3021           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
3022                 slot = bt_findslot(set->page, key->key, key->len);
3023           else
3024                 bt_unlockpage(bt, BtLockRead, set->latch); 
3025
3026         if( !slot )
3027           if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) )
3028                 set->latch->split = 0;
3029           else
3030                 goto atomicerr;
3031
3032         if( slotptr(set->page, slot)->type == Librarian )
3033           ptr = keyptr(set->page, ++slot);
3034         else
3035           ptr = keyptr(set->page, slot);
3036
3037         if( !samepage ) {
3038           locks[src].entry = set->latch->entry;
3039           locks[src].slot = slot;
3040           locks[src].reuse = 0;
3041         } else {
3042           locks[src].entry = 0;
3043           locks[src].slot = 0;
3044           locks[src].reuse = 1;
3045         }
3046
3047         //      capture current lsn for master page
3048
3049         locks[src].reqlsn = set->page->lsn;
3050
3051         //      perform constraint checks
3052
3053         switch( slotptr(source, src)->type ) {
3054         case Duplicate:
3055         case Unique:
3056           if( !slotptr(set->page, slot)->dead )
3057            if( slot < set->page->cnt || bt_getid (set->page->right) )
3058             if( !keycmp (ptr, key->key, key->len) ) {
3059
3060                   // return constraint violation if key already exists
3061
3062                   bt_unlockpage(bt, BtLockRead, set->latch);
3063                   result = src;
3064
3065                   while( src ) {
3066                         if( locks[src].entry ) {
3067                           set->latch = bt->mgr->latchsets + locks[src].entry;
3068                           bt_unlockpage(bt, BtLockAtomic, set->latch);
3069                           bt_unpinlatch (set->latch);
3070                         }
3071                         src--;
3072                   }
3073                   free (locks);
3074                   return result;
3075                 }
3076           break;
3077         }
3078   }
3079
3080   //  unlock last loadpage lock
3081
3082   if( source->cnt )
3083         bt_unlockpage(bt, BtLockRead, set->latch);
3084
3085   //  and add entries to redo log
3086
3087   if( bt->mgr->redopages )
3088    if( lsn = bt_txnredo (bt, source) )
3089     for( src = 0; src++ < source->cnt; )
3090          locks[src].lsn = lsn;
3091     else
3092          goto atomicerr;
3093
3094   //  obtain write lock for each master page
3095
3096   for( src = 0; src++ < source->cnt; ) {
3097         if( locks[src].reuse )
3098           continue;
3099         else
3100           bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
3101   }
3102
3103   // insert or delete each key
3104   // process any splits or merges
3105   // release Write & Atomic latches
3106   // set ParentModifications and build
3107   // queue of keys to insert for split pages
3108   // or delete for deleted pages.
3109
3110   // run through txn list backwards
3111
3112   samepage = source->cnt + 1;
3113
3114   for( src = source->cnt; src; src-- ) {
3115         if( locks[src].reuse )
3116           continue;
3117
3118         //  perform the txn operations
3119         //      from smaller to larger on
3120         //  the same page
3121
3122         for( idx = src; idx < samepage; idx++ )
3123          switch( slotptr(source,idx)->type ) {
3124          case Delete:
3125           if( bt_atomicdelete (bt, source, locks, idx) )
3126                 goto atomicerr;
3127           break;
3128
3129         case Duplicate:
3130         case Unique:
3131           if( bt_atomicinsert (bt, source, locks, idx) )
3132                 goto atomicerr;
3133           break;
3134         }
3135
3136         //      after the same page operations have finished,
3137         //  process master page for splits or deletion.
3138
3139         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
3140         prev->page = bt_mappage (bt, prev->latch);
3141         samepage = src;
3142
3143         //  pick-up all splits from master page
3144         //      each one is already WriteLocked.
3145
3146         entry = prev->latch->split;
3147
3148         while( entry ) {
3149           set->latch = bt->mgr->latchsets + entry;
3150           set->page = bt_mappage (bt, set->latch);
3151           entry = set->latch->split;
3152
3153           // delete empty master page by undoing its split
3154           //  (this is potentially another empty page)
3155           //  note that there are no new left pointers yet
3156
3157           if( !prev->page->act ) {
3158                 memcpy (set->page->left, prev->page->left, BtId);
3159                 memcpy (prev->page, set->page, bt->mgr->page_size);
3160                 bt_lockpage (bt, BtLockDelete, set->latch);
3161                 bt_freepage (bt, set);
3162
3163                 prev->latch->split = set->latch->split;
3164                 prev->latch->dirty = 1;
3165                 continue;
3166           }
3167
3168           // remove empty page from the split chain
3169           // and return it to the free list.
3170
3171           if( !set->page->act ) {
3172                 memcpy (prev->page->right, set->page->right, BtId);
3173                 prev->latch->split = set->latch->split;
3174                 bt_lockpage (bt, BtLockDelete, set->latch);
3175                 bt_freepage (bt, set);
3176                 continue;
3177           }
3178
3179           //  schedule prev fence key update
3180
3181           ptr = keyptr(prev->page,prev->page->cnt);
3182           leaf = calloc (sizeof(AtomicKey), 1), que++;
3183
3184           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3185           leaf->page_no = prev->latch->page_no;
3186           leaf->entry = prev->latch->entry;
3187           leaf->type = 0;
3188
3189           if( tail )
3190                 tail->next = leaf;
3191           else
3192                 head = leaf;
3193
3194           tail = leaf;
3195
3196           // splice in the left link into the split page
3197
3198           bt_putid (set->page->left, prev->latch->page_no);
3199           bt_lockpage(bt, BtLockParent, prev->latch);
3200           bt_unlockpage(bt, BtLockWrite, prev->latch);
3201           *prev = *set;
3202         }
3203
3204         //  update left pointer in next right page from last split page
3205         //      (if all splits were reversed, latch->split == 0)
3206
3207         if( latch->split ) {
3208           //  fix left pointer in master's original (now split)
3209           //  far right sibling or set rightmost page in page zero
3210
3211           if( right = bt_getid (prev->page->right) ) {
3212                 if( set->latch = bt_pinlatch (bt, right, NULL) )
3213                   set->page = bt_mappage (bt, set->latch);
3214                 else
3215                   goto atomicerr;
3216
3217             bt_lockpage (bt, BtLockWrite, set->latch);
3218             bt_putid (set->page->left, prev->latch->page_no);
3219                 set->latch->dirty = 1;
3220             bt_unlockpage (bt, BtLockWrite, set->latch);
3221                 bt_unpinlatch (set->latch);
3222           } else {      // prev is rightmost page
3223             bt_mutexlock (bt->mgr->lock);
3224                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
3225             bt_releasemutex(bt->mgr->lock);
3226           }
3227
3228           //  Process last page split in chain
3229
3230           ptr = keyptr(prev->page,prev->page->cnt);
3231           leaf = calloc (sizeof(AtomicKey), 1), que++;
3232
3233           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3234           leaf->page_no = prev->latch->page_no;
3235           leaf->entry = prev->latch->entry;
3236           leaf->type = 0;
3237   
3238           if( tail )
3239                 tail->next = leaf;
3240           else
3241                 head = leaf;
3242
3243           tail = leaf;
3244
3245           bt_lockpage(bt, BtLockParent, prev->latch);
3246           bt_unlockpage(bt, BtLockWrite, prev->latch);
3247
3248           //  remove atomic lock on master page
3249
3250           bt_unlockpage(bt, BtLockAtomic, latch);
3251           continue;
3252         }
3253
3254         //  finished if prev page occupied (either master or final split)
3255
3256         if( prev->page->act ) {
3257           bt_unlockpage(bt, BtLockWrite, latch);
3258           bt_unlockpage(bt, BtLockAtomic, latch);
3259           bt_unpinlatch(latch);
3260           continue;
3261         }
3262
3263         // any and all splits were reversed, and the
3264         // master page located in prev is empty, delete it
3265         // by pulling over master's right sibling.
3266
3267         // Remove empty master's fence key
3268
3269         ptr = keyptr(prev->page,prev->page->cnt);
3270
3271         if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3272                 goto atomicerr;
3273
3274         //      perform the remainder of the delete
3275         //      from the FIFO queue
3276
3277         leaf = calloc (sizeof(AtomicKey), 1), que++;
3278
3279         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3280         leaf->page_no = prev->latch->page_no;
3281         leaf->entry = prev->latch->entry;
3282         leaf->nounlock = 1;
3283         leaf->type = 2;
3284   
3285         if( tail )
3286           tail->next = leaf;
3287         else
3288           head = leaf;
3289
3290         tail = leaf;
3291
3292         //      leave atomic lock in place until
3293         //      deletion completes in next phase.
3294
3295         bt_unlockpage(bt, BtLockWrite, prev->latch);
3296   }
3297
3298   bt_availrelease (bt, avail);
3299
3300   que *= bt->mgr->pagezero->alloc->lvl;
3301   bt_availrequest (bt, que);
3302
3303   //  add & delete keys for any pages split or merged during transaction
3304
3305   if( leaf = head )
3306     do {
3307           set->latch = bt->mgr->latchsets + leaf->entry;
3308           set->page = bt_mappage (bt, set->latch);
3309
3310           bt_putid (value, leaf->page_no);
3311           ptr = (BtKey *)leaf->leafkey;
3312
3313           switch( leaf->type ) {
3314           case 0:       // insert key
3315             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
3316                   goto atomicerr;
3317
3318                 break;
3319
3320           case 1:       // delete key
3321                 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3322                   goto atomicerr;
3323
3324                 break;
3325
3326           case 2:       // free page
3327                 if( bt_atomicfree (bt, set) )
3328                   goto atomicerr;
3329
3330                 break;
3331           }
3332
3333           if( !leaf->nounlock )
3334             bt_unlockpage (bt, BtLockParent, set->latch);
3335
3336           bt_unpinlatch (set->latch);
3337           tail = leaf->next;
3338           free (leaf);
3339         } while( leaf = tail );
3340
3341   bt_availrelease (bt, que);
3342
3343   // return success
3344
3345   free (locks);
3346   return 0;
3347 atomicerr:
3348   return -1;
3349 }
3350
3351 //      set cursor to highest slot on highest page
3352
3353 uint bt_lastkey (BtDb *bt)
3354 {
3355 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3356 BtPageSet set[1];
3357
3358         if( set->latch = bt_pinlatch (bt, page_no, NULL) )
3359                 set->page = bt_mappage (bt, set->latch);
3360         else
3361                 return 0;
3362
3363     bt_lockpage(bt, BtLockRead, set->latch);
3364         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3365     bt_unlockpage(bt, BtLockRead, set->latch);
3366         bt_unpinlatch (set->latch);
3367
3368         bt->cursor_page = page_no;
3369         return bt->cursor->cnt;
3370 }
3371
3372 //      return previous slot on cursor page
3373
3374 uint bt_prevkey (BtDb *bt, uint slot)
3375 {
3376 uid ourright, next, us = bt->cursor_page;
3377 BtPageSet set[1];
3378
3379         if( --slot )
3380                 return slot;
3381
3382         ourright = bt_getid(bt->cursor->right);
3383
3384 goleft:
3385         if( !(next = bt_getid(bt->cursor->left)) )
3386                 return 0;
3387
3388 findourself:
3389         bt->cursor_page = next;
3390
3391         if( set->latch = bt_pinlatch (bt, next, NULL) )
3392                 set->page = bt_mappage (bt, set->latch);
3393         else
3394                 return 0;
3395
3396     bt_lockpage(bt, BtLockRead, set->latch);
3397         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3398         bt_unlockpage(bt, BtLockRead, set->latch);
3399         bt_unpinlatch (set->latch);
3400         
3401         next = bt_getid (bt->cursor->right);
3402
3403         if( bt->cursor->kill )
3404                 goto findourself;
3405
3406         if( next != us )
3407           if( next == ourright )
3408                 goto goleft;
3409           else
3410                 goto findourself;
3411
3412         return bt->cursor->cnt;
3413 }
3414
3415 //  return next slot on cursor page
3416 //  or slide cursor right into next page
3417
3418 uint bt_nextkey (BtDb *bt, uint slot)
3419 {
3420 BtPageSet set[1];
3421 uid right;
3422
3423   do {
3424         right = bt_getid(bt->cursor->right);
3425
3426         while( slot++ < bt->cursor->cnt )
3427           if( slotptr(bt->cursor,slot)->dead )
3428                 continue;
3429           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3430                 return slot;
3431           else
3432                 break;
3433
3434         if( !right )
3435                 break;
3436
3437         bt->cursor_page = right;
3438
3439         if( set->latch = bt_pinlatch (bt, right, NULL) )
3440                 set->page = bt_mappage (bt, set->latch);
3441         else
3442                 return 0;
3443
3444     bt_lockpage(bt, BtLockRead, set->latch);
3445
3446         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3447
3448         bt_unlockpage(bt, BtLockRead, set->latch);
3449         bt_unpinlatch (set->latch);
3450         slot = 0;
3451
3452   } while( 1 );
3453
3454   return bt->err = 0;
3455 }
3456
3457 //  cache page of keys into cursor and return starting slot for given key
3458
3459 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3460 {
3461 BtPageSet set[1];
3462 uint slot;
3463
3464         // cache page for retrieval
3465
3466         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
3467           memcpy (bt->cursor, set->page, bt->mgr->page_size);
3468         else
3469           return 0;
3470
3471         bt->cursor_page = set->latch->page_no;
3472
3473         bt_unlockpage(bt, BtLockRead, set->latch);
3474         bt_unpinlatch (set->latch);
3475         return slot;
3476 }
3477
3478 BtKey *bt_key(BtDb *bt, uint slot)
3479 {
3480         return keyptr(bt->cursor, slot);
3481 }
3482
3483 BtVal *bt_val(BtDb *bt, uint slot)
3484 {
3485         return valptr(bt->cursor,slot);
3486 }
3487
3488 #ifdef STANDALONE
3489
3490 #ifndef unix
3491 double getCpuTime(int type)
3492 {
3493 FILETIME crtime[1];
3494 FILETIME xittime[1];
3495 FILETIME systime[1];
3496 FILETIME usrtime[1];
3497 SYSTEMTIME timeconv[1];
3498 double ans = 0;
3499
3500         memset (timeconv, 0, sizeof(SYSTEMTIME));
3501
3502         switch( type ) {
3503         case 0:
3504                 GetSystemTimeAsFileTime (xittime);
3505                 FileTimeToSystemTime (xittime, timeconv);
3506                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3507                 break;
3508         case 1:
3509                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3510                 FileTimeToSystemTime (usrtime, timeconv);
3511                 break;
3512         case 2:
3513                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3514                 FileTimeToSystemTime (systime, timeconv);
3515                 break;
3516         }
3517
3518         ans += (double)timeconv->wHour * 3600;
3519         ans += (double)timeconv->wMinute * 60;
3520         ans += (double)timeconv->wSecond;
3521         ans += (double)timeconv->wMilliseconds / 1000;
3522         return ans;
3523 }
3524 #else
3525 #include <time.h>
3526 #include <sys/resource.h>
3527
3528 double getCpuTime(int type)
3529 {
3530 struct rusage used[1];
3531 struct timeval tv[1];
3532
3533         switch( type ) {
3534         case 0:
3535                 gettimeofday(tv, NULL);
3536                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3537
3538         case 1:
3539                 getrusage(RUSAGE_SELF, used);
3540                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3541
3542         case 2:
3543                 getrusage(RUSAGE_SELF, used);
3544                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3545         }
3546
3547         return 0;
3548 }
3549 #endif
3550
3551 void bt_poolaudit (BtMgr *mgr)
3552 {
3553 BtLatchSet *latch;
3554 uint entry = 0;
3555
3556         while( ++entry < mgr->latchtotal ) {
3557                 latch = mgr->latchsets + entry;
3558
3559                 if( *latch->readwr->rin & MASK )
3560                         fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3561
3562                 if( *latch->access->rin & MASK )
3563                         fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3564
3565                 if( *latch->parent->xcl->value )
3566                         fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3567
3568                 if( *latch->atomic->xcl->value )
3569                         fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3570
3571                 if( *latch->modify->value )
3572                         fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3573
3574                 if( latch->pin & ~CLOCK_bit )
3575                         fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3576         }
3577 }
3578
3579 typedef struct {
3580         char idx;
3581         char *type;
3582         char *infile;
3583         BtMgr *mgr;
3584         int num;
3585 } ThreadArg;
3586
3587 //  standalone program to index file of keys
3588 //  then list them onto std-out
3589
3590 #ifdef unix
3591 void *index_file (void *arg)
3592 #else
3593 uint __stdcall index_file (void *arg)
3594 #endif
3595 {
3596 int line = 0, found = 0, cnt = 0, idx;
3597 uid next, page_no = LEAF_page;  // start on first page of leaves
3598 int ch, len = 0, slot, type = 0;
3599 unsigned char key[BT_maxkey];
3600 unsigned char txn[65536];
3601 ThreadArg *args = arg;
3602 BtPageSet set[1];
3603 uint nxt = 65536;
3604 BtPage page;
3605 BtKey *ptr;
3606 BtVal *val;
3607 BtDb *bt;
3608 FILE *in;
3609
3610         bt = bt_open (args->mgr);
3611         page = (BtPage)txn;
3612
3613         if( args->idx < strlen (args->type) )
3614                 ch = args->type[args->idx];
3615         else
3616                 ch = args->type[strlen(args->type) - 1];
3617
3618         switch(ch | 0x20)
3619         {
3620         case 'd':
3621                 type = Delete;
3622
3623         case 'p':
3624                 if( !type )
3625                         type = Unique;
3626
3627                 if( args->num )
3628                  if( type == Delete )
3629                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3630                  else
3631                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3632                 else
3633                  if( type == Delete )
3634                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3635                  else
3636                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3637
3638                 if( in = fopen (args->infile, "rb") )
3639                   while( ch = getc(in), ch != EOF )
3640                         if( ch == '\n' )
3641                         {
3642                           line++;
3643
3644                           if( !args->num ) {
3645                             if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3646                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3647                             len = 0;
3648                                 continue;
3649                           }
3650
3651                           nxt -= len - 10;
3652                           memcpy (txn + nxt, key + 10, len - 10);
3653                           nxt -= 1;
3654                           txn[nxt] = len - 10;
3655                           nxt -= 10;
3656                           memcpy (txn + nxt, key, 10);
3657                           nxt -= 1;
3658                           txn[nxt] = 10;
3659                           slotptr(page,++cnt)->off  = nxt;
3660                           slotptr(page,cnt)->type = type;
3661                           len = 0;
3662
3663                           if( cnt < args->num )
3664                                 continue;
3665
3666                           page->cnt = cnt;
3667                           page->act = cnt;
3668                           page->min = nxt;
3669
3670                           if( bt_atomictxn (bt, page) )
3671                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3672                           nxt = sizeof(txn);
3673                           cnt = 0;
3674
3675                         }
3676                         else if( len < BT_maxkey )
3677                                 key[len++] = ch;
3678                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
3679                 break;
3680
3681         case 'w':
3682                 fprintf(stderr, "started indexing for %s\n", args->infile);
3683                 if( in = fopen (args->infile, "r") )
3684                   while( ch = getc(in), ch != EOF )
3685                         if( ch == '\n' )
3686                         {
3687                           line++;
3688
3689                           if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3690                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3691                           len = 0;
3692                         }
3693                         else if( len < BT_maxkey )
3694                                 key[len++] = ch;
3695                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3696                 break;
3697
3698         case 'f':
3699                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3700                 if( in = fopen (args->infile, "rb") )
3701                   while( ch = getc(in), ch != EOF )
3702                         if( ch == '\n' )
3703                         {
3704                           line++;
3705                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3706                                 found++;
3707                           else if( bt->err )
3708                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->err, errno, bt->line, line), exit(0);
3709                           len = 0;
3710                         }
3711                         else if( len < BT_maxkey )
3712                                 key[len++] = ch;
3713                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3714                 break;
3715
3716         case 's':
3717                 fprintf(stderr, "started scanning\n");
3718                 
3719                 do {
3720                         if( set->latch = bt_pinlatch (bt, page_no, NULL) )
3721                                 set->page = bt_mappage (bt, set->latch);
3722                         else
3723                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3724                         bt_lockpage (bt, BtLockRead, set->latch);
3725                         next = bt_getid (set->page->right);
3726
3727                         for( slot = 0; slot++ < set->page->cnt; )
3728                          if( next || slot < set->page->cnt )
3729                           if( !slotptr(set->page, slot)->dead ) {
3730                                 ptr = keyptr(set->page, slot);
3731                                 len = ptr->len;
3732
3733                             if( slotptr(set->page, slot)->type == Duplicate )
3734                                         len -= BtId;
3735
3736                                 fwrite (ptr->key, len, 1, stdout);
3737                                 val = valptr(set->page, slot);
3738                                 fwrite (val->value, val->len, 1, stdout);
3739                                 fputc ('\n', stdout);
3740                                 cnt++;
3741                            }
3742
3743                         set->latch->avail = 1;
3744                         bt_unlockpage (bt, BtLockRead, set->latch);
3745                         bt_unpinlatch (set->latch);
3746                 } while( page_no = next );
3747
3748                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3749                 break;
3750
3751         case 'r':
3752                 fprintf(stderr, "started reverse scan\n");
3753                 if( slot = bt_lastkey (bt) )
3754                    while( slot = bt_prevkey (bt, slot) ) {
3755                         if( slotptr(bt->cursor, slot)->dead )
3756                           continue;
3757
3758                         ptr = keyptr(bt->cursor, slot);
3759                         len = ptr->len;
3760
3761                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3762                                 len -= BtId;
3763
3764                         fwrite (ptr->key, len, 1, stdout);
3765                         val = valptr(bt->cursor, slot);
3766                         fwrite (val->value, val->len, 1, stdout);
3767                         fputc ('\n', stdout);
3768                         cnt++;
3769                   }
3770
3771                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3772                 break;
3773
3774         case 'c':
3775 #ifdef unix
3776                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3777 #endif
3778                 fprintf(stderr, "started counting\n");
3779                 next = LEAF_page + bt->mgr->redopages + 1;
3780
3781                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3782                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3783                                 break;
3784
3785                         if( !bt->frame->free && !bt->frame->lvl )
3786                                 cnt += bt->frame->act;
3787
3788                         bt->reads++;
3789                         page_no = next++;
3790                 }
3791                 
3792                 cnt--;  // remove stopper key
3793                 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3794                 break;
3795         }
3796
3797         bt_close (bt);
3798 #ifdef unix
3799         return NULL;
3800 #else
3801         return 0;
3802 #endif
3803 }
3804
3805 typedef struct timeval timer;
3806
3807 int main (int argc, char **argv)
3808 {
3809 int idx, cnt, len, slot, err;
3810 int segsize, bits = 16;
3811 double start, stop;
3812 #ifdef unix
3813 pthread_t *threads;
3814 #else
3815 HANDLE *threads;
3816 #endif
3817 ThreadArg *args;
3818 uint poolsize = 0;
3819 uint recovery = 0;
3820 float elapsed;
3821 int num = 0;
3822 char key[1];
3823 BtMgr *mgr;
3824 BtKey *ptr;
3825 BtDb *bt;
3826
3827         if( argc < 3 ) {
3828                 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size recovery_pages src_file1 src_file2 ... ]\n", argv[0]);
3829                 fprintf (stderr, "  where idx_file is the name of the btree file\n");
3830                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3831                 fprintf (stderr, "  page_bits is the page size in bits\n");
3832                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3833                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3834                 fprintf (stderr, "  recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3835                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3836                 exit(0);
3837         }
3838
3839         start = getCpuTime(0);
3840
3841         if( argc > 3 )
3842                 bits = atoi(argv[3]);
3843
3844         if( argc > 4 )
3845                 poolsize = atoi(argv[4]);
3846
3847         if( !poolsize )
3848                 fprintf (stderr, "Warning: no mapped_pool\n");
3849
3850         if( argc > 5 )
3851                 num = atoi(argv[5]);
3852
3853         if( argc > 6 )
3854                 recovery = atoi(argv[6]);
3855
3856         cnt = argc - 7;
3857 #ifdef unix
3858         threads = malloc (cnt * sizeof(pthread_t));
3859 #else
3860         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3861 #endif
3862         args = malloc (cnt * sizeof(ThreadArg));
3863
3864         mgr = bt_mgr ((argv[1]), bits, poolsize, recovery);
3865
3866         if( !mgr ) {
3867                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3868                 exit (1);
3869         }
3870
3871         //      fire off threads
3872
3873         for( idx = 0; idx < cnt; idx++ ) {
3874                 args[idx].infile = argv[idx + 7];
3875                 args[idx].type = argv[2];
3876                 args[idx].mgr = mgr;
3877                 args[idx].num = num;
3878                 args[idx].idx = idx;
3879 #ifdef unix
3880                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3881                         fprintf(stderr, "Error creating thread %d\n", err);
3882 #else
3883                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3884 #endif
3885         }
3886
3887         //      wait for termination
3888
3889 #ifdef unix
3890         for( idx = 0; idx < cnt; idx++ )
3891                 pthread_join (threads[idx], NULL);
3892 #else
3893         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3894
3895         for( idx = 0; idx < cnt; idx++ )
3896                 CloseHandle(threads[idx]);
3897
3898 #endif
3899         bt_poolaudit(mgr);
3900         bt_mgrclose (mgr);
3901
3902         elapsed = getCpuTime(0) - start;
3903         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3904         elapsed = getCpuTime(1);
3905         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3906         elapsed = getCpuTime(2);
3907         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3908 }
3909
3910 #endif  //STANDALONE