]> pd.if.org Git - btree/blob - threadskv10.c
More progress on the LSM btrees implementation
[btree] / threadskv10.c
1 // btree version threadskv10 futex version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      redo log for failure recovery
10 //      and LSM B-trees for write optimization
11
12 // 11 OCT 2014
13
14 // author: karl malbrain, malbrain@cal.berkeley.edu
15
16 /*
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
19
20 The orginal author is Karl Malbrain.
21
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
28 */
29
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
32
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
35
36 #ifdef linux
37 #define _GNU_SOURCE
38 #include <linux/futex.h>
39 #define SYS_futex 202
40 #endif
41
42 #ifdef unix
43 #include <unistd.h>
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <fcntl.h>
47 #include <sys/time.h>
48 #include <sys/mman.h>
49 #include <errno.h>
50 #include <pthread.h>
51 #include <limits.h>
52 #else
53 #define WIN32_LEAN_AND_MEAN
54 #include <windows.h>
55 #include <stdio.h>
56 #include <stdlib.h>
57 #include <time.h>
58 #include <fcntl.h>
59 #include <process.h>
60 #include <intrin.h>
61 #endif
62
63 #include <memory.h>
64 #include <string.h>
65 #include <stddef.h>
66
67 typedef unsigned long long      uid;
68 typedef unsigned long long      logseqno;
69
70 #ifndef unix
71 typedef unsigned long long      off64_t;
72 typedef unsigned short          ushort;
73 typedef unsigned int            uint;
74 #endif
75
76 #define BT_ro 0x6f72    // ro
77 #define BT_rw 0x7772    // rw
78
79 #define BT_maxbits              26                                      // maximum page size in bits
80 #define BT_minbits              9                                       // minimum page size in bits
81 #define BT_minpage              (1 << BT_minbits)       // minimum page size
82 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
83
84 //  BTree page number constants
85 #define ALLOC_page              0       // allocation page
86 #define ROOT_page               1       // root of the btree
87 #define LEAF_page               2       // first page of leaves
88 #define REDO_page               3       // first page of redo buffer
89
90 //      Number of levels to create in a new BTree
91
92 #define MIN_lvl                 2
93
94 /*
95 There are six lock types for each node in four independent sets: 
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
101 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
102 */
103
104 typedef enum{
105         BtLockAccess = 1,
106         BtLockDelete = 2,
107         BtLockRead   = 4,
108         BtLockWrite  = 8,
109         BtLockParent = 16,
110         BtLockAtomic = 32
111 } BtLock;
112
113 typedef struct {
114   union {
115         struct {
116           volatile uint xlock:1;        // one writer has exclusive lock
117           volatile uint wrt:31;         // count of other writers waiting
118         } bits[1];
119         uint value[1];
120   };
121 } BtMutexLatch;
122
123 #define XCL 1
124 #define WRT 2
125
126 //      definition for phase-fair reader/writer lock implementation
127
128 typedef struct {
129         volatile ushort rin[1];
130         volatile ushort rout[1];
131         volatile ushort ticket[1];
132         volatile ushort serving[1];
133         volatile ushort tid[1];
134         volatile ushort dup[1];
135 } RWLock;
136
137 //      write only lock
138
139 typedef struct {
140         BtMutexLatch xcl[1];
141         volatile ushort tid[1];
142         volatile ushort dup[1];
143 } WOLock;
144
145 #define PHID 0x1
146 #define PRES 0x2
147 #define MASK 0x3
148 #define RINC 0x4
149
150 //      mode & definition for lite latch implementation
151
152 enum {
153         QueRd = 1,              // reader queue
154         QueWr = 2               // writer queue
155 } RWQueue;
156
157 //  hash table entries
158
159 typedef struct {
160         uint entry;             // Latch table entry at head of chain
161         BtMutexLatch latch[1];
162 } BtHashEntry;
163
164 //      latch manager table structure
165
166 typedef struct {
167         uid page_no;                    // latch set page number
168         RWLock readwr[1];               // read/write page lock
169         RWLock access[1];               // Access Intent/Page delete
170         WOLock parent[1];               // Posting of fence key in parent
171         WOLock atomic[1];               // Atomic update in progress
172         uint split;                             // right split page atomic insert
173         uint next;                              // next entry in hash table chain
174         uint prev;                              // prev entry in hash table chain
175         ushort pin;                             // number of accessing threads
176         unsigned char dirty;    // page in cache is dirty (atomic set)
177         BtMutexLatch modify[1]; // modify entry lite latch
178 } BtLatchSet;
179
180 //      Define the length of the page record numbers
181
182 #define BtId 6
183
184 //      Page key slot definition.
185
186 //      Keys are marked dead, but remain on the page until
187 //      it cleanup is called. The fence key (highest key) for
188 //      a leaf page is always present, even after cleanup.
189
190 //      Slot types
191
192 //      In addition to the Unique keys that occupy slots
193 //      there are Librarian and Duplicate key
194 //      slots occupying the key slot array.
195
196 //      The Librarian slots are dead keys that
197 //      serve as filler, available to add new Unique
198 //      or Dup slots that are inserted into the B-tree.
199
200 //      The Duplicate slots have had their key bytes extended
201 //      by 6 bytes to contain a binary duplicate key uniqueifier.
202
203 typedef enum {
204         Unique,
205         Librarian,
206         Duplicate,
207         Delete
208 } BtSlotType;
209
210 typedef struct {
211         uint off:BT_maxbits;    // page offset for key start
212         uint type:3;                    // type of slot
213         uint dead:1;                    // set for deleted slot
214 } BtSlot;
215
216 //      The key structure occupies space at the upper end of
217 //      each page.  It's a length byte followed by the key
218 //      bytes.
219
220 typedef struct {
221         unsigned char len;              // this can be changed to a ushort or uint
222         unsigned char key[0];
223 } BtKey;
224
225 //      the value structure also occupies space at the upper
226 //      end of the page. Each key is immediately followed by a value.
227
228 typedef struct {
229         unsigned char len;              // this can be changed to a ushort or uint
230         unsigned char value[0];
231 } BtVal;
232
233 #define BT_maxkey       255             // maximum number of bytes in a key
234 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
235
236 //      The first part of an index page.
237 //      It is immediately followed
238 //      by the BtSlot array of keys.
239
240 //      note that this structure size
241 //      must be a multiple of 8 bytes
242 //      in order to place PageZero correctly.
243
244 typedef struct BtPage_ {
245         uint cnt;                                       // count of keys in page
246         uint act;                                       // count of active keys
247         uint min;                                       // next key offset
248         uint garbage;                           // page garbage in bytes
249         unsigned char bits:7;           // page size in bits
250         unsigned char free:1;           // page is on free chain
251         unsigned char lvl:7;            // level of page
252         unsigned char kill:1;           // page is being deleted
253         unsigned char right[BtId];      // page number to right
254         unsigned char left[BtId];       // page number to left
255         unsigned char filler[2];        // padding to multiple of 8
256         logseqno lsn;                           // log sequence number applied
257         uid page_no;                            // this page number
258 } *BtPage;
259
260 //  The loadpage interface object
261
262 typedef struct {
263         BtPage page;            // current page pointer
264         BtLatchSet *latch;      // current page latch set
265 } BtPageSet;
266
267 //      structure for latch manager on ALLOC_page
268
269 typedef struct {
270         struct BtPage_ alloc[1];                // next page_no in right ptr
271         unsigned char freechain[BtId];  // head of free page_nos chain
272         unsigned long long activepages; // number of active pages
273         uint redopages;                                 // number of redo pages in file
274 } BtPageZero;
275
276 //      The object structure for Btree access
277
278 typedef struct {
279         uint page_size;                         // page size    
280         uint page_bits;                         // page size in bits    
281 #ifdef unix
282         int idx;
283 #else
284         HANDLE idx;
285 #endif
286         BtPageZero *pagezero;           // mapped allocation page
287         BtHashEntry *hashtable;         // the buffer pool hash table entries
288         BtLatchSet *latchsets;          // mapped latch set from buffer pool
289         unsigned char *pagepool;        // mapped to the buffer pool pages
290         unsigned char *redobuff;        // mapped recovery buffer pointer
291         logseqno lsn, flushlsn;         // current & first lsn flushed
292         BtMutexLatch redo[1];           // redo area lite latch
293         BtMutexLatch lock[1];           // allocation area lite latch
294         BtMutexLatch maps[1];           // mapping segments lite latch
295         ushort thread_no[1];            // next thread number
296         uint nlatchpage;                        // number of latch pages at BT_latch
297         uint latchtotal;                        // number of page latch entries
298         uint latchhash;                         // number of latch hash table slots
299         uint latchvictim;                       // next latch entry to examine
300         uint latchpromote;                      // next latch entry to promote
301         uint redolast;                          // last msync size of recovery buff
302         uint redoend;                           // eof/end element in recovery buff
303         int err;                                        // last error
304         int line;                                       // last error line no
305         int found;                                      // number of keys found by delete
306         int reads, writes;                      // number of reads and writes
307 #ifndef unix
308         HANDLE halloc;                          // allocation handle
309         HANDLE hpool;                           // buffer pool handle
310 #endif
311         uint segments;                          // number of memory mapped segments
312         unsigned char *pages[64000];// memory mapped segments of b-tree
313 } BtMgr;
314
315 typedef struct {
316         BtMgr *mgr;                                     // buffer manager for entire process
317         BtMgr *main;                            // buffer manager for main btree
318         BtPage cursor;                          // cached page frame for start/next
319         ushort thread_no;                       // thread number
320         unsigned char key[BT_keyarray]; // last found complete key
321 } BtDb;
322
323 //      Catastrophic errors
324
325 typedef enum {
326         BTERR_ok = 0,
327         BTERR_struct,
328         BTERR_ovflw,
329         BTERR_lock,
330         BTERR_map,
331         BTERR_read,
332         BTERR_wrt,
333         BTERR_atomic,
334         BTERR_recovery
335 } BTERR;
336
337 #define CLOCK_bit 0x8000
338
339 // recovery manager entry types
340
341 typedef enum {
342         BTRM_eof = 0,   // rest of buffer is emtpy
343         BTRM_add,               // add a unique key-value to btree
344         BTRM_dup,               // add a duplicate key-value to btree
345         BTRM_del,               // delete a key-value from btree
346         BTRM_upd,               // update a key with a new value
347         BTRM_new,               // allocate a new empty page
348         BTRM_old                // reuse an old empty page
349 } BTRM;
350
351 // recovery manager entry
352 //      structure followed by BtKey & BtVal
353
354 typedef struct {
355         logseqno reqlsn;        // log sequence number required
356         logseqno lsn;           // log sequence number for entry
357         uint len;                       // length of entry
358         unsigned char type;     // type of entry
359         unsigned char lvl;      // level of btree entry pertains to
360 } BtLogHdr;
361
362 // B-Tree functions
363
364 extern void bt_close (BtDb *bt);
365 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
366 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
367 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
368 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
369 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
370 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
371 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
372
373 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
374
375 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
376 extern uint bt_nextkey   (BtDb *bt, uint slot);
377 extern uint bt_prevkey (BtDb *db, uint slot);
378 extern uint bt_lastkey (BtDb *db);
379
380 //      manager functions
381 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
382 extern void bt_mgrclose (BtMgr *mgr);
383 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
384 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
385
386 //      transaction functions
387 BTERR bt_txnpromote (BtDb *bt);
388
389 //  The page is allocated from low and hi ends.
390 //  The key slots are allocated from the bottom,
391 //      while the text and value of the key
392 //  are allocated from the top.  When the two
393 //  areas meet, the page is split into two.
394
395 //  A key consists of a length byte, two bytes of
396 //  index number (0 - 65535), and up to 253 bytes
397 //  of key value.
398
399 //  Associated with each key is a value byte string
400 //      containing any value desired.
401
402 //  The b-tree root is always located at page 1.
403 //      The first leaf page of level zero is always
404 //      located on page 2.
405
406 //      The b-tree pages are linked with next
407 //      pointers to facilitate enumerators,
408 //      and provide for concurrency.
409
410 //      When to root page fills, it is split in two and
411 //      the tree height is raised by a new root at page
412 //      one with two keys.
413
414 //      Deleted keys are marked with a dead bit until
415 //      page cleanup. The fence key for a leaf node is
416 //      always present
417
418 //  To achieve maximum concurrency one page is locked at a time
419 //  as the tree is traversed to find leaf key in question. The right
420 //  page numbers are used in cases where the page is being split,
421 //      or consolidated.
422
423 //  Page 0 is dedicated to lock for new page extensions,
424 //      and chains empty pages together for reuse. It also
425 //      contains the latch manager hash table.
426
427 //      The ParentModification lock on a node is obtained to serialize posting
428 //      or changing the fence key for a node.
429
430 //      Empty pages are chained together through the ALLOC page and reused.
431
432 //      Access macros to address slot and key values from the page
433 //      Page slots use 1 based indexing.
434
435 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
436 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
437 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
438
439 void bt_putid(unsigned char *dest, uid id)
440 {
441 int i = BtId;
442
443         while( i-- )
444                 dest[i] = (unsigned char)id, id >>= 8;
445 }
446
447 uid bt_getid(unsigned char *src)
448 {
449 uid id = 0;
450 int i;
451
452         for( i = 0; i < BtId; i++ )
453                 id <<= 8, id |= *src++; 
454
455         return id;
456 }
457
458 //      lite weight spin lock Latch Manager
459
460 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
461 {
462         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
463 }
464
465 void bt_mutexlock(BtMutexLatch *latch)
466 {
467 BtMutexLatch prev[1];
468 uint slept = 0;
469
470   while( 1 ) {
471         *prev->value = __sync_fetch_and_or(latch->value, XCL);
472
473         if( !prev->bits->xlock ) {                      // did we set XCL bit?
474             if( slept )
475                   __sync_fetch_and_sub(latch->value, WRT);
476                 return;
477         }
478
479         if( !slept ) {
480                 prev->bits->wrt++;
481                 __sync_fetch_and_add(latch->value, WRT);
482         }
483
484         sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr);
485         slept = 1;
486   }
487 }
488
489 //      try to obtain write lock
490
491 //      return 1 if obtained,
492 //              0 otherwise
493
494 int bt_mutextry(BtMutexLatch *latch)
495 {
496 BtMutexLatch prev[1];
497
498         *prev->value = __sync_fetch_and_or(latch->value, XCL);
499
500         //      take write access if exclusive bit is clear
501
502         return !prev->bits->xlock;
503 }
504
505 //      clear write mode
506
507 void bt_releasemutex(BtMutexLatch *latch)
508 {
509 BtMutexLatch prev[1];
510
511         *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
512
513         if( prev->bits->wrt )
514           sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
515 }
516
517 //      Write-Only Queue Lock
518
519 void WriteOLock (WOLock *lock, ushort tid)
520 {
521         if( *lock->tid == tid ) {
522                 *lock->dup += 1;
523                 return;
524         }
525
526         bt_mutexlock(lock->xcl);
527         *lock->tid = tid;
528 }
529
530 void WriteORelease (WOLock *lock)
531 {
532         if( *lock->dup ) {
533                 *lock->dup -= 1;
534                 return;
535         }
536
537         *lock->tid = 0;
538         bt_releasemutex(lock->xcl);
539 }
540
541 //      Phase-Fair reader/writer lock implementation
542
543 void WriteLock (RWLock *lock, ushort tid)
544 {
545 ushort w, r, tix;
546
547         if( *lock->tid == tid ) {
548                 *lock->dup += 1;
549                 return;
550         }
551 #ifdef unix
552         tix = __sync_fetch_and_add (lock->ticket, 1);
553 #else
554         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
555 #endif
556         // wait for our ticket to come up
557
558         while( tix != lock->serving[0] )
559 #ifdef unix
560                 sched_yield();
561 #else
562                 SwitchToThread ();
563 #endif
564
565         w = PRES | (tix & PHID);
566 #ifdef  unix
567         r = __sync_fetch_and_add (lock->rin, w);
568 #else
569         r = _InterlockedExchangeAdd16 (lock->rin, w);
570 #endif
571         while( r != *lock->rout )
572 #ifdef unix
573                 sched_yield();
574 #else
575                 SwitchToThread();
576 #endif
577         *lock->tid = tid;
578 }
579
580 void WriteRelease (RWLock *lock)
581 {
582         if( *lock->dup ) {
583                 *lock->dup -= 1;
584                 return;
585         }
586
587         *lock->tid = 0;
588 #ifdef unix
589         __sync_fetch_and_and (lock->rin, ~MASK);
590 #else
591         _InterlockedAnd16 (lock->rin, ~MASK);
592 #endif
593         lock->serving[0]++;
594 }
595
596 //      try to obtain read lock
597 //  return 1 if successful
598
599 int ReadTry (RWLock *lock, ushort tid)
600 {
601 ushort w;
602
603         //  OK if write lock already held by same thread
604
605         if( *lock->tid == tid ) {
606                 *lock->dup += 1;
607                 return 1;
608         }
609 #ifdef unix
610         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
611 #else
612         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
613 #endif
614         if( w )
615           if( w == (*lock->rin & MASK) ) {
616 #ifdef unix
617                 __sync_fetch_and_add (lock->rin, -RINC);
618 #else
619                 _InterlockedExchangeAdd16 (lock->rin, -RINC);
620 #endif
621                 return 0;
622           }
623
624         return 1;
625 }
626
627 void ReadLock (RWLock *lock, ushort tid)
628 {
629 ushort w;
630
631         if( *lock->tid == tid ) {
632                 *lock->dup += 1;
633                 return;
634         }
635 #ifdef unix
636         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
637 #else
638         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
639 #endif
640         if( w )
641           while( w == (*lock->rin & MASK) )
642 #ifdef unix
643                 sched_yield ();
644 #else
645                 SwitchToThread ();
646 #endif
647 }
648
649 void ReadRelease (RWLock *lock)
650 {
651         if( *lock->dup ) {
652                 *lock->dup -= 1;
653                 return;
654         }
655
656 #ifdef unix
657         __sync_fetch_and_add (lock->rout, RINC);
658 #else
659         _InterlockedExchangeAdd16 (lock->rout, RINC);
660 #endif
661 }
662
663 //      recovery manager -- flush dirty pages
664
665 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
666 {
667 uint cnt3 = 0, cnt2 = 0, cnt = 0;
668 uint entry, segment;
669 BtLatchSet *latch;
670 BtPage page;
671
672   //    flush dirty pool pages to the btree
673
674 fprintf(stderr, "Start flushlsn  ");
675   for( entry = 1; entry < mgr->latchtotal; entry++ ) {
676         page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
677         latch = mgr->latchsets + entry;
678         bt_mutexlock (latch->modify);
679         bt_lockpage(BtLockRead, latch, thread_no);
680
681         if( latch->dirty ) {
682           bt_writepage(mgr, page, latch->page_no);
683           latch->dirty = 0, cnt++;
684         }
685 if( latch->pin & ~CLOCK_bit )
686 cnt2++;
687         bt_unlockpage(BtLockRead, latch);
688         bt_releasemutex (latch->modify);
689   }
690 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
691 fprintf(stderr, "begin sync");
692         for( segment = 0; segment < mgr->segments; segment++ )
693           if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
694                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
695 fprintf(stderr, " end sync\n");
696 }
697
698 //      recovery manager -- process current recovery buff on startup
699 //      this won't do much if previous session was properly closed.
700
701 BTERR bt_recoveryredo (BtMgr *mgr)
702 {
703 BtLogHdr *hdr, *eof;
704 uint offset = 0;
705 BtKey *key;
706 BtVal *val;
707
708   hdr = (BtLogHdr *)mgr->redobuff;
709   mgr->flushlsn = hdr->lsn;
710
711   while( 1 ) {
712         hdr = (BtLogHdr *)(mgr->redobuff + offset);
713         switch( hdr->type ) {
714         case BTRM_eof:
715                 mgr->lsn = hdr->lsn;
716                 return 0;
717         case BTRM_add:          // add a unique key-value to btree
718         
719         case BTRM_dup:          // add a duplicate key-value to btree
720         case BTRM_del:          // delete a key-value from btree
721         case BTRM_upd:          // update a key with a new value
722         case BTRM_new:          // allocate a new empty page
723         case BTRM_old:          // reuse an old empty page
724                 return 0;
725         }
726   }
727 }
728
729 //      recovery manager -- append new entry to recovery log
730 //      flush dirty pages to disk when it overflows.
731
732 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
733 {
734 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
735 uint amt = sizeof(BtLogHdr);
736 BtLogHdr *hdr, *eof;
737 uint last, end;
738
739         bt_mutexlock (mgr->redo);
740
741         if( key )
742           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
743
744         //      see if new entry fits in buffer
745         //      flush and reset if it doesn't
746
747         if( amt > size - mgr->redoend ) {
748           mgr->flushlsn = mgr->lsn;
749           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
750                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
751           mgr->redolast = 0;
752           mgr->redoend = 0;
753           bt_flushlsn(mgr, thread_no);
754         }
755
756         //      fill in new entry & either eof or end block
757
758         hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
759
760         hdr->len = amt;
761         hdr->type = type;
762         hdr->lvl = lvl;
763         hdr->lsn = ++mgr->lsn;
764
765         mgr->redoend += amt;
766
767         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
768         memset (eof, 0, sizeof(BtLogHdr));
769
770         //  fill in key and value
771
772         if( key ) {
773           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
774           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
775         }
776
777         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
778         memset (eof, 0, sizeof(BtLogHdr));
779         eof->lsn = mgr->lsn;
780
781         last = mgr->redolast & ~0xfff;
782         end = mgr->redoend;
783
784         if( end - last + sizeof(BtLogHdr) >= 8192 )
785           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
786                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
787           else
788                 mgr->redolast = end;
789
790         bt_releasemutex(mgr->redo);
791         return hdr->lsn;
792 }
793
794 //      recovery manager -- append transaction to recovery log
795 //      flush dirty pages to disk when it overflows.
796
797 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
798 {
799 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
800 uint amt = 0, src, type;
801 BtLogHdr *hdr, *eof;
802 uint last, end;
803 logseqno lsn;
804 BtKey *key;
805 BtVal *val;
806
807         //      determine amount of redo recovery log space required
808
809         for( src = 0; src++ < source->cnt; ) {
810           key = keyptr(source,src);
811           val = valptr(source,src);
812           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
813           amt += sizeof(BtLogHdr);
814         }
815
816         bt_mutexlock (mgr->redo);
817
818         //      see if new entry fits in buffer
819         //      flush and reset if it doesn't
820
821         if( amt > size - mgr->redoend ) {
822           mgr->flushlsn = mgr->lsn;
823           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
824                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
825           mgr->redolast = 0;
826           mgr->redoend = 0;
827           bt_flushlsn (mgr, thread_no);
828         }
829
830         //      assign new lsn to transaction
831
832         lsn = ++mgr->lsn;
833
834         //      fill in new entries
835
836         for( src = 0; src++ < source->cnt; ) {
837           key = keyptr(source, src);
838           val = valptr(source, src);
839
840           switch( slotptr(source, src)->type ) {
841           case Unique:
842                 type = BTRM_add;
843                 break;
844           case Duplicate:
845                 type = BTRM_dup;
846                 break;
847           case Delete:
848                 type = BTRM_del;
849                 break;
850           }
851
852           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
853           amt += sizeof(BtLogHdr);
854
855           hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
856           hdr->len = amt;
857           hdr->type = type;
858           hdr->lsn = lsn;
859           hdr->lvl = 0;
860
861           //  fill in key and value
862
863           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
864           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
865
866           mgr->redoend += amt;
867         }
868
869         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
870         memset (eof, 0, sizeof(BtLogHdr));
871         eof->lsn = lsn;
872
873         last = mgr->redolast & ~0xfff;
874         end = mgr->redoend;
875
876         if( end - last + sizeof(BtLogHdr) >= 8192 )
877           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
878                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
879           else
880                 mgr->redolast = end;
881
882         bt_releasemutex(mgr->redo);
883         return lsn;
884 }
885
886 //      read page into buffer pool from permanent location in Btree file
887
888 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
889 {
890 int flag = PROT_READ | PROT_WRITE;
891 uint segment = page_no >> 16;
892 BtPage perm;
893
894   while( 1 ) {
895         if( segment < mgr->segments ) {
896           perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
897 if( perm->page_no != page_no )
898 abort();
899                 memcpy (page, perm, mgr->page_size);
900                 mgr->reads++;
901                 return 0;
902         }
903
904         bt_mutexlock (mgr->maps);
905
906         if( segment < mgr->segments ) {
907                 bt_releasemutex (mgr->maps);
908                 continue;
909         }
910
911         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
912         mgr->segments++;
913
914         bt_releasemutex (mgr->maps);
915   }
916 }
917
918 //      write page to permanent location in Btree file
919 //      clear the dirty bit
920
921 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
922 {
923 int flag = PROT_READ | PROT_WRITE;
924 uint segment = page_no >> 16;
925 BtPage perm;
926
927   while( 1 ) {
928         if( segment < mgr->segments ) {
929           perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
930 if( page_no > LEAF_page && perm->page_no != page_no)
931 abort();
932                 memcpy (perm, page, mgr->page_size);
933                 mgr->writes++;
934                 return 0;
935         }
936
937         bt_mutexlock (mgr->maps);
938
939         if( segment < mgr->segments ) {
940                 bt_releasemutex (mgr->maps);
941                 continue;
942         }
943
944         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
945         bt_releasemutex (mgr->maps);
946         mgr->segments++;
947   }
948 }
949
950 //      set CLOCK bit in latch
951 //      decrement pin count
952
953 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
954 {
955         bt_mutexlock(latch->modify);
956         latch->pin |= CLOCK_bit;
957         latch->pin--;
958
959         bt_releasemutex(latch->modify);
960 }
961
962 //  return the btree cached page address
963
964 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
965 {
966 uid entry = latch - mgr->latchsets;
967 BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
968
969   return page;
970 }
971
972 //  return next available latch entry
973 //        and with latch entry locked
974
975 uint bt_availnext (BtMgr *mgr)
976 {
977 BtLatchSet *latch;
978 uint entry;
979
980   while( 1 ) {
981 #ifdef unix
982         entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
983 #else
984         entry = _InterlockedIncrement (&mgr->latchvictim);
985 #endif
986         entry %= mgr->latchtotal;
987
988         if( !entry )
989                 continue;
990
991         latch = mgr->latchsets + entry;
992
993         if( !bt_mutextry(latch->modify) )
994                 continue;
995
996         //  return this entry if it is not pinned
997
998         if( !latch->pin )
999                 return entry;
1000
1001         //      if the CLOCK bit is set
1002         //      reset it to zero.
1003
1004         latch->pin &= ~CLOCK_bit;
1005         bt_releasemutex(latch->modify);
1006   }
1007 }
1008
1009 //      pin page in buffer pool
1010 //      return with latchset pinned
1011
1012 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
1013 {
1014 uint hashidx = page_no % mgr->latchhash;
1015 BtLatchSet *latch;
1016 uint entry, idx;
1017 BtPage page;
1018
1019   //  try to find our entry
1020
1021   bt_mutexlock(mgr->hashtable[hashidx].latch);
1022
1023   if( entry = mgr->hashtable[hashidx].entry ) do
1024   {
1025         latch = mgr->latchsets + entry;
1026         if( page_no == latch->page_no )
1027                 break;
1028   } while( entry = latch->next );
1029
1030   //  found our entry: increment pin
1031
1032   if( entry ) {
1033         latch = mgr->latchsets + entry;
1034         bt_mutexlock(latch->modify);
1035         latch->pin |= CLOCK_bit;
1036         latch->pin++;
1037 if(contents)
1038 abort();
1039         bt_releasemutex(latch->modify);
1040         bt_releasemutex(mgr->hashtable[hashidx].latch);
1041         return latch;
1042   }
1043
1044   //  find and reuse unpinned entry
1045
1046 trynext:
1047
1048   entry = bt_availnext (mgr);
1049   latch = mgr->latchsets + entry;
1050
1051   idx = latch->page_no % mgr->latchhash;
1052
1053   //  if latch is on a different hash chain
1054   //    unlink from the old page_no chain
1055
1056   if( latch->page_no )
1057    if( idx != hashidx ) {
1058
1059         //  skip over this entry if latch not available
1060
1061     if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1062           bt_releasemutex(latch->modify);
1063           goto trynext;
1064         }
1065
1066         if( latch->prev )
1067           mgr->latchsets[latch->prev].next = latch->next;
1068         else
1069           mgr->hashtable[idx].entry = latch->next;
1070
1071         if( latch->next )
1072           mgr->latchsets[latch->next].prev = latch->prev;
1073
1074     bt_releasemutex (mgr->hashtable[idx].latch);
1075    }
1076
1077   page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1078
1079   //  update permanent page area in btree from buffer pool
1080   //    no read-lock is required since page is not pinned.
1081
1082   if( latch->dirty )
1083         if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
1084           return mgr->line = __LINE__, NULL;
1085         else
1086           latch->dirty = 0;
1087
1088   if( contents ) {
1089         memcpy (page, contents, mgr->page_size);
1090         latch->dirty = 1;
1091   } else if( bt_readpage (mgr, page, page_no) )
1092         return mgr->line = __LINE__, NULL;
1093
1094   //  link page as head of hash table chain
1095   //  if this is a never before used entry,
1096   //  or it was previously on a different
1097   //  hash table chain. Otherwise, just
1098   //  leave it in its current hash table
1099   //  chain position.
1100
1101   if( !latch->page_no || hashidx != idx ) {
1102         if( latch->next = mgr->hashtable[hashidx].entry )
1103           mgr->latchsets[latch->next].prev = entry;
1104
1105         mgr->hashtable[hashidx].entry = entry;
1106     latch->prev = 0;
1107   }
1108
1109   //  fill in latch structure
1110
1111   latch->pin = CLOCK_bit | 1;
1112   latch->page_no = page_no;
1113   latch->split = 0;
1114
1115   bt_releasemutex (latch->modify);
1116   bt_releasemutex (mgr->hashtable[hashidx].latch);
1117   return latch;
1118 }
1119   
1120 void bt_mgrclose (BtMgr *mgr)
1121 {
1122 BtLatchSet *latch;
1123 BtLogHdr *eof;
1124 uint num = 0;
1125 BtPage page;
1126 uint slot;
1127
1128         //      flush previously written dirty pages
1129         //      and write recovery buffer to disk
1130
1131         fdatasync (mgr->idx);
1132
1133         if( mgr->redoend ) {
1134                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1135                 memset (eof, 0, sizeof(BtLogHdr));
1136         }
1137
1138         //      write remaining dirty pool pages to the btree
1139
1140         for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1141                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1142                 latch = mgr->latchsets + slot;
1143
1144                 if( latch->dirty ) {
1145                         bt_writepage(mgr, page, latch->page_no);
1146                         latch->dirty = 0, num++;
1147                 }
1148         }
1149
1150         //      clear redo recovery buffer on disk.
1151
1152         if( mgr->pagezero->redopages ) {
1153                 eof = (BtLogHdr *)mgr->redobuff;
1154                 memset (eof, 0, sizeof(BtLogHdr));
1155                 eof->lsn = mgr->lsn;
1156                 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1157                   fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1158         }
1159
1160         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1161
1162 #ifdef unix
1163         while( mgr->segments )
1164                 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1165
1166         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1167         munmap (mgr->pagezero, mgr->page_size);
1168 #else
1169         FlushViewOfFile(mgr->pagezero, 0);
1170         UnmapViewOfFile(mgr->pagezero);
1171         UnmapViewOfFile(mgr->pagepool);
1172         CloseHandle(mgr->halloc);
1173         CloseHandle(mgr->hpool);
1174 #endif
1175 #ifdef unix
1176         close (mgr->idx);
1177         free (mgr);
1178 #else
1179         VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1180         FlushFileBuffers(mgr->idx);
1181         CloseHandle(mgr->idx);
1182         GlobalFree (mgr);
1183 #endif
1184 }
1185
1186 //      close and release memory
1187
1188 void bt_close (BtDb *bt)
1189 {
1190 #ifdef unix
1191         if( bt->cursor )
1192                 free (bt->cursor);
1193 #else
1194         if( bt->cursor)
1195                 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1196 #endif
1197         free (bt);
1198 }
1199
1200 //  open/create new btree buffer manager
1201
1202 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1203 //              size of page pool (e.g. 262144)
1204
1205 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1206 {
1207 uint lvl, attr, last, slot, idx;
1208 uint nlatchpage, latchhash;
1209 unsigned char value[BtId];
1210 int flag, initit = 0;
1211 BtPageZero *pagezero;
1212 BtLatchSet *latch;
1213 off64_t size;
1214 uint amt[1];
1215 BtMgr* mgr;
1216 BtKey* key;
1217 BtVal *val;
1218
1219         // determine sanity of page size and buffer pool
1220
1221         if( bits > BT_maxbits )
1222                 bits = BT_maxbits;
1223         else if( bits < BT_minbits )
1224                 bits = BT_minbits;
1225 #ifdef unix
1226         mgr = calloc (1, sizeof(BtMgr));
1227
1228         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1229
1230         if( mgr->idx == -1 ) {
1231                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1232                 return free(mgr), NULL;
1233         }
1234 #else
1235         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1236         attr = FILE_ATTRIBUTE_NORMAL;
1237         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1238
1239         if( mgr->idx == INVALID_HANDLE_VALUE ) {
1240                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1241                 return GlobalFree(mgr), NULL;
1242         }
1243 #endif
1244
1245 #ifdef unix
1246         pagezero = valloc (BT_maxpage);
1247         *amt = 0;
1248
1249         // read minimum page size to get root info
1250         //      to support raw disk partition files
1251         //      check if bits == 0 on the disk.
1252
1253         if( size = lseek (mgr->idx, 0L, 2) )
1254                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1255                         if( pagezero->alloc->bits )
1256                                 bits = pagezero->alloc->bits;
1257                         else
1258                                 initit = 1;
1259                 else
1260                         return free(mgr), free(pagezero), NULL;
1261         else
1262                 initit = 1;
1263 #else
1264         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1265         size = GetFileSize(mgr->idx, amt);
1266
1267         if( size || *amt ) {
1268                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1269                         return bt_mgrclose (mgr), NULL;
1270                 bits = pagezero->alloc->bits;
1271         } else
1272                 initit = 1;
1273 #endif
1274
1275         mgr->page_size = 1 << bits;
1276         mgr->page_bits = bits;
1277
1278         //  calculate number of latch hash table entries
1279
1280         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1281
1282         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1283         mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1284         mgr->latchtotal = nodemax;
1285
1286         if( !initit )
1287                 goto mgrlatch;
1288
1289         // initialize an empty b-tree with latch page, root page, page of leaves
1290         // and page(s) of latches and page pool cache
1291
1292         memset (pagezero, 0, 1 << bits);
1293         pagezero->alloc->lvl = MIN_lvl - 1;
1294         pagezero->alloc->bits = mgr->page_bits;
1295         pagezero->redopages = redopages;
1296
1297         bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1);
1298         pagezero->activepages = 2;
1299
1300         //  initialize left-most LEAF page in
1301         //      alloc->left and count of active leaf pages.
1302
1303         bt_putid (pagezero->alloc->left, LEAF_page);
1304         ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits);
1305
1306         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1307                 fprintf (stderr, "Unable to create btree page zero\n");
1308                 return bt_mgrclose (mgr), NULL;
1309         }
1310
1311         memset (pagezero, 0, 1 << bits);
1312         pagezero->alloc->bits = mgr->page_bits;
1313
1314         for( lvl=MIN_lvl; lvl--; ) {
1315         BtSlot *node = slotptr(pagezero->alloc, 1);
1316                 node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1317                 key = keyptr(pagezero->alloc, 1);
1318                 key->len = 2;           // create stopper key
1319                 key->key[0] = 0xff;
1320                 key->key[1] = 0xff;
1321
1322                 bt_putid(value, MIN_lvl - lvl + 1);
1323                 val = valptr(pagezero->alloc, 1);
1324                 val->len = lvl ? BtId : 0;
1325                 memcpy (val->value, value, val->len);
1326
1327                 pagezero->alloc->min = node->off;
1328                 pagezero->alloc->lvl = lvl;
1329                 pagezero->alloc->cnt = 1;
1330                 pagezero->alloc->act = 1;
1331                 pagezero->alloc->page_no = MIN_lvl - lvl;
1332
1333                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1334                         fprintf (stderr, "Unable to create btree page\n");
1335                         return bt_mgrclose (mgr), NULL;
1336                 }
1337         }
1338
1339 mgrlatch:
1340 #ifdef unix
1341         free (pagezero);
1342 #else
1343         VirtualFree (pagezero, 0, MEM_RELEASE);
1344 #endif
1345 #ifdef unix
1346         // mlock the first segment of 64K pages
1347
1348         flag = PROT_READ | PROT_WRITE;
1349         mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1350         mgr->segments = 1;
1351
1352         if( mgr->pages[0] == MAP_FAILED ) {
1353                 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1354                 return bt_mgrclose (mgr), NULL;
1355         }
1356
1357         mgr->pagezero = (BtPageZero *)mgr->pages[0];
1358         mlock (mgr->pagezero, mgr->page_size);
1359
1360         mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size;
1361         mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1362
1363         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1364         if( mgr->pagepool == MAP_FAILED ) {
1365                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1366                 return bt_mgrclose (mgr), NULL;
1367         }
1368 #else
1369         flag = PAGE_READWRITE;
1370         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1371         if( !mgr->halloc ) {
1372                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1373                 return bt_mgrclose (mgr), NULL;
1374         }
1375
1376         flag = FILE_MAP_WRITE;
1377         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1378         if( !mgr->pagezero ) {
1379                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1380                 return bt_mgrclose (mgr), NULL;
1381         }
1382
1383         flag = PAGE_READWRITE;
1384         size = (uid)mgr->nlatchpage << mgr->page_bits;
1385         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1386         if( !mgr->hpool ) {
1387                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1388                 return bt_mgrclose (mgr), NULL;
1389         }
1390
1391         flag = FILE_MAP_WRITE;
1392         mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1393         if( !mgr->pagepool ) {
1394                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1395                 return bt_mgrclose (mgr), NULL;
1396         }
1397 #endif
1398
1399         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1400         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1401         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1402
1403         return mgr;
1404 }
1405
1406 //      open BTree access method
1407 //      based on buffer manager
1408
1409 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1410 {
1411 BtDb *bt = malloc (sizeof(*bt));
1412
1413         memset (bt, 0, sizeof(*bt));
1414         bt->main = main;
1415         bt->mgr = mgr;
1416 #ifdef unix
1417         bt->cursor = valloc (mgr->page_size);
1418 #else
1419         bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1420 #endif
1421 #ifdef unix
1422         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1423 #else
1424         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1425 #endif
1426         return bt;
1427 }
1428
1429 //  compare two keys, return > 0, = 0, or < 0
1430 //  =0: keys are same
1431 //  -1: key2 > key1
1432 //  +1: key2 < key1
1433 //  as the comparison value
1434
1435 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1436 {
1437 uint len1 = key1->len;
1438 int ans;
1439
1440         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1441                 return ans;
1442
1443         if( len1 > len2 )
1444                 return 1;
1445         if( len1 < len2 )
1446                 return -1;
1447
1448         return 0;
1449 }
1450
1451 // place write, read, or parent lock on requested page_no.
1452
1453 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1454 {
1455         switch( mode ) {
1456         case BtLockRead:
1457                 ReadLock (latch->readwr, thread_no);
1458                 break;
1459         case BtLockWrite:
1460                 WriteLock (latch->readwr, thread_no);
1461                 break;
1462         case BtLockAccess:
1463                 ReadLock (latch->access, thread_no);
1464                 break;
1465         case BtLockDelete:
1466                 WriteLock (latch->access, thread_no);
1467                 break;
1468         case BtLockParent:
1469                 WriteOLock (latch->parent, thread_no);
1470                 break;
1471         case BtLockAtomic:
1472                 WriteOLock (latch->atomic, thread_no);
1473                 break;
1474         case BtLockAtomic | BtLockRead:
1475                 WriteOLock (latch->atomic, thread_no);
1476                 ReadLock (latch->readwr, thread_no);
1477                 break;
1478         }
1479 }
1480
1481 // remove write, read, or parent lock on requested page
1482
1483 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1484 {
1485         switch( mode ) {
1486         case BtLockRead:
1487                 ReadRelease (latch->readwr);
1488                 break;
1489         case BtLockWrite:
1490                 WriteRelease (latch->readwr);
1491                 break;
1492         case BtLockAccess:
1493                 ReadRelease (latch->access);
1494                 break;
1495         case BtLockDelete:
1496                 WriteRelease (latch->access);
1497                 break;
1498         case BtLockParent:
1499                 WriteORelease (latch->parent);
1500                 break;
1501         case BtLockAtomic:
1502                 WriteORelease (latch->atomic);
1503                 break;
1504         case BtLockAtomic | BtLockRead:
1505                 WriteORelease (latch->atomic);
1506                 ReadRelease (latch->readwr);
1507                 break;
1508         }
1509 }
1510
1511 //      allocate a new page
1512 //      return with page latched, but unlocked.
1513
1514 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1515 {
1516 uid page_no;
1517 int blk;
1518
1519         //      lock allocation page
1520
1521         bt_mutexlock(mgr->lock);
1522
1523         // use empty chain first
1524         // else allocate empty page
1525
1526         if( page_no = bt_getid(mgr->pagezero->freechain) ) {
1527                 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1528                         set->page = bt_mappage (mgr, set->latch);
1529                 else
1530                         return mgr->line = __LINE__, mgr->err = BTERR_struct;
1531
1532                 mgr->pagezero->activepages++;
1533                 bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
1534                 if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1535                   fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1536
1537                 bt_releasemutex(mgr->lock);
1538
1539                 contents->page_no = page_no;
1540                 memcpy (set->page, contents, mgr->page_size);
1541                 set->latch->dirty = 1;
1542                 return 0;
1543         }
1544
1545         page_no = bt_getid(mgr->pagezero->alloc->right);
1546         bt_putid(mgr->pagezero->alloc->right, page_no+1);
1547
1548         // unlock allocation latch and
1549         //      extend file into new page.
1550
1551         mgr->pagezero->activepages++;
1552         if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1553           fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1554         bt_releasemutex(mgr->lock);
1555
1556         contents->page_no = page_no;
1557         pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
1558
1559         //      don't load cache from btree page, load it from contents
1560
1561         if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1562                 set->page = bt_mappage (mgr, set->latch);
1563         else
1564                 return mgr->err;
1565
1566         return 0;
1567 }
1568
1569 //  find slot in page for given key at a given level
1570
1571 int bt_findslot (BtPage page, unsigned char *key, uint len)
1572 {
1573 uint diff, higher = page->cnt, low = 1, slot;
1574 uint good = 0;
1575
1576         //        make stopper key an infinite fence value
1577
1578         if( bt_getid (page->right) )
1579                 higher++;
1580         else
1581                 good++;
1582
1583         //      low is the lowest candidate.
1584         //  loop ends when they meet
1585
1586         //  higher is already
1587         //      tested as .ge. the passed key.
1588
1589         while( diff = higher - low ) {
1590                 slot = low + ( diff >> 1 );
1591                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1592                         low = slot + 1;
1593                 else
1594                         higher = slot, good++;
1595         }
1596
1597         //      return zero if key is on right link page
1598
1599         return good ? higher : 0;
1600 }
1601
1602 //  find and load page at given level for given key
1603 //      leave page rd or wr locked as requested
1604
1605 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1606 {
1607 uid page_no = ROOT_page, prevpage = 0;
1608 uint drill = 0xff, slot;
1609 BtLatchSet *prevlatch;
1610 uint mode, prevmode;
1611 BtVal *val;
1612
1613   //  start at root of btree and drill down
1614
1615   do {
1616         // determine lock mode of drill level
1617         mode = (drill == lvl) ? lock : BtLockRead; 
1618
1619         if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1620           return 0;
1621
1622         // obtain access lock using lock chaining with Access mode
1623
1624         if( page_no > ROOT_page )
1625           bt_lockpage(BtLockAccess, set->latch, thread_no);
1626
1627         set->page = bt_mappage (mgr, set->latch);
1628
1629         //      release & unpin parent or left sibling page
1630
1631         if( prevpage ) {
1632           bt_unlockpage(prevmode, prevlatch);
1633           bt_unpinlatch (mgr, prevlatch);
1634           prevpage = 0;
1635         }
1636
1637         // obtain mode lock using lock chaining through AccessLock
1638
1639         bt_lockpage(mode, set->latch, thread_no);
1640
1641         if( set->page->free )
1642                 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1643
1644         if( page_no > ROOT_page )
1645           bt_unlockpage(BtLockAccess, set->latch);
1646
1647         // re-read and re-lock root after determining actual level of root
1648
1649         if( set->page->lvl != drill) {
1650                 if( set->latch->page_no != ROOT_page )
1651                         return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1652                         
1653                 drill = set->page->lvl;
1654
1655                 if( lock != BtLockRead && drill == lvl ) {
1656                   bt_unlockpage(mode, set->latch);
1657                   bt_unpinlatch (mgr, set->latch);
1658                   continue;
1659                 }
1660         }
1661
1662         prevpage = set->latch->page_no;
1663         prevlatch = set->latch;
1664         prevmode = mode;
1665
1666         //  find key on page at this level
1667         //  and descend to requested level
1668
1669         if( !set->page->kill )
1670          if( slot = bt_findslot (set->page, key, len) ) {
1671           if( drill == lvl )
1672                 return slot;
1673
1674           // find next non-dead slot -- the fence key if nothing else
1675
1676           while( slotptr(set->page, slot)->dead )
1677                 if( slot++ < set->page->cnt )
1678                   continue;
1679                 else
1680                   return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1681
1682           val = valptr(set->page, slot);
1683
1684           if( val->len == BtId )
1685                 page_no = bt_getid(valptr(set->page, slot)->value);
1686           else
1687                 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1688
1689           drill--;
1690           continue;
1691          }
1692
1693         //  slide right into next page
1694
1695         page_no = bt_getid(set->page->right);
1696   } while( page_no );
1697
1698   // return error on end of right chain
1699
1700   mgr->line = __LINE__, mgr->err = BTERR_struct;
1701   return 0;     // return error
1702 }
1703
1704 //      return page to free list
1705 //      page must be delete & write locked
1706
1707 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1708 {
1709         //      lock allocation page
1710
1711         bt_mutexlock (mgr->lock);
1712
1713         //      store chain
1714
1715         memcpy(set->page->right, mgr->pagezero->freechain, BtId);
1716         bt_putid(mgr->pagezero->freechain, set->latch->page_no);
1717         set->latch->dirty = 1;
1718         set->page->free = 1;
1719
1720         // decrement active page count
1721
1722         mgr->pagezero->activepages--;
1723         if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1724           fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1725
1726         // unlock released page
1727         // and unlock allocation page
1728
1729         bt_unlockpage (BtLockDelete, set->latch);
1730         bt_unlockpage (BtLockWrite, set->latch);
1731         bt_unpinlatch (mgr, set->latch);
1732         bt_releasemutex (mgr->lock);
1733 }
1734
1735 //      a fence key was deleted from a page
1736 //      push new fence value upwards
1737
1738 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1739 {
1740 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1741 unsigned char value[BtId];
1742 BtKey* ptr;
1743 uint idx;
1744
1745         //      remove the old fence value
1746
1747         ptr = keyptr(set->page, set->page->cnt);
1748         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1749         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1750         set->latch->dirty = 1;
1751
1752         //  cache new fence value
1753
1754         ptr = keyptr(set->page, set->page->cnt);
1755         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1756
1757         bt_lockpage (BtLockParent, set->latch, thread_no);
1758         bt_unlockpage (BtLockWrite, set->latch);
1759
1760         //      insert new (now smaller) fence key
1761
1762         bt_putid (value, set->latch->page_no);
1763         ptr = (BtKey*)leftkey;
1764
1765         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1766           return mgr->err;
1767
1768         //      now delete old fence key
1769
1770         ptr = (BtKey*)rightkey;
1771
1772         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1773                 return mgr->err;
1774
1775         bt_unlockpage (BtLockParent, set->latch);
1776         bt_unpinlatch(mgr, set->latch);
1777         return 0;
1778 }
1779
1780 //      root has a single child
1781 //      collapse a level from the tree
1782
1783 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1784 {
1785 BtPageSet child[1];
1786 uid page_no;
1787 BtVal *val;
1788 uint idx;
1789
1790   // find the child entry and promote as new root contents
1791
1792   do {
1793         for( idx = 0; idx++ < root->page->cnt; )
1794           if( !slotptr(root->page, idx)->dead )
1795                 break;
1796
1797         val = valptr(root->page, idx);
1798
1799         if( val->len == BtId )
1800                 page_no = bt_getid (valptr(root->page, idx)->value);
1801         else
1802                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1803
1804         if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1805                 child->page = bt_mappage (mgr, child->latch);
1806         else
1807                 return mgr->err;
1808
1809         bt_lockpage (BtLockDelete, child->latch, thread_no);
1810         bt_lockpage (BtLockWrite, child->latch, thread_no);
1811
1812         memcpy (root->page, child->page, mgr->page_size);
1813         root->latch->dirty = 1;
1814
1815         bt_freepage (mgr, child);
1816
1817   } while( root->page->lvl > 1 && root->page->act == 1 );
1818
1819   bt_unlockpage (BtLockWrite, root->latch);
1820   bt_unpinlatch (mgr, root->latch);
1821   return 0;
1822 }
1823
1824 //  delete a page and manage keys
1825 //  call with page writelocked
1826
1827 //      returns with page removed
1828 //      from the page pool.
1829
1830 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1831 {
1832 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1833 unsigned char value[BtId];
1834 uint lvl = set->page->lvl;
1835 BtPageSet right[1];
1836 uid page_no;
1837 BtKey *ptr;
1838
1839
1840         //      cache copy of fence key
1841         //      to remove in parent
1842
1843         ptr = keyptr(set->page, set->page->cnt);
1844         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1845
1846         //      obtain lock on right page
1847
1848         page_no = bt_getid(set->page->right);
1849
1850         if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1851                 right->page = bt_mappage (mgr, right->latch);
1852         else
1853                 return 0;
1854
1855         bt_lockpage (BtLockWrite, right->latch, thread_no);
1856
1857         // cache copy of key to update
1858
1859         ptr = keyptr(right->page, right->page->cnt);
1860         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1861
1862         if( right->page->kill )
1863                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1864
1865         // pull contents of right peer into our empty page
1866
1867         memcpy (set->page, right->page, mgr->page_size);
1868         set->page->page_no = set->latch->page_no;
1869         set->latch->dirty = 1;
1870
1871         // mark right page deleted and point it to left page
1872         //      until we can post parent updates that remove access
1873         //      to the deleted page.
1874
1875         bt_putid (right->page->right, set->latch->page_no);
1876         right->latch->dirty = 1;
1877         right->page->kill = 1;
1878
1879         bt_lockpage (BtLockParent, right->latch, thread_no);
1880         bt_unlockpage (BtLockWrite, right->latch);
1881
1882         bt_lockpage (BtLockParent, set->latch, thread_no);
1883         bt_unlockpage (BtLockWrite, set->latch);
1884
1885         // redirect higher key directly to our new node contents
1886
1887         bt_putid (value, set->latch->page_no);
1888         ptr = (BtKey*)higherfence;
1889
1890         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1891           return mgr->err;
1892
1893         //      delete old lower key to our node
1894
1895         ptr = (BtKey*)lowerfence;
1896
1897         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1898           return mgr->err;
1899
1900         //      obtain delete and write locks to right node
1901
1902         bt_unlockpage (BtLockParent, right->latch);
1903         bt_lockpage (BtLockDelete, right->latch, thread_no);
1904         bt_lockpage (BtLockWrite, right->latch, thread_no);
1905         bt_freepage (mgr, right);
1906
1907         bt_unlockpage (BtLockParent, set->latch);
1908         bt_unpinlatch (mgr, set->latch);
1909         return 0;
1910 }
1911
1912 //  find and delete key on page by marking delete flag bit
1913 //  if page becomes empty, delete it from the btree
1914
1915 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
1916 {
1917 uint slot, idx, found, fence;
1918 BtPageSet set[1];
1919 BtSlot *node;
1920 BtKey *ptr;
1921 BtVal *val;
1922
1923         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
1924                 node = slotptr(set->page, slot);
1925                 ptr = keyptr(set->page, slot);
1926         } else
1927                 return mgr->err;
1928
1929         // if librarian slot, advance to real slot
1930
1931         if( node->type == Librarian ) {
1932                 ptr = keyptr(set->page, ++slot);
1933                 node = slotptr(set->page, slot);
1934         }
1935
1936         fence = slot == set->page->cnt;
1937
1938         // delete the key, ignore request if already dead
1939
1940         if( found = !keycmp (ptr, key, len) )
1941           if( found = node->dead == 0 ) {
1942                 val = valptr(set->page,slot);
1943                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1944                 set->page->act--;
1945
1946                 //  mark node type as delete
1947
1948                 node->type = Delete;
1949                 node->dead = 1;
1950
1951                 // collapse empty slots beneath the fence
1952                 // on interiour nodes
1953
1954                 if( lvl )
1955                  while( idx = set->page->cnt - 1 )
1956                   if( slotptr(set->page, idx)->dead ) {
1957                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1958                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1959                   } else
1960                         break;
1961           }
1962
1963         //      did we delete a fence key in an upper level?
1964
1965         if( found && lvl && set->page->act && fence )
1966           if( bt_fixfence (mgr, set, lvl, thread_no) )
1967                 return mgr->err;
1968           else
1969                 return 0;
1970
1971         //      do we need to collapse root?
1972
1973         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1974           if( bt_collapseroot (mgr, set, thread_no) )
1975                 return mgr->err;
1976           else
1977                 return 0;
1978
1979         //      delete empty page
1980
1981         if( !set->page->act ) {
1982           if( bt_deletepage (mgr, set, thread_no) )
1983                 return mgr->err;
1984         } else {
1985           set->latch->dirty = 1;
1986           bt_unlockpage(BtLockWrite, set->latch);
1987           bt_unpinlatch (mgr, set->latch);
1988         }
1989
1990         return 0;
1991 }
1992
1993 //      advance to next slot
1994
1995 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1996 {
1997 BtLatchSet *prevlatch;
1998 uid page_no;
1999
2000         if( slot < set->page->cnt )
2001                 return slot + 1;
2002
2003         prevlatch = set->latch;
2004
2005         if( page_no = bt_getid(set->page->right) )
2006           if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2007                 set->page = bt_mappage (bt->mgr, set->latch);
2008           else
2009                 return 0;
2010         else
2011           return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2012
2013         // obtain access lock using lock chaining with Access mode
2014
2015         bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2016
2017         bt_unlockpage(BtLockRead, prevlatch);
2018         bt_unpinlatch (bt->mgr, prevlatch);
2019
2020         bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2021         bt_unlockpage(BtLockAccess, set->latch);
2022         return 1;
2023 }
2024
2025 //      find unique key == given key, or first duplicate key in
2026 //      leaf level and return number of value bytes
2027 //      or (-1) if not found.
2028
2029 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2030 {
2031 BtPageSet set[1];
2032 uint len, slot;
2033 int ret = -1;
2034 BtKey *ptr;
2035 BtVal *val;
2036
2037   if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2038    do {
2039         ptr = keyptr(set->page, slot);
2040
2041         //      skip librarian slot place holder
2042
2043         if( slotptr(set->page, slot)->type == Librarian )
2044                 ptr = keyptr(set->page, ++slot);
2045
2046         //      return actual key found
2047
2048         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2049         len = ptr->len;
2050
2051         if( slotptr(set->page, slot)->type == Duplicate )
2052                 len -= BtId;
2053
2054         //      not there if we reach the stopper key
2055
2056         if( slot == set->page->cnt )
2057           if( !bt_getid (set->page->right) )
2058                 break;
2059
2060         // if key exists, return >= 0 value bytes copied
2061         //      otherwise return (-1)
2062
2063         if( slotptr(set->page, slot)->dead )
2064                 continue;
2065
2066         if( keylen == len )
2067           if( !memcmp (ptr->key, key, len) ) {
2068                 val = valptr (set->page,slot);
2069                 if( valmax > val->len )
2070                         valmax = val->len;
2071                 memcpy (value, val->value, valmax);
2072                 ret = valmax;
2073           }
2074
2075         break;
2076
2077    } while( slot = bt_findnext (bt, set, slot) );
2078
2079   bt_unlockpage (BtLockRead, set->latch);
2080   bt_unpinlatch (bt->mgr, set->latch);
2081   return ret;
2082 }
2083
2084 //      check page for space available,
2085 //      clean if necessary and return
2086 //      0 - page needs splitting
2087 //      >0  new slot value
2088
2089 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2090 {
2091 BtPage page = set->page, frame;
2092 uint nxt = mgr->page_size;
2093 uint cnt = 0, idx = 0;
2094 uint max = page->cnt;
2095 uint newslot = max;
2096 BtKey *key;
2097 BtVal *val;
2098
2099         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2100                 return slot;
2101
2102         //      skip cleanup and proceed to split
2103         //      if there's not enough garbage
2104         //      to bother with.
2105
2106         if( page->garbage < nxt / 5 )
2107                 return 0;
2108
2109         frame = malloc (mgr->page_size);
2110         memcpy (frame, page, mgr->page_size);
2111
2112         // skip page info and set rest of page to zero
2113
2114         memset (page+1, 0, mgr->page_size - sizeof(*page));
2115         set->latch->dirty = 1;
2116
2117         page->garbage = 0;
2118         page->act = 0;
2119
2120         // clean up page first by
2121         // removing deleted keys
2122
2123         while( cnt++ < max ) {
2124                 if( cnt == slot )
2125                         newslot = idx + 2;
2126
2127                 if( cnt < max || frame->lvl )
2128                   if( slotptr(frame,cnt)->dead )
2129                         continue;
2130
2131                 // copy the value across
2132
2133                 val = valptr(frame, cnt);
2134                 nxt -= val->len + sizeof(BtVal);
2135                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2136
2137                 // copy the key across
2138
2139                 key = keyptr(frame, cnt);
2140                 nxt -= key->len + sizeof(BtKey);
2141                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2142
2143                 // make a librarian slot
2144
2145                 slotptr(page, ++idx)->off = nxt;
2146                 slotptr(page, idx)->type = Librarian;
2147                 slotptr(page, idx)->dead = 1;
2148
2149                 // set up the slot
2150
2151                 slotptr(page, ++idx)->off = nxt;
2152                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2153
2154                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2155                         page->act++;
2156         }
2157
2158         page->min = nxt;
2159         page->cnt = idx;
2160         free (frame);
2161
2162         //      see if page has enough space now, or does it need splitting?
2163
2164         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2165                 return newslot;
2166
2167         return 0;
2168 }
2169
2170 // split the root and raise the height of the btree
2171
2172 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2173 {  
2174 unsigned char leftkey[BT_keyarray];
2175 uint nxt = mgr->page_size;
2176 unsigned char value[BtId];
2177 BtPageSet left[1];
2178 uid left_page_no;
2179 BtPage frame;
2180 BtKey *ptr;
2181 BtVal *val;
2182
2183         frame = malloc (mgr->page_size);
2184         memcpy (frame, root->page, mgr->page_size);
2185
2186         //      save left page fence key for new root
2187
2188         ptr = keyptr(root->page, root->page->cnt);
2189         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2190
2191         //  Obtain an empty page to use, and copy the current
2192         //  root contents into it, e.g. lower keys
2193
2194         if( bt_newpage(mgr, left, frame, page_no) )
2195                 return mgr->err;
2196
2197         left_page_no = left->latch->page_no;
2198         bt_unpinlatch (mgr, left->latch);
2199         free (frame);
2200
2201         // preserve the page info at the bottom
2202         // of higher keys and set rest to zero
2203
2204         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2205
2206         // insert stopper key at top of newroot page
2207         // and increase the root height
2208
2209         nxt -= BtId + sizeof(BtVal);
2210         bt_putid (value, right->page_no);
2211         val = (BtVal *)((unsigned char *)root->page + nxt);
2212         memcpy (val->value, value, BtId);
2213         val->len = BtId;
2214
2215         nxt -= 2 + sizeof(BtKey);
2216         slotptr(root->page, 2)->off = nxt;
2217         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2218         ptr->len = 2;
2219         ptr->key[0] = 0xff;
2220         ptr->key[1] = 0xff;
2221
2222         // insert lower keys page fence key on newroot page as first key
2223
2224         nxt -= BtId + sizeof(BtVal);
2225         bt_putid (value, left_page_no);
2226         val = (BtVal *)((unsigned char *)root->page + nxt);
2227         memcpy (val->value, value, BtId);
2228         val->len = BtId;
2229
2230         ptr = (BtKey *)leftkey;
2231         nxt -= ptr->len + sizeof(BtKey);
2232         slotptr(root->page, 1)->off = nxt;
2233         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2234         
2235         bt_putid(root->page->right, 0);
2236         root->page->min = nxt;          // reset lowest used offset and key count
2237         root->page->cnt = 2;
2238         root->page->act = 2;
2239         root->page->lvl++;
2240
2241         mgr->pagezero->alloc->lvl = root->page->lvl;
2242
2243         // release and unpin root pages
2244
2245         bt_unlockpage(BtLockWrite, root->latch);
2246         bt_unpinlatch (mgr, root->latch);
2247
2248         bt_unpinlatch (mgr, right);
2249         return 0;
2250 }
2251
2252 //  split already locked full node
2253 //      leave it locked.
2254 //      return pool entry for new right
2255 //      page, unlocked
2256
2257 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2258 {
2259 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2260 BtPage frame = malloc (mgr->page_size);
2261 uint lvl = set->page->lvl;
2262 BtPageSet right[1];
2263 BtKey *key, *ptr;
2264 BtVal *val, *src;
2265 uid right2;
2266 uint prev;
2267
2268         //  split higher half of keys to frame
2269
2270         memset (frame, 0, mgr->page_size);
2271         max = set->page->cnt;
2272         cnt = max / 2;
2273         idx = 0;
2274
2275         while( cnt++ < max ) {
2276                 if( cnt < max || set->page->lvl )
2277                   if( slotptr(set->page, cnt)->dead )
2278                         continue;
2279
2280                 src = valptr(set->page, cnt);
2281                 nxt -= src->len + sizeof(BtVal);
2282                 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2283
2284                 key = keyptr(set->page, cnt);
2285                 nxt -= key->len + sizeof(BtKey);
2286                 ptr = (BtKey*)((unsigned char *)frame + nxt);
2287                 memcpy (ptr, key, key->len + sizeof(BtKey));
2288
2289                 //      add librarian slot
2290
2291                 slotptr(frame, ++idx)->off = nxt;
2292                 slotptr(frame, idx)->type = Librarian;
2293                 slotptr(frame, idx)->dead = 1;
2294
2295                 //  add actual slot
2296
2297                 slotptr(frame, ++idx)->off = nxt;
2298                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2299
2300                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2301                         frame->act++;
2302         }
2303
2304         frame->bits = mgr->page_bits;
2305         frame->min = nxt;
2306         frame->cnt = idx;
2307         frame->lvl = lvl;
2308
2309         // link right node
2310
2311         if( set->latch->page_no > ROOT_page )
2312                 bt_putid (frame->right, bt_getid (set->page->right));
2313
2314         //      get new free page and write higher keys to it.
2315
2316         if( bt_newpage(mgr, right, frame, thread_no) )
2317                 return 0;
2318
2319         // process lower keys
2320
2321         memcpy (frame, set->page, mgr->page_size);
2322         memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2323         set->latch->dirty = 1;
2324
2325         nxt = mgr->page_size;
2326         set->page->garbage = 0;
2327         set->page->act = 0;
2328         max /= 2;
2329         cnt = 0;
2330         idx = 0;
2331
2332         if( slotptr(frame, max)->type == Librarian )
2333                 max--;
2334
2335         //  assemble page of smaller keys
2336
2337         while( cnt++ < max ) {
2338                 if( slotptr(frame, cnt)->dead )
2339                         continue;
2340                 val = valptr(frame, cnt);
2341                 nxt -= val->len + sizeof(BtVal);
2342                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2343
2344                 key = keyptr(frame, cnt);
2345                 nxt -= key->len + sizeof(BtKey);
2346                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2347
2348                 //      add librarian slot
2349
2350                 slotptr(set->page, ++idx)->off = nxt;
2351                 slotptr(set->page, idx)->type = Librarian;
2352                 slotptr(set->page, idx)->dead = 1;
2353
2354                 //      add actual slot
2355
2356                 slotptr(set->page, ++idx)->off = nxt;
2357                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2358                 set->page->act++;
2359         }
2360
2361         bt_putid(set->page->right, right->latch->page_no);
2362         set->page->min = nxt;
2363         set->page->cnt = idx;
2364         free(frame);
2365
2366         return right->latch - mgr->latchsets;
2367 }
2368
2369 //      fix keys for newly split page
2370 //      call with page locked, return
2371 //      unlocked
2372
2373 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2374 {
2375 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2376 unsigned char value[BtId];
2377 uint lvl = set->page->lvl;
2378 BtPage page;
2379 BtKey *ptr;
2380
2381         // if current page is the root page, split it
2382
2383         if( set->latch->page_no == ROOT_page )
2384                 return bt_splitroot (mgr, set, right, thread_no);
2385
2386         ptr = keyptr(set->page, set->page->cnt);
2387         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2388
2389         page = bt_mappage (mgr, right);
2390
2391         ptr = keyptr(page, page->cnt);
2392         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2393
2394         // insert new fences in their parent pages
2395
2396         bt_lockpage (BtLockParent, right, thread_no);
2397
2398         bt_lockpage (BtLockParent, set->latch, thread_no);
2399         bt_unlockpage (BtLockWrite, set->latch);
2400
2401         // insert new fence for reformulated left block of smaller keys
2402
2403         bt_putid (value, set->latch->page_no);
2404         ptr = (BtKey *)leftkey;
2405
2406         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2407                 return mgr->err;
2408
2409         // switch fence for right block of larger keys to new right page
2410
2411         bt_putid (value, right->page_no);
2412         ptr = (BtKey *)rightkey;
2413
2414         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2415                 return mgr->err;
2416
2417         bt_unlockpage (BtLockParent, set->latch);
2418         bt_unpinlatch (mgr, set->latch);
2419
2420         bt_unlockpage (BtLockParent, right);
2421         bt_unpinlatch (mgr, right);
2422         return 0;
2423 }
2424
2425 //      install new key and value onto page
2426 //      page must already be checked for
2427 //      adequate space
2428
2429 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2430 {
2431 uint idx, librarian;
2432 BtSlot *node;
2433 BtKey *ptr;
2434 BtVal *val;
2435
2436         //      if found slot > desired slot and previous slot
2437         //      is a librarian slot, use it
2438
2439         if( slot > 1 )
2440           if( slotptr(set->page, slot-1)->type == Librarian )
2441                 slot--;
2442
2443         // copy value onto page
2444
2445         set->page->min -= vallen + sizeof(BtVal);
2446         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2447         memcpy (val->value, value, vallen);
2448         val->len = vallen;
2449
2450         // copy key onto page
2451
2452         set->page->min -= keylen + sizeof(BtKey);
2453         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2454         memcpy (ptr->key, key, keylen);
2455         ptr->len = keylen;
2456         
2457         //      find first empty slot
2458
2459         for( idx = slot; idx < set->page->cnt; idx++ )
2460           if( slotptr(set->page, idx)->dead )
2461                 break;
2462
2463         // now insert key into array before slot
2464
2465         if( idx == set->page->cnt )
2466                 idx += 2, set->page->cnt += 2, librarian = 2;
2467         else
2468                 librarian = 1;
2469
2470         set->latch->dirty = 1;
2471         set->page->act++;
2472
2473         while( idx > slot + librarian - 1 )
2474                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2475
2476         //      add librarian slot
2477
2478         if( librarian > 1 ) {
2479                 node = slotptr(set->page, slot++);
2480                 node->off = set->page->min;
2481                 node->type = Librarian;
2482                 node->dead = 1;
2483         }
2484
2485         //      fill in new slot
2486
2487         node = slotptr(set->page, slot);
2488         node->off = set->page->min;
2489         node->type = type;
2490         node->dead = 0;
2491
2492         if( release ) {
2493                 bt_unlockpage (BtLockWrite, set->latch);
2494                 bt_unpinlatch (mgr, set->latch);
2495         }
2496
2497         return 0;
2498 }
2499
2500 //  Insert new key into the btree at given level.
2501 //      either add a new key or update/add an existing one
2502
2503 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2504 {
2505 uint slot, idx, len, entry;
2506 BtPageSet set[1];
2507 BtSlot *node;
2508 BtKey *ptr;
2509 BtVal *val;
2510
2511   while( 1 ) { // find the page and slot for the current key
2512         if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2513                 node = slotptr(set->page, slot);
2514                 ptr = keyptr(set->page, slot);
2515         } else {
2516                 if( !mgr->err )
2517                         mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2518                 return mgr->err;
2519         }
2520
2521         // if librarian slot == found slot, advance to real slot
2522
2523         if( node->type == Librarian )
2524           if( !keycmp (ptr, key, keylen) ) {
2525                 ptr = keyptr(set->page, ++slot);
2526                 node = slotptr(set->page, slot);
2527           }
2528
2529         //  if inserting a duplicate key or unique
2530         //      key that doesn't exist on the page,
2531         //      check for adequate space on the page
2532         //      and insert the new key before slot.
2533
2534         switch( type ) {
2535         case Unique:
2536         case Duplicate:
2537           if( keycmp (ptr, key, keylen) )
2538            if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2539             return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
2540            else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2541                 return mgr->err;
2542            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2543                 return mgr->err;
2544            else
2545                 continue;
2546
2547           // if key already exists, update value and return
2548
2549           val = valptr(set->page, slot);
2550
2551           if( val->len >= vallen ) {
2552                 if( slotptr(set->page, slot)->dead )
2553                         set->page->act++;
2554                 node->type = type;
2555                 node->dead = 0;
2556
2557                 set->page->garbage += val->len - vallen;
2558                 set->latch->dirty = 1;
2559                 val->len = vallen;
2560                 memcpy (val->value, value, vallen);
2561                 bt_unlockpage(BtLockWrite, set->latch);
2562                 bt_unpinlatch (mgr, set->latch);
2563                 return 0;
2564           }
2565
2566           //  new update value doesn't fit in existing value area
2567           //    make sure page has room
2568
2569           if( !node->dead )
2570                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2571           else
2572                 set->page->act++;
2573
2574           node->type = type;
2575           node->dead = 0;
2576
2577           if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2578            if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2579                 return mgr->err;
2580            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2581                 return mgr->err;
2582            else
2583                 continue;
2584
2585           //  copy key and value onto page and update slot
2586
2587           set->page->min -= vallen + sizeof(BtVal);
2588           val = (BtVal*)((unsigned char *)set->page + set->page->min);
2589           memcpy (val->value, value, vallen);
2590           val->len = vallen;
2591
2592           set->latch->dirty = 1;
2593           set->page->min -= keylen + sizeof(BtKey);
2594           ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2595           memcpy (ptr->key, key, keylen);
2596           ptr->len = keylen;
2597         
2598           node->off = set->page->min;
2599           bt_unlockpage(BtLockWrite, set->latch);
2600           bt_unpinlatch (mgr, set->latch);
2601           return 0;
2602     }
2603   }
2604   return 0;
2605 }
2606
2607 typedef struct {
2608         logseqno reqlsn;        // redo log seq no required
2609         logseqno lsn;           // redo log sequence number
2610         uint entry;                     // latch table entry number
2611         uint slot:31;           // page slot number
2612         uint reuse:1;           // reused previous page
2613 } AtomicTxn;
2614
2615 typedef struct {
2616         uid page_no;            // page number for split leaf
2617         void *next;                     // next key to insert
2618         uint entry:29;          // latch table entry number
2619         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2620         uint nounlock:1;        // don't unlock ParentModification
2621         unsigned char leafkey[BT_keyarray];
2622 } AtomicKey;
2623
2624 //      determine actual page where key is located
2625 //  return slot number
2626
2627 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2628 {
2629 BtKey *key = keyptr(source,src);
2630 uint slot = locks[src].slot;
2631 uint entry;
2632
2633         if( src > 1 && locks[src].reuse )
2634           entry = locks[src-1].entry, slot = 0;
2635         else
2636           entry = locks[src].entry;
2637
2638         if( slot ) {
2639                 set->latch = mgr->latchsets + entry;
2640                 set->page = bt_mappage (mgr, set->latch);
2641                 return slot;
2642         }
2643
2644         //      is locks->reuse set? or was slot zeroed?
2645         //      if so, find where our key is located 
2646         //      on current page or pages split on
2647         //      same page txn operations.
2648
2649         do {
2650                 set->latch = mgr->latchsets + entry;
2651                 set->page = bt_mappage (mgr, set->latch);
2652
2653                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2654                   if( slotptr(set->page, slot)->type == Librarian )
2655                         slot++;
2656                   if( locks[src].reuse )
2657                         locks[src].entry = entry;
2658                   return slot;
2659                 }
2660         } while( entry = set->latch->split );
2661
2662         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2663         return 0;
2664 }
2665
2666 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
2667 {
2668 BtKey *key = keyptr(source, src);
2669 BtVal *val = valptr(source, src);
2670 BtLatchSet *latch;
2671 BtPageSet set[1];
2672 uint entry, slot;
2673
2674   while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2675         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2676           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2677                 return mgr->err;
2678           set->page->lsn = locks[src].lsn;
2679           return 0;
2680         }
2681
2682         if( entry = bt_splitpage (mgr, set, thread_no) )
2683           latch = mgr->latchsets + entry;
2684         else
2685           return mgr->err;
2686
2687         //      splice right page into split chain
2688         //      and WriteLock it.
2689
2690         bt_lockpage(BtLockWrite, latch, thread_no);
2691         latch->split = set->latch->split;
2692         set->latch->split = entry;
2693         locks[src].slot = 0;
2694   }
2695
2696   return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2697 }
2698
2699 //      perform delete from smaller btree
2700 //  insert a delete slot if not found there
2701
2702 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
2703 {
2704 BtKey *key = keyptr(source, src);
2705 BtPageSet set[1];
2706 uint idx, slot;
2707 BtSlot *node;
2708 BtKey *ptr;
2709 BtVal *val;
2710
2711         if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2712           node = slotptr(set->page, slot);
2713           ptr = keyptr(set->page, slot);
2714           val = valptr(set->page, slot);
2715         } else
2716           return mgr->line = __LINE__, mgr->err = BTERR_struct;
2717
2718         //  if slot is not found, insert a delete slot
2719
2720         if( keycmp (ptr, key->key, key->len) )
2721           return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
2722
2723         //      if node is already dead,
2724         //      ignore the request.
2725
2726         if( node->dead )
2727                 return 0;
2728
2729         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2730         set->latch->dirty = 1;
2731         set->page->lsn = locks[src].lsn;
2732         set->page->act--;
2733
2734         node->dead = 0;
2735         __sync_fetch_and_add(&mgr->found, 1);
2736         return 0;
2737 }
2738
2739 //      delete an empty master page for a transaction
2740
2741 //      note that the far right page never empties because
2742 //      it always contains (at least) the infinite stopper key
2743 //      and that all pages that don't contain any keys are
2744 //      deleted, or are being held under Atomic lock.
2745
2746 BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no)
2747 {
2748 BtPageSet right[1], temp[1];
2749 unsigned char value[BtId];
2750 uid right_page_no;
2751 BtKey *ptr;
2752
2753         bt_lockpage(BtLockWrite, prev->latch, thread_no);
2754
2755         //      grab the right sibling
2756
2757         if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) )
2758                 right->page = bt_mappage (mgr, right->latch);
2759         else
2760                 return mgr->err;
2761
2762         bt_lockpage(BtLockAtomic, right->latch, thread_no);
2763         bt_lockpage(BtLockWrite, right->latch, thread_no);
2764
2765         //      and pull contents over empty page
2766         //      while preserving master's left link
2767
2768         memcpy (right->page->left, prev->page->left, BtId);
2769         memcpy (prev->page, right->page, mgr->page_size);
2770
2771         //      forward seekers to old right sibling
2772         //      to new page location in set
2773
2774         bt_putid (right->page->right, prev->latch->page_no);
2775         right->latch->dirty = 1;
2776         right->page->kill = 1;
2777
2778         //      remove pointer to right page for searchers by
2779         //      changing right fence key to point to the master page
2780
2781         ptr = keyptr(right->page,right->page->cnt);
2782         bt_putid (value, prev->latch->page_no);
2783
2784         if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
2785                 return mgr->err;
2786
2787         //  now that master page is in good shape we can
2788         //      remove its locks.
2789
2790         bt_unlockpage (BtLockAtomic, prev->latch);
2791         bt_unlockpage (BtLockWrite, prev->latch);
2792
2793         //  fix master's right sibling's left pointer
2794         //      to remove scanner's poiner to the right page
2795
2796         if( right_page_no = bt_getid (prev->page->right) ) {
2797           if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
2798                 temp->page = bt_mappage (mgr, temp->latch);
2799           else
2800                 return mgr->err;
2801
2802           bt_lockpage (BtLockWrite, temp->latch, thread_no);
2803           bt_putid (temp->page->left, prev->latch->page_no);
2804           temp->latch->dirty = 1;
2805
2806           bt_unlockpage (BtLockWrite, temp->latch);
2807           bt_unpinlatch (mgr, temp->latch);
2808         } else {        // master is now the far right page
2809           bt_mutexlock (mgr->lock);
2810           bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
2811           bt_releasemutex(mgr->lock);
2812         }
2813
2814         //      now that there are no pointers to the right page
2815         //      we can delete it after the last read access occurs
2816
2817         bt_unlockpage (BtLockWrite, right->latch);
2818         bt_unlockpage (BtLockAtomic, right->latch);
2819         bt_lockpage (BtLockDelete, right->latch, thread_no);
2820         bt_lockpage (BtLockWrite, right->latch, thread_no);
2821         bt_freepage (mgr, right);
2822         return 0;
2823 }
2824
2825 //      atomic modification of a batch of keys.
2826
2827 //      return -1 if BTERR is set
2828 //      otherwise return slot number
2829 //      causing the key constraint violation
2830 //      or zero on successful completion.
2831
2832 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2833 {
2834 uint src, idx, slot, samepage, entry, que = 0;
2835 AtomicKey *head, *tail, *leaf;
2836 BtPageSet set[1], prev[1];
2837 unsigned char value[BtId];
2838 BtKey *key, *ptr, *key2;
2839 BtLatchSet *latch;
2840 AtomicTxn *locks;
2841 int result = 0;
2842 BtSlot temp[1];
2843 logseqno lsn;
2844 BtPage page;
2845 BtVal *val;
2846 uid right;
2847 int type;
2848
2849   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2850   head = NULL;
2851   tail = NULL;
2852
2853   // stable sort the list of keys into order to
2854   //    prevent deadlocks between threads.
2855
2856   for( src = 1; src++ < source->cnt; ) {
2857         *temp = *slotptr(source,src);
2858         key = keyptr (source,src);
2859
2860         for( idx = src; --idx; ) {
2861           key2 = keyptr (source,idx);
2862           if( keycmp (key, key2->key, key2->len) < 0 ) {
2863                 *slotptr(source,idx+1) = *slotptr(source,idx);
2864                 *slotptr(source,idx) = *temp;
2865           } else
2866                 break;
2867         }
2868   }
2869
2870   //  add entries to redo log
2871
2872   if( bt->mgr->pagezero->redopages )
2873    if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
2874     for( src = 0; src++ < source->cnt; )
2875          locks[src].lsn = lsn;
2876     else
2877          return bt->mgr->err;
2878
2879   // Load the leaf page for each key
2880   // group same page references with reuse bit
2881   // and determine any constraint violations
2882
2883   for( src = 0; src++ < source->cnt; ) {
2884         key = keyptr(source, src);
2885         slot = 0;
2886
2887         // first determine if this modification falls
2888         // on the same page as the previous modification
2889         //      note that the far right leaf page is a special case
2890
2891         if( samepage = src > 1 )
2892           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2893                 slot = bt_findslot(set->page, key->key, key->len);
2894           else
2895                 bt_unlockpage(BtLockRead, set->latch); 
2896
2897         if( !slot )
2898           if( slot = bt_loadpage(bt->mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, bt->thread_no) )
2899                 set->latch->split = 0;
2900           else
2901                 return bt->mgr->err;
2902
2903         if( slotptr(set->page, slot)->type == Librarian )
2904           ptr = keyptr(set->page, ++slot);
2905         else
2906           ptr = keyptr(set->page, slot);
2907
2908         if( !samepage ) {
2909           locks[src].entry = set->latch - bt->mgr->latchsets;
2910           locks[src].slot = slot;
2911           locks[src].reuse = 0;
2912         } else {
2913           locks[src].entry = 0;
2914           locks[src].slot = 0;
2915           locks[src].reuse = 1;
2916         }
2917
2918         //      capture current lsn for master page
2919
2920         locks[src].reqlsn = set->page->lsn;
2921   }
2922
2923   //  unlock last loadpage lock
2924
2925   if( source->cnt )
2926         bt_unlockpage(BtLockRead, set->latch);
2927
2928   //  obtain write lock for each master page
2929   //  sync flushed pages to disk
2930
2931   for( src = 0; src++ < source->cnt; ) {
2932         if( locks[src].reuse )
2933           continue;
2934
2935         set->latch = bt->mgr->latchsets + locks[src].entry;
2936         bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
2937   }
2938
2939   // insert or delete each key
2940   // process any splits or merges
2941   // release Write & Atomic latches
2942   // set ParentModifications and build
2943   // queue of keys to insert for split pages
2944   // or delete for deleted pages.
2945
2946   // run through txn list backwards
2947
2948   samepage = source->cnt + 1;
2949
2950   for( src = source->cnt; src; src-- ) {
2951         if( locks[src].reuse )
2952           continue;
2953
2954         //  perform the txn operations
2955         //      from smaller to larger on
2956         //  the same page
2957
2958         for( idx = src; idx < samepage; idx++ )
2959          switch( slotptr(source,idx)->type ) {
2960          case Delete:
2961           if( bt_atomicdelete (bt->mgr, source, locks, idx, bt->thread_no) )
2962                 return bt->mgr->err;
2963           break;
2964
2965         case Duplicate:
2966         case Unique:
2967           if( bt_atomicinsert (bt->mgr, source, locks, idx, bt->thread_no) )
2968                 return bt->mgr->err;
2969           break;
2970         }
2971
2972         //      after the same page operations have finished,
2973         //  process master page for splits or deletion.
2974
2975         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2976         prev->page = bt_mappage (bt->mgr, prev->latch);
2977         samepage = src;
2978
2979         //  pick-up all splits from master page
2980         //      each one is already WriteLocked.
2981
2982         entry = prev->latch->split;
2983
2984         while( entry ) {
2985           set->latch = bt->mgr->latchsets + entry;
2986           set->page = bt_mappage (bt->mgr, set->latch);
2987           entry = set->latch->split;
2988
2989           // delete empty master page by undoing its split
2990           //  (this is potentially another empty page)
2991           //  note that there are no new left pointers yet
2992
2993           if( !prev->page->act ) {
2994                 memcpy (set->page->left, prev->page->left, BtId);
2995                 memcpy (prev->page, set->page, bt->mgr->page_size);
2996                 bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
2997                 bt_freepage (bt->mgr, set);
2998
2999                 prev->latch->split = set->latch->split;
3000                 prev->latch->dirty = 1;
3001                 continue;
3002           }
3003
3004           // remove empty page from the split chain
3005           // and return it to the free list.
3006
3007           if( !set->page->act ) {
3008                 memcpy (prev->page->right, set->page->right, BtId);
3009                 prev->latch->split = set->latch->split;
3010                 bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
3011                 bt_freepage (bt->mgr, set);
3012                 continue;
3013           }
3014
3015           //  schedule prev fence key update
3016
3017           ptr = keyptr(prev->page,prev->page->cnt);
3018           leaf = calloc (sizeof(AtomicKey), 1), que++;
3019
3020           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3021           leaf->entry = prev->latch - bt->mgr->latchsets;
3022           leaf->page_no = prev->latch->page_no;
3023           leaf->type = 0;
3024
3025           if( tail )
3026                 tail->next = leaf;
3027           else
3028                 head = leaf;
3029
3030           tail = leaf;
3031
3032           // splice in the left link into the split page
3033
3034           bt_putid (set->page->left, prev->latch->page_no);
3035           bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
3036           bt_unlockpage(BtLockWrite, prev->latch);
3037           *prev = *set;
3038         }
3039
3040         //  update left pointer in next right page from last split page
3041         //      (if all splits were reversed, latch->split == 0)
3042
3043         if( latch->split ) {
3044           //  fix left pointer in master's original (now split)
3045           //  far right sibling or set rightmost page in page zero
3046
3047           if( right = bt_getid (prev->page->right) ) {
3048                 if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3049                   set->page = bt_mappage (bt->mgr, set->latch);
3050                 else
3051                   return bt->mgr->err;
3052
3053             bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3054             bt_putid (set->page->left, prev->latch->page_no);
3055                 set->latch->dirty = 1;
3056             bt_unlockpage (BtLockWrite, set->latch);
3057                 bt_unpinlatch (bt->mgr, set->latch);
3058           } else {      // prev is rightmost page
3059             bt_mutexlock (bt->mgr->lock);
3060                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
3061             bt_releasemutex(bt->mgr->lock);
3062           }
3063
3064           //  Process last page split in chain
3065
3066           ptr = keyptr(prev->page,prev->page->cnt);
3067           leaf = calloc (sizeof(AtomicKey), 1), que++;
3068
3069           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3070           leaf->entry = prev->latch - bt->mgr->latchsets;
3071           leaf->page_no = prev->latch->page_no;
3072           leaf->type = 0;
3073   
3074           if( tail )
3075                 tail->next = leaf;
3076           else
3077                 head = leaf;
3078
3079           tail = leaf;
3080
3081           bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
3082           bt_unlockpage(BtLockWrite, prev->latch);
3083
3084           //  remove atomic lock on master page
3085
3086           bt_unlockpage(BtLockAtomic, latch);
3087           continue;
3088         }
3089
3090         //  finished if prev page occupied (either master or final split)
3091
3092         if( prev->page->act ) {
3093           bt_unlockpage(BtLockWrite, latch);
3094           bt_unlockpage(BtLockAtomic, latch);
3095           bt_unpinlatch(bt->mgr, latch);
3096           continue;
3097         }
3098
3099         // any and all splits were reversed, and the
3100         // master page located in prev is empty, delete it
3101         // by pulling over master's right sibling.
3102
3103         // Remove empty master's fence key
3104
3105         ptr = keyptr(prev->page,prev->page->cnt);
3106
3107         if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3108                 return bt->mgr->err;
3109
3110         //      perform the remainder of the delete
3111         //      from the FIFO queue
3112
3113         leaf = calloc (sizeof(AtomicKey), 1), que++;
3114
3115         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3116         leaf->entry = prev->latch - bt->mgr->latchsets;
3117         leaf->page_no = prev->latch->page_no;
3118         leaf->nounlock = 1;
3119         leaf->type = 2;
3120   
3121         if( tail )
3122           tail->next = leaf;
3123         else
3124           head = leaf;
3125
3126         tail = leaf;
3127
3128         //      leave atomic lock in place until
3129         //      deletion completes in next phase.
3130
3131         bt_unlockpage(BtLockWrite, prev->latch);
3132   }
3133
3134   //  add & delete keys for any pages split or merged during transaction
3135
3136   if( leaf = head )
3137     do {
3138           set->latch = bt->mgr->latchsets + leaf->entry;
3139           set->page = bt_mappage (bt->mgr, set->latch);
3140
3141           bt_putid (value, leaf->page_no);
3142           ptr = (BtKey *)leaf->leafkey;
3143
3144           switch( leaf->type ) {
3145           case 0:       // insert key
3146             if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, Unique, bt->thread_no) )
3147                   return bt->mgr->err;
3148
3149                 break;
3150
3151           case 1:       // delete key
3152                 if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3153                   return bt->mgr->err;
3154
3155                 break;
3156
3157           case 2:       // free page
3158                 if( bt_atomicfree (bt->mgr, set, bt->thread_no) )
3159                   return bt->mgr->err;
3160
3161                 break;
3162           }
3163
3164           if( !leaf->nounlock )
3165             bt_unlockpage (BtLockParent, set->latch);
3166
3167           bt_unpinlatch (bt->mgr, set->latch);
3168           tail = leaf->next;
3169           free (leaf);
3170         } while( leaf = tail );
3171
3172   // if number of active pages
3173   // is greater than the buffer pool
3174   // promote page into larger btree
3175
3176   if( bt->main )
3177    while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
3178         if( bt_txnpromote (bt) )
3179           return bt->mgr->err;
3180
3181   // return success
3182
3183   free (locks);
3184   return 0;
3185 }
3186
3187 //  promote a page into the larger btree
3188
3189 BTERR bt_txnpromote (BtDb *bt)
3190 {
3191 uint entry, slot, idx;
3192 BtPageSet set[1];
3193 BtSlot *node;
3194 BtKey *ptr;
3195 BtVal *val;
3196
3197   while( 1 ) {
3198 #ifdef unix
3199         entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
3200 #else
3201         entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
3202 #endif
3203         entry %= bt->mgr->latchtotal;
3204
3205         if( !entry )
3206                 continue;
3207
3208         set->latch = bt->mgr->latchsets + entry;
3209         idx = set->latch->page_no % bt->mgr->latchhash;
3210
3211         if( !bt_mutextry(set->latch->modify) )
3212                 continue;
3213
3214         //  skip this entry if it is pinned
3215
3216         if( set->latch->pin & ~CLOCK_bit ) {
3217           bt_releasemutex(set->latch->modify);
3218           continue;
3219         }
3220
3221         set->page = bt_mappage (bt->mgr, set->latch);
3222
3223         // entry never used or has no right sibling
3224
3225         if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3226           bt_releasemutex(set->latch->modify);
3227           continue;
3228         }
3229
3230         bt_lockpage (BtLockAccess, set->latch, bt->thread_no);
3231         bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3232     bt_unlockpage(BtLockAccess, set->latch);
3233
3234         // entry interiour node or being or was killed
3235
3236         if( set->page->lvl || set->page->free || set->page->kill ) {
3237           bt_releasemutex(set->latch->modify);
3238       bt_unlockpage(BtLockWrite, set->latch);
3239           continue;
3240         }
3241
3242         //  pin the page for our useage
3243
3244         set->latch->pin++;
3245         bt_releasemutex(set->latch->modify);
3246
3247         // transfer slots in our selected page to larger btree
3248 if( !(set->latch->page_no % 100) )
3249 fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cnt);
3250
3251         for( slot = 0; slot++ < set->page->cnt; ) {
3252                 ptr = keyptr (set->page, slot);
3253                 val = valptr (set->page, slot);
3254                 node = slotptr(set->page, slot);
3255
3256                 switch( node->type ) {
3257                 case Duplicate:
3258                 case Unique:
3259                   if( bt_insertkey (bt->main, ptr->key, ptr->len, 0, val->value, val->len, node->type, bt->thread_no) )
3260                         return bt->main->err;
3261
3262                   continue;
3263
3264                 case Delete:
3265                   if( bt_deletekey (bt->main, ptr->key, ptr->len, 0, bt->thread_no) )
3266                         return bt->main->err;
3267
3268                   continue;
3269                 }
3270         }
3271
3272         //  now delete the page
3273
3274         if( bt_deletepage (bt->mgr, set, bt->thread_no) )
3275                 return bt->mgr->err;
3276
3277         return 0;
3278   }
3279 }
3280
3281 //      set cursor to highest slot on highest page
3282
3283 uint bt_lastkey (BtDb *bt)
3284 {
3285 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3286 BtPageSet set[1];
3287
3288         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3289                 set->page = bt_mappage (bt->mgr, set->latch);
3290         else
3291                 return 0;
3292
3293     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3294         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3295     bt_unlockpage(BtLockRead, set->latch);
3296         bt_unpinlatch (bt->mgr, set->latch);
3297         return bt->cursor->cnt;
3298 }
3299
3300 //      return previous slot on cursor page
3301
3302 uint bt_prevkey (BtDb *bt, uint slot)
3303 {
3304 uid cursor_page = bt->cursor->page_no;
3305 uid ourright, next, us = cursor_page;
3306 BtPageSet set[1];
3307
3308         if( --slot )
3309                 return slot;
3310
3311         ourright = bt_getid(bt->cursor->right);
3312
3313 goleft:
3314         if( !(next = bt_getid(bt->cursor->left)) )
3315                 return 0;
3316
3317 findourself:
3318         cursor_page = next;
3319
3320         if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3321                 set->page = bt_mappage (bt->mgr, set->latch);
3322         else
3323                 return 0;
3324
3325     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3326         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3327         bt_unlockpage(BtLockRead, set->latch);
3328         bt_unpinlatch (bt->mgr, set->latch);
3329         
3330         next = bt_getid (bt->cursor->right);
3331
3332         if( bt->cursor->kill )
3333                 goto findourself;
3334
3335         if( next != us )
3336           if( next == ourright )
3337                 goto goleft;
3338           else
3339                 goto findourself;
3340
3341         return bt->cursor->cnt;
3342 }
3343
3344 //  return next slot on cursor page
3345 //  or slide cursor right into next page
3346
3347 uint bt_nextkey (BtDb *bt, uint slot)
3348 {
3349 BtPageSet set[1];
3350 uid right;
3351
3352   do {
3353         right = bt_getid(bt->cursor->right);
3354
3355         while( slot++ < bt->cursor->cnt )
3356           if( slotptr(bt->cursor,slot)->dead )
3357                 continue;
3358           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3359                 return slot;
3360           else
3361                 break;
3362
3363         if( !right )
3364                 break;
3365
3366         if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3367                 set->page = bt_mappage (bt->mgr, set->latch);
3368         else
3369                 return 0;
3370
3371     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3372         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3373         bt_unlockpage(BtLockRead, set->latch);
3374         bt_unpinlatch (bt->mgr, set->latch);
3375         slot = 0;
3376
3377   } while( 1 );
3378
3379   return bt->mgr->err = 0;
3380 }
3381
3382 //  cache page of keys into cursor and return starting slot for given key
3383
3384 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3385 {
3386 BtPageSet set[1];
3387 uint slot;
3388
3389         // cache page for retrieval
3390
3391         if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3392           memcpy (bt->cursor, set->page, bt->mgr->page_size);
3393         else
3394           return 0;
3395
3396         bt_unlockpage(BtLockRead, set->latch);
3397         bt_unpinlatch (bt->mgr, set->latch);
3398         return slot;
3399 }
3400
3401 #ifdef STANDALONE
3402
3403 #ifndef unix
3404 double getCpuTime(int type)
3405 {
3406 FILETIME crtime[1];
3407 FILETIME xittime[1];
3408 FILETIME systime[1];
3409 FILETIME usrtime[1];
3410 SYSTEMTIME timeconv[1];
3411 double ans = 0;
3412
3413         memset (timeconv, 0, sizeof(SYSTEMTIME));
3414
3415         switch( type ) {
3416         case 0:
3417                 GetSystemTimeAsFileTime (xittime);
3418                 FileTimeToSystemTime (xittime, timeconv);
3419                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3420                 break;
3421         case 1:
3422                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3423                 FileTimeToSystemTime (usrtime, timeconv);
3424                 break;
3425         case 2:
3426                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3427                 FileTimeToSystemTime (systime, timeconv);
3428                 break;
3429         }
3430
3431         ans += (double)timeconv->wHour * 3600;
3432         ans += (double)timeconv->wMinute * 60;
3433         ans += (double)timeconv->wSecond;
3434         ans += (double)timeconv->wMilliseconds / 1000;
3435         return ans;
3436 }
3437 #else
3438 #include <time.h>
3439 #include <sys/resource.h>
3440
3441 double getCpuTime(int type)
3442 {
3443 struct rusage used[1];
3444 struct timeval tv[1];
3445
3446         switch( type ) {
3447         case 0:
3448                 gettimeofday(tv, NULL);
3449                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3450
3451         case 1:
3452                 getrusage(RUSAGE_SELF, used);
3453                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3454
3455         case 2:
3456                 getrusage(RUSAGE_SELF, used);
3457                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3458         }
3459
3460         return 0;
3461 }
3462 #endif
3463
3464 void bt_poolaudit (BtMgr *mgr)
3465 {
3466 BtLatchSet *latch;
3467 uint entry = 0;
3468
3469         while( ++entry < mgr->latchtotal ) {
3470                 latch = mgr->latchsets + entry;
3471
3472                 if( *latch->readwr->rin & MASK )
3473                         fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3474
3475                 if( *latch->access->rin & MASK )
3476                         fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3477
3478 //              if( *latch->parent->xcl->value )
3479 //                      fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3480
3481 //              if( *latch->atomic->xcl->value )
3482 //                      fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3483
3484 //              if( *latch->modify->value )
3485 //                      fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3486
3487                 if( latch->pin & ~CLOCK_bit )
3488                         fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3489         }
3490 }
3491
3492 typedef struct {
3493         char idx;
3494         char *type;
3495         char *infile;
3496         BtMgr *main;
3497         BtMgr *mgr;
3498         int num;
3499 } ThreadArg;
3500
3501 //  standalone program to index file of keys
3502 //  then list them onto std-out
3503
3504 #ifdef unix
3505 void *index_file (void *arg)
3506 #else
3507 uint __stdcall index_file (void *arg)
3508 #endif
3509 {
3510 int line = 0, found = 0, cnt = 0, idx;
3511 uid next, page_no = LEAF_page;  // start on first page of leaves
3512 int ch, len = 0, slot, type = 0;
3513 unsigned char key[BT_maxkey];
3514 unsigned char txn[65536];
3515 ThreadArg *args = arg;
3516 BtPage page, frame;
3517 BtPageSet set[1];
3518 uint nxt = 65536;
3519 BtKey *ptr;
3520 BtVal *val;
3521 BtDb *bt;
3522 FILE *in;
3523
3524         bt = bt_open (args->mgr, args->main);
3525         page = (BtPage)txn;
3526
3527         if( args->idx < strlen (args->type) )
3528                 ch = args->type[args->idx];
3529         else
3530                 ch = args->type[strlen(args->type) - 1];
3531
3532         switch(ch | 0x20)
3533         {
3534         case 'd':
3535                 type = Delete;
3536
3537         case 'p':
3538                 if( !type )
3539                         type = Unique;
3540
3541                 if( args->num )
3542                  if( type == Delete )
3543                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3544                  else
3545                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3546                 else
3547                  if( type == Delete )
3548                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3549                  else
3550                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3551
3552                 if( in = fopen (args->infile, "rb") )
3553                   while( ch = getc(in), ch != EOF )
3554                         if( ch == '\n' )
3555                         {
3556                           line++;
3557
3558                           if( !args->num ) {
3559                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3560                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3561                             len = 0;
3562                                 continue;
3563                           }
3564
3565                           nxt -= len - 10;
3566                           memcpy (txn + nxt, key + 10, len - 10);
3567                           nxt -= 1;
3568                           txn[nxt] = len - 10;
3569                           nxt -= 10;
3570                           memcpy (txn + nxt, key, 10);
3571                           nxt -= 1;
3572                           txn[nxt] = 10;
3573                           slotptr(page,++cnt)->off  = nxt;
3574                           slotptr(page,cnt)->type = type;
3575                           len = 0;
3576
3577                           if( cnt < args->num )
3578                                 continue;
3579
3580                           page->cnt = cnt;
3581                           page->act = cnt;
3582                           page->min = nxt;
3583
3584                           if( bt_atomictxn (bt, page) )
3585                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3586                           nxt = sizeof(txn);
3587                           cnt = 0;
3588
3589                         }
3590                         else if( len < BT_maxkey )
3591                                 key[len++] = ch;
3592                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3593                 break;
3594
3595         case 'w':
3596                 fprintf(stderr, "started indexing for %s\n", args->infile);
3597                 if( in = fopen (args->infile, "r") )
3598                   while( ch = getc(in), ch != EOF )
3599                         if( ch == '\n' )
3600                         {
3601                           line++;
3602
3603                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3604                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3605                           len = 0;
3606                         }
3607                         else if( len < BT_maxkey )
3608                                 key[len++] = ch;
3609                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3610                 break;
3611
3612         case 'f':
3613                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3614                 if( in = fopen (args->infile, "rb") )
3615                   while( ch = getc(in), ch != EOF )
3616                         if( ch == '\n' )
3617                         {
3618                           line++;
3619                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3620                                 found++;
3621                           else if( bt->mgr->err )
3622                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3623                           len = 0;
3624                         }
3625                         else if( len < BT_maxkey )
3626                                 key[len++] = ch;
3627                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3628                 break;
3629
3630         case 's':
3631                 fprintf(stderr, "started scanning\n");
3632                 
3633                 do {
3634                         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3635                                 set->page = bt_mappage (bt->mgr, set->latch);
3636                         else
3637                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3638
3639                         bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3640                         next = bt_getid (set->page->right);
3641
3642                         for( slot = 0; slot++ < set->page->cnt; )
3643                          if( next || slot < set->page->cnt )
3644                           if( !slotptr(set->page, slot)->dead ) {
3645                                 ptr = keyptr(set->page, slot);
3646                                 len = ptr->len;
3647
3648                             if( slotptr(set->page, slot)->type == Duplicate )
3649                                         len -= BtId;
3650
3651                                 fwrite (ptr->key, len, 1, stdout);
3652                                 val = valptr(set->page, slot);
3653                                 fwrite (val->value, val->len, 1, stdout);
3654                                 fputc ('\n', stdout);
3655                                 cnt++;
3656                            }
3657
3658                         bt_unlockpage (BtLockRead, set->latch);
3659                         bt_unpinlatch (bt->mgr, set->latch);
3660                 } while( page_no = next );
3661
3662                 fprintf(stderr, " Total keys read %d\n", cnt);
3663                 break;
3664
3665         case 'r':
3666                 fprintf(stderr, "started reverse scan\n");
3667                 if( slot = bt_lastkey (bt) )
3668                    while( slot = bt_prevkey (bt, slot) ) {
3669                         if( slotptr(bt->cursor, slot)->dead )
3670                           continue;
3671
3672                         ptr = keyptr(bt->cursor, slot);
3673                         len = ptr->len;
3674
3675                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3676                                 len -= BtId;
3677
3678                         fwrite (ptr->key, len, 1, stdout);
3679                         val = valptr(bt->cursor, slot);
3680                         fwrite (val->value, val->len, 1, stdout);
3681                         fputc ('\n', stdout);
3682                         cnt++;
3683                   }
3684
3685                 fprintf(stderr, " Total keys read %d\n", cnt);
3686                 break;
3687
3688         case 'c':
3689 #ifdef unix
3690                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3691 #endif
3692                 fprintf(stderr, "started counting\n");
3693                 next = REDO_page + bt->mgr->pagezero->redopages;
3694
3695                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3696                         if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3697                                 break;
3698
3699                         if( !bt->cursor->free && !bt->cursor->lvl )
3700                                 cnt += bt->cursor->act;
3701
3702                         bt->mgr->reads++;
3703                         page_no = next++;
3704                 }
3705                 
3706                 cnt--;  // remove stopper key
3707                 fprintf(stderr, " Total keys counted %d\n", cnt);
3708                 break;
3709         }
3710
3711         bt_close (bt);
3712 #ifdef unix
3713         return NULL;
3714 #else
3715         return 0;
3716 #endif
3717 }
3718
3719 typedef struct timeval timer;
3720
3721 int main (int argc, char **argv)
3722 {
3723 int idx, cnt, len, slot, err;
3724 int segsize, bits = 16;
3725 double start, stop;
3726 #ifdef unix
3727 pthread_t *threads;
3728 #else
3729 HANDLE *threads;
3730 #endif
3731 ThreadArg *args;
3732 uint redopages = 0;
3733 uint poolsize = 0;
3734 uint mainpool = 0;
3735 uint mainbits = 0;
3736 float elapsed;
3737 int num = 0;
3738 char key[1];
3739 BtMgr *main;
3740 BtMgr *mgr;
3741 BtKey *ptr;
3742
3743         if( argc < 3 ) {
3744                 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3745                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
3746                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
3747                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3748                 fprintf (stderr, "  page_bits is the page size in bits for the cache btree\n");
3749                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3750                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3751                 fprintf (stderr, "  recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3752                 fprintf (stderr, "  main_bits is the page size of the main btree in bits\n");
3753                 fprintf (stderr, "  main_pool is the number of main pages in the main buffer pool\n");
3754                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3755                 exit(0);
3756         }
3757
3758         start = getCpuTime(0);
3759
3760         if( argc > 4 )
3761                 bits = atoi(argv[4]);
3762
3763         if( argc > 5 )
3764                 poolsize = atoi(argv[5]);
3765
3766         if( !poolsize )
3767                 fprintf (stderr, "Warning: no mapped_pool\n");
3768
3769         if( argc > 6 )
3770                 num = atoi(argv[6]);
3771
3772         if( argc > 7 )
3773                 redopages = atoi(argv[7]);
3774
3775         if( redopages + REDO_page > 65535 )
3776                 fprintf (stderr, "Warning: Recovery buffer too large\n");
3777
3778         if( argc > 8 )
3779                 mainbits = atoi(argv[8]);
3780
3781         if( argc > 9 )
3782                 mainpool = atoi(argv[9]);
3783
3784         cnt = argc - 10;
3785 #ifdef unix
3786         threads = malloc (cnt * sizeof(pthread_t));
3787 #else
3788         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3789 #endif
3790         args = malloc ((cnt + 1) * sizeof(ThreadArg));
3791
3792         mgr = bt_mgr (argv[1], bits, poolsize, redopages);
3793
3794         if( !mgr ) {
3795                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3796                 exit (1);
3797         }
3798
3799         if( mainbits ) {
3800                 main = bt_mgr (argv[2], mainbits, mainpool, 0);
3801
3802                 if( !main ) {
3803                         fprintf(stderr, "Index Open Error %s\n", argv[2]);
3804                         exit (1);
3805                 }
3806         } else
3807                 main = NULL;
3808
3809         //      fire off threads
3810
3811         if( cnt )
3812           for( idx = 0; idx < cnt; idx++ ) {
3813                 args[idx].infile = argv[idx + 10];
3814                 args[idx].type = argv[3];
3815                 args[idx].main = main;
3816                 args[idx].mgr = mgr;
3817                 args[idx].num = num;
3818                 args[idx].idx = idx;
3819 #ifdef unix
3820                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3821                         fprintf(stderr, "Error creating thread %d\n", err);
3822 #else
3823                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3824 #endif
3825           }
3826         else {
3827                 args[0].infile = argv[idx + 10];
3828                 args[0].type = argv[3];
3829                 args[0].main = main;
3830                 args[0].mgr = mgr;
3831                 args[0].num = num;
3832                 args[0].idx = 0;
3833                 index_file (args);
3834         }
3835
3836         //      wait for termination
3837
3838 #ifdef unix
3839         for( idx = 0; idx < cnt; idx++ )
3840                 pthread_join (threads[idx], NULL);
3841 #else
3842         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3843
3844         for( idx = 0; idx < cnt; idx++ )
3845                 CloseHandle(threads[idx]);
3846 #endif
3847         bt_poolaudit(mgr);
3848
3849         if( main )
3850                 bt_poolaudit(main);
3851
3852         fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
3853
3854         if( main )
3855                 bt_mgrclose (main);
3856         bt_mgrclose (mgr);
3857
3858         elapsed = getCpuTime(0) - start;
3859         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3860         elapsed = getCpuTime(1);
3861         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3862         elapsed = getCpuTime(2);
3863         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3864 }
3865
3866 #endif  //STANDALONE