]> pd.if.org Git - btree/blob - threadskv10.c
Progress made on LSM B-tree
[btree] / threadskv10.c
1 // btree version threadskv10 futex version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      redo log for failure recovery
10 //      and LSM B-trees for write optimization
11
12 // 11 OCT 2014
13
14 // author: karl malbrain, malbrain@cal.berkeley.edu
15
16 /*
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
19
20 The orginal author is Karl Malbrain.
21
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
28 */
29
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
32
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
35
36 #ifdef linux
37 #define _GNU_SOURCE
38 #include <linux/futex.h>
39 #define SYS_futex 202
40 #endif
41
42 #ifdef unix
43 #include <unistd.h>
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <fcntl.h>
47 #include <sys/time.h>
48 #include <sys/mman.h>
49 #include <errno.h>
50 #include <pthread.h>
51 #include <limits.h>
52 #else
53 #define WIN32_LEAN_AND_MEAN
54 #include <windows.h>
55 #include <stdio.h>
56 #include <stdlib.h>
57 #include <time.h>
58 #include <fcntl.h>
59 #include <process.h>
60 #include <intrin.h>
61 #endif
62
63 #include <memory.h>
64 #include <string.h>
65 #include <stddef.h>
66
67 typedef unsigned long long      uid;
68 typedef unsigned long long      logseqno;
69
70 #ifndef unix
71 typedef unsigned long long      off64_t;
72 typedef unsigned short          ushort;
73 typedef unsigned int            uint;
74 #endif
75
76 #define BT_ro 0x6f72    // ro
77 #define BT_rw 0x7772    // rw
78
79 #define BT_maxbits              26                                      // maximum page size in bits
80 #define BT_minbits              9                                       // minimum page size in bits
81 #define BT_minpage              (1 << BT_minbits)       // minimum page size
82 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
83
84 //  BTree page number constants
85 #define ALLOC_page              0       // allocation page
86 #define ROOT_page               1       // root of the btree
87 #define LEAF_page               2       // first page of leaves
88 #define REDO_page               3       // first page of redo buffer
89
90 //      Number of levels to create in a new BTree
91
92 #define MIN_lvl                 2
93
94 /*
95 There are six lock types for each node in four independent sets: 
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
101 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
102 */
103
104 typedef enum{
105         BtLockAccess = 1,
106         BtLockDelete = 2,
107         BtLockRead   = 4,
108         BtLockWrite  = 8,
109         BtLockParent = 16,
110         BtLockAtomic = 32
111 } BtLock;
112
113 //      lite weight spin latch
114
115 volatile typedef struct {
116         ushort exclusive:1;
117         ushort pending:1;
118         ushort share:14;
119 } BtMutexLatch;
120
121 #define XCL 1
122 #define PEND 2
123 #define BOTH 3
124 #define SHARE 4
125
126 /*
127 // exclusive is set for write access
128
129 volatile typedef struct {
130         unsigned char exclusive[1];
131         unsigned char filler;
132 } BtMutexLatch;
133 */
134 /*
135 typedef struct {
136   union {
137         struct {
138           volatile uint xlock:1;        // one writer has exclusive lock
139           volatile uint wrt:31;         // count of other writers waiting
140         } bits[1];
141         uint value[1];
142   };
143 } BtMutexLatch;
144 */
145 #define XCL 1
146 #define WRT 2
147
148 //      definition for phase-fair reader/writer lock implementation
149
150 typedef struct {
151         volatile ushort rin[1];
152         volatile ushort rout[1];
153         volatile ushort ticket[1];
154         volatile ushort serving[1];
155         volatile ushort tid[1];
156         volatile ushort dup[1];
157 } RWLock;
158
159 //      write only lock
160
161 typedef struct {
162         BtMutexLatch xcl[1];
163         volatile ushort tid[1];
164         volatile ushort dup[1];
165 } WOLock;
166
167 #define PHID 0x1
168 #define PRES 0x2
169 #define MASK 0x3
170 #define RINC 0x4
171
172 //      mode & definition for lite latch implementation
173
174 enum {
175         QueRd = 1,              // reader queue
176         QueWr = 2               // writer queue
177 } RWQueue;
178
179 //  hash table entries
180
181 typedef struct {
182         uint entry;             // Latch table entry at head of chain
183         BtMutexLatch latch[1];
184 } BtHashEntry;
185
186 //      latch manager table structure
187
188 typedef struct {
189         uid page_no;                    // latch set page number
190         RWLock readwr[1];               // read/write page lock
191         RWLock access[1];               // Access Intent/Page delete
192         WOLock parent[1];               // Posting of fence key in parent
193         WOLock atomic[1];               // Atomic update in progress
194         uint split;                             // right split page atomic insert
195         uint entry;                             // entry slot in latch table
196         uint next;                              // next entry in hash table chain
197         uint prev;                              // prev entry in hash table chain
198         ushort pin;                             // number of accessing threads
199         unsigned char dirty;    // page in cache is dirty (atomic set)
200         BtMutexLatch modify[1]; // modify entry lite latch
201 } BtLatchSet;
202
203 //      Define the length of the page record numbers
204
205 #define BtId 6
206
207 //      Page key slot definition.
208
209 //      Keys are marked dead, but remain on the page until
210 //      it cleanup is called. The fence key (highest key) for
211 //      a leaf page is always present, even after cleanup.
212
213 //      Slot types
214
215 //      In addition to the Unique keys that occupy slots
216 //      there are Librarian and Duplicate key
217 //      slots occupying the key slot array.
218
219 //      The Librarian slots are dead keys that
220 //      serve as filler, available to add new Unique
221 //      or Dup slots that are inserted into the B-tree.
222
223 //      The Duplicate slots have had their key bytes extended
224 //      by 6 bytes to contain a binary duplicate key uniqueifier.
225
226 typedef enum {
227         Unique,
228         Librarian,
229         Duplicate,
230         Delete
231 } BtSlotType;
232
233 typedef struct {
234         uint off:BT_maxbits;    // page offset for key start
235         uint type:3;                    // type of slot
236         uint dead:1;                    // set for deleted slot
237 } BtSlot;
238
239 //      The key structure occupies space at the upper end of
240 //      each page.  It's a length byte followed by the key
241 //      bytes.
242
243 typedef struct {
244         unsigned char len;              // this can be changed to a ushort or uint
245         unsigned char key[0];
246 } BtKey;
247
248 //      the value structure also occupies space at the upper
249 //      end of the page. Each key is immediately followed by a value.
250
251 typedef struct {
252         unsigned char len;              // this can be changed to a ushort or uint
253         unsigned char value[0];
254 } BtVal;
255
256 #define BT_maxkey       255             // maximum number of bytes in a key
257 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
258
259 //      The first part of an index page.
260 //      It is immediately followed
261 //      by the BtSlot array of keys.
262
263 //      note that this structure size
264 //      must be a multiple of 8 bytes
265 //      in order to place dups correctly.
266
267 typedef struct BtPage_ {
268         uint cnt;                                       // count of keys in page
269         uint act;                                       // count of active keys
270         uint min;                                       // next key offset
271         uint garbage;                           // page garbage in bytes
272         unsigned char bits:7;           // page size in bits
273         unsigned char free:1;           // page is on free chain
274         unsigned char lvl:7;            // level of page
275         unsigned char kill:1;           // page is being deleted
276         unsigned char right[BtId];      // page number to right
277         unsigned char left[BtId];       // page number to left
278         unsigned char filler[2];        // padding to multiple of 8
279         logseqno lsn;                           // log sequence number applied
280         uid page_no;                            // this page number
281 } *BtPage;
282
283 //  The loadpage interface object
284
285 typedef struct {
286         BtPage page;            // current page pointer
287         BtLatchSet *latch;      // current page latch set
288 } BtPageSet;
289
290 //      structure for latch manager on ALLOC_page
291
292 typedef struct {
293         struct BtPage_ alloc[1];                // next page_no in right ptr
294         unsigned long long dups[1];             // global duplicate key uniqueifier
295         unsigned char freechain[BtId];  // head of free page_nos chain
296         unsigned long long activepages; // number of active pages pages
297 } BtPageZero;
298
299 //      The object structure for Btree access
300
301 typedef struct {
302         uint page_size;                         // page size    
303         uint page_bits;                         // page size in bits    
304 #ifdef unix
305         int idx;
306 #else
307         HANDLE idx;
308 #endif
309         BtPageZero *pagezero;           // mapped allocation page
310         BtHashEntry *hashtable;         // the buffer pool hash table entries
311         BtLatchSet *latchsets;          // mapped latch set from buffer pool
312         unsigned char *pagepool;        // mapped to the buffer pool pages
313         unsigned char *redobuff;        // mapped recovery buffer pointer
314         logseqno lsn, flushlsn;         // current & first lsn flushed
315         BtMutexLatch redo[1];           // redo area lite latch
316         BtMutexLatch lock[1];           // allocation area lite latch
317         BtMutexLatch maps[1];           // mapping segments lite latch
318         ushort thread_no[1];            // next thread number
319         uint nlatchpage;                        // number of latch pages at BT_latch
320         uint latchtotal;                        // number of page latch entries
321         uint latchhash;                         // number of latch hash table slots
322         uint latchvictim;                       // next latch entry to examine
323         uint latchpromote;                      // next latch entry to promote
324         uint redopages;                         // size of recovery buff in pages
325         uint redolast;                          // last msync size of recovery buff
326         uint redoend;                           // eof/end element in recovery buff
327         int err;                                        // last error
328         int line;                                       // last error line no
329         int found;                                      // number of keys found by delete
330         int reads, writes;                      // number of reads and writes
331 #ifndef unix
332         HANDLE halloc;                          // allocation handle
333         HANDLE hpool;                           // buffer pool handle
334 #endif
335         uint segments;                          // number of memory mapped segments
336         unsigned char *pages[64000];// memory mapped segments of b-tree
337 } BtMgr;
338
339 typedef struct {
340         BtMgr *mgr;                                     // buffer manager for entire process
341         BtMgr *main;                            // buffer manager for main btree
342         BtPage cursor;                          // cached page frame for start/next
343         ushort thread_no;                       // thread number
344         unsigned char key[BT_keyarray]; // last found complete key
345 } BtDb;
346
347 //      Catastrophic errors
348
349 typedef enum {
350         BTERR_ok = 0,
351         BTERR_struct,
352         BTERR_ovflw,
353         BTERR_lock,
354         BTERR_map,
355         BTERR_read,
356         BTERR_wrt,
357         BTERR_atomic,
358         BTERR_recovery
359 } BTERR;
360
361 #define CLOCK_bit 0x8000
362
363 // recovery manager entry types
364
365 typedef enum {
366         BTRM_eof = 0,   // rest of buffer is emtpy
367         BTRM_add,               // add a unique key-value to btree
368         BTRM_dup,               // add a duplicate key-value to btree
369         BTRM_del,               // delete a key-value from btree
370         BTRM_upd,               // update a key with a new value
371         BTRM_new,               // allocate a new empty page
372         BTRM_old                // reuse an old empty page
373 } BTRM;
374
375 // recovery manager entry
376 //      structure followed by BtKey & BtVal
377
378 typedef struct {
379         logseqno reqlsn;        // log sequence number required
380         logseqno lsn;           // log sequence number for entry
381         uint len;                       // length of entry
382         unsigned char type;     // type of entry
383         unsigned char lvl;      // level of btree entry pertains to
384 } BtLogHdr;
385
386 // B-Tree functions
387
388 extern void bt_close (BtDb *bt);
389 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
390 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit);
391 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
392 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
393 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
394 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
395 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
396
397 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
398
399 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
400 extern uint bt_nextkey   (BtDb *bt, uint slot);
401 extern uint bt_prevkey (BtDb *db, uint slot);
402 extern uint bt_lastkey (BtDb *db);
403
404 //      manager functions
405 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
406 extern void bt_mgrclose (BtMgr *mgr);
407 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
408 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
409
410 //      transaction functions
411 BTERR bt_txnpromote (BtDb *bt);
412
413 //  The page is allocated from low and hi ends.
414 //  The key slots are allocated from the bottom,
415 //      while the text and value of the key
416 //  are allocated from the top.  When the two
417 //  areas meet, the page is split into two.
418
419 //  A key consists of a length byte, two bytes of
420 //  index number (0 - 65535), and up to 253 bytes
421 //  of key value.
422
423 //  Associated with each key is a value byte string
424 //      containing any value desired.
425
426 //  The b-tree root is always located at page 1.
427 //      The first leaf page of level zero is always
428 //      located on page 2.
429
430 //      The b-tree pages are linked with next
431 //      pointers to facilitate enumerators,
432 //      and provide for concurrency.
433
434 //      When to root page fills, it is split in two and
435 //      the tree height is raised by a new root at page
436 //      one with two keys.
437
438 //      Deleted keys are marked with a dead bit until
439 //      page cleanup. The fence key for a leaf node is
440 //      always present
441
442 //  To achieve maximum concurrency one page is locked at a time
443 //  as the tree is traversed to find leaf key in question. The right
444 //  page numbers are used in cases where the page is being split,
445 //      or consolidated.
446
447 //  Page 0 is dedicated to lock for new page extensions,
448 //      and chains empty pages together for reuse. It also
449 //      contains the latch manager hash table.
450
451 //      The ParentModification lock on a node is obtained to serialize posting
452 //      or changing the fence key for a node.
453
454 //      Empty pages are chained together through the ALLOC page and reused.
455
456 //      Access macros to address slot and key values from the page
457 //      Page slots use 1 based indexing.
458
459 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
460 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
461 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
462
463 void bt_putid(unsigned char *dest, uid id)
464 {
465 int i = BtId;
466
467         while( i-- )
468                 dest[i] = (unsigned char)id, id >>= 8;
469 }
470
471 uid bt_getid(unsigned char *src)
472 {
473 uid id = 0;
474 int i;
475
476         for( i = 0; i < BtId; i++ )
477                 id <<= 8, id |= *src++; 
478
479         return id;
480 }
481
482 uid bt_newdup (BtMgr *mgr)
483 {
484 #ifdef unix
485         return __sync_fetch_and_add (mgr->pagezero->dups, 1) + 1;
486 #else
487         return _InterlockedIncrement64(mgr->pagezero->dups, 1);
488 #endif
489 }
490
491 //      lite weight spin lock Latch Manager
492
493 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
494 {
495         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
496 }
497
498 void bt_mutexlock(BtMutexLatch *latch)
499 {
500 ushort prev;
501
502   do {
503 #ifdef  unix
504         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
505 #else
506         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
507 #endif
508         if( !(prev & XCL) )
509           if( !(prev & ~BOTH) )
510                 return;
511           else
512 #ifdef unix
513                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
514 #else
515                 _InterlockedAnd16((ushort *)latch, ~XCL);
516 #endif
517 #ifdef  unix
518   } while( sched_yield(), 1 );
519 #else
520   } while( SwitchToThread(), 1 );
521 #endif
522 /*
523 unsigned char prev;
524
525   do {
526 #ifdef  unix
527         prev = __sync_fetch_and_or(latch->exclusive, XCL);
528 #else
529         prev = _InterlockedOr8(latch->exclusive, XCL);
530 #endif
531         if( !(prev & XCL) )
532                 return;
533 #ifdef  unix
534   } while( sched_yield(), 1 );
535 #else
536   } while( SwitchToThread(), 1 );
537 #endif
538 */
539 /*
540 BtMutexLatch prev[1];
541 uint slept = 0;
542
543   while( 1 ) {
544         *prev->value = __sync_fetch_and_or(latch->value, XCL);
545
546         if( !prev->bits->xlock ) {                      // did we set XCL bit?
547             if( slept )
548                   __sync_fetch_and_sub(latch->value, WRT);
549                 return;
550         }
551
552         if( !slept ) {
553                 prev->bits->wrt++;
554                 __sync_fetch_and_add(latch->value, WRT);
555         }
556
557         sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr);
558         slept = 1;
559   } */
560 }
561
562 //      try to obtain write lock
563
564 //      return 1 if obtained,
565 //              0 otherwise
566
567 int bt_mutextry(BtMutexLatch *latch)
568 {
569 ushort prev;
570
571 #ifdef  unix
572         prev = __sync_fetch_and_or((ushort *)latch, XCL);
573 #else
574         prev = _InterlockedOr16((ushort *)latch, XCL);
575 #endif
576         //      take write access if all bits are clear
577
578         if( !(prev & XCL) )
579           if( !(prev & ~BOTH) )
580                 return 1;
581           else
582 #ifdef unix
583                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
584 #else
585                 _InterlockedAnd16((ushort *)latch, ~XCL);
586 #endif
587         return 0;
588 /*
589 unsigned char prev;
590
591 #ifdef  unix
592         prev = __sync_fetch_and_or(latch->exclusive, XCL);
593 #else
594         prev = _InterlockedOr8(latch->exclusive, XCL);
595 #endif
596         //      take write access if all bits are clear
597
598         return !(prev & XCL);
599 */
600 /*
601 BtMutexLatch prev[1];
602
603         *prev->value = __sync_fetch_and_or(latch->value, XCL);
604
605         //      take write access if exclusive bit is clear
606
607         return !prev->bits->xlock;
608 */
609 }
610
611 //      clear write mode
612
613 void bt_releasemutex(BtMutexLatch *latch)
614 {
615 #ifdef unix
616         __sync_fetch_and_and((ushort *)latch, ~BOTH);
617 #else
618         _InterlockedAnd16((ushort *)latch, ~BOTH);
619 #endif
620 /*
621         *latch->exclusive = 0;
622 */
623 /*
624 BtMutexLatch prev[1];
625
626         *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
627
628         if( prev->bits->wrt )
629           sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
630 */
631 }
632
633 //      Write-Only Queue Lock
634
635 void WriteOLock (WOLock *lock, ushort tid)
636 {
637         if( *lock->tid == tid ) {
638                 *lock->dup += 1;
639                 return;
640         }
641
642         bt_mutexlock(lock->xcl);
643         *lock->tid = tid;
644 }
645
646 void WriteORelease (WOLock *lock)
647 {
648         if( *lock->dup ) {
649                 *lock->dup -= 1;
650                 return;
651         }
652
653         *lock->tid = 0;
654         bt_releasemutex(lock->xcl);
655 }
656
657 //      Phase-Fair reader/writer lock implementation
658
659 void WriteLock (RWLock *lock, ushort tid)
660 {
661 ushort w, r, tix;
662
663         if( *lock->tid == tid ) {
664                 *lock->dup += 1;
665                 return;
666         }
667 #ifdef unix
668         tix = __sync_fetch_and_add (lock->ticket, 1);
669 #else
670         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
671 #endif
672         // wait for our ticket to come up
673
674         while( tix != lock->serving[0] )
675 #ifdef unix
676                 sched_yield();
677 #else
678                 SwitchToThread ();
679 #endif
680
681         w = PRES | (tix & PHID);
682 #ifdef  unix
683         r = __sync_fetch_and_add (lock->rin, w);
684 #else
685         r = _InterlockedExchangeAdd16 (lock->rin, w);
686 #endif
687         while( r != *lock->rout )
688 #ifdef unix
689                 sched_yield();
690 #else
691                 SwitchToThread();
692 #endif
693         *lock->tid = tid;
694 }
695
696 void WriteRelease (RWLock *lock)
697 {
698         if( *lock->dup ) {
699                 *lock->dup -= 1;
700                 return;
701         }
702
703         *lock->tid = 0;
704 #ifdef unix
705         __sync_fetch_and_and (lock->rin, ~MASK);
706 #else
707         _InterlockedAnd16 (lock->rin, ~MASK);
708 #endif
709         lock->serving[0]++;
710 }
711
712 //      try to obtain read lock
713 //  return 1 if successful
714
715 int ReadTry (RWLock *lock, ushort tid)
716 {
717 ushort w;
718
719         //  OK if write lock already held by same thread
720
721         if( *lock->tid == tid ) {
722                 *lock->dup += 1;
723                 return 1;
724         }
725 #ifdef unix
726         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
727 #else
728         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
729 #endif
730         if( w )
731           if( w == (*lock->rin & MASK) ) {
732 #ifdef unix
733                 __sync_fetch_and_add (lock->rin, -RINC);
734 #else
735                 _InterlockedExchangeAdd16 (lock->rin, -RINC);
736 #endif
737                 return 0;
738           }
739
740         return 1;
741 }
742
743 void ReadLock (RWLock *lock, ushort tid)
744 {
745 ushort w;
746
747         if( *lock->tid == tid ) {
748                 *lock->dup += 1;
749                 return;
750         }
751 #ifdef unix
752         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
753 #else
754         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
755 #endif
756         if( w )
757           while( w == (*lock->rin & MASK) )
758 #ifdef unix
759                 sched_yield ();
760 #else
761                 SwitchToThread ();
762 #endif
763 }
764
765 void ReadRelease (RWLock *lock)
766 {
767         if( *lock->dup ) {
768                 *lock->dup -= 1;
769                 return;
770         }
771
772 #ifdef unix
773         __sync_fetch_and_add (lock->rout, RINC);
774 #else
775         _InterlockedExchangeAdd16 (lock->rout, RINC);
776 #endif
777 }
778
779 //      recovery manager -- flush dirty pages
780
781 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
782 {
783 uint cnt3 = 0, cnt2 = 0, cnt = 0;
784 BtLatchSet *latch;
785 BtPage page;
786 uint entry;
787
788   //    flush dirty pool pages to the btree
789
790 fprintf(stderr, "Start flushlsn  ");
791   for( entry = 1; entry < mgr->latchtotal; entry++ ) {
792         page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
793         latch = mgr->latchsets + entry;
794         bt_mutexlock (latch->modify);
795         bt_lockpage(BtLockRead, latch, thread_no);
796
797         if( latch->dirty ) {
798           bt_writepage(mgr, page, latch->page_no, 0);
799           latch->dirty = 0, cnt++;
800         }
801 if( latch->pin & ~CLOCK_bit )
802 cnt2++;
803         bt_unlockpage(BtLockRead, latch);
804         bt_releasemutex (latch->modify);
805   }
806 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
807 fprintf(stderr, "begin sync");
808         sync_file_range (mgr->idx, 0, 0, SYNC_FILE_RANGE_WAIT_AFTER);
809 fprintf(stderr, " end sync\n");
810 }
811
812 //      recovery manager -- process current recovery buff on startup
813 //      this won't do much if previous session was properly closed.
814
815 BTERR bt_recoveryredo (BtMgr *mgr)
816 {
817 BtLogHdr *hdr, *eof;
818 uint offset = 0;
819 BtKey *key;
820 BtVal *val;
821
822   pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
823
824   hdr = (BtLogHdr *)mgr->redobuff;
825   mgr->flushlsn = hdr->lsn;
826
827   while( 1 ) {
828         hdr = (BtLogHdr *)(mgr->redobuff + offset);
829         switch( hdr->type ) {
830         case BTRM_eof:
831                 mgr->lsn = hdr->lsn;
832                 return 0;
833         case BTRM_add:          // add a unique key-value to btree
834         
835         case BTRM_dup:          // add a duplicate key-value to btree
836         case BTRM_del:          // delete a key-value from btree
837         case BTRM_upd:          // update a key with a new value
838         case BTRM_new:          // allocate a new empty page
839         case BTRM_old:          // reuse an old empty page
840                 return 0;
841         }
842   }
843 }
844
845 //      recovery manager -- append new entry to recovery log
846 //      flush to disk when it overflows.
847
848 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
849 {
850 uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
851 uint amt = sizeof(BtLogHdr);
852 BtLogHdr *hdr, *eof;
853 uint last, end;
854
855         bt_mutexlock (mgr->redo);
856
857         if( key )
858           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
859
860         //      see if new entry fits in buffer
861         //      flush and reset if it doesn't
862
863         if( amt > size - mgr->redoend ) {
864           mgr->flushlsn = mgr->lsn;
865           msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
866           mgr->redolast = 0;
867           mgr->redoend = 0;
868           bt_flushlsn(mgr, thread_no);
869         }
870
871         //      fill in new entry & either eof or end block
872
873         hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
874
875         hdr->len = amt;
876         hdr->type = type;
877         hdr->lvl = lvl;
878         hdr->lsn = ++mgr->lsn;
879
880         mgr->redoend += amt;
881
882         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
883         memset (eof, 0, sizeof(BtLogHdr));
884
885         //  fill in key and value
886
887         if( key ) {
888           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
889           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
890         }
891
892         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
893         memset (eof, 0, sizeof(BtLogHdr));
894         eof->lsn = mgr->lsn;
895
896         last = mgr->redolast & 0xfff;
897         end = mgr->redoend;
898         mgr->redolast = end;
899
900         bt_releasemutex(mgr->redo);
901
902         msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
903         return hdr->lsn;
904 }
905
906 //      recovery manager -- append transaction to recovery log
907 //      flush to disk when it overflows.
908
909 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
910 {
911 uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
912 uint amt = 0, src, type;
913 BtLogHdr *hdr, *eof;
914 uint last, end;
915 logseqno lsn;
916 BtKey *key;
917 BtVal *val;
918
919         //      determine amount of redo recovery log space required
920
921         for( src = 0; src++ < source->cnt; ) {
922           key = keyptr(source,src);
923           val = valptr(source,src);
924           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
925           amt += sizeof(BtLogHdr);
926         }
927
928         bt_mutexlock (mgr->redo);
929
930         //      see if new entry fits in buffer
931         //      flush and reset if it doesn't
932
933         if( amt > size - mgr->redoend ) {
934           mgr->flushlsn = mgr->lsn;
935           msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
936           mgr->redolast = 0;
937           mgr->redoend = 0;
938           bt_flushlsn (mgr, thread_no);
939         }
940
941         //      assign new lsn to transaction
942
943         lsn = ++mgr->lsn;
944
945         //      fill in new entries
946
947         for( src = 0; src++ < source->cnt; ) {
948           key = keyptr(source, src);
949           val = valptr(source, src);
950
951           switch( slotptr(source, src)->type ) {
952           case Unique:
953                 type = BTRM_add;
954                 break;
955           case Duplicate:
956                 type = BTRM_dup;
957                 break;
958           case Delete:
959                 type = BTRM_del;
960                 break;
961           }
962
963           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
964           amt += sizeof(BtLogHdr);
965
966           hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
967           hdr->len = amt;
968           hdr->type = type;
969           hdr->lsn = lsn;
970           hdr->lvl = 0;
971
972           //  fill in key and value
973
974           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
975           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
976
977           mgr->redoend += amt;
978         }
979
980         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
981         memset (eof, 0, sizeof(BtLogHdr));
982         eof->lsn = lsn;
983
984         last = mgr->redolast & 0xfff;
985         end = mgr->redoend;
986         mgr->redolast = end;
987         bt_releasemutex(mgr->redo);
988
989         msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
990         bt_releasemutex(mgr->redo);
991         return lsn;
992 }
993
994 //      read page into buffer pool from permanent location in Btree file
995
996 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
997 {
998 off64_t off = page_no << mgr->page_bits;
999
1000         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
1001                 fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno);
1002                 return BTERR_read;
1003         }
1004 if( page->page_no != page_no )
1005 abort();
1006         mgr->reads++;
1007         return 0;
1008 /*
1009 int flag = PROT_READ | PROT_WRITE;
1010 uint segment = page_no >> 32;
1011 unsigned char *perm;
1012
1013   while( 1 ) {
1014         if( segment < mgr->segments ) {
1015           perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
1016                 memcpy (page, perm, mgr->page_size);
1017 if( page->page_no != page_no )
1018 abort();
1019                 mgr->reads++;
1020                 return 0;
1021         }
1022
1023         bt_mutexlock (mgr->maps);
1024
1025         if( segment < mgr->segments ) {
1026                 bt_releasemutex (mgr->maps);
1027                 continue;
1028         }
1029
1030         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
1031         mgr->segments++;
1032
1033         bt_releasemutex (mgr->maps);
1034   }  */
1035 }
1036
1037 //      write page to permanent location in Btree file
1038 //      clear the dirty bit
1039
1040 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit)
1041 {
1042 off64_t off = page_no << mgr->page_bits;
1043
1044         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size ) {
1045                 fprintf (stderr, "Unable to write page %d errno = %d\n", page_no, errno);
1046                 return BTERR_wrt;
1047         }
1048         mgr->writes++;
1049         return 0;
1050 /*
1051 int flag = PROT_READ | PROT_WRITE;
1052 uint segment = page_no >> 32;
1053 unsigned char *perm;
1054
1055   while( 1 ) {
1056         if( segment < mgr->segments ) {
1057           perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
1058                 memcpy (perm, page, mgr->page_size);
1059                 if( syncit )
1060                         msync (perm, mgr->page_size, MS_SYNC);
1061                 mgr->writes++;
1062                 return 0;
1063         }
1064
1065         bt_mutexlock (mgr->maps);
1066
1067         if( segment < mgr->segments ) {
1068                 bt_releasemutex (mgr->maps);
1069                 continue;
1070         }
1071
1072         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
1073         bt_releasemutex (mgr->maps);
1074         mgr->segments++;
1075   } */
1076 }
1077
1078 //      set CLOCK bit in latch
1079 //      decrement pin count
1080
1081 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
1082 {
1083         bt_mutexlock(latch->modify);
1084         latch->pin |= CLOCK_bit;
1085         latch->pin--;
1086
1087         bt_releasemutex(latch->modify);
1088 }
1089
1090 //  return the btree cached page address
1091
1092 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
1093 {
1094 BtPage page = (BtPage)(((uid)latch->entry << mgr->page_bits) + mgr->pagepool);
1095
1096   return page;
1097 }
1098
1099 //  return next available latch entry
1100 //        and with latch entry locked
1101
1102 uint bt_availnext (BtMgr *mgr)
1103 {
1104 BtLatchSet *latch;
1105 uint entry;
1106
1107   while( 1 ) {
1108 #ifdef unix
1109         entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
1110 #else
1111         entry = _InterlockedIncrement (&mgr->latchvictim);
1112 #endif
1113         entry %= mgr->latchtotal;
1114
1115         if( !entry )
1116                 continue;
1117
1118         latch = mgr->latchsets + entry;
1119
1120         if( !bt_mutextry(latch->modify) )
1121                 continue;
1122
1123         //  return this entry if it is not pinned
1124
1125         if( !latch->pin )
1126                 return entry;
1127
1128         //      if the CLOCK bit is set
1129         //      reset it to zero.
1130
1131         latch->pin &= ~CLOCK_bit;
1132         bt_releasemutex(latch->modify);
1133   }
1134 }
1135
1136 //      pin page in buffer pool
1137 //      return with latchset pinned
1138
1139 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
1140 {
1141 uint hashidx = page_no % mgr->latchhash;
1142 BtLatchSet *latch;
1143 uint entry, idx;
1144 BtPage page;
1145
1146   //  try to find our entry
1147
1148   bt_mutexlock(mgr->hashtable[hashidx].latch);
1149
1150   if( entry = mgr->hashtable[hashidx].entry ) do
1151   {
1152         latch = mgr->latchsets + entry;
1153         if( page_no == latch->page_no )
1154                 break;
1155   } while( entry = latch->next );
1156
1157   //  found our entry: increment pin
1158
1159   if( entry ) {
1160         latch = mgr->latchsets + entry;
1161         bt_mutexlock(latch->modify);
1162         latch->pin |= CLOCK_bit;
1163         latch->pin++;
1164 if(contents)
1165 abort();
1166         bt_releasemutex(latch->modify);
1167         bt_releasemutex(mgr->hashtable[hashidx].latch);
1168         return latch;
1169   }
1170
1171   //  find and reuse unpinned entry
1172
1173 trynext:
1174
1175   entry = bt_availnext (mgr);
1176   latch = mgr->latchsets + entry;
1177
1178   idx = latch->page_no % mgr->latchhash;
1179
1180   //  if latch is on a different hash chain
1181   //    unlink from the old page_no chain
1182
1183   if( latch->page_no )
1184    if( idx != hashidx ) {
1185
1186         //  skip over this entry if latch not available
1187
1188     if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1189           bt_releasemutex(latch->modify);
1190           goto trynext;
1191         }
1192
1193         if( latch->prev )
1194           mgr->latchsets[latch->prev].next = latch->next;
1195         else
1196           mgr->hashtable[idx].entry = latch->next;
1197
1198         if( latch->next )
1199           mgr->latchsets[latch->next].prev = latch->prev;
1200
1201     bt_releasemutex (mgr->hashtable[idx].latch);
1202    }
1203
1204   page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1205
1206   //  update permanent page area in btree from buffer pool
1207   //    no read-lock is required since page is not pinned.
1208
1209   if( latch->dirty )
1210         if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1211           return mgr->line = __LINE__, NULL;
1212         else
1213           latch->dirty = 0;
1214
1215   if( contents ) {
1216         memcpy (page, contents, mgr->page_size);
1217         latch->dirty = 1;
1218   } else if( bt_readpage (mgr, page, page_no) )
1219           return mgr->line = __LINE__, NULL;
1220
1221   //  link page as head of hash table chain
1222   //  if this is a never before used entry,
1223   //  or it was previously on a different
1224   //  hash table chain. Otherwise, just
1225   //  leave it in its current hash table
1226   //  chain position.
1227
1228   if( !latch->page_no || hashidx != idx ) {
1229         if( latch->next = mgr->hashtable[hashidx].entry )
1230           mgr->latchsets[latch->next].prev = entry;
1231
1232         mgr->hashtable[hashidx].entry = entry;
1233     latch->prev = 0;
1234   }
1235
1236   //  fill in latch structure
1237
1238   latch->pin = CLOCK_bit | 1;
1239   latch->page_no = page_no;
1240   latch->entry = entry;
1241   latch->split = 0;
1242
1243   bt_releasemutex (latch->modify);
1244   bt_releasemutex (mgr->hashtable[hashidx].latch);
1245   return latch;
1246 }
1247   
1248 void bt_mgrclose (BtMgr *mgr)
1249 {
1250 BtLatchSet *latch;
1251 BtLogHdr *eof;
1252 uint num = 0;
1253 BtPage page;
1254 uint slot;
1255
1256         //      flush previously written dirty pages
1257         //      and write recovery buffer to disk
1258
1259         fdatasync (mgr->idx);
1260
1261         if( mgr->redoend ) {
1262                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1263                 memset (eof, 0, sizeof(BtLogHdr));
1264
1265                 pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1266         }
1267
1268         //      write remaining dirty pool pages to the btree
1269
1270         for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1271                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1272                 latch = mgr->latchsets + slot;
1273
1274                 if( latch->dirty ) {
1275                         bt_writepage(mgr, page, latch->page_no, 0);
1276                         latch->dirty = 0, num++;
1277                 }
1278         }
1279
1280         //      flush last batch to disk and clear
1281         //      redo recovery buffer on disk.
1282
1283         fdatasync (mgr->idx);
1284
1285         if( mgr->redopages ) {
1286                 eof = (BtLogHdr *)mgr->redobuff;
1287                 memset (eof, 0, sizeof(BtLogHdr));
1288                 eof->lsn = mgr->lsn;
1289
1290                 pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1291
1292                 sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
1293         }
1294
1295         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1296
1297 #ifdef unix
1298         while( mgr->segments )
1299                 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1300
1301         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1302         munmap (mgr->pagezero, mgr->page_size);
1303 #else
1304         FlushViewOfFile(mgr->pagezero, 0);
1305         UnmapViewOfFile(mgr->pagezero);
1306         UnmapViewOfFile(mgr->pagepool);
1307         CloseHandle(mgr->halloc);
1308         CloseHandle(mgr->hpool);
1309 #endif
1310 #ifdef unix
1311         free (mgr->redobuff);
1312         close (mgr->idx);
1313         free (mgr);
1314 #else
1315         VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1316         FlushFileBuffers(mgr->idx);
1317         CloseHandle(mgr->idx);
1318         GlobalFree (mgr);
1319 #endif
1320 }
1321
1322 //      close and release memory
1323
1324 void bt_close (BtDb *bt)
1325 {
1326 #ifdef unix
1327         if( bt->cursor )
1328                 free (bt->cursor);
1329 #else
1330         if( bt->cursor)
1331                 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1332 #endif
1333         free (bt);
1334 }
1335
1336 //  open/create new btree buffer manager
1337
1338 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1339 //              size of page pool (e.g. 262144)
1340
1341 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1342 {
1343 uint lvl, attr, last, slot, idx;
1344 uint nlatchpage, latchhash;
1345 unsigned char value[BtId];
1346 int flag, initit = 0;
1347 BtPageZero *pagezero;
1348 BtLatchSet *latch;
1349 off64_t size;
1350 uint amt[1];
1351 BtMgr* mgr;
1352 BtKey* key;
1353 BtVal *val;
1354
1355         // determine sanity of page size and buffer pool
1356
1357         if( bits > BT_maxbits )
1358                 bits = BT_maxbits;
1359         else if( bits < BT_minbits )
1360                 bits = BT_minbits;
1361 #ifdef unix
1362         mgr = calloc (1, sizeof(BtMgr));
1363
1364         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1365
1366         if( mgr->idx == -1 ) {
1367                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1368                 return free(mgr), NULL;
1369         }
1370 #else
1371         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1372         attr = FILE_ATTRIBUTE_NORMAL;
1373         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1374
1375         if( mgr->idx == INVALID_HANDLE_VALUE ) {
1376                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1377                 return GlobalFree(mgr), NULL;
1378         }
1379 #endif
1380
1381 #ifdef unix
1382         pagezero = valloc (BT_maxpage);
1383         *amt = 0;
1384
1385         // read minimum page size to get root info
1386         //      to support raw disk partition files
1387         //      check if bits == 0 on the disk.
1388
1389         if( size = lseek (mgr->idx, 0L, 2) )
1390                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1391                         if( pagezero->alloc->bits )
1392                                 bits = pagezero->alloc->bits;
1393                         else
1394                                 initit = 1;
1395                 else
1396                         return free(mgr), free(pagezero), NULL;
1397         else
1398                 initit = 1;
1399 #else
1400         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1401         size = GetFileSize(mgr->idx, amt);
1402
1403         if( size || *amt ) {
1404                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1405                         return bt_mgrclose (mgr), NULL;
1406                 bits = pagezero->alloc->bits;
1407         } else
1408                 initit = 1;
1409 #endif
1410
1411         mgr->page_size = 1 << bits;
1412         mgr->page_bits = bits;
1413
1414         //  calculate number of latch hash table entries
1415
1416         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1417
1418         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1419         mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1420         mgr->latchtotal = nodemax;
1421
1422         if( !initit )
1423                 goto mgrlatch;
1424
1425         // initialize an empty b-tree with latch page, root page, page of leaves
1426         // and page(s) of latches and page pool cache
1427
1428         memset (pagezero, 0, 1 << bits);
1429         pagezero->alloc->lvl = MIN_lvl - 1;
1430         pagezero->alloc->bits = mgr->page_bits;
1431         bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
1432         pagezero->activepages = 2;
1433
1434         //  initialize left-most LEAF page in
1435         //      alloc->left and count of active leaf pages.
1436
1437         bt_putid (pagezero->alloc->left, LEAF_page);
1438
1439         ftruncate (mgr->idx, REDO_page << mgr->page_bits);
1440
1441         if( bt_writepage (mgr, pagezero->alloc, 0, 1) ) {
1442                 fprintf (stderr, "Unable to create btree page zero\n");
1443                 return bt_mgrclose (mgr), NULL;
1444         }
1445
1446         memset (pagezero, 0, 1 << bits);
1447         pagezero->alloc->bits = mgr->page_bits;
1448
1449         for( lvl=MIN_lvl; lvl--; ) {
1450         BtSlot *node = slotptr(pagezero->alloc, 1);
1451                 node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1452                 key = keyptr(pagezero->alloc, 1);
1453                 key->len = 2;           // create stopper key
1454                 key->key[0] = 0xff;
1455                 key->key[1] = 0xff;
1456
1457                 bt_putid(value, MIN_lvl - lvl + 1);
1458                 val = valptr(pagezero->alloc, 1);
1459                 val->len = lvl ? BtId : 0;
1460                 memcpy (val->value, value, val->len);
1461
1462                 pagezero->alloc->min = node->off;
1463                 pagezero->alloc->lvl = lvl;
1464                 pagezero->alloc->cnt = 1;
1465                 pagezero->alloc->act = 1;
1466                 pagezero->alloc->page_no = MIN_lvl - lvl;
1467
1468                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, 1) ) {
1469                         fprintf (stderr, "Unable to create btree page\n");
1470                         return bt_mgrclose (mgr), NULL;
1471                 }
1472         }
1473
1474 mgrlatch:
1475 #ifdef unix
1476         free (pagezero);
1477 #else
1478         VirtualFree (pagezero, 0, MEM_RELEASE);
1479 #endif
1480 #ifdef unix
1481         // mlock the pagezero page
1482
1483         flag = PROT_READ | PROT_WRITE;
1484         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1485         if( mgr->pagezero == MAP_FAILED ) {
1486                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1487                 return bt_mgrclose (mgr), NULL;
1488         }
1489         mlock (mgr->pagezero, mgr->page_size);
1490
1491         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1492         if( mgr->pagepool == MAP_FAILED ) {
1493                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1494                 return bt_mgrclose (mgr), NULL;
1495         }
1496         if( mgr->redopages = redopages ) {
1497                 ftruncate (mgr->idx, (REDO_page + redopages) << mgr->page_bits);
1498                 mgr->redobuff = valloc (redopages * mgr->page_size);
1499         }
1500 #else
1501         flag = PAGE_READWRITE;
1502         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1503         if( !mgr->halloc ) {
1504                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1505                 return bt_mgrclose (mgr), NULL;
1506         }
1507
1508         flag = FILE_MAP_WRITE;
1509         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1510         if( !mgr->pagezero ) {
1511                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1512                 return bt_mgrclose (mgr), NULL;
1513         }
1514
1515         flag = PAGE_READWRITE;
1516         size = (uid)mgr->nlatchpage << mgr->page_bits;
1517         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1518         if( !mgr->hpool ) {
1519                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1520                 return bt_mgrclose (mgr), NULL;
1521         }
1522
1523         flag = FILE_MAP_WRITE;
1524         mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1525         if( !mgr->pagepool ) {
1526                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1527                 return bt_mgrclose (mgr), NULL;
1528         }
1529         if( mgr->redopages = redopages )
1530                 mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
1531 #endif
1532
1533         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1534         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1535         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1536
1537         return mgr;
1538 }
1539
1540 //      open BTree access method
1541 //      based on buffer manager
1542
1543 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1544 {
1545 BtDb *bt = malloc (sizeof(*bt));
1546
1547         memset (bt, 0, sizeof(*bt));
1548         bt->main = main;
1549         bt->mgr = mgr;
1550 #ifdef unix
1551         bt->cursor = valloc (mgr->page_size);
1552 #else
1553         bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1554 #endif
1555 #ifdef unix
1556         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1557 #else
1558         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1559 #endif
1560         return bt;
1561 }
1562
1563 //  compare two keys, return > 0, = 0, or < 0
1564 //  =0: keys are same
1565 //  -1: key2 > key1
1566 //  +1: key2 < key1
1567 //  as the comparison value
1568
1569 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1570 {
1571 uint len1 = key1->len;
1572 int ans;
1573
1574         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1575                 return ans;
1576
1577         if( len1 > len2 )
1578                 return 1;
1579         if( len1 < len2 )
1580                 return -1;
1581
1582         return 0;
1583 }
1584
1585 // place write, read, or parent lock on requested page_no.
1586
1587 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1588 {
1589         switch( mode ) {
1590         case BtLockRead:
1591                 ReadLock (latch->readwr, thread_no);
1592                 break;
1593         case BtLockWrite:
1594                 WriteLock (latch->readwr, thread_no);
1595                 break;
1596         case BtLockAccess:
1597                 ReadLock (latch->access, thread_no);
1598                 break;
1599         case BtLockDelete:
1600                 WriteLock (latch->access, thread_no);
1601                 break;
1602         case BtLockParent:
1603                 WriteOLock (latch->parent, thread_no);
1604                 break;
1605         case BtLockAtomic:
1606                 WriteOLock (latch->atomic, thread_no);
1607                 break;
1608         case BtLockAtomic | BtLockRead:
1609                 WriteOLock (latch->atomic, thread_no);
1610                 ReadLock (latch->readwr, thread_no);
1611                 break;
1612         }
1613 }
1614
1615 // remove write, read, or parent lock on requested page
1616
1617 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1618 {
1619         switch( mode ) {
1620         case BtLockRead:
1621                 ReadRelease (latch->readwr);
1622                 break;
1623         case BtLockWrite:
1624                 WriteRelease (latch->readwr);
1625                 break;
1626         case BtLockAccess:
1627                 ReadRelease (latch->access);
1628                 break;
1629         case BtLockDelete:
1630                 WriteRelease (latch->access);
1631                 break;
1632         case BtLockParent:
1633                 WriteORelease (latch->parent);
1634                 break;
1635         case BtLockAtomic:
1636                 WriteORelease (latch->atomic);
1637                 break;
1638         case BtLockAtomic | BtLockRead:
1639                 WriteORelease (latch->atomic);
1640                 ReadRelease (latch->readwr);
1641                 break;
1642         }
1643 }
1644
1645 //      allocate a new page
1646 //      return with page latched, but unlocked.
1647
1648 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1649 {
1650 uid page_no;
1651 int blk;
1652
1653         //      lock allocation page
1654
1655         bt_mutexlock(mgr->lock);
1656
1657         // use empty chain first
1658         // else allocate empty page
1659
1660         if( page_no = bt_getid(mgr->pagezero->freechain) ) {
1661                 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1662                         set->page = bt_mappage (mgr, set->latch);
1663                 else
1664                         return mgr->line = __LINE__, mgr->err = BTERR_struct;
1665
1666                 bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
1667                 mgr->pagezero->activepages++;
1668
1669                 bt_releasemutex(mgr->lock);
1670
1671                 memcpy (set->page, contents, mgr->page_size);
1672                 set->latch->dirty = 1;
1673                 return 0;
1674         }
1675
1676         page_no = bt_getid(mgr->pagezero->alloc->right);
1677         bt_putid(mgr->pagezero->alloc->right, page_no+1);
1678
1679         // unlock allocation latch and
1680         //      extend file into new page.
1681
1682         mgr->pagezero->activepages++;
1683
1684         ftruncate (mgr->idx, (uid)(page_no + 1) << mgr->page_bits);
1685         bt_releasemutex(mgr->lock);
1686
1687         //      don't load cache from btree page, load it from contents
1688
1689         contents->page_no = page_no;
1690
1691         if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1692                 set->page = bt_mappage (mgr, set->latch);
1693         else
1694                 return mgr->err;
1695
1696         return 0;
1697 }
1698
1699 //  find slot in page for given key at a given level
1700
1701 int bt_findslot (BtPage page, unsigned char *key, uint len)
1702 {
1703 uint diff, higher = page->cnt, low = 1, slot;
1704 uint good = 0;
1705
1706         //        make stopper key an infinite fence value
1707
1708         if( bt_getid (page->right) )
1709                 higher++;
1710         else
1711                 good++;
1712
1713         //      low is the lowest candidate.
1714         //  loop ends when they meet
1715
1716         //  higher is already
1717         //      tested as .ge. the passed key.
1718
1719         while( diff = higher - low ) {
1720                 slot = low + ( diff >> 1 );
1721                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1722                         low = slot + 1;
1723                 else
1724                         higher = slot, good++;
1725         }
1726
1727         //      return zero if key is on right link page
1728
1729         return good ? higher : 0;
1730 }
1731
1732 //  find and load page at given level for given key
1733 //      leave page rd or wr locked as requested
1734
1735 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1736 {
1737 uid page_no = ROOT_page, prevpage = 0;
1738 uint drill = 0xff, slot;
1739 BtLatchSet *prevlatch;
1740 uint mode, prevmode;
1741 BtVal *val;
1742
1743   //  start at root of btree and drill down
1744
1745   do {
1746         // determine lock mode of drill level
1747         mode = (drill == lvl) ? lock : BtLockRead; 
1748
1749         if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1750           return 0;
1751
1752         // obtain access lock using lock chaining with Access mode
1753
1754         if( page_no > ROOT_page )
1755           bt_lockpage(BtLockAccess, set->latch, thread_no);
1756
1757         set->page = bt_mappage (mgr, set->latch);
1758
1759         //      release & unpin parent or left sibling page
1760
1761         if( prevpage ) {
1762           bt_unlockpage(prevmode, prevlatch);
1763           bt_unpinlatch (mgr, prevlatch);
1764           prevpage = 0;
1765         }
1766
1767         // obtain mode lock using lock chaining through AccessLock
1768
1769         bt_lockpage(mode, set->latch, thread_no);
1770
1771         if( set->page->free )
1772                 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1773
1774         if( page_no > ROOT_page )
1775           bt_unlockpage(BtLockAccess, set->latch);
1776
1777         // re-read and re-lock root after determining actual level of root
1778
1779         if( set->page->lvl != drill) {
1780                 if( set->latch->page_no != ROOT_page )
1781                         return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1782                         
1783                 drill = set->page->lvl;
1784
1785                 if( lock != BtLockRead && drill == lvl ) {
1786                   bt_unlockpage(mode, set->latch);
1787                   bt_unpinlatch (mgr, set->latch);
1788                   continue;
1789                 }
1790         }
1791
1792         prevpage = set->latch->page_no;
1793         prevlatch = set->latch;
1794         prevmode = mode;
1795
1796         //  find key on page at this level
1797         //  and descend to requested level
1798
1799         if( !set->page->kill )
1800          if( slot = bt_findslot (set->page, key, len) ) {
1801           if( drill == lvl )
1802                 return slot;
1803
1804           // find next non-dead slot -- the fence key if nothing else
1805
1806           while( slotptr(set->page, slot)->dead )
1807                 if( slot++ < set->page->cnt )
1808                   continue;
1809                 else
1810                   return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1811
1812           val = valptr(set->page, slot);
1813
1814           if( val->len == BtId )
1815                 page_no = bt_getid(valptr(set->page, slot)->value);
1816           else
1817                 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1818
1819           drill--;
1820           continue;
1821          }
1822
1823         //  slide right into next page
1824
1825         page_no = bt_getid(set->page->right);
1826   } while( page_no );
1827
1828   // return error on end of right chain
1829
1830   mgr->line = __LINE__, mgr->err = BTERR_struct;
1831   return 0;     // return error
1832 }
1833
1834 //      return page to free list
1835 //      page must be delete & write locked
1836
1837 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1838 {
1839         //      lock allocation page
1840
1841         bt_mutexlock (mgr->lock);
1842
1843         //      store chain
1844
1845         memcpy(set->page->right, mgr->pagezero->freechain, BtId);
1846         bt_putid(mgr->pagezero->freechain, set->latch->page_no);
1847         set->latch->dirty = 1;
1848         set->page->free = 1;
1849
1850         // decrement active page count
1851         // and unlock allocation page
1852
1853         mgr->pagezero->activepages--;
1854         bt_releasemutex (mgr->lock);
1855
1856         // unlock released page
1857
1858         bt_unlockpage (BtLockDelete, set->latch);
1859         bt_unlockpage (BtLockWrite, set->latch);
1860         bt_unpinlatch (mgr, set->latch);
1861 }
1862
1863 //      a fence key was deleted from a page
1864 //      push new fence value upwards
1865
1866 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1867 {
1868 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1869 unsigned char value[BtId];
1870 BtKey* ptr;
1871 uint idx;
1872
1873         //      remove the old fence value
1874
1875         ptr = keyptr(set->page, set->page->cnt);
1876         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1877         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1878         set->latch->dirty = 1;
1879
1880         //  cache new fence value
1881
1882         ptr = keyptr(set->page, set->page->cnt);
1883         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1884
1885         bt_lockpage (BtLockParent, set->latch, thread_no);
1886         bt_unlockpage (BtLockWrite, set->latch);
1887
1888         //      insert new (now smaller) fence key
1889
1890         bt_putid (value, set->latch->page_no);
1891         ptr = (BtKey*)leftkey;
1892
1893         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1894           return mgr->err;
1895
1896         //      now delete old fence key
1897
1898         ptr = (BtKey*)rightkey;
1899
1900         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1901                 return mgr->err;
1902
1903         bt_unlockpage (BtLockParent, set->latch);
1904         bt_unpinlatch(mgr, set->latch);
1905         return 0;
1906 }
1907
1908 //      root has a single child
1909 //      collapse a level from the tree
1910
1911 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1912 {
1913 BtPageSet child[1];
1914 uid page_no;
1915 BtVal *val;
1916 uint idx;
1917
1918   // find the child entry and promote as new root contents
1919
1920   do {
1921         for( idx = 0; idx++ < root->page->cnt; )
1922           if( !slotptr(root->page, idx)->dead )
1923                 break;
1924
1925         val = valptr(root->page, idx);
1926
1927         if( val->len == BtId )
1928                 page_no = bt_getid (valptr(root->page, idx)->value);
1929         else
1930                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1931
1932         if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1933                 child->page = bt_mappage (mgr, child->latch);
1934         else
1935                 return mgr->err;
1936
1937         bt_lockpage (BtLockDelete, child->latch, thread_no);
1938         bt_lockpage (BtLockWrite, child->latch, thread_no);
1939
1940         memcpy (root->page, child->page, mgr->page_size);
1941         root->latch->dirty = 1;
1942
1943         bt_freepage (mgr, child);
1944
1945   } while( root->page->lvl > 1 && root->page->act == 1 );
1946
1947   bt_unlockpage (BtLockWrite, root->latch);
1948   bt_unpinlatch (mgr, root->latch);
1949   return 0;
1950 }
1951
1952 //  delete a page and manage keys
1953 //  call with page writelocked
1954
1955 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1956 {
1957 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1958 unsigned char value[BtId];
1959 uint lvl = set->page->lvl;
1960 BtPageSet right[1];
1961 uid page_no;
1962 BtKey *ptr;
1963
1964         //      cache copy of fence key
1965         //      to remove in parent
1966
1967         ptr = keyptr(set->page, set->page->cnt);
1968         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1969
1970         //      obtain lock on right page
1971
1972         page_no = bt_getid(set->page->right);
1973
1974         if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1975                 right->page = bt_mappage (mgr, right->latch);
1976         else
1977                 return 0;
1978
1979         bt_lockpage (BtLockWrite, right->latch, thread_no);
1980
1981         // cache copy of key to update
1982
1983         ptr = keyptr(right->page, right->page->cnt);
1984         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1985
1986         if( right->page->kill )
1987                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1988
1989         // pull contents of right peer into our empty page
1990
1991         memcpy (set->page, right->page, mgr->page_size);
1992         set->latch->dirty = 1;
1993
1994         // mark right page deleted and point it to left page
1995         //      until we can post parent updates that remove access
1996         //      to the deleted page.
1997
1998         bt_putid (right->page->right, set->latch->page_no);
1999         right->latch->dirty = 1;
2000         right->page->kill = 1;
2001
2002         bt_lockpage (BtLockParent, right->latch, thread_no);
2003         bt_unlockpage (BtLockWrite, right->latch);
2004
2005         bt_lockpage (BtLockParent, set->latch, thread_no);
2006         bt_unlockpage (BtLockWrite, set->latch);
2007
2008         // redirect higher key directly to our new node contents
2009
2010         bt_putid (value, set->latch->page_no);
2011         ptr = (BtKey*)higherfence;
2012
2013         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2014           return mgr->err;
2015
2016         //      delete old lower key to our node
2017
2018         ptr = (BtKey*)lowerfence;
2019
2020         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
2021           return mgr->err;
2022
2023         //      obtain delete and write locks to right node
2024
2025         bt_unlockpage (BtLockParent, right->latch);
2026         bt_lockpage (BtLockDelete, right->latch, thread_no);
2027         bt_lockpage (BtLockWrite, right->latch, thread_no);
2028         bt_freepage (mgr, right);
2029
2030         bt_unlockpage (BtLockParent, set->latch);
2031         return 0;
2032 }
2033
2034 //  find and delete key on page by marking delete flag bit
2035 //  if page becomes empty, delete it from the btree
2036
2037 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2038 {
2039 uint slot, idx, found, fence;
2040 BtPageSet set[1];
2041 BtSlot *node;
2042 BtKey *ptr;
2043 BtVal *val;
2044
2045         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2046                 node = slotptr(set->page, slot);
2047                 ptr = keyptr(set->page, slot);
2048         } else
2049                 return mgr->err;
2050
2051         // if librarian slot, advance to real slot
2052
2053         if( node->type == Librarian ) {
2054                 ptr = keyptr(set->page, ++slot);
2055                 node = slotptr(set->page, slot);
2056         }
2057
2058         fence = slot == set->page->cnt;
2059
2060         // delete the key, ignore request if already dead
2061
2062         if( found = !keycmp (ptr, key, len) )
2063           if( found = node->dead == 0 ) {
2064                 val = valptr(set->page,slot);
2065                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2066                 set->page->act--;
2067
2068                 //  mark node type as delete
2069
2070                 node->type = Delete;
2071                 node->dead = 1;
2072
2073                 // collapse empty slots beneath the fence
2074                 // on interiour nodes
2075
2076                 if( lvl )
2077                  while( idx = set->page->cnt - 1 )
2078                   if( slotptr(set->page, idx)->dead ) {
2079                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2080                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2081                   } else
2082                         break;
2083           }
2084
2085         //      did we delete a fence key in an upper level?
2086
2087         if( found && lvl && set->page->act && fence )
2088           if( bt_fixfence (mgr, set, lvl, thread_no) )
2089                 return mgr->err;
2090           else
2091                 return 0;
2092
2093         //      do we need to collapse root?
2094
2095         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
2096           if( bt_collapseroot (mgr, set, thread_no) )
2097                 return mgr->err;
2098           else
2099                 return 0;
2100
2101         //      delete empty page
2102
2103         if( !set->page->act ) {
2104           if( bt_deletepage (mgr, set, thread_no) )
2105                 return mgr->err;
2106         } else {
2107           set->latch->dirty = 1;
2108           bt_unlockpage(BtLockWrite, set->latch);
2109         }
2110
2111         bt_unpinlatch (mgr, set->latch);
2112         return 0;
2113 }
2114
2115 //      advance to next slot
2116
2117 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2118 {
2119 BtLatchSet *prevlatch;
2120 uid page_no;
2121
2122         if( slot < set->page->cnt )
2123                 return slot + 1;
2124
2125         prevlatch = set->latch;
2126
2127         if( page_no = bt_getid(set->page->right) )
2128           if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2129                 set->page = bt_mappage (bt->mgr, set->latch);
2130           else
2131                 return 0;
2132         else
2133           return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2134
2135         // obtain access lock using lock chaining with Access mode
2136
2137         bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2138
2139         bt_unlockpage(BtLockRead, prevlatch);
2140         bt_unpinlatch (bt->mgr, prevlatch);
2141
2142         bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2143         bt_unlockpage(BtLockAccess, set->latch);
2144         return 1;
2145 }
2146
2147 //      find unique key == given key, or first duplicate key in
2148 //      leaf level and return number of value bytes
2149 //      or (-1) if not found.
2150
2151 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2152 {
2153 BtPageSet set[1];
2154 uint len, slot;
2155 int ret = -1;
2156 BtKey *ptr;
2157 BtVal *val;
2158
2159   if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2160    do {
2161         ptr = keyptr(set->page, slot);
2162
2163         //      skip librarian slot place holder
2164
2165         if( slotptr(set->page, slot)->type == Librarian )
2166                 ptr = keyptr(set->page, ++slot);
2167
2168         //      return actual key found
2169
2170         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2171         len = ptr->len;
2172
2173         if( slotptr(set->page, slot)->type == Duplicate )
2174                 len -= BtId;
2175
2176         //      not there if we reach the stopper key
2177
2178         if( slot == set->page->cnt )
2179           if( !bt_getid (set->page->right) )
2180                 break;
2181
2182         // if key exists, return >= 0 value bytes copied
2183         //      otherwise return (-1)
2184
2185         if( slotptr(set->page, slot)->dead )
2186                 continue;
2187
2188         if( keylen == len )
2189           if( !memcmp (ptr->key, key, len) ) {
2190                 val = valptr (set->page,slot);
2191                 if( valmax > val->len )
2192                         valmax = val->len;
2193                 memcpy (value, val->value, valmax);
2194                 ret = valmax;
2195           }
2196
2197         break;
2198
2199    } while( slot = bt_findnext (bt, set, slot) );
2200
2201   bt_unlockpage (BtLockRead, set->latch);
2202   bt_unpinlatch (bt->mgr, set->latch);
2203   return ret;
2204 }
2205
2206 //      check page for space available,
2207 //      clean if necessary and return
2208 //      0 - page needs splitting
2209 //      >0  new slot value
2210
2211 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2212 {
2213 BtPage page = set->page, frame;
2214 uint nxt = mgr->page_size;
2215 uint cnt = 0, idx = 0;
2216 uint max = page->cnt;
2217 uint newslot = max;
2218 BtKey *key;
2219 BtVal *val;
2220
2221         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2222                 return slot;
2223
2224         //      skip cleanup and proceed to split
2225         //      if there's not enough garbage
2226         //      to bother with.
2227
2228         if( page->garbage < nxt / 5 )
2229                 return 0;
2230
2231         frame = malloc (mgr->page_size);
2232         memcpy (frame, page, mgr->page_size);
2233
2234         // skip page info and set rest of page to zero
2235
2236         memset (page+1, 0, mgr->page_size - sizeof(*page));
2237         set->latch->dirty = 1;
2238
2239         page->garbage = 0;
2240         page->act = 0;
2241
2242         // clean up page first by
2243         // removing deleted keys
2244
2245         while( cnt++ < max ) {
2246                 if( cnt == slot )
2247                         newslot = idx + 2;
2248
2249                 if( cnt < max || frame->lvl )
2250                   if( slotptr(frame,cnt)->dead )
2251                         continue;
2252
2253                 // copy the value across
2254
2255                 val = valptr(frame, cnt);
2256                 nxt -= val->len + sizeof(BtVal);
2257                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2258
2259                 // copy the key across
2260
2261                 key = keyptr(frame, cnt);
2262                 nxt -= key->len + sizeof(BtKey);
2263                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2264
2265                 // make a librarian slot
2266
2267                 slotptr(page, ++idx)->off = nxt;
2268                 slotptr(page, idx)->type = Librarian;
2269                 slotptr(page, idx)->dead = 1;
2270
2271                 // set up the slot
2272
2273                 slotptr(page, ++idx)->off = nxt;
2274                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2275
2276                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2277                         page->act++;
2278         }
2279
2280         page->min = nxt;
2281         page->cnt = idx;
2282         free (frame);
2283
2284         //      see if page has enough space now, or does it need splitting?
2285
2286         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2287                 return newslot;
2288
2289         return 0;
2290 }
2291
2292 // split the root and raise the height of the btree
2293
2294 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2295 {  
2296 unsigned char leftkey[BT_keyarray];
2297 uint nxt = mgr->page_size;
2298 unsigned char value[BtId];
2299 BtPageSet left[1];
2300 uid left_page_no;
2301 BtKey *ptr;
2302 BtVal *val;
2303
2304         //      save left page fence key for new root
2305
2306         ptr = keyptr(root->page, root->page->cnt);
2307         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2308
2309         //  Obtain an empty page to use, and copy the current
2310         //  root contents into it, e.g. lower keys
2311
2312         if( bt_newpage(mgr, left, root->page, page_no) )
2313                 return mgr->err;
2314
2315         left_page_no = left->latch->page_no;
2316         bt_unpinlatch (mgr, left->latch);
2317
2318         // preserve the page info at the bottom
2319         // of higher keys and set rest to zero
2320
2321         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2322
2323         // insert stopper key at top of newroot page
2324         // and increase the root height
2325
2326         nxt -= BtId + sizeof(BtVal);
2327         bt_putid (value, right->page_no);
2328         val = (BtVal *)((unsigned char *)root->page + nxt);
2329         memcpy (val->value, value, BtId);
2330         val->len = BtId;
2331
2332         nxt -= 2 + sizeof(BtKey);
2333         slotptr(root->page, 2)->off = nxt;
2334         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2335         ptr->len = 2;
2336         ptr->key[0] = 0xff;
2337         ptr->key[1] = 0xff;
2338
2339         // insert lower keys page fence key on newroot page as first key
2340
2341         nxt -= BtId + sizeof(BtVal);
2342         bt_putid (value, left_page_no);
2343         val = (BtVal *)((unsigned char *)root->page + nxt);
2344         memcpy (val->value, value, BtId);
2345         val->len = BtId;
2346
2347         ptr = (BtKey *)leftkey;
2348         nxt -= ptr->len + sizeof(BtKey);
2349         slotptr(root->page, 1)->off = nxt;
2350         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2351         
2352         bt_putid(root->page->right, 0);
2353         root->page->min = nxt;          // reset lowest used offset and key count
2354         root->page->cnt = 2;
2355         root->page->act = 2;
2356         root->page->lvl++;
2357
2358         mgr->pagezero->alloc->lvl = root->page->lvl;
2359
2360         // release and unpin root pages
2361
2362         bt_unlockpage(BtLockWrite, root->latch);
2363         bt_unpinlatch (mgr, root->latch);
2364
2365         bt_unpinlatch (mgr, right);
2366         return 0;
2367 }
2368
2369 //  split already locked full node
2370 //      leave it locked.
2371 //      return pool entry for new right
2372 //      page, unlocked
2373
2374 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2375 {
2376 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2377 BtPage frame = malloc (mgr->page_size);
2378 uint lvl = set->page->lvl;
2379 BtPageSet right[1];
2380 BtKey *key, *ptr;
2381 BtVal *val, *src;
2382 uid right2;
2383 uint prev;
2384
2385         //  split higher half of keys to frame
2386
2387         memset (frame, 0, mgr->page_size);
2388         max = set->page->cnt;
2389         cnt = max / 2;
2390         idx = 0;
2391
2392         while( cnt++ < max ) {
2393                 if( cnt < max || set->page->lvl )
2394                   if( slotptr(set->page, cnt)->dead )
2395                         continue;
2396
2397                 src = valptr(set->page, cnt);
2398                 nxt -= src->len + sizeof(BtVal);
2399                 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2400
2401                 key = keyptr(set->page, cnt);
2402                 nxt -= key->len + sizeof(BtKey);
2403                 ptr = (BtKey*)((unsigned char *)frame + nxt);
2404                 memcpy (ptr, key, key->len + sizeof(BtKey));
2405
2406                 //      add librarian slot
2407
2408                 slotptr(frame, ++idx)->off = nxt;
2409                 slotptr(frame, idx)->type = Librarian;
2410                 slotptr(frame, idx)->dead = 1;
2411
2412                 //  add actual slot
2413
2414                 slotptr(frame, ++idx)->off = nxt;
2415                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2416
2417                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2418                         frame->act++;
2419         }
2420
2421         frame->bits = mgr->page_bits;
2422         frame->min = nxt;
2423         frame->cnt = idx;
2424         frame->lvl = lvl;
2425
2426         // link right node
2427
2428         if( set->latch->page_no > ROOT_page )
2429                 bt_putid (frame->right, bt_getid (set->page->right));
2430
2431         //      get new free page and write higher keys to it.
2432
2433         if( bt_newpage(mgr, right, frame, thread_no) )
2434                 return 0;
2435
2436         // process lower keys
2437
2438         memcpy (frame, set->page, mgr->page_size);
2439         memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2440         set->latch->dirty = 1;
2441
2442         nxt = mgr->page_size;
2443         set->page->garbage = 0;
2444         set->page->act = 0;
2445         max /= 2;
2446         cnt = 0;
2447         idx = 0;
2448
2449         if( slotptr(frame, max)->type == Librarian )
2450                 max--;
2451
2452         //  assemble page of smaller keys
2453
2454         while( cnt++ < max ) {
2455                 if( slotptr(frame, cnt)->dead )
2456                         continue;
2457                 val = valptr(frame, cnt);
2458                 nxt -= val->len + sizeof(BtVal);
2459                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2460
2461                 key = keyptr(frame, cnt);
2462                 nxt -= key->len + sizeof(BtKey);
2463                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2464
2465                 //      add librarian slot
2466
2467                 slotptr(set->page, ++idx)->off = nxt;
2468                 slotptr(set->page, idx)->type = Librarian;
2469                 slotptr(set->page, idx)->dead = 1;
2470
2471                 //      add actual slot
2472
2473                 slotptr(set->page, ++idx)->off = nxt;
2474                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2475                 set->page->act++;
2476         }
2477
2478         bt_putid(set->page->right, right->latch->page_no);
2479         set->page->min = nxt;
2480         set->page->cnt = idx;
2481
2482         return right->latch->entry;
2483 }
2484
2485 //      fix keys for newly split page
2486 //      call with page locked, return
2487 //      unlocked
2488
2489 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2490 {
2491 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2492 unsigned char value[BtId];
2493 uint lvl = set->page->lvl;
2494 BtPage page;
2495 BtKey *ptr;
2496
2497         // if current page is the root page, split it
2498
2499         if( set->latch->page_no == ROOT_page )
2500                 return bt_splitroot (mgr, set, right, thread_no);
2501
2502         ptr = keyptr(set->page, set->page->cnt);
2503         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2504
2505         page = bt_mappage (mgr, right);
2506
2507         ptr = keyptr(page, page->cnt);
2508         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2509
2510         // insert new fences in their parent pages
2511
2512         bt_lockpage (BtLockParent, right, thread_no);
2513
2514         bt_lockpage (BtLockParent, set->latch, thread_no);
2515         bt_unlockpage (BtLockWrite, set->latch);
2516
2517         // insert new fence for reformulated left block of smaller keys
2518
2519         bt_putid (value, set->latch->page_no);
2520         ptr = (BtKey *)leftkey;
2521
2522         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2523                 return mgr->err;
2524
2525         // switch fence for right block of larger keys to new right page
2526
2527         bt_putid (value, right->page_no);
2528         ptr = (BtKey *)rightkey;
2529
2530         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2531                 return mgr->err;
2532
2533         bt_unlockpage (BtLockParent, set->latch);
2534         bt_unpinlatch (mgr, set->latch);
2535
2536         bt_unlockpage (BtLockParent, right);
2537         bt_unpinlatch (mgr, right);
2538         return 0;
2539 }
2540
2541 //      install new key and value onto page
2542 //      page must already be checked for
2543 //      adequate space
2544
2545 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2546 {
2547 uint idx, librarian;
2548 BtSlot *node;
2549 BtKey *ptr;
2550 BtVal *val;
2551
2552         //      if found slot > desired slot and previous slot
2553         //      is a librarian slot, use it
2554
2555         if( slot > 1 )
2556           if( slotptr(set->page, slot-1)->type == Librarian )
2557                 slot--;
2558
2559         // copy value onto page
2560
2561         set->page->min -= vallen + sizeof(BtVal);
2562         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2563         memcpy (val->value, value, vallen);
2564         val->len = vallen;
2565
2566         // copy key onto page
2567
2568         set->page->min -= keylen + sizeof(BtKey);
2569         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2570         memcpy (ptr->key, key, keylen);
2571         ptr->len = keylen;
2572         
2573         //      find first empty slot
2574
2575         for( idx = slot; idx < set->page->cnt; idx++ )
2576           if( slotptr(set->page, idx)->dead )
2577                 break;
2578
2579         // now insert key into array before slot
2580
2581         if( idx == set->page->cnt )
2582                 idx += 2, set->page->cnt += 2, librarian = 2;
2583         else
2584                 librarian = 1;
2585
2586         set->latch->dirty = 1;
2587         set->page->act++;
2588
2589         while( idx > slot + librarian - 1 )
2590                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2591
2592         //      add librarian slot
2593
2594         if( librarian > 1 ) {
2595                 node = slotptr(set->page, slot++);
2596                 node->off = set->page->min;
2597                 node->type = Librarian;
2598                 node->dead = 1;
2599         }
2600
2601         //      fill in new slot
2602
2603         node = slotptr(set->page, slot);
2604         node->off = set->page->min;
2605         node->type = type;
2606         node->dead = 0;
2607
2608         if( release ) {
2609                 bt_unlockpage (BtLockWrite, set->latch);
2610                 bt_unpinlatch (mgr, set->latch);
2611         }
2612
2613         return 0;
2614 }
2615
2616 //  Insert new key into the btree at given level.
2617 //      either add a new key or update/add an existing one
2618
2619 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2620 {
2621 uint slot, idx, len, entry;
2622 BtPageSet set[1];
2623 BtSlot *node;
2624 BtKey *ptr;
2625 BtVal *val;
2626
2627   while( 1 ) { // find the page and slot for the current key
2628         if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2629                 node = slotptr(set->page, slot);
2630                 ptr = keyptr(set->page, slot);
2631         } else {
2632                 if( !mgr->err )
2633                         mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2634                 return mgr->err;
2635         }
2636
2637         // if librarian slot == found slot, advance to real slot
2638
2639         if( node->type == Librarian )
2640           if( !keycmp (ptr, key, keylen) ) {
2641                 ptr = keyptr(set->page, ++slot);
2642                 node = slotptr(set->page, slot);
2643           }
2644
2645         //  if inserting a duplicate key or unique
2646         //      key that doesn't exist on the page,
2647         //      check for adequate space on the page
2648         //      and insert the new key before slot.
2649
2650         switch( type ) {
2651         case Unique:
2652         case Duplicate:
2653           if( keycmp (ptr, key, keylen) )
2654            if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2655             return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
2656            else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2657                 return mgr->err;
2658            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2659                 return mgr->err;
2660            else
2661                 continue;
2662
2663           // if key already exists, update value and return
2664
2665           val = valptr(set->page, slot);
2666
2667           if( val->len >= vallen ) {
2668                 if( slotptr(set->page, slot)->dead )
2669                         set->page->act++;
2670                 node->type = type;
2671                 node->dead = 0;
2672
2673                 set->page->garbage += val->len - vallen;
2674                 set->latch->dirty = 1;
2675                 val->len = vallen;
2676                 memcpy (val->value, value, vallen);
2677                 bt_unlockpage(BtLockWrite, set->latch);
2678                 bt_unpinlatch (mgr, set->latch);
2679                 return 0;
2680           }
2681
2682           //  new update value doesn't fit in existing value area
2683           //    make sure page has room
2684
2685           if( !node->dead )
2686                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2687           else
2688                 set->page->act++;
2689
2690           node->type = type;
2691           node->dead = 0;
2692
2693           if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2694            if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2695                 return mgr->err;
2696            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2697                 return mgr->err;
2698            else
2699                 continue;
2700
2701           //  copy key and value onto page and update slot
2702
2703           set->page->min -= vallen + sizeof(BtVal);
2704           val = (BtVal*)((unsigned char *)set->page + set->page->min);
2705           memcpy (val->value, value, vallen);
2706           val->len = vallen;
2707
2708           set->latch->dirty = 1;
2709           set->page->min -= keylen + sizeof(BtKey);
2710           ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2711           memcpy (ptr->key, key, keylen);
2712           ptr->len = keylen;
2713         
2714           node->off = set->page->min;
2715           bt_unlockpage(BtLockWrite, set->latch);
2716           bt_unpinlatch (mgr, set->latch);
2717           return 0;
2718     }
2719   }
2720   return 0;
2721 }
2722
2723 typedef struct {
2724         logseqno reqlsn;        // redo log seq no required
2725         logseqno lsn;           // redo log sequence number
2726         uint entry;                     // latch table entry number
2727         uint slot:31;           // page slot number
2728         uint reuse:1;           // reused previous page
2729 } AtomicTxn;
2730
2731 typedef struct {
2732         uid page_no;            // page number for split leaf
2733         void *next;                     // next key to insert
2734         uint entry:29;          // latch table entry number
2735         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2736         uint nounlock:1;        // don't unlock ParentModification
2737         unsigned char leafkey[BT_keyarray];
2738 } AtomicKey;
2739
2740 //      determine actual page where key is located
2741 //  return slot number
2742
2743 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2744 {
2745 BtKey *key = keyptr(source,src);
2746 uint slot = locks[src].slot;
2747 uint entry;
2748
2749         if( src > 1 && locks[src].reuse )
2750           entry = locks[src-1].entry, slot = 0;
2751         else
2752           entry = locks[src].entry;
2753
2754         if( slot ) {
2755                 set->latch = mgr->latchsets + entry;
2756                 set->page = bt_mappage (mgr, set->latch);
2757                 return slot;
2758         }
2759
2760         //      is locks->reuse set? or was slot zeroed?
2761         //      if so, find where our key is located 
2762         //      on current page or pages split on
2763         //      same page txn operations.
2764
2765         do {
2766                 set->latch = mgr->latchsets + entry;
2767                 set->page = bt_mappage (mgr, set->latch);
2768
2769                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2770                   if( slotptr(set->page, slot)->type == Librarian )
2771                         slot++;
2772                   if( locks[src].reuse )
2773                         locks[src].entry = entry;
2774                   return slot;
2775                 }
2776         } while( entry = set->latch->split );
2777
2778         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2779         return 0;
2780 }
2781
2782 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
2783 {
2784 BtKey *key = keyptr(source, src);
2785 BtVal *val = valptr(source, src);
2786 BtLatchSet *latch;
2787 BtPageSet set[1];
2788 uint entry, slot;
2789
2790   while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2791         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2792           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2793                 return mgr->err;
2794           set->page->lsn = locks[src].lsn;
2795           return 0;
2796         }
2797
2798         if( entry = bt_splitpage (mgr, set, thread_no) )
2799           latch = mgr->latchsets + entry;
2800         else
2801           return mgr->err;
2802
2803         //      splice right page into split chain
2804         //      and WriteLock it.
2805
2806         bt_lockpage(BtLockWrite, latch, thread_no);
2807         latch->split = set->latch->split;
2808         set->latch->split = entry;
2809         locks[src].slot = 0;
2810   }
2811
2812   return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2813 }
2814
2815 //      perform delete from smaller btree
2816 //  insert a delete slot if not found there
2817
2818 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
2819 {
2820 BtKey *key = keyptr(source, src);
2821 BtPageSet set[1];
2822 uint idx, slot;
2823 BtSlot *node;
2824 BtKey *ptr;
2825 BtVal *val;
2826
2827         if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2828           node = slotptr(set->page, slot);
2829           ptr = keyptr(set->page, slot);
2830           val = valptr(set->page, slot);
2831         } else
2832           return mgr->line = __LINE__, mgr->err = BTERR_struct;
2833
2834         //  if slot is not found, insert a delete slot
2835
2836         if( keycmp (ptr, key->key, key->len) )
2837           return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
2838
2839         //      if node is already dead,
2840         //      ignore the request.
2841
2842         if( node->dead )
2843                 return 0;
2844
2845         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2846         set->latch->dirty = 1;
2847         set->page->lsn = locks[src].lsn;
2848         set->page->act--;
2849
2850         node->dead = 0;
2851         mgr->found++;
2852         return 0;
2853 }
2854
2855 //      delete an empty master page for a transaction
2856
2857 //      note that the far right page never empties because
2858 //      it always contains (at least) the infinite stopper key
2859 //      and that all pages that don't contain any keys are
2860 //      deleted, or are being held under Atomic lock.
2861
2862 BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no)
2863 {
2864 BtPageSet right[1], temp[1];
2865 unsigned char value[BtId];
2866 uid right_page_no;
2867 BtKey *ptr;
2868
2869         bt_lockpage(BtLockWrite, prev->latch, thread_no);
2870
2871         //      grab the right sibling
2872
2873         if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) )
2874                 right->page = bt_mappage (mgr, right->latch);
2875         else
2876                 return mgr->err;
2877
2878         bt_lockpage(BtLockAtomic, right->latch, thread_no);
2879         bt_lockpage(BtLockWrite, right->latch, thread_no);
2880
2881         //      and pull contents over empty page
2882         //      while preserving master's left link
2883
2884         memcpy (right->page->left, prev->page->left, BtId);
2885         memcpy (prev->page, right->page, mgr->page_size);
2886
2887         //      forward seekers to old right sibling
2888         //      to new page location in set
2889
2890         bt_putid (right->page->right, prev->latch->page_no);
2891         right->latch->dirty = 1;
2892         right->page->kill = 1;
2893
2894         //      remove pointer to right page for searchers by
2895         //      changing right fence key to point to the master page
2896
2897         ptr = keyptr(right->page,right->page->cnt);
2898         bt_putid (value, prev->latch->page_no);
2899
2900         if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
2901                 return mgr->err;
2902
2903         //  now that master page is in good shape we can
2904         //      remove its locks.
2905
2906         bt_unlockpage (BtLockAtomic, prev->latch);
2907         bt_unlockpage (BtLockWrite, prev->latch);
2908
2909         //  fix master's right sibling's left pointer
2910         //      to remove scanner's poiner to the right page
2911
2912         if( right_page_no = bt_getid (prev->page->right) ) {
2913           if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
2914                 temp->page = bt_mappage (mgr, temp->latch);
2915           else
2916                 return mgr->err;
2917
2918           bt_lockpage (BtLockWrite, temp->latch, thread_no);
2919           bt_putid (temp->page->left, prev->latch->page_no);
2920           temp->latch->dirty = 1;
2921
2922           bt_unlockpage (BtLockWrite, temp->latch);
2923           bt_unpinlatch (mgr, temp->latch);
2924         } else {        // master is now the far right page
2925           bt_mutexlock (mgr->lock);
2926           bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
2927           bt_releasemutex(mgr->lock);
2928         }
2929
2930         //      now that there are no pointers to the right page
2931         //      we can delete it after the last read access occurs
2932
2933         bt_unlockpage (BtLockWrite, right->latch);
2934         bt_unlockpage (BtLockAtomic, right->latch);
2935         bt_lockpage (BtLockDelete, right->latch, thread_no);
2936         bt_lockpage (BtLockWrite, right->latch, thread_no);
2937         bt_freepage (mgr, right);
2938         return 0;
2939 }
2940
2941 //      atomic modification of a batch of keys.
2942
2943 //      return -1 if BTERR is set
2944 //      otherwise return slot number
2945 //      causing the key constraint violation
2946 //      or zero on successful completion.
2947
2948 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2949 {
2950 uint src, idx, slot, samepage, entry, que = 0;
2951 AtomicKey *head, *tail, *leaf;
2952 BtPageSet set[1], prev[1];
2953 unsigned char value[BtId];
2954 BtKey *key, *ptr, *key2;
2955 BtLatchSet *latch;
2956 AtomicTxn *locks;
2957 int result = 0;
2958 BtSlot temp[1];
2959 logseqno lsn;
2960 BtPage page;
2961 BtVal *val;
2962 uid right;
2963 int type;
2964
2965   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2966   head = NULL;
2967   tail = NULL;
2968
2969   // stable sort the list of keys into order to
2970   //    prevent deadlocks between threads.
2971
2972   for( src = 1; src++ < source->cnt; ) {
2973         *temp = *slotptr(source,src);
2974         key = keyptr (source,src);
2975
2976         for( idx = src; --idx; ) {
2977           key2 = keyptr (source,idx);
2978           if( keycmp (key, key2->key, key2->len) < 0 ) {
2979                 *slotptr(source,idx+1) = *slotptr(source,idx);
2980                 *slotptr(source,idx) = *temp;
2981           } else
2982                 break;
2983         }
2984   }
2985
2986   //  add entries to redo log
2987
2988   if( bt->mgr->redopages )
2989    if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
2990     for( src = 0; src++ < source->cnt; )
2991          locks[src].lsn = lsn;
2992     else
2993          return bt->mgr->err;
2994
2995   // Load the leaf page for each key
2996   // group same page references with reuse bit
2997   // and determine any constraint violations
2998
2999   for( src = 0; src++ < source->cnt; ) {
3000         key = keyptr(source, src);
3001         slot = 0;
3002
3003         // first determine if this modification falls
3004         // on the same page as the previous modification
3005         //      note that the far right leaf page is a special case
3006
3007         if( samepage = src > 1 )
3008           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
3009                 slot = bt_findslot(set->page, key->key, key->len);
3010           else
3011                 bt_unlockpage(BtLockRead, set->latch); 
3012
3013         if( !slot )
3014           if( slot = bt_loadpage(bt->mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, bt->thread_no) )
3015                 set->latch->split = 0;
3016           else
3017                 return bt->mgr->err;
3018
3019         if( slotptr(set->page, slot)->type == Librarian )
3020           ptr = keyptr(set->page, ++slot);
3021         else
3022           ptr = keyptr(set->page, slot);
3023
3024         if( !samepage ) {
3025           locks[src].entry = set->latch->entry;
3026           locks[src].slot = slot;
3027           locks[src].reuse = 0;
3028         } else {
3029           locks[src].entry = 0;
3030           locks[src].slot = 0;
3031           locks[src].reuse = 1;
3032         }
3033
3034         //      capture current lsn for master page
3035
3036         locks[src].reqlsn = set->page->lsn;
3037   }
3038
3039   //  unlock last loadpage lock
3040
3041   if( source->cnt )
3042         bt_unlockpage(BtLockRead, set->latch);
3043
3044   //  obtain write lock for each master page
3045   //  sync flushed pages to disk
3046
3047   for( src = 0; src++ < source->cnt; ) {
3048         if( locks[src].reuse )
3049           continue;
3050
3051         set->latch = bt->mgr->latchsets + locks[src].entry;
3052         bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3053   }
3054
3055   // insert or delete each key
3056   // process any splits or merges
3057   // release Write & Atomic latches
3058   // set ParentModifications and build
3059   // queue of keys to insert for split pages
3060   // or delete for deleted pages.
3061
3062   // run through txn list backwards
3063
3064   samepage = source->cnt + 1;
3065
3066   for( src = source->cnt; src; src-- ) {
3067         if( locks[src].reuse )
3068           continue;
3069
3070         //  perform the txn operations
3071         //      from smaller to larger on
3072         //  the same page
3073
3074         for( idx = src; idx < samepage; idx++ )
3075          switch( slotptr(source,idx)->type ) {
3076          case Delete:
3077           if( bt_atomicdelete (bt->mgr, source, locks, idx, bt->thread_no) )
3078                 return bt->mgr->err;
3079           break;
3080
3081         case Duplicate:
3082         case Unique:
3083           if( bt_atomicinsert (bt->mgr, source, locks, idx, bt->thread_no) )
3084                 return bt->mgr->err;
3085           break;
3086         }
3087
3088         //      after the same page operations have finished,
3089         //  process master page for splits or deletion.
3090
3091         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
3092         prev->page = bt_mappage (bt->mgr, prev->latch);
3093         samepage = src;
3094
3095         //  pick-up all splits from master page
3096         //      each one is already WriteLocked.
3097
3098         entry = prev->latch->split;
3099
3100         while( entry ) {
3101           set->latch = bt->mgr->latchsets + entry;
3102           set->page = bt_mappage (bt->mgr, set->latch);
3103           entry = set->latch->split;
3104
3105           // delete empty master page by undoing its split
3106           //  (this is potentially another empty page)
3107           //  note that there are no new left pointers yet
3108
3109           if( !prev->page->act ) {
3110                 memcpy (set->page->left, prev->page->left, BtId);
3111                 memcpy (prev->page, set->page, bt->mgr->page_size);
3112                 bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
3113                 bt_freepage (bt->mgr, set);
3114
3115                 prev->latch->split = set->latch->split;
3116                 prev->latch->dirty = 1;
3117                 continue;
3118           }
3119
3120           // remove empty page from the split chain
3121           // and return it to the free list.
3122
3123           if( !set->page->act ) {
3124                 memcpy (prev->page->right, set->page->right, BtId);
3125                 prev->latch->split = set->latch->split;
3126                 bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
3127                 bt_freepage (bt->mgr, set);
3128                 continue;
3129           }
3130
3131           //  schedule prev fence key update
3132
3133           ptr = keyptr(prev->page,prev->page->cnt);
3134           leaf = calloc (sizeof(AtomicKey), 1), que++;
3135
3136           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3137           leaf->page_no = prev->latch->page_no;
3138           leaf->entry = prev->latch->entry;
3139           leaf->type = 0;
3140
3141           if( tail )
3142                 tail->next = leaf;
3143           else
3144                 head = leaf;
3145
3146           tail = leaf;
3147
3148           // splice in the left link into the split page
3149
3150           bt_putid (set->page->left, prev->latch->page_no);
3151           bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
3152           bt_unlockpage(BtLockWrite, prev->latch);
3153           *prev = *set;
3154         }
3155
3156         //  update left pointer in next right page from last split page
3157         //      (if all splits were reversed, latch->split == 0)
3158
3159         if( latch->split ) {
3160           //  fix left pointer in master's original (now split)
3161           //  far right sibling or set rightmost page in page zero
3162
3163           if( right = bt_getid (prev->page->right) ) {
3164                 if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3165                   set->page = bt_mappage (bt->mgr, set->latch);
3166                 else
3167                   return bt->mgr->err;
3168
3169             bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3170             bt_putid (set->page->left, prev->latch->page_no);
3171                 set->latch->dirty = 1;
3172             bt_unlockpage (BtLockWrite, set->latch);
3173                 bt_unpinlatch (bt->mgr, set->latch);
3174           } else {      // prev is rightmost page
3175             bt_mutexlock (bt->mgr->lock);
3176                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
3177             bt_releasemutex(bt->mgr->lock);
3178           }
3179
3180           //  Process last page split in chain
3181
3182           ptr = keyptr(prev->page,prev->page->cnt);
3183           leaf = calloc (sizeof(AtomicKey), 1), que++;
3184
3185           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3186           leaf->page_no = prev->latch->page_no;
3187           leaf->entry = prev->latch->entry;
3188           leaf->type = 0;
3189   
3190           if( tail )
3191                 tail->next = leaf;
3192           else
3193                 head = leaf;
3194
3195           tail = leaf;
3196
3197           bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
3198           bt_unlockpage(BtLockWrite, prev->latch);
3199
3200           //  remove atomic lock on master page
3201
3202           bt_unlockpage(BtLockAtomic, latch);
3203           continue;
3204         }
3205
3206         //  finished if prev page occupied (either master or final split)
3207
3208         if( prev->page->act ) {
3209           bt_unlockpage(BtLockWrite, latch);
3210           bt_unlockpage(BtLockAtomic, latch);
3211           bt_unpinlatch(bt->mgr, latch);
3212           continue;
3213         }
3214
3215         // any and all splits were reversed, and the
3216         // master page located in prev is empty, delete it
3217         // by pulling over master's right sibling.
3218
3219         // Remove empty master's fence key
3220
3221         ptr = keyptr(prev->page,prev->page->cnt);
3222
3223         if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3224                 return bt->mgr->err;
3225
3226         //      perform the remainder of the delete
3227         //      from the FIFO queue
3228
3229         leaf = calloc (sizeof(AtomicKey), 1), que++;
3230
3231         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3232         leaf->page_no = prev->latch->page_no;
3233         leaf->entry = prev->latch->entry;
3234         leaf->nounlock = 1;
3235         leaf->type = 2;
3236   
3237         if( tail )
3238           tail->next = leaf;
3239         else
3240           head = leaf;
3241
3242         tail = leaf;
3243
3244         //      leave atomic lock in place until
3245         //      deletion completes in next phase.
3246
3247         bt_unlockpage(BtLockWrite, prev->latch);
3248   }
3249
3250   //  add & delete keys for any pages split or merged during transaction
3251
3252   if( leaf = head )
3253     do {
3254           set->latch = bt->mgr->latchsets + leaf->entry;
3255           set->page = bt_mappage (bt->mgr, set->latch);
3256
3257           bt_putid (value, leaf->page_no);
3258           ptr = (BtKey *)leaf->leafkey;
3259
3260           switch( leaf->type ) {
3261           case 0:       // insert key
3262             if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, Unique, bt->thread_no) )
3263                   return bt->mgr->err;
3264
3265                 break;
3266
3267           case 1:       // delete key
3268                 if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3269                   return bt->mgr->err;
3270
3271                 break;
3272
3273           case 2:       // free page
3274                 if( bt_atomicfree (bt->mgr, set, bt->thread_no) )
3275                   return bt->mgr->err;
3276
3277                 break;
3278           }
3279
3280           if( !leaf->nounlock )
3281             bt_unlockpage (BtLockParent, set->latch);
3282
3283           bt_unpinlatch (bt->mgr, set->latch);
3284           tail = leaf->next;
3285           free (leaf);
3286         } while( leaf = tail );
3287
3288   // if number of active pages
3289   // is greater than the buffer pool
3290   // promote page into larger btree
3291
3292   while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
3293         if( bt_txnpromote (bt) )
3294           return bt->mgr->err;
3295
3296   // return success
3297
3298   free (locks);
3299   return 0;
3300 }
3301
3302 //  promote a page into the larger btree
3303
3304 BTERR bt_txnpromote (BtDb *bt)
3305 {
3306 uint entry, slot, idx;
3307 BtPageSet set[1];
3308 BtSlot *node;
3309 BtKey *ptr;
3310 BtVal *val;
3311
3312   while( 1 ) {
3313 #ifdef unix
3314         entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
3315 #else
3316         entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
3317 #endif
3318         entry %= bt->mgr->latchtotal;
3319
3320         if( !entry )
3321                 continue;
3322
3323         set->latch = bt->mgr->latchsets + entry;
3324         idx = set->latch->page_no % bt->mgr->latchhash;
3325
3326         if( !bt_mutextry(set->latch->modify) )
3327                 continue;
3328
3329 //    if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
3330 //              bt_releasemutex(set->latch->modify);
3331 //              continue;
3332 //      }
3333
3334         //  skip this entry if it is pinned
3335
3336         if( set->latch->pin & ~CLOCK_bit ) {
3337           bt_releasemutex(set->latch->modify);
3338 //      bt_releasemutex(bt->mgr->hashtable[idx].latch);
3339           continue;
3340         }
3341
3342         set->page = bt_mappage (bt->mgr, set->latch);
3343
3344         // entry never used or has no right sibling
3345
3346         if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3347           bt_releasemutex(set->latch->modify);
3348 //      bt_releasemutex(bt->mgr->hashtable[idx].latch);
3349           continue;
3350         }
3351
3352         bt_lockpage (BtLockAccess, set->latch, bt->thread_no);
3353         bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3354     bt_unlockpage(BtLockAccess, set->latch);
3355
3356         // entry interiour node or being or was killed
3357
3358         if( set->page->lvl || set->page->free || set->page->kill ) {
3359           bt_releasemutex(set->latch->modify);
3360 //      bt_releasemutex(bt->mgr->hashtable[idx].latch);
3361       bt_unlockpage(BtLockWrite, set->latch);
3362           continue;
3363         }
3364
3365         //  pin the page for our useage
3366
3367         set->latch->pin++;
3368         bt_releasemutex(set->latch->modify);
3369 //    bt_releasemutex(bt->mgr->hashtable[idx].latch);
3370
3371         //      if page is dirty, then
3372         //      sync it to the disk first.
3373
3374         if( set->latch->dirty )
3375          if( bt->mgr->err = bt_writepage (bt->mgr, set->page, set->latch->page_no, 1) )
3376           return bt->mgr->line = __LINE__, bt->mgr->err;
3377          else
3378           set->latch->dirty = 0;
3379
3380         // transfer slots in our selected page to larger btree
3381 if( !(set->latch->page_no % 100) )
3382 fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cnt);
3383
3384         for( slot = 0; slot++ < set->page->cnt; ) {
3385                 ptr = keyptr (set->page, slot);
3386                 val = valptr (set->page, slot);
3387                 node = slotptr(set->page, slot);
3388
3389                 switch( node->type ) {
3390                 case Duplicate:
3391                 case Unique:
3392                   if( bt_insertkey (bt->main, ptr->key, ptr->len, 0, val->value, val->len, node->type, bt->thread_no) )
3393                         return bt->main->err;
3394
3395                   continue;
3396
3397                 case Delete:
3398                   if( bt_deletekey (bt->main, ptr->key, ptr->len, 0, bt->thread_no) )
3399                         return bt->main->err;
3400
3401                   continue;
3402                 }
3403         }
3404
3405         //  now delete the page
3406
3407         if( bt_deletepage (bt->mgr, set, bt->thread_no) )
3408                 return bt->mgr->err;
3409
3410         bt_unpinlatch (bt->mgr, set->latch);
3411         return 0;
3412   }
3413 }
3414
3415 //      set cursor to highest slot on highest page
3416
3417 uint bt_lastkey (BtDb *bt)
3418 {
3419 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3420 BtPageSet set[1];
3421
3422         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3423                 set->page = bt_mappage (bt->mgr, set->latch);
3424         else
3425                 return 0;
3426
3427     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3428         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3429     bt_unlockpage(BtLockRead, set->latch);
3430         bt_unpinlatch (bt->mgr, set->latch);
3431         return bt->cursor->cnt;
3432 }
3433
3434 //      return previous slot on cursor page
3435
3436 uint bt_prevkey (BtDb *bt, uint slot)
3437 {
3438 uid cursor_page = bt->cursor->page_no;
3439 uid ourright, next, us = cursor_page;
3440 BtPageSet set[1];
3441
3442         if( --slot )
3443                 return slot;
3444
3445         ourright = bt_getid(bt->cursor->right);
3446
3447 goleft:
3448         if( !(next = bt_getid(bt->cursor->left)) )
3449                 return 0;
3450
3451 findourself:
3452         cursor_page = next;
3453
3454         if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3455                 set->page = bt_mappage (bt->mgr, set->latch);
3456         else
3457                 return 0;
3458
3459     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3460         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3461         bt_unlockpage(BtLockRead, set->latch);
3462         bt_unpinlatch (bt->mgr, set->latch);
3463         
3464         next = bt_getid (bt->cursor->right);
3465
3466         if( bt->cursor->kill )
3467                 goto findourself;
3468
3469         if( next != us )
3470           if( next == ourright )
3471                 goto goleft;
3472           else
3473                 goto findourself;
3474
3475         return bt->cursor->cnt;
3476 }
3477
3478 //  return next slot on cursor page
3479 //  or slide cursor right into next page
3480
3481 uint bt_nextkey (BtDb *bt, uint slot)
3482 {
3483 BtPageSet set[1];
3484 uid right;
3485
3486   do {
3487         right = bt_getid(bt->cursor->right);
3488
3489         while( slot++ < bt->cursor->cnt )
3490           if( slotptr(bt->cursor,slot)->dead )
3491                 continue;
3492           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3493                 return slot;
3494           else
3495                 break;
3496
3497         if( !right )
3498                 break;
3499
3500         if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3501                 set->page = bt_mappage (bt->mgr, set->latch);
3502         else
3503                 return 0;
3504
3505     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3506         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3507         bt_unlockpage(BtLockRead, set->latch);
3508         bt_unpinlatch (bt->mgr, set->latch);
3509         slot = 0;
3510
3511   } while( 1 );
3512
3513   return bt->mgr->err = 0;
3514 }
3515
3516 //  cache page of keys into cursor and return starting slot for given key
3517
3518 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3519 {
3520 BtPageSet set[1];
3521 uint slot;
3522
3523         // cache page for retrieval
3524
3525         if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3526           memcpy (bt->cursor, set->page, bt->mgr->page_size);
3527         else
3528           return 0;
3529
3530         bt_unlockpage(BtLockRead, set->latch);
3531         bt_unpinlatch (bt->mgr, set->latch);
3532         return slot;
3533 }
3534
3535 #ifdef STANDALONE
3536
3537 #ifndef unix
3538 double getCpuTime(int type)
3539 {
3540 FILETIME crtime[1];
3541 FILETIME xittime[1];
3542 FILETIME systime[1];
3543 FILETIME usrtime[1];
3544 SYSTEMTIME timeconv[1];
3545 double ans = 0;
3546
3547         memset (timeconv, 0, sizeof(SYSTEMTIME));
3548
3549         switch( type ) {
3550         case 0:
3551                 GetSystemTimeAsFileTime (xittime);
3552                 FileTimeToSystemTime (xittime, timeconv);
3553                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3554                 break;
3555         case 1:
3556                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3557                 FileTimeToSystemTime (usrtime, timeconv);
3558                 break;
3559         case 2:
3560                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3561                 FileTimeToSystemTime (systime, timeconv);
3562                 break;
3563         }
3564
3565         ans += (double)timeconv->wHour * 3600;
3566         ans += (double)timeconv->wMinute * 60;
3567         ans += (double)timeconv->wSecond;
3568         ans += (double)timeconv->wMilliseconds / 1000;
3569         return ans;
3570 }
3571 #else
3572 #include <time.h>
3573 #include <sys/resource.h>
3574
3575 double getCpuTime(int type)
3576 {
3577 struct rusage used[1];
3578 struct timeval tv[1];
3579
3580         switch( type ) {
3581         case 0:
3582                 gettimeofday(tv, NULL);
3583                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3584
3585         case 1:
3586                 getrusage(RUSAGE_SELF, used);
3587                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3588
3589         case 2:
3590                 getrusage(RUSAGE_SELF, used);
3591                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3592         }
3593
3594         return 0;
3595 }
3596 #endif
3597
3598 void bt_poolaudit (BtMgr *mgr)
3599 {
3600 BtLatchSet *latch;
3601 uint entry = 0;
3602
3603         while( ++entry < mgr->latchtotal ) {
3604                 latch = mgr->latchsets + entry;
3605
3606                 if( *latch->readwr->rin & MASK )
3607                         fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3608
3609                 if( *latch->access->rin & MASK )
3610                         fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3611
3612 //              if( *latch->parent->xcl->value )
3613 //                      fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3614
3615 //              if( *latch->atomic->xcl->value )
3616 //                      fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3617
3618 //              if( *latch->modify->value )
3619 //                      fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3620
3621                 if( latch->pin & ~CLOCK_bit )
3622                         fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3623         }
3624 }
3625
3626 typedef struct {
3627         char idx;
3628         char *type;
3629         char *infile;
3630         BtMgr *main;
3631         BtMgr *mgr;
3632         int num;
3633 } ThreadArg;
3634
3635 //  standalone program to index file of keys
3636 //  then list them onto std-out
3637
3638 #ifdef unix
3639 void *index_file (void *arg)
3640 #else
3641 uint __stdcall index_file (void *arg)
3642 #endif
3643 {
3644 int line = 0, found = 0, cnt = 0, idx;
3645 uid next, page_no = LEAF_page;  // start on first page of leaves
3646 int ch, len = 0, slot, type = 0;
3647 unsigned char key[BT_maxkey];
3648 unsigned char txn[65536];
3649 ThreadArg *args = arg;
3650 BtPage page, frame;
3651 BtPageSet set[1];
3652 uint nxt = 65536;
3653 BtKey *ptr;
3654 BtVal *val;
3655 BtDb *bt;
3656 FILE *in;
3657
3658         bt = bt_open (args->mgr, args->main);
3659         page = (BtPage)txn;
3660
3661         if( args->idx < strlen (args->type) )
3662                 ch = args->type[args->idx];
3663         else
3664                 ch = args->type[strlen(args->type) - 1];
3665
3666         switch(ch | 0x20)
3667         {
3668         case 'd':
3669                 type = Delete;
3670
3671         case 'p':
3672                 if( !type )
3673                         type = Unique;
3674
3675                 if( args->num )
3676                  if( type == Delete )
3677                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3678                  else
3679                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3680                 else
3681                  if( type == Delete )
3682                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3683                  else
3684                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3685
3686                 if( in = fopen (args->infile, "rb") )
3687                   while( ch = getc(in), ch != EOF )
3688                         if( ch == '\n' )
3689                         {
3690                           line++;
3691
3692                           if( !args->num ) {
3693                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3694                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3695                             len = 0;
3696                                 continue;
3697                           }
3698
3699                           nxt -= len - 10;
3700                           memcpy (txn + nxt, key + 10, len - 10);
3701                           nxt -= 1;
3702                           txn[nxt] = len - 10;
3703                           nxt -= 10;
3704                           memcpy (txn + nxt, key, 10);
3705                           nxt -= 1;
3706                           txn[nxt] = 10;
3707                           slotptr(page,++cnt)->off  = nxt;
3708                           slotptr(page,cnt)->type = type;
3709                           len = 0;
3710
3711                           if( cnt < args->num )
3712                                 continue;
3713
3714                           page->cnt = cnt;
3715                           page->act = cnt;
3716                           page->min = nxt;
3717
3718                           if( bt_atomictxn (bt, page) )
3719                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3720                           nxt = sizeof(txn);
3721                           cnt = 0;
3722
3723                         }
3724                         else if( len < BT_maxkey )
3725                                 key[len++] = ch;
3726                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3727                 break;
3728
3729         case 'w':
3730                 fprintf(stderr, "started indexing for %s\n", args->infile);
3731                 if( in = fopen (args->infile, "r") )
3732                   while( ch = getc(in), ch != EOF )
3733                         if( ch == '\n' )
3734                         {
3735                           line++;
3736
3737                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3738                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3739                           len = 0;
3740                         }
3741                         else if( len < BT_maxkey )
3742                                 key[len++] = ch;
3743                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3744                 break;
3745
3746         case 'f':
3747                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3748                 if( in = fopen (args->infile, "rb") )
3749                   while( ch = getc(in), ch != EOF )
3750                         if( ch == '\n' )
3751                         {
3752                           line++;
3753                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3754                                 found++;
3755                           else if( bt->mgr->err )
3756                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3757                           len = 0;
3758                         }
3759                         else if( len < BT_maxkey )
3760                                 key[len++] = ch;
3761                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3762                 break;
3763
3764         case 's':
3765                 fprintf(stderr, "started scanning\n");
3766                 
3767                 do {
3768                         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3769                                 set->page = bt_mappage (bt->mgr, set->latch);
3770                         else
3771                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3772
3773                         bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3774                         next = bt_getid (set->page->right);
3775
3776                         for( slot = 0; slot++ < set->page->cnt; )
3777                          if( next || slot < set->page->cnt )
3778                           if( !slotptr(set->page, slot)->dead ) {
3779                                 ptr = keyptr(set->page, slot);
3780                                 len = ptr->len;
3781
3782                             if( slotptr(set->page, slot)->type == Duplicate )
3783                                         len -= BtId;
3784
3785                                 fwrite (ptr->key, len, 1, stdout);
3786                                 val = valptr(set->page, slot);
3787                                 fwrite (val->value, val->len, 1, stdout);
3788                                 fputc ('\n', stdout);
3789                                 cnt++;
3790                            }
3791
3792                         bt_unlockpage (BtLockRead, set->latch);
3793                         bt_unpinlatch (bt->mgr, set->latch);
3794                 } while( page_no = next );
3795
3796                 fprintf(stderr, " Total keys read %d\n", cnt);
3797                 break;
3798
3799         case 'r':
3800                 fprintf(stderr, "started reverse scan\n");
3801                 if( slot = bt_lastkey (bt) )
3802                    while( slot = bt_prevkey (bt, slot) ) {
3803                         if( slotptr(bt->cursor, slot)->dead )
3804                           continue;
3805
3806                         ptr = keyptr(bt->cursor, slot);
3807                         len = ptr->len;
3808
3809                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3810                                 len -= BtId;
3811
3812                         fwrite (ptr->key, len, 1, stdout);
3813                         val = valptr(bt->cursor, slot);
3814                         fwrite (val->value, val->len, 1, stdout);
3815                         fputc ('\n', stdout);
3816                         cnt++;
3817                   }
3818
3819                 fprintf(stderr, " Total keys read %d\n", cnt);
3820                 break;
3821
3822         case 'c':
3823 #ifdef unix
3824                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3825 #endif
3826                 fprintf(stderr, "started counting\n");
3827                 next = LEAF_page + bt->mgr->redopages + 1;
3828
3829                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3830                         if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3831                                 break;
3832
3833                         if( !bt->cursor->free && !bt->cursor->lvl )
3834                                 cnt += bt->cursor->act;
3835
3836                         bt->mgr->reads++;
3837                         page_no = next++;
3838                 }
3839                 
3840                 cnt--;  // remove stopper key
3841                 fprintf(stderr, " Total keys counted %d\n", cnt);
3842                 break;
3843         }
3844
3845         fprintf(stderr, "%d reads %d writes %d found\n", bt->mgr->reads, bt->mgr->writes, bt->mgr->found);
3846         bt_close (bt);
3847 #ifdef unix
3848         return NULL;
3849 #else
3850         return 0;
3851 #endif
3852 }
3853
3854 typedef struct timeval timer;
3855
3856 int main (int argc, char **argv)
3857 {
3858 int idx, cnt, len, slot, err;
3859 int segsize, bits = 16;
3860 double start, stop;
3861 #ifdef unix
3862 pthread_t *threads;
3863 #else
3864 HANDLE *threads;
3865 #endif
3866 ThreadArg *args;
3867 uint poolsize = 0;
3868 uint recovery = 0;
3869 uint mainpool = 0;
3870 uint mainbits = 0;
3871 float elapsed;
3872 int num = 0;
3873 char key[1];
3874 BtMgr *main;
3875 BtMgr *mgr;
3876 BtKey *ptr;
3877
3878         if( argc < 3 ) {
3879                 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3880                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
3881                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
3882                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3883                 fprintf (stderr, "  page_bits is the page size in bits for the cache btree\n");
3884                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3885                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3886                 fprintf (stderr, "  recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3887                 fprintf (stderr, "  main_bits is the page size of the main btree in bits\n");
3888                 fprintf (stderr, "  main_pool is the number of main pages in the main buffer pool\n");
3889                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3890                 exit(0);
3891         }
3892
3893         start = getCpuTime(0);
3894
3895         if( argc > 4 )
3896                 bits = atoi(argv[4]);
3897
3898         if( argc > 5 )
3899                 poolsize = atoi(argv[5]);
3900
3901         if( !poolsize )
3902                 fprintf (stderr, "Warning: no mapped_pool\n");
3903
3904         if( argc > 6 )
3905                 num = atoi(argv[6]);
3906
3907         if( argc > 7 )
3908                 recovery = atoi(argv[7]);
3909
3910         if( argc > 8 )
3911                 mainbits = atoi(argv[8]);
3912
3913         if( argc > 9 )
3914                 mainpool = atoi(argv[9]);
3915
3916         cnt = argc - 10;
3917 #ifdef unix
3918         threads = malloc (cnt * sizeof(pthread_t));
3919 #else
3920         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3921 #endif
3922         args = malloc ((cnt + 1) * sizeof(ThreadArg));
3923
3924         mgr = bt_mgr (argv[1], bits, poolsize, recovery);
3925
3926         if( !mgr ) {
3927                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3928                 exit (1);
3929         }
3930
3931         main = bt_mgr (argv[2], mainbits, mainpool, 0);
3932
3933         if( !main ) {
3934                 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3935                 exit (1);
3936         }
3937
3938         //      fire off threads
3939
3940         if( cnt )
3941           for( idx = 0; idx < cnt; idx++ ) {
3942                 args[idx].infile = argv[idx + 10];
3943                 args[idx].type = argv[3];
3944                 args[idx].main = main;
3945                 args[idx].mgr = mgr;
3946                 args[idx].num = num;
3947                 args[idx].idx = idx;
3948 #ifdef unix
3949                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3950                         fprintf(stderr, "Error creating thread %d\n", err);
3951 #else
3952                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3953 #endif
3954           }
3955         else {
3956                 args[0].infile = argv[idx + 10];
3957                 args[0].type = argv[3];
3958                 args[0].main = main;
3959                 args[0].mgr = mgr;
3960                 args[0].num = num;
3961                 args[0].idx = 0;
3962                 index_file (args);
3963         }
3964
3965         //      wait for termination
3966
3967 #ifdef unix
3968         for( idx = 0; idx < cnt; idx++ )
3969                 pthread_join (threads[idx], NULL);
3970 #else
3971         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3972
3973         for( idx = 0; idx < cnt; idx++ )
3974                 CloseHandle(threads[idx]);
3975 #endif
3976         bt_poolaudit(mgr);
3977         bt_poolaudit(main);
3978
3979         bt_mgrclose (main);
3980         bt_mgrclose (mgr);
3981
3982         elapsed = getCpuTime(0) - start;
3983         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3984         elapsed = getCpuTime(1);
3985         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3986         elapsed = getCpuTime(2);
3987         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3988 }
3989
3990 #endif  //STANDALONE