]> pd.if.org Git - btree/blob - threadskv9c.c
Threadskv9c.c version for durability progress made
[btree] / threadskv9c.c
1 // btree version threadskv9c FUTEX version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      and redo log for failure recovery
10
11 // 07 OCT 2014
12
13 // author: karl malbrain, malbrain@cal.berkeley.edu
14
15 /*
16 This work, including the source code, documentation
17 and related data, is placed into the public domain.
18
19 The orginal author is Karl Malbrain.
20
21 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
22 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
23 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
24 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
25 RESULTING FROM THE USE, MODIFICATION, OR
26 REDISTRIBUTION OF THIS SOFTWARE.
27 */
28
29 // Please see the project home page for documentation
30 // code.google.com/p/high-concurrency-btree
31
32 #define _FILE_OFFSET_BITS 64
33 #define _LARGEFILE64_SOURCE
34
35 #ifdef linux
36 #define _GNU_SOURCE
37 #include <linux/futex.h>
38 #define SYS_futex 202
39 #endif
40
41 #ifdef unix
42 #include <unistd.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <fcntl.h>
46 #include <sys/time.h>
47 #include <sys/mman.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <limits.h>
51 #else
52 #define WIN32_LEAN_AND_MEAN
53 #include <windows.h>
54 #include <stdio.h>
55 #include <stdlib.h>
56 #include <time.h>
57 #include <fcntl.h>
58 #include <process.h>
59 #include <intrin.h>
60 #endif
61
62 #include <memory.h>
63 #include <string.h>
64 #include <stddef.h>
65
66 typedef unsigned long long      uid;
67 typedef unsigned long long      logseqno;
68
69 #ifndef unix
70 typedef unsigned long long      off64_t;
71 typedef unsigned short          ushort;
72 typedef unsigned int            uint;
73 #endif
74
75 #define BT_ro 0x6f72    // ro
76 #define BT_rw 0x7772    // rw
77
78 #define BT_maxbits              24                                      // maximum page size in bits
79 #define BT_minbits              9                                       // minimum page size in bits
80 #define BT_minpage              (1 << BT_minbits)       // minimum page size
81 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
82
83 //  BTree page number constants
84 #define ALLOC_page              0       // allocation page
85 #define ROOT_page               1       // root of the btree
86 #define LEAF_page               2       // first page of leaves
87 #define REDO_page               3       // first page of redo buffer
88
89 //      Number of levels to create in a new BTree
90
91 #define MIN_lvl                 2
92
93 /*
94 There are six lock types for each node in four independent sets: 
95 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
96 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
97 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
98 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
99 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
100 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
101 */
102
103 typedef enum{
104         BtLockAccess = 1,
105         BtLockDelete = 2,
106         BtLockRead   = 4,
107         BtLockWrite  = 8,
108         BtLockParent = 16,
109         BtLockAtomic = 32
110 } BtLock;
111
112 //      definition for phase-fair reader/writer lock implementation
113
114 typedef struct {
115         volatile ushort rin[1];
116         volatile ushort rout[1];
117         volatile ushort ticket[1];
118         volatile ushort serving[1];
119         ushort tid;
120         ushort dup;
121 } RWLock;
122
123 //      write only lock
124
125 typedef struct {
126         volatile uint exclusive[1];
127         ushort tid;
128         ushort dup;
129 } WOLock;
130
131 #define PHID 0x1
132 #define PRES 0x2
133 #define MASK 0x3
134 #define RINC 0x4
135
136 //      lite weight mutex
137
138 // exclusive is set for write access
139
140 typedef struct {
141         volatile uint exclusive[1];
142 } BtMutexLatch;
143
144 #define XCL 1
145
146 //      mode & definition for lite latch implementation
147
148 enum {
149         QueRd = 1,              // reader queue
150         QueWr = 2               // writer queue
151 } RWQueue;
152
153 //  hash table entries
154
155 typedef struct {
156         volatile uint entry;            // Latch table entry at head of chain
157         BtMutexLatch latch[1];
158 } BtHashEntry;
159
160 //      latch manager table structure
161
162 typedef struct {
163         uid page_no;                                    // latch set page number
164         RWLock readwr[1];                               // read/write page lock
165         RWLock access[1];                               // Access Intent/Page delete
166         WOLock parent[1];                               // Posting of fence key in parent
167         WOLock atomic[1];                               // Atomic update in progress
168         uint split;                                             // right split page atomic insert
169         uint entry;                                             // entry slot in latch table
170         uint next;                                              // next entry in hash table chain
171         uint prev;                                              // prev entry in hash table chain
172         BtMutexLatch modify[1];                 // modify entry lite latch
173         volatile ushort pin;                    // number of accessing threads
174         volatile unsigned char dirty;   // page in cache is dirty (atomic set)
175         volatile unsigned char avail;   // page is an available entry
176 } BtLatchSet;
177
178 //      Define the length of the page record numbers
179
180 #define BtId 6
181
182 //      Page key slot definition.
183
184 //      Keys are marked dead, but remain on the page until
185 //      it cleanup is called. The fence key (highest key) for
186 //      a leaf page is always present, even after cleanup.
187
188 //      Slot types
189
190 //      In addition to the Unique keys that occupy slots
191 //      there are Librarian and Duplicate key
192 //      slots occupying the key slot array.
193
194 //      The Librarian slots are dead keys that
195 //      serve as filler, available to add new Unique
196 //      or Dup slots that are inserted into the B-tree.
197
198 //      The Duplicate slots have had their key bytes extended
199 //      by 6 bytes to contain a binary duplicate key uniqueifier.
200
201 typedef enum {
202         Unique,
203         Librarian,
204         Duplicate,
205         Delete,
206         Update
207 } BtSlotType;
208
209 typedef struct {
210         uint off:BT_maxbits;    // page offset for key start
211         uint type:3;                    // type of slot
212         uint dead:1;                    // set for deleted slot
213 } BtSlot;
214
215 //      The key structure occupies space at the upper end of
216 //      each page.  It's a length byte followed by the key
217 //      bytes.
218
219 typedef struct {
220         unsigned char len;              // this can be changed to a ushort or uint
221         unsigned char key[0];
222 } BtKey;
223
224 //      the value structure also occupies space at the upper
225 //      end of the page. Each key is immediately followed by a value.
226
227 typedef struct {
228         unsigned char len;              // this can be changed to a ushort or uint
229         unsigned char value[0];
230 } BtVal;
231
232 #define BT_maxkey       255             // maximum number of bytes in a key
233 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
234
235 //      The first part of an index page.
236 //      It is immediately followed
237 //      by the BtSlot array of keys.
238
239 //      note that this structure size
240 //      must be a multiple of 8 bytes
241 //      in order to place dups correctly.
242
243 typedef struct BtPage_ {
244         uint cnt;                                       // count of keys in page
245         uint act;                                       // count of active keys
246         uint min;                                       // next key offset
247         uint garbage;                           // page garbage in bytes
248         unsigned char bits:7;           // page size in bits
249         unsigned char free:1;           // page is on free chain
250         unsigned char lvl:7;            // level of page
251         unsigned char kill:1;           // page is being deleted
252         unsigned char right[BtId];      // page number to right
253         unsigned char left[BtId];       // page number to left
254         unsigned char filler[2];        // padding to multiple of 8
255         logseqno lsn;                           // log sequence number applied
256 } *BtPage;
257
258 //  The loadpage interface object
259
260 typedef struct {
261         BtPage page;            // current page pointer
262         BtLatchSet *latch;      // current page latch set
263 } BtPageSet;
264
265 //      structure for latch manager on ALLOC_page
266
267 typedef struct {
268         struct BtPage_ alloc[1];        // next page_no in right ptr
269         unsigned long long dups[1];     // global duplicate key uniqueifier
270         unsigned char chain[BtId];      // head of free page_nos chain
271 } BtPageZero;
272
273 //      The object structure for Btree access
274
275 typedef struct {
276         uint page_size;                         // page size    
277         uint page_bits;                         // page size in bits    
278 #ifdef unix
279         int idx;
280 #else
281         HANDLE idx;
282 #endif
283         BtPageZero *pagezero;           // mapped allocation page
284         BtHashEntry *hashtable;         // the buffer pool hash table entries
285         BtLatchSet *latchsets;          // mapped latch set from buffer pool
286         unsigned char *pagepool;        // mapped to the buffer pool pages
287         unsigned char *redobuff;        // mapped recovery buffer pointer
288         logseqno lsn, flushlsn;         // current & first lsn flushed
289         BtMutexLatch dump[1];           // redo dump lite latch
290         BtMutexLatch redo[1];           // redo area lite latch
291         BtMutexLatch lock[1];           // allocation area lite latch
292         ushort thread_no[1];            // next thread number
293         uint nlatchpage;                        // number of latch pages at BT_latch
294         uint latchtotal;                        // number of page latch entries
295         uint latchhash;                         // number of latch hash table slots
296         uint latchvictim;                       // next latch entry to examine
297         uint latchavail;                        // next available latch entry
298         uint availlock[1];                      // latch available chain commitments
299         uint available;                         // size of latch available chain
300         uint redopages;                         // size of recovery buff in pages
301         uint redoend;                           // eof/end element in recovery buff
302 #ifndef unix
303         HANDLE halloc;                          // allocation handle
304         HANDLE hpool;                           // buffer pool handle
305 #endif
306 } BtMgr;
307
308 typedef struct {
309         BtMgr *mgr;                             // buffer manager for thread
310         BtPage cursor;                  // cached frame for start/next (never mapped)
311         BtPage frame;                   // spare frame for the page split (never mapped)
312         uid cursor_page;                                // current cursor page number   
313         unsigned char *mem;                             // frame, cursor, page memory buffer
314         unsigned char key[BT_keyarray]; // last found complete key
315         int found;                              // last delete or insert was found
316         int err;                                // last error
317         int line;                               // last error line no
318         int reads, writes;              // number of reads and writes from the btree
319         ushort thread_no;               // thread number
320 } BtDb;
321
322 //      Catastrophic errors
323
324 typedef enum {
325         BTERR_ok = 0,
326         BTERR_struct,
327         BTERR_ovflw,
328         BTERR_lock,
329         BTERR_map,
330         BTERR_read,
331         BTERR_wrt,
332         BTERR_atomic,
333         BTERR_recovery,
334         BTERR_avail
335 } BTERR;
336
337 #define CLOCK_bit 0x8000
338
339 // recovery manager entry types
340
341 typedef enum {
342         BTRM_eof = 0,   // rest of buffer is emtpy
343         BTRM_add,               // add a unique key-value to btree
344         BTRM_dup,               // add a duplicate key-value to btree
345         BTRM_del,               // delete a key-value from btree
346         BTRM_upd,               // update a key with a new value
347         BTRM_new,               // allocate a new empty page
348         BTRM_old                // reuse an old empty page
349 } BTRM;
350
351 // recovery manager entry
352 //      structure followed by BtKey & BtVal
353
354 typedef struct {
355         logseqno reqlsn;        // log sequence number required
356         logseqno lsn;           // log sequence number for entry
357         uint len;                       // length of entry
358         unsigned char type;     // type of entry
359         unsigned char lvl;      // level of btree entry pertains to
360 } BtLogHdr;
361
362 // B-Tree functions
363
364 extern void bt_close (BtDb *bt);
365 extern BtDb *bt_open (BtMgr *mgr);
366 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
367 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
368 extern void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
369 extern void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
370 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
371 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
372 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
373 extern BtKey *bt_foundkey (BtDb *bt);
374 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
375 extern uint bt_nextkey   (BtDb *bt, uint slot);
376
377 //      manager functions
378 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint rmpages);
379 extern void bt_mgrclose (BtMgr *mgr);
380 extern logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val);
381 extern logseqno bt_txnredo (BtDb *bt, BtPage page);
382
383 //  Helper functions to return slot values
384 //      from the cursor page.
385
386 extern BtKey *bt_key (BtDb *bt, uint slot);
387 extern BtVal *bt_val (BtDb *bt, uint slot);
388
389 //  The page is allocated from low and hi ends.
390 //  The key slots are allocated from the bottom,
391 //      while the text and value of the key
392 //  are allocated from the top.  When the two
393 //  areas meet, the page is split into two.
394
395 //  A key consists of a length byte, two bytes of
396 //  index number (0 - 65535), and up to 253 bytes
397 //  of key value.
398
399 //  Associated with each key is a value byte string
400 //      containing any value desired.
401
402 //  The b-tree root is always located at page 1.
403 //      The first leaf page of level zero is always
404 //      located on page 2.
405
406 //      The b-tree pages are linked with next
407 //      pointers to facilitate enumerators,
408 //      and provide for concurrency.
409
410 //      When to root page fills, it is split in two and
411 //      the tree height is raised by a new root at page
412 //      one with two keys.
413
414 //      Deleted keys are marked with a dead bit until
415 //      page cleanup. The fence key for a leaf node is
416 //      always present
417
418 //  To achieve maximum concurrency one page is locked at a time
419 //  as the tree is traversed to find leaf key in question. The right
420 //  page numbers are used in cases where the page is being split,
421 //      or consolidated.
422
423 //  Page 0 is dedicated to lock for new page extensions,
424 //      and chains empty pages together for reuse. It also
425 //      contains the latch manager hash table.
426
427 //      The ParentModification lock on a node is obtained to serialize posting
428 //      or changing the fence key for a node.
429
430 //      Empty pages are chained together through the ALLOC page and reused.
431
432 //      Access macros to address slot and key values from the page
433 //      Page slots use 1 based indexing.
434
435 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
436 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
437 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
438
439 void bt_putid(unsigned char *dest, uid id)
440 {
441 int i = BtId;
442
443         while( i-- )
444                 dest[i] = (unsigned char)id, id >>= 8;
445 }
446
447 uid bt_getid(unsigned char *src)
448 {
449 uid id = 0;
450 int i;
451
452         for( i = 0; i < BtId; i++ )
453                 id <<= 8, id |= *src++; 
454
455         return id;
456 }
457
458 uid bt_newdup (BtDb *bt)
459 {
460 #ifdef unix
461         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
462 #else
463         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
464 #endif
465 }
466
467 //      Write-Only Queue Lock
468
469 void WriteOLock (WOLock *lock, ushort tid)
470 {
471 uint prev;
472
473         if( lock->tid == tid ) {
474                 lock->dup++;
475                 return;
476         }
477
478         while( 1 ) {
479 #ifdef unix
480           prev = __sync_fetch_and_or (lock->exclusive, 1);
481 #else
482           prev = _InterlockedExchangeOr (lock->exclusive, 1);
483 #endif
484           if( !(prev & XCL) ) {
485                 lock->tid = tid;
486                 return;
487           }
488 #ifdef unix
489           sys_futex( (void *)lock->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
490 #else
491           SwitchToThread ();
492 #endif
493         }
494 }
495
496 void WriteORelease (WOLock *lock)
497 {
498         if( lock->dup ) {
499                 lock->dup--;
500                 return;
501         }
502
503         *lock->exclusive = 0;
504         lock->tid = 0;
505 #ifdef linux
506         sys_futex( (void *)lock->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
507 #endif
508 }
509
510 //      Phase-Fair reader/writer lock implementation
511
512 void WriteLock (RWLock *lock, ushort tid)
513 {
514 ushort w, r, tix;
515
516         if( lock->tid == tid ) {
517                 lock->dup++;
518                 return;
519         }
520 #ifdef unix
521         tix = __sync_fetch_and_add (lock->ticket, 1);
522 #else
523         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
524 #endif
525         // wait for our ticket to come up
526
527         while( tix != lock->serving[0] )
528 #ifdef unix
529                 sched_yield();
530 #else
531                 SwitchToThread ();
532 #endif
533
534         w = PRES | (tix & PHID);
535 #ifdef  unix
536         r = __sync_fetch_and_add (lock->rin, w);
537 #else
538         r = _InterlockedExchangeAdd16 (lock->rin, w);
539 #endif
540         while( r != *lock->rout )
541 #ifdef unix
542                 sched_yield();
543 #else
544                 SwitchToThread();
545 #endif
546         lock->tid = tid;
547 }
548
549 void WriteRelease (RWLock *lock)
550 {
551         if( lock->dup ) {
552                 lock->dup--;
553                 return;
554         }
555
556         lock->tid = 0;
557 #ifdef unix
558         __sync_fetch_and_and (lock->rin, ~MASK);
559 #else
560         _InterlockedAnd16 (lock->rin, ~MASK);
561 #endif
562         lock->serving[0]++;
563 }
564
565 //      try to obtain read lock
566 //  return 1 if successful
567
568 int ReadTry (RWLock *lock, ushort tid)
569 {
570 ushort w;
571
572         //  OK if write lock already held by same thread
573
574         if( lock->tid == tid ) {
575                 lock->dup++;
576                 return 1;
577         }
578 #ifdef unix
579         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
580 #else
581         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
582 #endif
583         if( w )
584           if( w == (*lock->rin & MASK) ) {
585 #ifdef unix
586                 __sync_fetch_and_add (lock->rin, -RINC);
587 #else
588                 _InterlockedExchangeAdd16 (lock->rin, -RINC);
589 #endif
590                 return 0;
591           }
592
593         return 1;
594 }
595
596 void ReadLock (RWLock *lock, ushort tid)
597 {
598 ushort w;
599         if( lock->tid == tid ) {
600                 lock->dup++;
601                 return;
602         }
603 #ifdef unix
604         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
605 #else
606         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
607 #endif
608         if( w )
609           while( w == (*lock->rin & MASK) )
610 #ifdef unix
611                 sched_yield ();
612 #else
613                 SwitchToThread ();
614 #endif
615 }
616
617 void ReadRelease (RWLock *lock)
618 {
619         if( lock->dup ) {
620                 lock->dup--;
621                 return;
622         }
623
624 #ifdef unix
625         __sync_fetch_and_add (lock->rout, RINC);
626 #else
627         _InterlockedExchangeAdd16 (lock->rout, RINC);
628 #endif
629 }
630
631 //      lite weight spin lock Latch Manager
632
633 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
634 {
635         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
636 }
637
638 void bt_mutexlock(BtMutexLatch *latch)
639 {
640 uint prev;
641
642   while( 1 ) {
643 #ifdef  unix
644         prev = __sync_fetch_and_or(latch->exclusive, XCL);
645 #else
646         prev = _InterlockedOr(latch->exclusive, XCL);
647 #endif
648         if( !(prev & XCL) )
649                 return;
650 #ifdef  unix
651         sys_futex( (void *)latch->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
652 #else
653         SwitchToThread();
654 #endif
655   }
656 }
657
658 //      try to obtain write lock
659
660 //      return 1 if obtained,
661 //              0 otherwise
662
663 int bt_mutextry(BtMutexLatch *latch)
664 {
665 uint prev;
666
667 #ifdef  unix
668         prev = __sync_fetch_and_or(latch->exclusive, XCL);
669 #else
670         prev = _InterlockedOr(latch->exclusive, XCL);
671 #endif
672         //      take write access if exclusive bit is clear
673
674         return !(prev & XCL);
675 }
676
677 //      clear write mode
678
679 void bt_releasemutex(BtMutexLatch *latch)
680 {
681         *latch->exclusive = 0;
682 #ifdef unix
683         sys_futex( (void *)latch->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
684 #endif
685 }
686
687 //      recovery manager -- flush dirty pages
688
689 void bt_flushlsn (BtDb *bt)
690 {
691 uint cnt3 = 0, cnt2 = 0, cnt = 0;
692 BtLatchSet *latch;
693 BtPage page;
694 uint entry;
695
696   //    flush dirty pool pages to the btree that were
697   //    dirty before the start of this redo recovery buffer
698 fprintf(stderr, "Start flushlsn\n");
699   for( entry = 1; entry < bt->mgr->latchtotal; entry++ ) {
700         page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
701         latch = bt->mgr->latchsets + entry;
702         bt_mutexlock (latch->modify);
703         bt_lockpage(bt, BtLockRead, latch);
704
705         if( latch->dirty ) {
706           bt_writepage(bt->mgr, page, latch->page_no);
707           latch->dirty = 0, bt->writes++, cnt++;
708         }
709 if( latch->avail )
710 cnt3++;
711 if( latch->pin & ~CLOCK_bit )
712 cnt2++;
713         bt_unlockpage(bt, BtLockRead, latch);
714         bt_releasemutex (latch->modify);
715   }
716 fprintf(stderr, "End flushlsn %d pages  %d pinned  %d available\n", cnt, cnt2, cnt3);
717 }
718
719 //      recovery manager -- process current recovery buff on startup
720 //      this won't do much if previous session was properly closed.
721
722 BTERR bt_recoveryredo (BtMgr *mgr)
723 {
724 BtLogHdr *hdr, *eof;
725 uint offset = 0;
726 BtKey *key;
727 BtVal *val;
728
729   pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
730
731   hdr = (BtLogHdr *)mgr->redobuff;
732   mgr->flushlsn = hdr->lsn;
733
734   while( 1 ) {
735         hdr = (BtLogHdr *)(mgr->redobuff + offset);
736         switch( hdr->type ) {
737         case BTRM_eof:
738                 mgr->lsn = hdr->lsn;
739                 return 0;
740         case BTRM_add:          // add a unique key-value to btree
741         
742         case BTRM_dup:          // add a duplicate key-value to btree
743         case BTRM_del:          // delete a key-value from btree
744         case BTRM_upd:          // update a key with a new value
745         case BTRM_new:          // allocate a new empty page
746         case BTRM_old:          // reuse an old empty page
747                 return 0;
748         }
749   }
750 }
751
752 //      recovery manager -- dump current recovery buff & flush dirty pages
753 //              in preparation for next recovery buffer.
754
755 BTERR bt_dumpredo (BtDb *bt)
756 {
757 BtLogHdr *eof;
758 fprintf(stderr, "Flush pages ");
759
760         eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
761         memset (eof, 0, sizeof(BtLogHdr));
762
763         //  flush pages written at beginning of this redo buffer
764         //      then write the redo buffer out to disk
765
766         fdatasync (bt->mgr->idx);
767
768 fprintf(stderr, "Dump ReDo: %d bytes\n", bt->mgr->redoend);
769         pwrite (bt->mgr->idx, bt->mgr->redobuff, bt->mgr->redoend + sizeof(BtLogHdr), REDO_page << bt->mgr->page_bits);
770
771         sync_file_range (bt->mgr->idx, REDO_page << bt->mgr->page_bits, bt->mgr->redoend + sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
772
773         bt->mgr->flushlsn = bt->mgr->lsn;
774         bt->mgr->redoend = 0;
775
776         eof = (BtLogHdr *)(bt->mgr->redobuff);
777         memset (eof, 0, sizeof(BtLogHdr));
778         eof->lsn = bt->mgr->lsn;
779         return 0;
780 }
781
782 //      recovery manager -- append new entry to recovery log
783 //      flush to disk when it overflows.
784
785 logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val)
786 {
787 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
788 uint amt = sizeof(BtLogHdr);
789 BtLogHdr *hdr, *eof;
790 uint flush;
791
792         bt_mutexlock (bt->mgr->redo);
793
794         if( key )
795           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
796
797         //      see if new entry fits in buffer
798         //      flush and reset if it doesn't
799
800         if( flush = amt > size - bt->mgr->redoend ) {
801           bt_mutexlock (bt->mgr->dump);
802
803           if( bt_dumpredo (bt) )
804                 return 0;
805         }
806
807         //      fill in new entry & either eof or end block
808
809         hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
810
811         hdr->len = amt;
812         hdr->type = type;
813         hdr->lvl = lvl;
814         hdr->lsn = ++bt->mgr->lsn;
815
816         bt->mgr->redoend += amt;
817
818         eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
819         memset (eof, 0, sizeof(BtLogHdr));
820
821         //  fill in key and value
822
823         if( key ) {
824           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
825           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
826         }
827
828         bt_releasemutex(bt->mgr->redo);
829
830         if( flush ) {
831           bt_flushlsn (bt);
832           bt_releasemutex(bt->mgr->dump);
833         }
834
835         return hdr->lsn;
836 }
837
838 //      recovery manager -- append transaction to recovery log
839 //      flush to disk when it overflows.
840
841 logseqno bt_txnredo (BtDb *bt, BtPage source)
842 {
843 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
844 uint amt = 0, src, type;
845 BtLogHdr *hdr, *eof;
846 logseqno lsn;
847 uint flush;
848 BtKey *key;
849 BtVal *val;
850
851         //      determine amount of redo recovery log space required
852
853         for( src = 0; src++ < source->cnt; ) {
854           key = keyptr(source,src);
855           val = valptr(source,src);
856           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
857           amt += sizeof(BtLogHdr);
858         }
859
860         bt_mutexlock (bt->mgr->redo);
861
862         //      see if new entry fits in buffer
863         //      flush and reset if it doesn't
864
865         if( flush = amt > size - bt->mgr->redoend ) {
866           bt_mutexlock (bt->mgr->dump);
867
868           if( bt_dumpredo (bt) )
869                 return 0;
870         }
871
872         //      assign new lsn to transaction
873
874         lsn = ++bt->mgr->lsn;
875
876         //      fill in new entries
877
878         for( src = 0; src++ < source->cnt; ) {
879           key = keyptr(source, src);
880           val = valptr(source, src);
881
882           switch( slotptr(source, src)->type ) {
883           case Unique:
884                 type = BTRM_add;
885                 break;
886           case Duplicate:
887                 type = BTRM_dup;
888                 break;
889           case Delete:
890                 type = BTRM_del;
891                 break;
892           case Update:
893                 type = BTRM_upd;
894                 break;
895           }
896
897           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
898           amt += sizeof(BtLogHdr);
899
900           hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
901           hdr->len = amt;
902           hdr->type = type;
903           hdr->lsn = lsn;
904           hdr->lvl = 0;
905
906           //  fill in key and value
907
908           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
909           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
910
911           bt->mgr->redoend += amt;
912         }
913
914         eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
915         memset (eof, 0, sizeof(BtLogHdr));
916
917         bt_releasemutex(bt->mgr->redo);
918
919         if( flush ) {
920           bt_flushlsn (bt);
921           bt_releasemutex(bt->mgr->dump);
922         }
923
924         return lsn;
925 }
926
927 //      read page into buffer pool from permanent location in Btree file
928
929 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
930 {
931 off64_t off = page_no << mgr->page_bits;
932
933 #ifdef unix
934         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
935                 fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno);
936                 return BTERR_read;
937         }
938 #else
939 OVERLAPPED ovl[1];
940 uint amt[1];
941
942         memset (ovl, 0, sizeof(OVERLAPPED));
943         ovl->Offset = off;
944         ovl->OffsetHigh = off >> 32;
945
946         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
947                 fprintf (stderr, "Unable to read page %d GetLastError = %d\n", page_no, GetLastError());
948                 return BTERR_read;
949         }
950         if( *amt <  mgr->page_size ) {
951                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
952                 return BTERR_read;
953         }
954 #endif
955         return 0;
956 }
957
958 //      write page to permanent location in Btree file
959 //      clear the dirty bit
960
961 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
962 {
963 off64_t off = page_no << mgr->page_bits;
964
965 #ifdef unix
966         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
967                 return BTERR_wrt;
968 #else
969 OVERLAPPED ovl[1];
970 uint amt[1];
971
972         memset (ovl, 0, sizeof(OVERLAPPED));
973         ovl->Offset = off;
974         ovl->OffsetHigh = off >> 32;
975
976         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
977                 return BTERR_wrt;
978
979         if( *amt <  mgr->page_size )
980                 return BTERR_wrt;
981 #endif
982         return 0;
983 }
984
985 //      set CLOCK bit in latch
986 //      decrement pin count
987
988 void bt_unpinlatch (BtLatchSet *latch)
989 {
990         bt_mutexlock(latch->modify);
991         latch->pin |= CLOCK_bit;
992         latch->pin--;
993         bt_releasemutex(latch->modify);
994 }
995
996 //  return the btree cached page address
997 //      if page is dirty and has not yet been
998 //      flushed to disk for the current redo
999 //      recovery buffer, write it out.
1000
1001 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
1002 {
1003 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
1004
1005   return page;
1006 }
1007
1008 //  return next available latch entry
1009 //        and with latch entry locked
1010
1011 uint bt_availnext (BtDb *bt)
1012 {
1013 BtLatchSet *latch;
1014 uint entry;
1015
1016   while( 1 ) {
1017 #ifdef unix
1018         entry = __sync_fetch_and_add (&bt->mgr->latchavail, 1) + 1;
1019 #else
1020         entry = _InterlockedIncrement (&bt->mgr->latchavail);
1021 #endif
1022         entry %= bt->mgr->latchtotal;
1023
1024         if( !entry )
1025                 continue;
1026
1027         latch = bt->mgr->latchsets + entry;
1028
1029         if( !latch->avail )
1030                 continue;
1031
1032         bt_mutexlock(latch->modify);
1033
1034         if( !latch->avail ) {
1035                 bt_releasemutex(latch->modify);
1036                 continue;
1037         }
1038
1039         return entry;
1040   }
1041 }
1042
1043 //      find and add the next available latch entry
1044 //      to the queue
1045
1046 BTERR bt_availlatch (BtDb *bt)
1047 {
1048 BtLatchSet *latch;
1049 uint startattempt;
1050 uint cnt, entry;
1051 uint hashidx;
1052 BtPage page;
1053
1054   //  find and reuse previous entry on victim
1055
1056   startattempt = bt->mgr->latchvictim;
1057
1058   while( 1 ) {
1059 #ifdef unix
1060         entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
1061 #else
1062         entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
1063 #endif
1064         //      skip entry if it has outstanding pins
1065
1066         entry %= bt->mgr->latchtotal;
1067
1068         if( !entry )
1069                 continue;
1070
1071         //      only go around one time before
1072         //      flushing redo recovery buffer,
1073         //      and the buffer pool to free up entries.
1074
1075         if( bt->mgr->redopages )
1076           if( bt->mgr->latchvictim - startattempt > bt->mgr->latchtotal ) {
1077             if( bt_mutextry (bt->mgr->dump) ) {
1078                  if( bt_dumpredo (bt) )
1079                   return bt->err;
1080                  bt_flushlsn (bt);
1081                 // synchronize the various threads running into this condition
1082                 //      so that only one thread does the dump and flush
1083                 } else
1084                         bt_mutexlock(bt->mgr->dump);
1085
1086                 startattempt = bt->mgr->latchvictim;
1087                 bt_releasemutex(bt->mgr->dump);
1088           }
1089
1090         latch = bt->mgr->latchsets + entry;
1091
1092         if( latch->avail )
1093           continue;
1094
1095         bt_mutexlock(latch->modify);
1096
1097         //      skip if already an available entry
1098
1099         if( latch->avail ) {
1100           bt_releasemutex(latch->modify);
1101           continue;
1102         }
1103
1104         //  skip this entry if it is pinned
1105         //      if the CLOCK bit is set
1106         //      reset it to zero.
1107
1108         if( latch->pin ) {
1109           latch->pin &= ~CLOCK_bit;
1110           bt_releasemutex(latch->modify);
1111           continue;
1112         }
1113
1114         page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
1115
1116         //      if dirty page has lsn >= last redo recovery buffer
1117         //      then hold page in the buffer pool until next redo
1118         //      recovery buffer is being written to disk.
1119
1120         if( latch->dirty )
1121           if( page->lsn >= bt->mgr->flushlsn ) {
1122             bt_releasemutex(latch->modify);
1123                 continue;
1124           }
1125
1126         //      entry is available
1127 #ifdef unix
1128         __sync_fetch_and_add (&bt->mgr->available, 1);
1129 #else
1130         _InterlockedIncrement(&bt->mgr->available);
1131 #endif
1132         latch->avail = 1;
1133         bt_releasemutex(latch->modify);
1134         return 0;
1135   }
1136 }
1137
1138 //      release available latch requests
1139
1140 void bt_availrelease (BtDb *bt, uint count)
1141 {
1142 #ifdef unix
1143         __sync_fetch_and_add(bt->mgr->availlock, -count);
1144 #else
1145         _InterlockedAdd(bt->mgr->availlock, -count);
1146 #endif
1147 }
1148
1149 //      commit available chain entries
1150 //      find available entries as required
1151
1152 void bt_availrequest (BtDb *bt, uint count)
1153 {
1154 #ifdef unix
1155         __sync_fetch_and_add(bt->mgr->availlock, count);
1156 #else
1157         _InterlockedAdd(bt->mgr->availlock, count);
1158 #endif
1159
1160         while( *bt->mgr->availlock > bt->mgr->available )
1161                 bt_availlatch (bt);
1162 }
1163
1164 //      find available latchset
1165 //      return with latchset pinned
1166
1167 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, BtPage loadit)
1168 {
1169 uint hashidx = page_no % bt->mgr->latchhash;
1170 BtLatchSet *latch;
1171 uint entry, idx;
1172 BtPage page;
1173
1174   //  try to find our entry
1175
1176   bt_mutexlock(bt->mgr->hashtable[hashidx].latch);
1177
1178   if( entry = bt->mgr->hashtable[hashidx].entry ) do
1179   {
1180         latch = bt->mgr->latchsets + entry;
1181         if( page_no == latch->page_no )
1182                 break;
1183   } while( entry = latch->next );
1184
1185   //  found our entry: increment pin
1186   //  remove from available status
1187
1188   if( entry ) {
1189         latch = bt->mgr->latchsets + entry;
1190         bt_mutexlock(latch->modify);
1191         if( latch->avail )
1192 #ifdef unix
1193           __sync_fetch_and_add (&bt->mgr->available, -1);
1194 #else
1195           _InterlockedDecrement(&bt->mgr->available);
1196 #endif
1197         latch->avail = 0;
1198         latch->pin |= CLOCK_bit;
1199         latch->pin++;
1200
1201         bt_releasemutex(latch->modify);
1202         bt_releasemutex(bt->mgr->hashtable[hashidx].latch);
1203         return latch;
1204   }
1205
1206   //  find and reuse entry from available set
1207
1208 trynext:
1209
1210   if( entry = bt_availnext (bt) )
1211         latch = bt->mgr->latchsets + entry;
1212   else
1213         return bt->line = __LINE__, bt->err = BTERR_avail, NULL;
1214
1215   idx = latch->page_no % bt->mgr->latchhash;
1216
1217   //  if latch is on a different hash chain
1218   //    unlink from the old page_no chain
1219
1220   if( latch->page_no )
1221    if( idx != hashidx ) {
1222
1223         //  skip over this entry if latch not available
1224
1225     if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
1226           bt_releasemutex(latch->modify);
1227           goto trynext;
1228         }
1229
1230         if( latch->prev )
1231           bt->mgr->latchsets[latch->prev].next = latch->next;
1232         else
1233           bt->mgr->hashtable[idx].entry = latch->next;
1234
1235         if( latch->next )
1236           bt->mgr->latchsets[latch->next].prev = latch->prev;
1237
1238     bt_releasemutex (bt->mgr->hashtable[idx].latch);
1239    }
1240
1241   //  remove available status
1242
1243   latch->avail = 0;
1244 #ifdef unix
1245   __sync_fetch_and_add (&bt->mgr->available, -1);
1246 #else
1247   _InterlockedDecrement(&bt->mgr->available);
1248 #endif
1249   page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
1250
1251   if( latch->dirty )
1252         if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
1253           return bt->line = __LINE__, NULL;
1254         else
1255           latch->dirty = 0, bt->writes++;
1256
1257   if( loadit ) {
1258         memcpy (page, loadit, bt->mgr->page_size);
1259         latch->dirty = 1;
1260   } else
1261         if( bt->err = bt_readpage (bt->mgr, page, page_no) )
1262           return bt->line = __LINE__, NULL;
1263         else
1264           bt->reads++;
1265
1266   //  link page as head of hash table chain
1267   //  if this is a never before used entry,
1268   //  or it was previously on a different
1269   //  hash table chain. Otherwise, just
1270   //  leave it in its current hash table
1271   //  chain position.
1272
1273   if( !latch->page_no || hashidx != idx ) {
1274         if( latch->next = bt->mgr->hashtable[hashidx].entry )
1275           bt->mgr->latchsets[latch->next].prev = entry;
1276
1277         bt->mgr->hashtable[hashidx].entry = entry;
1278     latch->prev = 0;
1279   }
1280
1281   //  fill in latch structure
1282
1283   latch->pin = CLOCK_bit | 1;
1284   latch->page_no = page_no;
1285   latch->entry = entry;
1286   latch->split = 0;
1287
1288   bt_releasemutex (latch->modify);
1289   bt_releasemutex (bt->mgr->hashtable[hashidx].latch);
1290   return latch;
1291 }
1292   
1293 void bt_mgrclose (BtMgr *mgr)
1294 {
1295 BtLatchSet *latch;
1296 BtLogHdr *eof;
1297 uint num = 0;
1298 BtPage page;
1299 uint slot;
1300
1301         //      flush previously written dirty pages
1302         //      and write recovery buffer to disk
1303
1304         fdatasync (mgr->idx);
1305
1306         if( mgr->redoend ) {
1307                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1308                 memset (eof, 0, sizeof(BtLogHdr));
1309
1310                 pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1311         }
1312
1313         //      write remaining dirty pool pages to the btree
1314
1315         for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1316                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1317                 latch = mgr->latchsets + slot;
1318
1319                 if( latch->dirty ) {
1320                         bt_writepage(mgr, page, latch->page_no);
1321                         latch->dirty = 0, num++;
1322                 }
1323         }
1324
1325         //      flush last batch to disk and clear
1326         //      redo recovery buffer on disk.
1327
1328         fdatasync (mgr->idx);
1329
1330         eof = (BtLogHdr *)mgr->redobuff;
1331         memset (eof, 0, sizeof(BtLogHdr));
1332         eof->lsn = mgr->lsn;
1333
1334         pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1335
1336         sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
1337
1338         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1339
1340 #ifdef unix
1341         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1342         munmap (mgr->pagezero, mgr->page_size);
1343 #else
1344         FlushViewOfFile(mgr->pagezero, 0);
1345         UnmapViewOfFile(mgr->pagezero);
1346         UnmapViewOfFile(mgr->pagepool);
1347         CloseHandle(mgr->halloc);
1348         CloseHandle(mgr->hpool);
1349 #endif
1350 #ifdef unix
1351         free (mgr->redobuff);
1352         close (mgr->idx);
1353         free (mgr);
1354 #else
1355         VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1356         FlushFileBuffers(mgr->idx);
1357         CloseHandle(mgr->idx);
1358         GlobalFree (mgr);
1359 #endif
1360 }
1361
1362 //      close and release memory
1363
1364 void bt_close (BtDb *bt)
1365 {
1366 #ifdef unix
1367         if( bt->mem )
1368                 free (bt->mem);
1369 #else
1370         if( bt->mem)
1371                 VirtualFree (bt->mem, 0, MEM_RELEASE);
1372 #endif
1373         free (bt);
1374 }
1375
1376 //  open/create new btree buffer manager
1377
1378 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1379 //              size of page pool (e.g. 262144)
1380
1381 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1382 {
1383 uint lvl, attr, last, slot, idx;
1384 uint nlatchpage, latchhash;
1385 unsigned char value[BtId];
1386 int flag, initit = 0;
1387 BtPageZero *pagezero;
1388 BtLatchSet *latch;
1389 off64_t size;
1390 uint amt[1];
1391 BtMgr* mgr;
1392 BtKey* key;
1393 BtVal *val;
1394
1395         // determine sanity of page size and buffer pool
1396
1397         if( bits > BT_maxbits )
1398                 bits = BT_maxbits;
1399         else if( bits < BT_minbits )
1400                 bits = BT_minbits;
1401
1402         if( nodemax < 16 ) {
1403                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
1404                 return NULL;
1405         }
1406
1407 #ifdef unix
1408         mgr = calloc (1, sizeof(BtMgr));
1409
1410         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1411
1412         if( mgr->idx == -1 ) {
1413                 fprintf (stderr, "Unable to open btree file\n");
1414                 return free(mgr), NULL;
1415         }
1416 #else
1417         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1418         attr = FILE_ATTRIBUTE_NORMAL;
1419         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1420
1421         if( mgr->idx == INVALID_HANDLE_VALUE )
1422                 return GlobalFree(mgr), NULL;
1423 #endif
1424
1425 #ifdef unix
1426         pagezero = valloc (BT_maxpage);
1427         *amt = 0;
1428
1429         // read minimum page size to get root info
1430         //      to support raw disk partition files
1431         //      check if bits == 0 on the disk.
1432
1433         if( size = lseek (mgr->idx, 0L, 2) )
1434                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1435                         if( pagezero->alloc->bits )
1436                                 bits = pagezero->alloc->bits;
1437                         else
1438                                 initit = 1;
1439                 else
1440                         return free(mgr), free(pagezero), NULL;
1441         else
1442                 initit = 1;
1443 #else
1444         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1445         size = GetFileSize(mgr->idx, amt);
1446
1447         if( size || *amt ) {
1448                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1449                         return bt_mgrclose (mgr), NULL;
1450                 bits = pagezero->alloc->bits;
1451         } else
1452                 initit = 1;
1453 #endif
1454
1455         mgr->page_size = 1 << bits;
1456         mgr->page_bits = bits;
1457
1458         //  calculate number of latch hash table entries
1459
1460         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1461
1462         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1463         mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1464         mgr->latchtotal = nodemax;
1465
1466         if( !initit )
1467                 goto mgrlatch;
1468
1469         // initialize an empty b-tree with latch page, root page, page of leaves
1470         // and page(s) of latches and page pool cache
1471
1472         memset (pagezero, 0, 1 << bits);
1473         pagezero->alloc->bits = mgr->page_bits;
1474         bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
1475
1476         //  initialize left-most LEAF page in
1477         //      alloc->left.
1478
1479         bt_putid (pagezero->alloc->left, LEAF_page);
1480
1481         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1482                 fprintf (stderr, "Unable to create btree page zero\n");
1483                 return bt_mgrclose (mgr), NULL;
1484         }
1485
1486         memset (pagezero, 0, 1 << bits);
1487         pagezero->alloc->bits = mgr->page_bits;
1488
1489         for( lvl=MIN_lvl; lvl--; ) {
1490                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1491                 key = keyptr(pagezero->alloc, 1);
1492                 key->len = 2;           // create stopper key
1493                 key->key[0] = 0xff;
1494                 key->key[1] = 0xff;
1495
1496                 bt_putid(value, MIN_lvl - lvl + 1);
1497                 val = valptr(pagezero->alloc, 1);
1498                 val->len = lvl ? BtId : 0;
1499                 memcpy (val->value, value, val->len);
1500
1501                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1502                 pagezero->alloc->lvl = lvl;
1503                 pagezero->alloc->cnt = 1;
1504                 pagezero->alloc->act = 1;
1505
1506                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1507                         fprintf (stderr, "Unable to create btree page zero\n");
1508                         return bt_mgrclose (mgr), NULL;
1509                 }
1510         }
1511
1512 mgrlatch:
1513 #ifdef unix
1514         free (pagezero);
1515 #else
1516         VirtualFree (pagezero, 0, MEM_RELEASE);
1517 #endif
1518 #ifdef unix
1519         // mlock the pagezero page
1520
1521         flag = PROT_READ | PROT_WRITE;
1522         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1523         if( mgr->pagezero == MAP_FAILED ) {
1524                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1525                 return bt_mgrclose (mgr), NULL;
1526         }
1527         mlock (mgr->pagezero, mgr->page_size);
1528
1529         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1530         if( mgr->pagepool == MAP_FAILED ) {
1531                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1532                 return bt_mgrclose (mgr), NULL;
1533         }
1534         if( mgr->redopages = redopages )
1535                 mgr->redobuff = valloc (redopages * mgr->page_size);
1536 #else
1537         flag = PAGE_READWRITE;
1538         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1539         if( !mgr->halloc ) {
1540                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1541                 return bt_mgrclose (mgr), NULL;
1542         }
1543
1544         flag = FILE_MAP_WRITE;
1545         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1546         if( !mgr->pagezero ) {
1547                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1548                 return bt_mgrclose (mgr), NULL;
1549         }
1550
1551         flag = PAGE_READWRITE;
1552         size = (uid)mgr->nlatchpage << mgr->page_bits;
1553         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1554         if( !mgr->hpool ) {
1555                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1556                 return bt_mgrclose (mgr), NULL;
1557         }
1558
1559         flag = FILE_MAP_WRITE;
1560         mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1561         if( !mgr->pagepool ) {
1562                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1563                 return bt_mgrclose (mgr), NULL;
1564         }
1565         if( mgr->redopages = redopages )
1566                 mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
1567 #endif
1568
1569         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1570         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1571         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1572
1573         //      mark all pool entries as available
1574
1575         for( idx = 1; idx < mgr->latchtotal; idx++ ) {
1576                 latch = mgr->latchsets + idx;
1577                 latch->avail = 1;
1578                 mgr->available++;
1579         }
1580
1581         return mgr;
1582 }
1583
1584 //      open BTree access method
1585 //      based on buffer manager
1586
1587 BtDb *bt_open (BtMgr *mgr)
1588 {
1589 BtDb *bt = malloc (sizeof(*bt));
1590
1591         memset (bt, 0, sizeof(*bt));
1592         bt->mgr = mgr;
1593 #ifdef unix
1594         bt->mem = valloc (2 *mgr->page_size);
1595 #else
1596         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1597 #endif
1598         bt->frame = (BtPage)bt->mem;
1599         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1600 #ifdef unix
1601         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1602 #else
1603         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1604 #endif
1605         return bt;
1606 }
1607
1608 //  compare two keys, return > 0, = 0, or < 0
1609 //  =0: keys are same
1610 //  -1: key2 > key1
1611 //  +1: key2 < key1
1612 //  as the comparison value
1613
1614 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1615 {
1616 uint len1 = key1->len;
1617 int ans;
1618
1619         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1620                 return ans;
1621
1622         if( len1 > len2 )
1623                 return 1;
1624         if( len1 < len2 )
1625                 return -1;
1626
1627         return 0;
1628 }
1629
1630 // place write, read, or parent lock on requested page_no.
1631
1632 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1633 {
1634         switch( mode ) {
1635         case BtLockRead:
1636                 ReadLock (latch->readwr, bt->thread_no);
1637                 break;
1638         case BtLockWrite:
1639                 WriteLock (latch->readwr, bt->thread_no);
1640                 break;
1641         case BtLockAccess:
1642                 ReadLock (latch->access, bt->thread_no);
1643                 break;
1644         case BtLockDelete:
1645                 WriteLock (latch->access, bt->thread_no);
1646                 break;
1647         case BtLockParent:
1648                 WriteOLock (latch->parent, bt->thread_no);
1649                 break;
1650         case BtLockAtomic:
1651                 WriteOLock (latch->atomic, bt->thread_no);
1652                 break;
1653         case BtLockAtomic | BtLockRead:
1654                 WriteOLock (latch->atomic, bt->thread_no);
1655                 ReadLock (latch->readwr, bt->thread_no);
1656                 break;
1657         }
1658 }
1659
1660 // remove write, read, or parent lock on requested page
1661
1662 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1663 {
1664         switch( mode ) {
1665         case BtLockRead:
1666                 ReadRelease (latch->readwr);
1667                 break;
1668         case BtLockWrite:
1669                 WriteRelease (latch->readwr);
1670                 break;
1671         case BtLockAccess:
1672                 ReadRelease (latch->access);
1673                 break;
1674         case BtLockDelete:
1675                 WriteRelease (latch->access);
1676                 break;
1677         case BtLockParent:
1678                 WriteORelease (latch->parent);
1679                 break;
1680         case BtLockAtomic:
1681                 WriteORelease (latch->atomic);
1682                 break;
1683         case BtLockAtomic | BtLockRead:
1684                 WriteORelease (latch->atomic);
1685                 ReadRelease (latch->readwr);
1686                 break;
1687         }
1688 }
1689
1690 //      allocate a new page
1691 //      return with page latched, but unlocked.
1692
1693 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1694 {
1695 uid page_no;
1696 int blk;
1697
1698         //      lock allocation page
1699
1700         bt_mutexlock(bt->mgr->lock);
1701
1702         // use empty chain first
1703         // else allocate empty page
1704
1705         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1706                 if( set->latch = bt_pinlatch (bt, page_no, NULL) )
1707                         set->page = bt_mappage (bt, set->latch);
1708                 else
1709                         return bt->err = BTERR_struct, bt->line = __LINE__, -1;
1710
1711                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1712                 bt_releasemutex(bt->mgr->lock);
1713
1714                 memcpy (set->page, contents, bt->mgr->page_size);
1715                 set->latch->dirty = 1;
1716                 return 0;
1717         }
1718
1719         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1720         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1721
1722         // unlock allocation latch and
1723         //      extend file into new page.
1724
1725         bt_releasemutex(bt->mgr->lock);
1726
1727         //      don't load cache from btree page
1728
1729         if( set->latch = bt_pinlatch (bt, page_no, contents) )
1730                 set->page = bt_mappage (bt, set->latch);
1731         else
1732                 return bt->err;
1733
1734         set->latch->dirty = 1;
1735         return 0;
1736 }
1737
1738 //  find slot in page for given key at a given level
1739
1740 int bt_findslot (BtPage page, unsigned char *key, uint len)
1741 {
1742 uint diff, higher = page->cnt, low = 1, slot;
1743 uint good = 0;
1744
1745         //        make stopper key an infinite fence value
1746
1747         if( bt_getid (page->right) )
1748                 higher++;
1749         else
1750                 good++;
1751
1752         //      low is the lowest candidate.
1753         //  loop ends when they meet
1754
1755         //  higher is already
1756         //      tested as .ge. the passed key.
1757
1758         while( diff = higher - low ) {
1759                 slot = low + ( diff >> 1 );
1760                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1761                         low = slot + 1;
1762                 else
1763                         higher = slot, good++;
1764         }
1765
1766         //      return zero if key is on right link page
1767
1768         return good ? higher : 0;
1769 }
1770
1771 //  find and load page at given level for given key
1772 //      leave page rd or wr locked as requested
1773
1774 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1775 {
1776 uid page_no = ROOT_page, prevpage = 0;
1777 uint drill = 0xff, slot;
1778 BtLatchSet *prevlatch;
1779 uint mode, prevmode;
1780 BtVal *val;
1781
1782   //  start at root of btree and drill down
1783
1784   do {
1785         // determine lock mode of drill level
1786         mode = (drill == lvl) ? lock : BtLockRead; 
1787
1788         if( !(set->latch = bt_pinlatch (bt, page_no, NULL)) )
1789           return 0;
1790
1791         // obtain access lock using lock chaining with Access mode
1792
1793         if( page_no > ROOT_page )
1794           bt_lockpage(bt, BtLockAccess, set->latch);
1795
1796         set->page = bt_mappage (bt, set->latch);
1797
1798         //      release & unpin parent or left sibling page
1799
1800         if( prevpage ) {
1801           bt_unlockpage(bt, prevmode, prevlatch);
1802           bt_unpinlatch (prevlatch);
1803           prevpage = 0;
1804         }
1805
1806         // obtain mode lock using lock chaining through AccessLock
1807
1808         bt_lockpage(bt, mode, set->latch);
1809
1810         if( set->page->free )
1811                 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1812
1813         if( page_no > ROOT_page )
1814           bt_unlockpage(bt, BtLockAccess, set->latch);
1815
1816         // re-read and re-lock root after determining actual level of root
1817
1818         if( set->page->lvl != drill) {
1819                 if( set->latch->page_no != ROOT_page )
1820                         return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1821                         
1822                 drill = set->page->lvl;
1823
1824                 if( lock != BtLockRead && drill == lvl ) {
1825                   bt_unlockpage(bt, mode, set->latch);
1826                   bt_unpinlatch (set->latch);
1827                   continue;
1828                 }
1829         }
1830
1831         prevpage = set->latch->page_no;
1832         prevlatch = set->latch;
1833         prevmode = mode;
1834
1835         //  find key on page at this level
1836         //  and descend to requested level
1837
1838         if( !set->page->kill )
1839          if( slot = bt_findslot (set->page, key, len) ) {
1840           if( drill == lvl )
1841                 return slot;
1842
1843           // find next non-dead slot -- the fence key if nothing else
1844
1845           while( slotptr(set->page, slot)->dead )
1846                 if( slot++ < set->page->cnt )
1847                   continue;
1848                 else
1849                   return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1850
1851           val = valptr(set->page, slot);
1852
1853           if( val->len == BtId )
1854                 page_no = bt_getid(valptr(set->page, slot)->value);
1855           else
1856                 return bt->line = __LINE__, bt->err = BTERR_struct, 0;
1857
1858           drill--;
1859           continue;
1860          }
1861
1862         //  slide right into next page
1863
1864         page_no = bt_getid(set->page->right);
1865   } while( page_no );
1866
1867   // return error on end of right chain
1868
1869   bt->line = __LINE__, bt->err = BTERR_struct;
1870   return 0;     // return error
1871 }
1872
1873 //      return page to free list
1874 //      page must be delete & write locked
1875
1876 void bt_freepage (BtDb *bt, BtPageSet *set)
1877 {
1878         //      lock allocation page
1879
1880         bt_mutexlock (bt->mgr->lock);
1881
1882         //      store chain
1883
1884         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1885         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1886         set->latch->dirty = 1;
1887         set->page->free = 1;
1888
1889         // unlock released page
1890
1891         bt_unlockpage (bt, BtLockDelete, set->latch);
1892         bt_unlockpage (bt, BtLockWrite, set->latch);
1893         bt_unpinlatch (set->latch);
1894
1895         // unlock allocation page
1896
1897         bt_releasemutex (bt->mgr->lock);
1898 }
1899
1900 //      a fence key was deleted from a page
1901 //      push new fence value upwards
1902
1903 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1904 {
1905 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1906 unsigned char value[BtId];
1907 BtKey* ptr;
1908 uint idx;
1909
1910         //      remove the old fence value
1911
1912         ptr = keyptr(set->page, set->page->cnt);
1913         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1914         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1915         set->latch->dirty = 1;
1916
1917         //  cache new fence value
1918
1919         ptr = keyptr(set->page, set->page->cnt);
1920         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1921
1922         bt_lockpage (bt, BtLockParent, set->latch);
1923         bt_unlockpage (bt, BtLockWrite, set->latch);
1924
1925         //      insert new (now smaller) fence key
1926
1927         bt_putid (value, set->latch->page_no);
1928         ptr = (BtKey*)leftkey;
1929
1930         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1931           return bt->err;
1932
1933         //      now delete old fence key
1934
1935         ptr = (BtKey*)rightkey;
1936
1937         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1938                 return bt->err;
1939
1940         bt_unlockpage (bt, BtLockParent, set->latch);
1941         bt_unpinlatch(set->latch);
1942         return 0;
1943 }
1944
1945 //      root has a single child
1946 //      collapse a level from the tree
1947
1948 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1949 {
1950 BtPageSet child[1];
1951 uid page_no;
1952 BtVal *val;
1953 uint idx;
1954
1955   // find the child entry and promote as new root contents
1956
1957   do {
1958         for( idx = 0; idx++ < root->page->cnt; )
1959           if( !slotptr(root->page, idx)->dead )
1960                 break;
1961
1962         val = valptr(root->page, idx);
1963
1964         if( val->len == BtId )
1965                 page_no = bt_getid (valptr(root->page, idx)->value);
1966         else
1967                 return bt->line = __LINE__, bt->err = BTERR_struct;
1968
1969         if( child->latch = bt_pinlatch (bt, page_no, NULL) )
1970                 child->page = bt_mappage (bt, child->latch);
1971         else
1972                 return bt->err;
1973
1974         bt_lockpage (bt, BtLockDelete, child->latch);
1975         bt_lockpage (bt, BtLockWrite, child->latch);
1976
1977         memcpy (root->page, child->page, bt->mgr->page_size);
1978         root->latch->dirty = 1;
1979
1980         bt_freepage (bt, child);
1981
1982   } while( root->page->lvl > 1 && root->page->act == 1 );
1983
1984   bt_unlockpage (bt, BtLockWrite, root->latch);
1985   bt_unpinlatch (root->latch);
1986   return 0;
1987 }
1988
1989 //  delete a page and manage keys
1990 //  call with page writelocked
1991 //      returns with page unpinned
1992
1993 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1994 {
1995 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1996 unsigned char value[BtId];
1997 uint lvl = set->page->lvl;
1998 BtPageSet right[1];
1999 uid page_no;
2000 BtKey *ptr;
2001
2002         //      cache copy of fence key
2003         //      to post in parent
2004
2005         ptr = keyptr(set->page, set->page->cnt);
2006         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
2007
2008         //      obtain lock on right page
2009
2010         page_no = bt_getid(set->page->right);
2011
2012         if( right->latch = bt_pinlatch (bt, page_no, NULL) )
2013                 right->page = bt_mappage (bt, right->latch);
2014         else
2015                 return 0;
2016
2017         bt_lockpage (bt, BtLockWrite, right->latch);
2018
2019         // cache copy of key to update
2020
2021         ptr = keyptr(right->page, right->page->cnt);
2022         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
2023
2024         if( right->page->kill )
2025                 return bt->line = __LINE__, bt->err = BTERR_struct;
2026
2027         // pull contents of right peer into our empty page
2028
2029         memcpy (set->page, right->page, bt->mgr->page_size);
2030         set->latch->dirty = 1;
2031
2032         // mark right page deleted and point it to left page
2033         //      until we can post parent updates that remove access
2034         //      to the deleted page.
2035
2036         bt_putid (right->page->right, set->latch->page_no);
2037         right->latch->dirty = 1;
2038         right->page->kill = 1;
2039
2040         bt_lockpage (bt, BtLockParent, right->latch);
2041         bt_unlockpage (bt, BtLockWrite, right->latch);
2042
2043         bt_lockpage (bt, BtLockParent, set->latch);
2044         bt_unlockpage (bt, BtLockWrite, set->latch);
2045
2046         // redirect higher key directly to our new node contents
2047
2048         bt_putid (value, set->latch->page_no);
2049         ptr = (BtKey*)higherfence;
2050
2051         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2052           return bt->err;
2053
2054         //      delete old lower key to our node
2055
2056         ptr = (BtKey*)lowerfence;
2057
2058         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
2059           return bt->err;
2060
2061         //      obtain delete and write locks to right node
2062
2063         bt_unlockpage (bt, BtLockParent, right->latch);
2064         bt_lockpage (bt, BtLockDelete, right->latch);
2065         bt_lockpage (bt, BtLockWrite, right->latch);
2066         bt_freepage (bt, right);
2067
2068         bt_unlockpage (bt, BtLockParent, set->latch);
2069         bt_unpinlatch (set->latch);
2070         return 0;
2071 }
2072
2073 //  find and delete key on page by marking delete flag bit
2074 //  if page becomes empty, delete it from the btree
2075
2076 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
2077 {
2078 uint slot, idx, found, fence;
2079 BtPageSet set[1];
2080 BtKey *ptr;
2081 BtVal *val;
2082
2083         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
2084                 ptr = keyptr(set->page, slot);
2085         else
2086                 return bt->err;
2087
2088         // if librarian slot, advance to real slot
2089
2090         if( slotptr(set->page, slot)->type == Librarian )
2091                 ptr = keyptr(set->page, ++slot);
2092
2093         fence = slot == set->page->cnt;
2094
2095         // if key is found delete it, otherwise ignore request
2096
2097         if( found = !keycmp (ptr, key, len) )
2098           if( found = slotptr(set->page, slot)->dead == 0 ) {
2099                 val = valptr(set->page,slot);
2100                 slotptr(set->page, slot)->dead = 1;
2101                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2102                 set->page->act--;
2103
2104                 // collapse empty slots beneath the fence
2105
2106                 while( idx = set->page->cnt - 1 )
2107                   if( slotptr(set->page, idx)->dead ) {
2108                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2109                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2110                   } else
2111                         break;
2112           }
2113
2114         //      did we delete a fence key in an upper level?
2115
2116         if( found && lvl && set->page->act && fence )
2117           if( bt_fixfence (bt, set, lvl) )
2118                 return bt->err;
2119           else
2120                 return 0;
2121
2122         //      do we need to collapse root?
2123
2124         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
2125           if( bt_collapseroot (bt, set) )
2126                 return bt->err;
2127           else
2128                 return 0;
2129
2130         //      delete empty page
2131
2132         if( !set->page->act )
2133                 return bt_deletepage (bt, set);
2134
2135         set->latch->dirty = 1;
2136         bt_unlockpage(bt, BtLockWrite, set->latch);
2137         bt_unpinlatch (set->latch);
2138         return 0;
2139 }
2140
2141 BtKey *bt_foundkey (BtDb *bt)
2142 {
2143         return (BtKey*)(bt->key);
2144 }
2145
2146 //      advance to next slot
2147
2148 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2149 {
2150 BtLatchSet *prevlatch;
2151 uid page_no;
2152
2153         if( slot < set->page->cnt )
2154                 return slot + 1;
2155
2156         prevlatch = set->latch;
2157
2158         if( page_no = bt_getid(set->page->right) )
2159           if( set->latch = bt_pinlatch (bt, page_no, NULL) )
2160                 set->page = bt_mappage (bt, set->latch);
2161           else
2162                 return 0;
2163         else
2164           return bt->err = BTERR_struct, bt->line = __LINE__, 0;
2165
2166         // obtain access lock using lock chaining with Access mode
2167
2168         bt_lockpage(bt, BtLockAccess, set->latch);
2169
2170         bt_unlockpage(bt, BtLockRead, prevlatch);
2171         bt_unpinlatch (prevlatch);
2172
2173         bt_lockpage(bt, BtLockRead, set->latch);
2174         bt_unlockpage(bt, BtLockAccess, set->latch);
2175         return 1;
2176 }
2177
2178 //      find unique key or first duplicate key in
2179 //      leaf level and return number of value bytes
2180 //      or (-1) if not found.  Setup key for bt_foundkey
2181
2182 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2183 {
2184 BtPageSet set[1];
2185 uint len, slot;
2186 int ret = -1;
2187 BtKey *ptr;
2188 BtVal *val;
2189
2190   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
2191    do {
2192         ptr = keyptr(set->page, slot);
2193
2194         //      skip librarian slot place holder
2195
2196         if( slotptr(set->page, slot)->type == Librarian )
2197                 ptr = keyptr(set->page, ++slot);
2198
2199         //      return actual key found
2200
2201         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2202         len = ptr->len;
2203
2204         if( slotptr(set->page, slot)->type == Duplicate )
2205                 len -= BtId;
2206
2207         //      not there if we reach the stopper key
2208
2209         if( slot == set->page->cnt )
2210           if( !bt_getid (set->page->right) )
2211                 break;
2212
2213         // if key exists, return >= 0 value bytes copied
2214         //      otherwise return (-1)
2215
2216         if( slotptr(set->page, slot)->dead )
2217                 continue;
2218
2219         if( keylen == len )
2220           if( !memcmp (ptr->key, key, len) ) {
2221                 val = valptr (set->page,slot);
2222                 if( valmax > val->len )
2223                         valmax = val->len;
2224                 memcpy (value, val->value, valmax);
2225                 ret = valmax;
2226           }
2227
2228         break;
2229
2230    } while( slot = bt_findnext (bt, set, slot) );
2231
2232   bt_unlockpage (bt, BtLockRead, set->latch);
2233   bt_unpinlatch (set->latch);
2234   return ret;
2235 }
2236
2237 //      check page for space available,
2238 //      clean if necessary and return
2239 //      0 - page needs splitting
2240 //      >0  new slot value
2241
2242 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
2243 {
2244 uint nxt = bt->mgr->page_size;
2245 BtPage page = set->page;
2246 uint cnt = 0, idx = 0;
2247 uint max = page->cnt;
2248 uint newslot = max;
2249 BtKey *key;
2250 BtVal *val;
2251
2252         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2253                 return slot;
2254
2255         //      skip cleanup and proceed to split
2256         //      if there's not enough garbage
2257         //      to bother with.
2258
2259         if( page->garbage < nxt / 5 )
2260                 return 0;
2261
2262         memcpy (bt->frame, page, bt->mgr->page_size);
2263
2264         // skip page info and set rest of page to zero
2265
2266         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2267         set->latch->dirty = 1;
2268
2269         page->garbage = 0;
2270         page->act = 0;
2271
2272         // clean up page first by
2273         // removing deleted keys
2274
2275         while( cnt++ < max ) {
2276                 if( cnt == slot )
2277                         newslot = idx + 2;
2278
2279                 if( cnt < max || bt->frame->lvl )
2280                   if( slotptr(bt->frame,cnt)->dead )
2281                         continue;
2282
2283                 // copy the value across
2284
2285                 val = valptr(bt->frame, cnt);
2286                 nxt -= val->len + sizeof(BtVal);
2287                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2288
2289                 // copy the key across
2290
2291                 key = keyptr(bt->frame, cnt);
2292                 nxt -= key->len + sizeof(BtKey);
2293                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2294
2295                 // make a librarian slot
2296
2297                 slotptr(page, ++idx)->off = nxt;
2298                 slotptr(page, idx)->type = Librarian;
2299                 slotptr(page, idx)->dead = 1;
2300
2301                 // set up the slot
2302
2303                 slotptr(page, ++idx)->off = nxt;
2304                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2305
2306                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2307                         page->act++;
2308         }
2309
2310         page->min = nxt;
2311         page->cnt = idx;
2312
2313         //      see if page has enough space now, or does it need splitting?
2314
2315         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2316                 return newslot;
2317
2318         return 0;
2319 }
2320
2321 // split the root and raise the height of the btree
2322
2323 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
2324 {  
2325 unsigned char leftkey[BT_keyarray];
2326 uint nxt = bt->mgr->page_size;
2327 unsigned char value[BtId];
2328 BtPageSet left[1];
2329 uid left_page_no;
2330 BtKey *ptr;
2331 BtVal *val;
2332
2333         //      save left page fence key for new root
2334
2335         ptr = keyptr(root->page, root->page->cnt);
2336         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2337
2338         //  Obtain an empty page to use, and copy the current
2339         //  root contents into it, e.g. lower keys
2340
2341         if( bt_newpage(bt, left, root->page) )
2342                 return bt->err;
2343
2344         left_page_no = left->latch->page_no;
2345         bt_unpinlatch (left->latch);
2346
2347         // preserve the page info at the bottom
2348         // of higher keys and set rest to zero
2349
2350         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2351
2352         // insert stopper key at top of newroot page
2353         // and increase the root height
2354
2355         nxt -= BtId + sizeof(BtVal);
2356         bt_putid (value, right->page_no);
2357         val = (BtVal *)((unsigned char *)root->page + nxt);
2358         memcpy (val->value, value, BtId);
2359         val->len = BtId;
2360
2361         nxt -= 2 + sizeof(BtKey);
2362         slotptr(root->page, 2)->off = nxt;
2363         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2364         ptr->len = 2;
2365         ptr->key[0] = 0xff;
2366         ptr->key[1] = 0xff;
2367
2368         // insert lower keys page fence key on newroot page as first key
2369
2370         nxt -= BtId + sizeof(BtVal);
2371         bt_putid (value, left_page_no);
2372         val = (BtVal *)((unsigned char *)root->page + nxt);
2373         memcpy (val->value, value, BtId);
2374         val->len = BtId;
2375
2376         ptr = (BtKey *)leftkey;
2377         nxt -= ptr->len + sizeof(BtKey);
2378         slotptr(root->page, 1)->off = nxt;
2379         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2380         
2381         bt_putid(root->page->right, 0);
2382         root->page->min = nxt;          // reset lowest used offset and key count
2383         root->page->cnt = 2;
2384         root->page->act = 2;
2385         root->page->lvl++;
2386
2387         // release and unpin root pages
2388
2389         bt_unlockpage(bt, BtLockWrite, root->latch);
2390         bt_unpinlatch (root->latch);
2391
2392         bt_unpinlatch (right);
2393         return 0;
2394 }
2395
2396 //  split already locked full node
2397 //      leave it locked.
2398 //      return pool entry for new right
2399 //      page, unlocked
2400
2401 uint bt_splitpage (BtDb *bt, BtPageSet *set)
2402 {
2403 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2404 uint lvl = set->page->lvl;
2405 BtPageSet right[1];
2406 BtKey *key, *ptr;
2407 BtVal *val, *src;
2408 uid right2;
2409 uint prev;
2410
2411         //  split higher half of keys to bt->frame
2412
2413         memset (bt->frame, 0, bt->mgr->page_size);
2414         max = set->page->cnt;
2415         cnt = max / 2;
2416         idx = 0;
2417
2418         while( cnt++ < max ) {
2419                 if( cnt < max || set->page->lvl )
2420                   if( slotptr(set->page, cnt)->dead )
2421                         continue;
2422
2423                 src = valptr(set->page, cnt);
2424                 nxt -= src->len + sizeof(BtVal);
2425                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
2426
2427                 key = keyptr(set->page, cnt);
2428                 nxt -= key->len + sizeof(BtKey);
2429                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
2430                 memcpy (ptr, key, key->len + sizeof(BtKey));
2431
2432                 //      add librarian slot
2433
2434                 slotptr(bt->frame, ++idx)->off = nxt;
2435                 slotptr(bt->frame, idx)->type = Librarian;
2436                 slotptr(bt->frame, idx)->dead = 1;
2437
2438                 //  add actual slot
2439
2440                 slotptr(bt->frame, ++idx)->off = nxt;
2441                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2442
2443                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2444                         bt->frame->act++;
2445         }
2446
2447         bt->frame->bits = bt->mgr->page_bits;
2448         bt->frame->min = nxt;
2449         bt->frame->cnt = idx;
2450         bt->frame->lvl = lvl;
2451
2452         // link right node
2453
2454         if( set->latch->page_no > ROOT_page )
2455                 bt_putid (bt->frame->right, bt_getid (set->page->right));
2456
2457         //      get new free page and write higher keys to it.
2458
2459         if( bt_newpage(bt, right, bt->frame) )
2460                 return 0;
2461
2462         // process lower keys
2463
2464         memcpy (bt->frame, set->page, bt->mgr->page_size);
2465         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2466         set->latch->dirty = 1;
2467
2468         nxt = bt->mgr->page_size;
2469         set->page->garbage = 0;
2470         set->page->act = 0;
2471         max /= 2;
2472         cnt = 0;
2473         idx = 0;
2474
2475         if( slotptr(bt->frame, max)->type == Librarian )
2476                 max--;
2477
2478         //  assemble page of smaller keys
2479
2480         while( cnt++ < max ) {
2481                 if( slotptr(bt->frame, cnt)->dead )
2482                         continue;
2483                 val = valptr(bt->frame, cnt);
2484                 nxt -= val->len + sizeof(BtVal);
2485                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2486
2487                 key = keyptr(bt->frame, cnt);
2488                 nxt -= key->len + sizeof(BtKey);
2489                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2490
2491                 //      add librarian slot
2492
2493                 slotptr(set->page, ++idx)->off = nxt;
2494                 slotptr(set->page, idx)->type = Librarian;
2495                 slotptr(set->page, idx)->dead = 1;
2496
2497                 //      add actual slot
2498
2499                 slotptr(set->page, ++idx)->off = nxt;
2500                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2501                 set->page->act++;
2502         }
2503
2504         bt_putid(set->page->right, right->latch->page_no);
2505         set->page->min = nxt;
2506         set->page->cnt = idx;
2507
2508         return right->latch->entry;
2509 }
2510
2511 //      fix keys for newly split page
2512 //      call with page locked, return
2513 //      unlocked
2514
2515 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2516 {
2517 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2518 unsigned char value[BtId];
2519 uint lvl = set->page->lvl;
2520 BtPage page;
2521 BtKey *ptr;
2522
2523         // if current page is the root page, split it
2524
2525         if( set->latch->page_no == ROOT_page )
2526                 return bt_splitroot (bt, set, right);
2527
2528         ptr = keyptr(set->page, set->page->cnt);
2529         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2530
2531         page = bt_mappage (bt, right);
2532
2533         ptr = keyptr(page, page->cnt);
2534         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2535
2536         // insert new fences in their parent pages
2537
2538         bt_lockpage (bt, BtLockParent, right);
2539
2540         bt_lockpage (bt, BtLockParent, set->latch);
2541         bt_unlockpage (bt, BtLockWrite, set->latch);
2542
2543         // insert new fence for reformulated left block of smaller keys
2544
2545         bt_putid (value, set->latch->page_no);
2546         ptr = (BtKey *)leftkey;
2547
2548         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2549                 return bt->err;
2550
2551         // switch fence for right block of larger keys to new right page
2552
2553         bt_putid (value, right->page_no);
2554         ptr = (BtKey *)rightkey;
2555
2556         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2557                 return bt->err;
2558
2559         bt_unlockpage (bt, BtLockParent, set->latch);
2560         bt_unpinlatch (set->latch);
2561
2562         bt_unlockpage (bt, BtLockParent, right);
2563         bt_unpinlatch (right);
2564         return 0;
2565 }
2566
2567 //      install new key and value onto page
2568 //      page must already be checked for
2569 //      adequate space
2570
2571 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2572 {
2573 uint idx, librarian;
2574 BtSlot *node;
2575 BtKey *ptr;
2576 BtVal *val;
2577
2578         //      if found slot > desired slot and previous slot
2579         //      is a librarian slot, use it
2580
2581         if( slot > 1 )
2582           if( slotptr(set->page, slot-1)->type == Librarian )
2583                 slot--;
2584
2585         // copy value onto page
2586
2587         set->page->min -= vallen + sizeof(BtVal);
2588         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2589         memcpy (val->value, value, vallen);
2590         val->len = vallen;
2591
2592         // copy key onto page
2593
2594         set->page->min -= keylen + sizeof(BtKey);
2595         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2596         memcpy (ptr->key, key, keylen);
2597         ptr->len = keylen;
2598         
2599         //      find first empty slot
2600
2601         for( idx = slot; idx < set->page->cnt; idx++ )
2602           if( slotptr(set->page, idx)->dead )
2603                 break;
2604
2605         // now insert key into array before slot
2606
2607         if( idx == set->page->cnt )
2608                 idx += 2, set->page->cnt += 2, librarian = 2;
2609         else
2610                 librarian = 1;
2611
2612         set->latch->dirty = 1;
2613         set->page->act++;
2614
2615         while( idx > slot + librarian - 1 )
2616                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2617
2618         //      add librarian slot
2619
2620         if( librarian > 1 ) {
2621                 node = slotptr(set->page, slot++);
2622                 node->off = set->page->min;
2623                 node->type = Librarian;
2624                 node->dead = 1;
2625         }
2626
2627         //      fill in new slot
2628
2629         node = slotptr(set->page, slot);
2630         node->off = set->page->min;
2631         node->type = type;
2632         node->dead = 0;
2633
2634         if( release ) {
2635                 bt_unlockpage (bt, BtLockWrite, set->latch);
2636                 bt_unpinlatch (set->latch);
2637         }
2638
2639         return 0;
2640 }
2641
2642 //  Insert new key into the btree at given level.
2643 //      either add a new key or update/add an existing one
2644
2645 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2646 {
2647 unsigned char newkey[BT_keyarray];
2648 uint slot, idx, len, entry;
2649 BtPageSet set[1];
2650 BtKey *ptr, *ins;
2651 uid sequence;
2652 BtVal *val;
2653 uint type;
2654
2655   // set up the key we're working on
2656
2657   ins = (BtKey*)newkey;
2658   memcpy (ins->key, key, keylen);
2659   ins->len = keylen;
2660
2661   // is this a non-unique index value?
2662
2663   if( unique )
2664         type = Unique;
2665   else {
2666         type = Duplicate;
2667         sequence = bt_newdup (bt);
2668         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2669         ins->len += BtId;
2670   }
2671
2672   while( 1 ) { // find the page and slot for the current key
2673         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2674                 ptr = keyptr(set->page, slot);
2675         else {
2676                 if( !bt->err )
2677                         bt->line = __LINE__, bt->err = BTERR_ovflw;
2678                 return bt->err;
2679         }
2680
2681         // if librarian slot == found slot, advance to real slot
2682
2683         if( slotptr(set->page, slot)->type == Librarian )
2684           if( !keycmp (ptr, key, keylen) )
2685                 ptr = keyptr(set->page, ++slot);
2686
2687         len = ptr->len;
2688
2689         if( slotptr(set->page, slot)->type == Duplicate )
2690                 len -= BtId;
2691
2692         //  if inserting a duplicate key or unique key
2693         //      check for adequate space on the page
2694         //      and insert the new key before slot.
2695
2696         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2697           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2698                 if( !(entry = bt_splitpage (bt, set)) )
2699                   return bt->err;
2700                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2701                   return bt->err;
2702                 else
2703                   continue;
2704
2705           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2706         }
2707
2708         // if key already exists, update value and return
2709
2710         val = valptr(set->page, slot);
2711
2712         if( val->len >= vallen ) {
2713                 if( slotptr(set->page, slot)->dead )
2714                         set->page->act++;
2715                 set->page->garbage += val->len - vallen;
2716                 set->latch->dirty = 1;
2717                 slotptr(set->page, slot)->dead = 0;
2718                 val->len = vallen;
2719                 memcpy (val->value, value, vallen);
2720                 bt_unlockpage(bt, BtLockWrite, set->latch);
2721                 bt_unpinlatch (set->latch);
2722                 return 0;
2723         }
2724
2725         //      new update value doesn't fit in existing value area
2726
2727         if( !slotptr(set->page, slot)->dead )
2728                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2729         else {
2730                 slotptr(set->page, slot)->dead = 0;
2731                 set->page->act++;
2732         }
2733
2734         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2735           if( !(entry = bt_splitpage (bt, set)) )
2736                 return bt->err;
2737           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2738                 return bt->err;
2739           else
2740                 continue;
2741
2742         set->page->min -= vallen + sizeof(BtVal);
2743         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2744         memcpy (val->value, value, vallen);
2745         val->len = vallen;
2746
2747         set->latch->dirty = 1;
2748         set->page->min -= keylen + sizeof(BtKey);
2749         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2750         memcpy (ptr->key, key, keylen);
2751         ptr->len = keylen;
2752         
2753         slotptr(set->page, slot)->off = set->page->min;
2754         bt_unlockpage(bt, BtLockWrite, set->latch);
2755         bt_unpinlatch (set->latch);
2756         return 0;
2757   }
2758   return 0;
2759 }
2760
2761 typedef struct {
2762         logseqno reqlsn;        // redo log seq no required
2763         logseqno lsn;           // redo log sequence number
2764         uint entry;                     // latch table entry number
2765         uint slot:31;           // page slot number
2766         uint reuse:1;           // reused previous page
2767 } AtomicTxn;
2768
2769 typedef struct {
2770         uid page_no;            // page number for split leaf
2771         void *next;                     // next key to insert
2772         uint entry:29;          // latch table entry number
2773         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2774         uint nounlock:1;        // don't unlock ParentModification
2775         unsigned char leafkey[BT_keyarray];
2776 } AtomicKey;
2777
2778 //      determine actual page where key is located
2779 //  return slot number
2780
2781 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2782 {
2783 BtKey *key = keyptr(source,src);
2784 uint slot = locks[src].slot;
2785 uint entry;
2786
2787         if( src > 1 && locks[src].reuse )
2788           entry = locks[src-1].entry, slot = 0;
2789         else
2790           entry = locks[src].entry;
2791
2792         if( slot ) {
2793                 set->latch = bt->mgr->latchsets + entry;
2794                 set->page = bt_mappage (bt, set->latch);
2795                 return slot;
2796         }
2797
2798         //      is locks->reuse set? or was slot zeroed?
2799         //      if so, find where our key is located 
2800         //      on current page or pages split on
2801         //      same page txn operations.
2802
2803         do {
2804                 set->latch = bt->mgr->latchsets + entry;
2805                 set->page = bt_mappage (bt, set->latch);
2806
2807                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2808                   if( slotptr(set->page, slot)->type == Librarian )
2809                         slot++;
2810                   if( locks[src].reuse )
2811                         locks[src].entry = entry;
2812                   return slot;
2813                 }
2814         } while( entry = set->latch->split );
2815
2816         bt->line = __LINE__, bt->err = BTERR_atomic;
2817         return 0;
2818 }
2819
2820 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2821 {
2822 BtKey *key = keyptr(source, src);
2823 BtVal *val = valptr(source, src);
2824 BtLatchSet *latch;
2825 BtPageSet set[1];
2826 uint entry, slot;
2827
2828   while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2829         if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) ) {
2830           if( bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2831                 return bt->err;
2832           set->page->lsn = locks[src].lsn;
2833           return 0;
2834         }
2835
2836         if( entry = bt_splitpage (bt, set) )
2837           latch = bt->mgr->latchsets + entry;
2838         else
2839           return bt->err;
2840
2841         //      splice right page into split chain
2842         //      and WriteLock it.
2843
2844         bt_lockpage(bt, BtLockWrite, latch);
2845         latch->split = set->latch->split;
2846         set->latch->split = entry;
2847         locks[src].slot = 0;
2848   }
2849
2850   return bt->line = __LINE__, bt->err = BTERR_atomic;
2851 }
2852
2853 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2854 {
2855 BtKey *key = keyptr(source, src);
2856 BtPageSet set[1];
2857 uint idx, slot;
2858 BtKey *ptr;
2859 BtVal *val;
2860
2861         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2862           ptr = keyptr(set->page, slot);
2863         else
2864           return bt->line = __LINE__, bt->err = BTERR_struct;
2865
2866         if( !keycmp (ptr, key->key, key->len) )
2867           if( !slotptr(set->page, slot)->dead )
2868                 slotptr(set->page, slot)->dead = 1;
2869           else
2870                 return 0;
2871         else
2872                 return 0;
2873
2874         val = valptr(set->page, slot);
2875         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2876         set->latch->dirty = 1;
2877         set->page->lsn = locks[src].lsn;
2878         set->page->act--;
2879         bt->found++;
2880         return 0;
2881 }
2882
2883 //      delete an empty master page for a transaction
2884
2885 //      note that the far right page never empties because
2886 //      it always contains (at least) the infinite stopper key
2887 //      and that all pages that don't contain any keys are
2888 //      deleted, or are being held under Atomic lock.
2889
2890 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2891 {
2892 BtPageSet right[1], temp[1];
2893 unsigned char value[BtId];
2894 uid right_page_no;
2895 BtKey *ptr;
2896
2897         bt_lockpage(bt, BtLockWrite, prev->latch);
2898
2899         //      grab the right sibling
2900
2901         if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), NULL) )
2902                 right->page = bt_mappage (bt, right->latch);
2903         else
2904                 return bt->err;
2905
2906         bt_lockpage(bt, BtLockAtomic, right->latch);
2907         bt_lockpage(bt, BtLockWrite, right->latch);
2908
2909         //      and pull contents over empty page
2910         //      while preserving master's left link
2911
2912         memcpy (right->page->left, prev->page->left, BtId);
2913         memcpy (prev->page, right->page, bt->mgr->page_size);
2914
2915         //      forward seekers to old right sibling
2916         //      to new page location in set
2917
2918         bt_putid (right->page->right, prev->latch->page_no);
2919         right->latch->dirty = 1;
2920         right->page->kill = 1;
2921
2922         //      remove pointer to right page for searchers by
2923         //      changing right fence key to point to the master page
2924
2925         ptr = keyptr(right->page,right->page->cnt);
2926         bt_putid (value, prev->latch->page_no);
2927
2928         if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2929                 return bt->err;
2930
2931         //  now that master page is in good shape we can
2932         //      remove its locks.
2933
2934         bt_unlockpage (bt, BtLockAtomic, prev->latch);
2935         bt_unlockpage (bt, BtLockWrite, prev->latch);
2936
2937         //  fix master's right sibling's left pointer
2938         //      to remove scanner's poiner to the right page
2939
2940         if( right_page_no = bt_getid (prev->page->right) ) {
2941           if( temp->latch = bt_pinlatch (bt, right_page_no, NULL) )
2942                 temp->page = bt_mappage (bt, temp->latch);
2943
2944           bt_lockpage (bt, BtLockWrite, temp->latch);
2945           bt_putid (temp->page->left, prev->latch->page_no);
2946           temp->latch->dirty = 1;
2947
2948           bt_unlockpage (bt, BtLockWrite, temp->latch);
2949           bt_unpinlatch (temp->latch);
2950         } else {        // master is now the far right page
2951           bt_mutexlock (bt->mgr->lock);
2952           bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2953           bt_releasemutex(bt->mgr->lock);
2954         }
2955
2956         //      now that there are no pointers to the right page
2957         //      we can delete it after the last read access occurs
2958
2959         bt_unlockpage (bt, BtLockWrite, right->latch);
2960         bt_unlockpage (bt, BtLockAtomic, right->latch);
2961         bt_lockpage (bt, BtLockDelete, right->latch);
2962         bt_lockpage (bt, BtLockWrite, right->latch);
2963         bt_freepage (bt, right);
2964         return 0;
2965 }
2966
2967 //      atomic modification of a batch of keys.
2968
2969 //      return -1 if BTERR is set
2970 //      otherwise return slot number
2971 //      causing the key constraint violation
2972 //      or zero on successful completion.
2973
2974 int bt_atomictxn (BtDb *bt, BtPage source)
2975 {
2976 uint src, idx, slot, samepage, entry, avail, que = 0;
2977 AtomicKey *head, *tail, *leaf;
2978 BtPageSet set[1], prev[1];
2979 unsigned char value[BtId];
2980 BtKey *key, *ptr, *key2;
2981 BtLatchSet *latch;
2982 AtomicTxn *locks;
2983 int result = 0;
2984 BtSlot temp[1];
2985 logseqno lsn;
2986 BtPage page;
2987 BtVal *val;
2988 uid right;
2989 int type;
2990
2991   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2992   head = NULL;
2993   tail = NULL;
2994
2995   // stable sort the list of keys into order to
2996   //    prevent deadlocks between threads.
2997
2998   for( src = 1; src++ < source->cnt; ) {
2999         *temp = *slotptr(source,src);
3000         key = keyptr (source,src);
3001
3002         for( idx = src; --idx; ) {
3003           key2 = keyptr (source,idx);
3004           if( keycmp (key, key2->key, key2->len) < 0 ) {
3005                 *slotptr(source,idx+1) = *slotptr(source,idx);
3006                 *slotptr(source,idx) = *temp;
3007           } else
3008                 break;
3009         }
3010   }
3011
3012   // reserve enough buffer pool entries
3013
3014   avail = source->cnt * 3 + bt->mgr->pagezero->alloc->lvl + 1;
3015   bt_availrequest (bt, avail);
3016
3017   // Load the leaf page for each key
3018   // group same page references with reuse bit
3019   // and determine any constraint violations
3020
3021   for( src = 0; src++ < source->cnt; ) {
3022         key = keyptr(source, src);
3023         slot = 0;
3024
3025         // first determine if this modification falls
3026         // on the same page as the previous modification
3027         //      note that the far right leaf page is a special case
3028
3029         if( samepage = src > 1 )
3030           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
3031                 slot = bt_findslot(set->page, key->key, key->len);
3032           else
3033                 bt_unlockpage(bt, BtLockRead, set->latch); 
3034
3035         if( !slot )
3036           if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) )
3037                 set->latch->split = 0;
3038           else
3039                 goto atomicerr;
3040
3041         if( slotptr(set->page, slot)->type == Librarian )
3042           ptr = keyptr(set->page, ++slot);
3043         else
3044           ptr = keyptr(set->page, slot);
3045
3046         if( !samepage ) {
3047           locks[src].entry = set->latch->entry;
3048           locks[src].slot = slot;
3049           locks[src].reuse = 0;
3050         } else {
3051           locks[src].entry = 0;
3052           locks[src].slot = 0;
3053           locks[src].reuse = 1;
3054         }
3055
3056         //      capture current lsn for master page
3057
3058         locks[src].reqlsn = set->page->lsn;
3059
3060         //      perform constraint checks
3061
3062         switch( slotptr(source, src)->type ) {
3063         case Duplicate:
3064         case Unique:
3065           if( !slotptr(set->page, slot)->dead )
3066            if( slot < set->page->cnt || bt_getid (set->page->right) )
3067             if( !keycmp (ptr, key->key, key->len) ) {
3068
3069                   // return constraint violation if key already exists
3070
3071                   bt_unlockpage(bt, BtLockRead, set->latch);
3072                   result = src;
3073
3074                   while( src ) {
3075                         if( locks[src].entry ) {
3076                           set->latch = bt->mgr->latchsets + locks[src].entry;
3077                           bt_unlockpage(bt, BtLockAtomic, set->latch);
3078                           bt_unpinlatch (set->latch);
3079                         }
3080                         src--;
3081                   }
3082                   free (locks);
3083                   return result;
3084                 }
3085           break;
3086         }
3087   }
3088
3089   //  unlock last loadpage lock
3090
3091   if( source->cnt )
3092         bt_unlockpage(bt, BtLockRead, set->latch);
3093
3094   //  and add entries to redo log
3095
3096   if( bt->mgr->redopages )
3097    if( lsn = bt_txnredo (bt, source) )
3098     for( src = 0; src++ < source->cnt; )
3099          locks[src].lsn = lsn;
3100     else
3101          goto atomicerr;
3102
3103   //  obtain write lock for each master page
3104
3105   for( src = 0; src++ < source->cnt; ) {
3106         if( locks[src].reuse )
3107           continue;
3108         else
3109           bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
3110   }
3111
3112   // insert or delete each key
3113   // process any splits or merges
3114   // release Write & Atomic latches
3115   // set ParentModifications and build
3116   // queue of keys to insert for split pages
3117   // or delete for deleted pages.
3118
3119   // run through txn list backwards
3120
3121   samepage = source->cnt + 1;
3122
3123   for( src = source->cnt; src; src-- ) {
3124         if( locks[src].reuse )
3125           continue;
3126
3127         //  perform the txn operations
3128         //      from smaller to larger on
3129         //  the same page
3130
3131         for( idx = src; idx < samepage; idx++ )
3132          switch( slotptr(source,idx)->type ) {
3133          case Delete:
3134           if( bt_atomicdelete (bt, source, locks, idx) )
3135                 goto atomicerr;
3136           break;
3137
3138         case Duplicate:
3139         case Unique:
3140           if( bt_atomicinsert (bt, source, locks, idx) )
3141                 goto atomicerr;
3142           break;
3143         }
3144
3145         //      after the same page operations have finished,
3146         //  process master page for splits or deletion.
3147
3148         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
3149         prev->page = bt_mappage (bt, prev->latch);
3150         samepage = src;
3151
3152         //  pick-up all splits from master page
3153         //      each one is already WriteLocked.
3154
3155         entry = prev->latch->split;
3156
3157         while( entry ) {
3158           set->latch = bt->mgr->latchsets + entry;
3159           set->page = bt_mappage (bt, set->latch);
3160           entry = set->latch->split;
3161
3162           // delete empty master page by undoing its split
3163           //  (this is potentially another empty page)
3164           //  note that there are no new left pointers yet
3165
3166           if( !prev->page->act ) {
3167                 memcpy (set->page->left, prev->page->left, BtId);
3168                 memcpy (prev->page, set->page, bt->mgr->page_size);
3169                 bt_lockpage (bt, BtLockDelete, set->latch);
3170                 bt_freepage (bt, set);
3171
3172                 prev->latch->split = set->latch->split;
3173                 prev->latch->dirty = 1;
3174                 continue;
3175           }
3176
3177           // remove empty page from the split chain
3178           // and return it to the free list.
3179
3180           if( !set->page->act ) {
3181                 memcpy (prev->page->right, set->page->right, BtId);
3182                 prev->latch->split = set->latch->split;
3183                 bt_lockpage (bt, BtLockDelete, set->latch);
3184                 bt_freepage (bt, set);
3185                 continue;
3186           }
3187
3188           //  schedule prev fence key update
3189
3190           ptr = keyptr(prev->page,prev->page->cnt);
3191           leaf = calloc (sizeof(AtomicKey), 1), que++;
3192
3193           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3194           leaf->page_no = prev->latch->page_no;
3195           leaf->entry = prev->latch->entry;
3196           leaf->type = 0;
3197
3198           if( tail )
3199                 tail->next = leaf;
3200           else
3201                 head = leaf;
3202
3203           tail = leaf;
3204
3205           // splice in the left link into the split page
3206
3207           bt_putid (set->page->left, prev->latch->page_no);
3208           bt_lockpage(bt, BtLockParent, prev->latch);
3209           bt_unlockpage(bt, BtLockWrite, prev->latch);
3210           *prev = *set;
3211         }
3212
3213         //  update left pointer in next right page from last split page
3214         //      (if all splits were reversed, latch->split == 0)
3215
3216         if( latch->split ) {
3217           //  fix left pointer in master's original (now split)
3218           //  far right sibling or set rightmost page in page zero
3219
3220           if( right = bt_getid (prev->page->right) ) {
3221                 if( set->latch = bt_pinlatch (bt, right, NULL) )
3222                   set->page = bt_mappage (bt, set->latch);
3223                 else
3224                   goto atomicerr;
3225
3226             bt_lockpage (bt, BtLockWrite, set->latch);
3227             bt_putid (set->page->left, prev->latch->page_no);
3228                 set->latch->dirty = 1;
3229             bt_unlockpage (bt, BtLockWrite, set->latch);
3230                 bt_unpinlatch (set->latch);
3231           } else {      // prev is rightmost page
3232             bt_mutexlock (bt->mgr->lock);
3233                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
3234             bt_releasemutex(bt->mgr->lock);
3235           }
3236
3237           //  Process last page split in chain
3238
3239           ptr = keyptr(prev->page,prev->page->cnt);
3240           leaf = calloc (sizeof(AtomicKey), 1), que++;
3241
3242           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3243           leaf->page_no = prev->latch->page_no;
3244           leaf->entry = prev->latch->entry;
3245           leaf->type = 0;
3246   
3247           if( tail )
3248                 tail->next = leaf;
3249           else
3250                 head = leaf;
3251
3252           tail = leaf;
3253
3254           bt_lockpage(bt, BtLockParent, prev->latch);
3255           bt_unlockpage(bt, BtLockWrite, prev->latch);
3256
3257           //  remove atomic lock on master page
3258
3259           bt_unlockpage(bt, BtLockAtomic, latch);
3260           continue;
3261         }
3262
3263         //  finished if prev page occupied (either master or final split)
3264
3265         if( prev->page->act ) {
3266           bt_unlockpage(bt, BtLockWrite, latch);
3267           bt_unlockpage(bt, BtLockAtomic, latch);
3268           bt_unpinlatch(latch);
3269           continue;
3270         }
3271
3272         // any and all splits were reversed, and the
3273         // master page located in prev is empty, delete it
3274         // by pulling over master's right sibling.
3275
3276         // Remove empty master's fence key
3277
3278         ptr = keyptr(prev->page,prev->page->cnt);
3279
3280         if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3281                 goto atomicerr;
3282
3283         //      perform the remainder of the delete
3284         //      from the FIFO queue
3285
3286         leaf = calloc (sizeof(AtomicKey), 1), que++;
3287
3288         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3289         leaf->page_no = prev->latch->page_no;
3290         leaf->entry = prev->latch->entry;
3291         leaf->nounlock = 1;
3292         leaf->type = 2;
3293   
3294         if( tail )
3295           tail->next = leaf;
3296         else
3297           head = leaf;
3298
3299         tail = leaf;
3300
3301         //      leave atomic lock in place until
3302         //      deletion completes in next phase.
3303
3304         bt_unlockpage(bt, BtLockWrite, prev->latch);
3305   }
3306
3307   bt_availrelease (bt, avail);
3308
3309   que *= bt->mgr->pagezero->alloc->lvl;
3310   bt_availrequest (bt, que);
3311
3312   //  add & delete keys for any pages split or merged during transaction
3313
3314   if( leaf = head )
3315     do {
3316           set->latch = bt->mgr->latchsets + leaf->entry;
3317           set->page = bt_mappage (bt, set->latch);
3318
3319           bt_putid (value, leaf->page_no);
3320           ptr = (BtKey *)leaf->leafkey;
3321
3322           switch( leaf->type ) {
3323           case 0:       // insert key
3324             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
3325                   goto atomicerr;
3326
3327                 break;
3328
3329           case 1:       // delete key
3330                 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3331                   goto atomicerr;
3332
3333                 break;
3334
3335           case 2:       // free page
3336                 if( bt_atomicfree (bt, set) )
3337                   goto atomicerr;
3338
3339                 break;
3340           }
3341
3342           if( !leaf->nounlock )
3343             bt_unlockpage (bt, BtLockParent, set->latch);
3344
3345           bt_unpinlatch (set->latch);
3346           tail = leaf->next;
3347           free (leaf);
3348         } while( leaf = tail );
3349
3350   bt_availrelease (bt, que);
3351
3352   // return success
3353
3354   free (locks);
3355   return 0;
3356 atomicerr:
3357   return -1;
3358 }
3359
3360 //      set cursor to highest slot on highest page
3361
3362 uint bt_lastkey (BtDb *bt)
3363 {
3364 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3365 BtPageSet set[1];
3366
3367         if( set->latch = bt_pinlatch (bt, page_no, NULL) )
3368                 set->page = bt_mappage (bt, set->latch);
3369         else
3370                 return 0;
3371
3372     bt_lockpage(bt, BtLockRead, set->latch);
3373         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3374     bt_unlockpage(bt, BtLockRead, set->latch);
3375         bt_unpinlatch (set->latch);
3376
3377         bt->cursor_page = page_no;
3378         return bt->cursor->cnt;
3379 }
3380
3381 //      return previous slot on cursor page
3382
3383 uint bt_prevkey (BtDb *bt, uint slot)
3384 {
3385 uid ourright, next, us = bt->cursor_page;
3386 BtPageSet set[1];
3387
3388         if( --slot )
3389                 return slot;
3390
3391         ourright = bt_getid(bt->cursor->right);
3392
3393 goleft:
3394         if( !(next = bt_getid(bt->cursor->left)) )
3395                 return 0;
3396
3397 findourself:
3398         bt->cursor_page = next;
3399
3400         if( set->latch = bt_pinlatch (bt, next, NULL) )
3401                 set->page = bt_mappage (bt, set->latch);
3402         else
3403                 return 0;
3404
3405     bt_lockpage(bt, BtLockRead, set->latch);
3406         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3407         bt_unlockpage(bt, BtLockRead, set->latch);
3408         bt_unpinlatch (set->latch);
3409         
3410         next = bt_getid (bt->cursor->right);
3411
3412         if( bt->cursor->kill )
3413                 goto findourself;
3414
3415         if( next != us )
3416           if( next == ourright )
3417                 goto goleft;
3418           else
3419                 goto findourself;
3420
3421         return bt->cursor->cnt;
3422 }
3423
3424 //  return next slot on cursor page
3425 //  or slide cursor right into next page
3426
3427 uint bt_nextkey (BtDb *bt, uint slot)
3428 {
3429 BtPageSet set[1];
3430 uid right;
3431
3432   do {
3433         right = bt_getid(bt->cursor->right);
3434
3435         while( slot++ < bt->cursor->cnt )
3436           if( slotptr(bt->cursor,slot)->dead )
3437                 continue;
3438           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3439                 return slot;
3440           else
3441                 break;
3442
3443         if( !right )
3444                 break;
3445
3446         bt->cursor_page = right;
3447
3448         if( set->latch = bt_pinlatch (bt, right, NULL) )
3449                 set->page = bt_mappage (bt, set->latch);
3450         else
3451                 return 0;
3452
3453     bt_lockpage(bt, BtLockRead, set->latch);
3454
3455         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3456
3457         bt_unlockpage(bt, BtLockRead, set->latch);
3458         bt_unpinlatch (set->latch);
3459         slot = 0;
3460
3461   } while( 1 );
3462
3463   return bt->err = 0;
3464 }
3465
3466 //  cache page of keys into cursor and return starting slot for given key
3467
3468 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3469 {
3470 BtPageSet set[1];
3471 uint slot;
3472
3473         // cache page for retrieval
3474
3475         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
3476           memcpy (bt->cursor, set->page, bt->mgr->page_size);
3477         else
3478           return 0;
3479
3480         bt->cursor_page = set->latch->page_no;
3481
3482         bt_unlockpage(bt, BtLockRead, set->latch);
3483         bt_unpinlatch (set->latch);
3484         return slot;
3485 }
3486
3487 BtKey *bt_key(BtDb *bt, uint slot)
3488 {
3489         return keyptr(bt->cursor, slot);
3490 }
3491
3492 BtVal *bt_val(BtDb *bt, uint slot)
3493 {
3494         return valptr(bt->cursor,slot);
3495 }
3496
3497 #ifdef STANDALONE
3498
3499 #ifndef unix
3500 double getCpuTime(int type)
3501 {
3502 FILETIME crtime[1];
3503 FILETIME xittime[1];
3504 FILETIME systime[1];
3505 FILETIME usrtime[1];
3506 SYSTEMTIME timeconv[1];
3507 double ans = 0;
3508
3509         memset (timeconv, 0, sizeof(SYSTEMTIME));
3510
3511         switch( type ) {
3512         case 0:
3513                 GetSystemTimeAsFileTime (xittime);
3514                 FileTimeToSystemTime (xittime, timeconv);
3515                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3516                 break;
3517         case 1:
3518                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3519                 FileTimeToSystemTime (usrtime, timeconv);
3520                 break;
3521         case 2:
3522                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3523                 FileTimeToSystemTime (systime, timeconv);
3524                 break;
3525         }
3526
3527         ans += (double)timeconv->wHour * 3600;
3528         ans += (double)timeconv->wMinute * 60;
3529         ans += (double)timeconv->wSecond;
3530         ans += (double)timeconv->wMilliseconds / 1000;
3531         return ans;
3532 }
3533 #else
3534 #include <time.h>
3535 #include <sys/resource.h>
3536
3537 double getCpuTime(int type)
3538 {
3539 struct rusage used[1];
3540 struct timeval tv[1];
3541
3542         switch( type ) {
3543         case 0:
3544                 gettimeofday(tv, NULL);
3545                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3546
3547         case 1:
3548                 getrusage(RUSAGE_SELF, used);
3549                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3550
3551         case 2:
3552                 getrusage(RUSAGE_SELF, used);
3553                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3554         }
3555
3556         return 0;
3557 }
3558 #endif
3559
3560 void bt_poolaudit (BtMgr *mgr)
3561 {
3562 BtLatchSet *latch;
3563 uint entry = 0;
3564
3565         while( ++entry < mgr->latchtotal ) {
3566                 latch = mgr->latchsets + entry;
3567
3568                 if( *latch->readwr->rin & MASK )
3569                         fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3570
3571                 if( *latch->access->rin & MASK )
3572                         fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3573
3574                 if( *latch->parent->exclusive )
3575                         fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3576
3577                 if( *latch->atomic->exclusive )
3578                         fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3579
3580                 if( *latch->modify->exclusive )
3581                         fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3582
3583                 if( latch->pin & ~CLOCK_bit )
3584                         fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3585         }
3586 }
3587
3588 typedef struct {
3589         char idx;
3590         char *type;
3591         char *infile;
3592         BtMgr *mgr;
3593         int num;
3594 } ThreadArg;
3595
3596 //  standalone program to index file of keys
3597 //  then list them onto std-out
3598
3599 #ifdef unix
3600 void *index_file (void *arg)
3601 #else
3602 uint __stdcall index_file (void *arg)
3603 #endif
3604 {
3605 int line = 0, found = 0, cnt = 0, idx;
3606 uid next, page_no = LEAF_page;  // start on first page of leaves
3607 int ch, len = 0, slot, type = 0;
3608 unsigned char key[BT_maxkey];
3609 unsigned char txn[65536];
3610 ThreadArg *args = arg;
3611 BtPageSet set[1];
3612 uint nxt = 65536;
3613 BtPage page;
3614 BtKey *ptr;
3615 BtVal *val;
3616 BtDb *bt;
3617 FILE *in;
3618
3619         bt = bt_open (args->mgr);
3620         page = (BtPage)txn;
3621
3622         if( args->idx < strlen (args->type) )
3623                 ch = args->type[args->idx];
3624         else
3625                 ch = args->type[strlen(args->type) - 1];
3626
3627         switch(ch | 0x20)
3628         {
3629         case 'd':
3630                 type = Delete;
3631
3632         case 'p':
3633                 if( !type )
3634                         type = Unique;
3635
3636                 if( args->num )
3637                  if( type == Delete )
3638                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3639                  else
3640                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3641                 else
3642                  if( type == Delete )
3643                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3644                  else
3645                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3646
3647                 if( in = fopen (args->infile, "rb") )
3648                   while( ch = getc(in), ch != EOF )
3649                         if( ch == '\n' )
3650                         {
3651                           line++;
3652
3653                           if( !args->num ) {
3654                             if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3655                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3656                             len = 0;
3657                                 continue;
3658                           }
3659
3660                           nxt -= len - 10;
3661                           memcpy (txn + nxt, key + 10, len - 10);
3662                           nxt -= 1;
3663                           txn[nxt] = len - 10;
3664                           nxt -= 10;
3665                           memcpy (txn + nxt, key, 10);
3666                           nxt -= 1;
3667                           txn[nxt] = 10;
3668                           slotptr(page,++cnt)->off  = nxt;
3669                           slotptr(page,cnt)->type = type;
3670                           len = 0;
3671
3672                           if( cnt < args->num )
3673                                 continue;
3674
3675                           page->cnt = cnt;
3676                           page->act = cnt;
3677                           page->min = nxt;
3678
3679                           if( bt_atomictxn (bt, page) )
3680                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3681                           nxt = sizeof(txn);
3682                           cnt = 0;
3683
3684                         }
3685                         else if( len < BT_maxkey )
3686                                 key[len++] = ch;
3687                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
3688                 break;
3689
3690         case 'w':
3691                 fprintf(stderr, "started indexing for %s\n", args->infile);
3692                 if( in = fopen (args->infile, "r") )
3693                   while( ch = getc(in), ch != EOF )
3694                         if( ch == '\n' )
3695                         {
3696                           line++;
3697
3698                           if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3699                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3700                           len = 0;
3701                         }
3702                         else if( len < BT_maxkey )
3703                                 key[len++] = ch;
3704                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3705                 break;
3706
3707         case 'f':
3708                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3709                 if( in = fopen (args->infile, "rb") )
3710                   while( ch = getc(in), ch != EOF )
3711                         if( ch == '\n' )
3712                         {
3713                           line++;
3714                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3715                                 found++;
3716                           else if( bt->err )
3717                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->err, errno, bt->line, line), exit(0);
3718                           len = 0;
3719                         }
3720                         else if( len < BT_maxkey )
3721                                 key[len++] = ch;
3722                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3723                 break;
3724
3725         case 's':
3726                 fprintf(stderr, "started scanning\n");
3727                 
3728                 do {
3729                         if( set->latch = bt_pinlatch (bt, page_no, NULL) )
3730                                 set->page = bt_mappage (bt, set->latch);
3731                         else
3732                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3733                         bt_lockpage (bt, BtLockRead, set->latch);
3734                         next = bt_getid (set->page->right);
3735
3736                         for( slot = 0; slot++ < set->page->cnt; )
3737                          if( next || slot < set->page->cnt )
3738                           if( !slotptr(set->page, slot)->dead ) {
3739                                 ptr = keyptr(set->page, slot);
3740                                 len = ptr->len;
3741
3742                             if( slotptr(set->page, slot)->type == Duplicate )
3743                                         len -= BtId;
3744
3745                                 fwrite (ptr->key, len, 1, stdout);
3746                                 val = valptr(set->page, slot);
3747                                 fwrite (val->value, val->len, 1, stdout);
3748                                 fputc ('\n', stdout);
3749                                 cnt++;
3750                            }
3751
3752                         set->latch->avail = 1;
3753                         bt_unlockpage (bt, BtLockRead, set->latch);
3754                         bt_unpinlatch (set->latch);
3755                 } while( page_no = next );
3756
3757                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3758                 break;
3759
3760         case 'r':
3761                 fprintf(stderr, "started reverse scan\n");
3762                 if( slot = bt_lastkey (bt) )
3763                    while( slot = bt_prevkey (bt, slot) ) {
3764                         if( slotptr(bt->cursor, slot)->dead )
3765                           continue;
3766
3767                         ptr = keyptr(bt->cursor, slot);
3768                         len = ptr->len;
3769
3770                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3771                                 len -= BtId;
3772
3773                         fwrite (ptr->key, len, 1, stdout);
3774                         val = valptr(bt->cursor, slot);
3775                         fwrite (val->value, val->len, 1, stdout);
3776                         fputc ('\n', stdout);
3777                         cnt++;
3778                   }
3779
3780                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3781                 break;
3782
3783         case 'c':
3784 #ifdef unix
3785                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3786 #endif
3787                 fprintf(stderr, "started counting\n");
3788                 page_no = LEAF_page;
3789
3790                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3791                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3792                                 break;
3793
3794                         if( !bt->frame->free && !bt->frame->lvl )
3795                                 cnt += bt->frame->act;
3796
3797                         bt->reads++;
3798                         page_no++;
3799                 }
3800                 
3801                 cnt--;  // remove stopper key
3802                 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3803                 break;
3804         }
3805
3806         bt_close (bt);
3807 #ifdef unix
3808         return NULL;
3809 #else
3810         return 0;
3811 #endif
3812 }
3813
3814 typedef struct timeval timer;
3815
3816 int main (int argc, char **argv)
3817 {
3818 int idx, cnt, len, slot, err;
3819 int segsize, bits = 16;
3820 double start, stop;
3821 #ifdef unix
3822 pthread_t *threads;
3823 #else
3824 HANDLE *threads;
3825 #endif
3826 ThreadArg *args;
3827 uint poolsize = 0;
3828 uint recovery = 0;
3829 float elapsed;
3830 int num = 0;
3831 char key[1];
3832 BtMgr *mgr;
3833 BtKey *ptr;
3834 BtDb *bt;
3835
3836         if( argc < 3 ) {
3837                 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size recovery_pages src_file1 src_file2 ... ]\n", argv[0]);
3838                 fprintf (stderr, "  where idx_file is the name of the btree file\n");
3839                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3840                 fprintf (stderr, "  page_bits is the page size in bits\n");
3841                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3842                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3843                 fprintf (stderr, "  recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3844                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3845                 exit(0);
3846         }
3847
3848         start = getCpuTime(0);
3849
3850         if( argc > 3 )
3851                 bits = atoi(argv[3]);
3852
3853         if( argc > 4 )
3854                 poolsize = atoi(argv[4]);
3855
3856         if( !poolsize )
3857                 fprintf (stderr, "Warning: no mapped_pool\n");
3858
3859         if( argc > 5 )
3860                 num = atoi(argv[5]);
3861
3862         if( argc > 6 )
3863                 recovery = atoi(argv[6]);
3864
3865         cnt = argc - 7;
3866 #ifdef unix
3867         threads = malloc (cnt * sizeof(pthread_t));
3868 #else
3869         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3870 #endif
3871         args = malloc (cnt * sizeof(ThreadArg));
3872
3873         mgr = bt_mgr ((argv[1]), bits, poolsize, recovery);
3874
3875         if( !mgr ) {
3876                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3877                 exit (1);
3878         }
3879
3880         //      fire off threads
3881
3882         for( idx = 0; idx < cnt; idx++ ) {
3883                 args[idx].infile = argv[idx + 7];
3884                 args[idx].type = argv[2];
3885                 args[idx].mgr = mgr;
3886                 args[idx].num = num;
3887                 args[idx].idx = idx;
3888 #ifdef unix
3889                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3890                         fprintf(stderr, "Error creating thread %d\n", err);
3891 #else
3892                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3893 #endif
3894         }
3895
3896         //      wait for termination
3897
3898 #ifdef unix
3899         for( idx = 0; idx < cnt; idx++ )
3900                 pthread_join (threads[idx], NULL);
3901 #else
3902         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3903
3904         for( idx = 0; idx < cnt; idx++ )
3905                 CloseHandle(threads[idx]);
3906
3907 #endif
3908         bt_poolaudit(mgr);
3909         bt_mgrclose (mgr);
3910
3911         elapsed = getCpuTime(0) - start;
3912         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3913         elapsed = getCpuTime(1);
3914         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3915         elapsed = getCpuTime(2);
3916         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3917 }
3918
3919 #endif  //STANDALONE