]> pd.if.org Git - btree/blob - threadskv10.c
Rewrite re-entrant phase-fair latching, continue LSM btree code
[btree] / threadskv10.c
1 // btree version threadskv10 futex version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      redo log for failure recovery
10 //      and LSM B-trees for write optimization
11
12 // 11 OCT 2014
13
14 // author: karl malbrain, malbrain@cal.berkeley.edu
15
16 /*
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
19
20 The orginal author is Karl Malbrain.
21
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
28 */
29
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
32
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
35
36 #ifdef linux
37 #define _GNU_SOURCE
38 #include <linux/futex.h>
39 #define SYS_futex 202
40 #endif
41
42 #ifdef unix
43 #include <unistd.h>
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <fcntl.h>
47 #include <sys/time.h>
48 #include <sys/mman.h>
49 #include <errno.h>
50 #include <pthread.h>
51 #include <limits.h>
52 #else
53 #define WIN32_LEAN_AND_MEAN
54 #include <windows.h>
55 #include <stdio.h>
56 #include <stdlib.h>
57 #include <time.h>
58 #include <fcntl.h>
59 #include <process.h>
60 #include <intrin.h>
61 #endif
62
63 #include <memory.h>
64 #include <string.h>
65 #include <stddef.h>
66
67 typedef unsigned long long      uid;
68 typedef unsigned long long      logseqno;
69
70 #ifndef unix
71 typedef unsigned long long      off64_t;
72 typedef unsigned short          ushort;
73 typedef unsigned int            uint;
74 #endif
75
76 #define BT_ro 0x6f72    // ro
77 #define BT_rw 0x7772    // rw
78
79 #define BT_maxbits              26                                      // maximum page size in bits
80 #define BT_minbits              9                                       // minimum page size in bits
81 #define BT_minpage              (1 << BT_minbits)       // minimum page size
82 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
83
84 //  BTree page number constants
85 #define ALLOC_page              0       // allocation page
86 #define ROOT_page               1       // root of the btree
87 #define LEAF_page               2       // first page of leaves
88 #define REDO_page               3       // first page of redo buffer
89
90 //      Number of levels to create in a new BTree
91
92 #define MIN_lvl                 2
93
94 /*
95 There are six lock types for each node in four independent sets: 
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
101 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
102 */
103
104 typedef enum{
105         BtLockAccess = 1,
106         BtLockDelete = 2,
107         BtLockRead   = 4,
108         BtLockWrite  = 8,
109         BtLockParent = 16,
110         BtLockAtomic = 32
111 } BtLock;
112
113 typedef struct {
114   union {
115         struct {
116           volatile ushort xlock;        // one writer has exclusive lock
117           volatile ushort wrt;          // count of other writers waiting
118         } bits[1];
119         uint value[1];
120   };
121 } BtMutexLatch;
122
123 #define XCL 1
124 #define WRT 65536
125
126 //      definition for phase-fair reader/writer lock implementation
127
128 typedef struct {
129         volatile ushort rin[1];
130         volatile ushort rout[1];
131         volatile ushort ticket[1];
132         volatile ushort serving[1];
133 } RWLock;
134
135 //      write only lock
136
137 typedef struct {
138         BtMutexLatch xcl[1];
139         volatile ushort tid[1];
140         volatile ushort dup[1];
141 } WOLock;
142
143 #define PHID 0x1
144 #define PRES 0x2
145 #define MASK 0x3
146 #define RINC 0x4
147
148 //      mode & definition for lite latch implementation
149
150 enum {
151         QueRd = 1,              // reader queue
152         QueWr = 2               // writer queue
153 } RWQueue;
154
155 //  hash table entries
156
157 typedef struct {
158         uint entry;             // Latch table entry at head of chain
159         BtMutexLatch latch[1];
160 } BtHashEntry;
161
162 //      latch manager table structure
163
164 typedef struct {
165         uid page_no;                    // latch set page number
166         RWLock readwr[1];               // read/write page lock
167         RWLock access[1];               // Access Intent/Page delete
168         WOLock parent[1];               // Posting of fence key in parent
169         WOLock atomic[1];               // Atomic update in progress
170         uint split;                             // right split page atomic insert
171         uint next;                              // next entry in hash table chain
172         uint prev;                              // prev entry in hash table chain
173         ushort pin;                             // number of accessing threads
174         unsigned char dirty;    // page in cache is dirty (atomic set)
175         BtMutexLatch modify[1]; // modify entry lite latch
176 } BtLatchSet;
177
178 //      Define the length of the page record numbers
179
180 #define BtId 6
181
182 //      Page key slot definition.
183
184 //      Keys are marked dead, but remain on the page until
185 //      it cleanup is called. The fence key (highest key) for
186 //      a leaf page is always present, even after cleanup.
187
188 //      Slot types
189
190 //      In addition to the Unique keys that occupy slots
191 //      there are Librarian and Duplicate key
192 //      slots occupying the key slot array.
193
194 //      The Librarian slots are dead keys that
195 //      serve as filler, available to add new Unique
196 //      or Dup slots that are inserted into the B-tree.
197
198 //      The Duplicate slots have had their key bytes extended
199 //      by 6 bytes to contain a binary duplicate key uniqueifier.
200
201 typedef enum {
202         Unique,
203         Librarian,
204         Duplicate,
205         Delete
206 } BtSlotType;
207
208 typedef struct {
209         uint off:BT_maxbits;    // page offset for key start
210         uint type:3;                    // type of slot
211         uint dead:1;                    // set for deleted slot
212 } BtSlot;
213
214 //      The key structure occupies space at the upper end of
215 //      each page.  It's a length byte followed by the key
216 //      bytes.
217
218 typedef struct {
219         unsigned char len;              // this can be changed to a ushort or uint
220         unsigned char key[0];
221 } BtKey;
222
223 //      the value structure also occupies space at the upper
224 //      end of the page. Each key is immediately followed by a value.
225
226 typedef struct {
227         unsigned char len;              // this can be changed to a ushort or uint
228         unsigned char value[0];
229 } BtVal;
230
231 #define BT_maxkey       255             // maximum number of bytes in a key
232 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
233
234 //      The first part of an index page.
235 //      It is immediately followed
236 //      by the BtSlot array of keys.
237
238 //      note that this structure size
239 //      must be a multiple of 8 bytes
240 //      in order to place PageZero correctly.
241
242 typedef struct BtPage_ {
243         uint cnt;                                       // count of keys in page
244         uint act;                                       // count of active keys
245         uint min;                                       // next key offset
246         uint garbage;                           // page garbage in bytes
247         unsigned char bits:7;           // page size in bits
248         unsigned char free:1;           // page is on free chain
249         unsigned char lvl:7;            // level of page
250         unsigned char kill:1;           // page is being deleted
251         unsigned char right[BtId];      // page number to right
252         unsigned char left[BtId];       // page number to left
253         unsigned char filler[2];        // padding to multiple of 8
254         logseqno lsn;                           // log sequence number applied
255         uid page_no;                            // this page number
256 } *BtPage;
257
258 //  The loadpage interface object
259
260 typedef struct {
261         BtPage page;            // current page pointer
262         BtLatchSet *latch;      // current page latch set
263 } BtPageSet;
264
265 //      structure for latch manager on ALLOC_page
266
267 typedef struct {
268         struct BtPage_ alloc[1];                // next page_no in right ptr
269         unsigned char freechain[BtId];  // head of free page_nos chain
270         unsigned long long activepages; // number of active pages
271         uint redopages;                                 // number of redo pages in file
272 } BtPageZero;
273
274 //      The object structure for Btree access
275
276 typedef struct {
277         uint page_size;                         // page size    
278         uint page_bits;                         // page size in bits    
279 #ifdef unix
280         int idx;
281 #else
282         HANDLE idx;
283 #endif
284         BtPageZero *pagezero;           // mapped allocation page
285         BtHashEntry *hashtable;         // the buffer pool hash table entries
286         BtLatchSet *latchsets;          // mapped latch set from buffer pool
287         unsigned char *pagepool;        // mapped to the buffer pool pages
288         unsigned char *redobuff;        // mapped recovery buffer pointer
289         logseqno lsn, flushlsn;         // current & first lsn flushed
290         BtMutexLatch redo[1];           // redo area lite latch
291         BtMutexLatch lock[1];           // allocation area lite latch
292         BtMutexLatch maps[1];           // mapping segments lite latch
293         ushort thread_no[1];            // next thread number
294         uint nlatchpage;                        // number of latch pages at BT_latch
295         uint latchtotal;                        // number of page latch entries
296         uint latchhash;                         // number of latch hash table slots
297         uint latchvictim;                       // next latch entry to examine
298         uint latchpromote;                      // next latch entry to promote
299         uint redolast;                          // last msync size of recovery buff
300         uint redoend;                           // eof/end element in recovery buff
301         int err;                                        // last error
302         int line;                                       // last error line no
303         int found;                                      // number of keys found by delete
304         int reads, writes;                      // number of reads and writes
305 #ifndef unix
306         HANDLE halloc;                          // allocation handle
307         HANDLE hpool;                           // buffer pool handle
308 #endif
309         uint segments;                          // number of memory mapped segments
310         unsigned char *pages[64000];// memory mapped segments of b-tree
311 } BtMgr;
312
313 typedef struct {
314         BtMgr *mgr;                                     // buffer manager for entire process
315         BtMgr *main;                            // buffer manager for main btree
316         BtPage cursor;                          // cached page frame for start/next
317         ushort thread_no;                       // thread number
318         unsigned char key[BT_keyarray]; // last found complete key
319 } BtDb;
320
321 //      atomic txn structures
322
323 typedef struct {
324         logseqno reqlsn;        // redo log seq no required
325         uint entry;                     // latch table entry number
326         uint slot:31;           // page slot number
327         uint reuse:1;           // reused previous page
328 } AtomicTxn;
329
330 typedef struct {
331         uid page_no;            // page number for split leaf
332         void *next;                     // next key to insert
333         uint entry:29;          // latch table entry number
334         uint type:2;            // 0 == insert, 1 == delete, 2 == free
335         uint nounlock:1;        // don't unlock ParentModification
336         unsigned char leafkey[BT_keyarray];
337 } AtomicKey;
338
339 //      Catastrophic errors
340
341 typedef enum {
342         BTERR_ok = 0,
343         BTERR_struct,
344         BTERR_ovflw,
345         BTERR_lock,
346         BTERR_map,
347         BTERR_read,
348         BTERR_wrt,
349         BTERR_atomic,
350         BTERR_recovery
351 } BTERR;
352
353 #define CLOCK_bit 0x8000
354
355 // recovery manager entry types
356
357 typedef enum {
358         BTRM_eof = 0,   // rest of buffer is emtpy
359         BTRM_add,               // add a unique key-value to btree
360         BTRM_dup,               // add a duplicate key-value to btree
361         BTRM_del,               // delete a key-value from btree
362         BTRM_upd,               // update a key with a new value
363         BTRM_new,               // allocate a new empty page
364         BTRM_old                // reuse an old empty page
365 } BTRM;
366
367 // recovery manager entry
368 //      structure followed by BtKey & BtVal
369
370 typedef struct {
371         logseqno reqlsn;        // log sequence number required
372         logseqno lsn;           // log sequence number for entry
373         uint len;                       // length of entry
374         unsigned char type;     // type of entry
375         unsigned char lvl;      // level of btree entry pertains to
376 } BtLogHdr;
377
378 // B-Tree functions
379
380 extern void bt_close (BtDb *bt);
381 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
382 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
383 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
384 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
385 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
386 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
387 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
388
389 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
390
391 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
392 extern uint bt_nextkey   (BtDb *bt, uint slot);
393 extern uint bt_prevkey (BtDb *db, uint slot);
394 extern uint bt_lastkey (BtDb *db);
395
396 //      manager functions
397 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
398 extern void bt_mgrclose (BtMgr *mgr);
399 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
400 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
401
402 //      atomic transaction functions
403 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
404 BTERR bt_txnpromote (BtDb *bt);
405
406 //  The page is allocated from low and hi ends.
407 //  The key slots are allocated from the bottom,
408 //      while the text and value of the key
409 //  are allocated from the top.  When the two
410 //  areas meet, the page is split into two.
411
412 //  A key consists of a length byte, two bytes of
413 //  index number (0 - 65535), and up to 253 bytes
414 //  of key value.
415
416 //  Associated with each key is a value byte string
417 //      containing any value desired.
418
419 //  The b-tree root is always located at page 1.
420 //      The first leaf page of level zero is always
421 //      located on page 2.
422
423 //      The b-tree pages are linked with next
424 //      pointers to facilitate enumerators,
425 //      and provide for concurrency.
426
427 //      When to root page fills, it is split in two and
428 //      the tree height is raised by a new root at page
429 //      one with two keys.
430
431 //      Deleted keys are marked with a dead bit until
432 //      page cleanup. The fence key for a leaf node is
433 //      always present
434
435 //  To achieve maximum concurrency one page is locked at a time
436 //  as the tree is traversed to find leaf key in question. The right
437 //  page numbers are used in cases where the page is being split,
438 //      or consolidated.
439
440 //  Page 0 is dedicated to lock for new page extensions,
441 //      and chains empty pages together for reuse. It also
442 //      contains the latch manager hash table.
443
444 //      The ParentModification lock on a node is obtained to serialize posting
445 //      or changing the fence key for a node.
446
447 //      Empty pages are chained together through the ALLOC page and reused.
448
449 //      Access macros to address slot and key values from the page
450 //      Page slots use 1 based indexing.
451
452 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
453 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
454 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
455
456 void bt_putid(unsigned char *dest, uid id)
457 {
458 int i = BtId;
459
460         while( i-- )
461                 dest[i] = (unsigned char)id, id >>= 8;
462 }
463
464 uid bt_getid(unsigned char *src)
465 {
466 uid id = 0;
467 int i;
468
469         for( i = 0; i < BtId; i++ )
470                 id <<= 8, id |= *src++; 
471
472         return id;
473 }
474
475 //      lite weight spin lock Latch Manager
476
477 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
478 {
479         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
480 }
481
482 void bt_mutexlock(BtMutexLatch *latch)
483 {
484 BtMutexLatch prev[1];
485 uint slept = 0;
486
487   while( 1 ) {
488         *prev->value = __sync_fetch_and_or(latch->value, XCL);
489
490         if( !prev->bits->xlock ) {                      // did we set XCL?
491             if( slept )
492                   __sync_fetch_and_sub(latch->value, WRT);
493                 return;
494         }
495
496         if( !slept ) {
497                 prev->bits->wrt++;
498                 __sync_fetch_and_add(latch->value, WRT);
499         }
500
501         sys_futex (latch->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr);
502         slept = 1;
503   }
504 }
505
506 //      try to obtain write lock
507
508 //      return 1 if obtained,
509 //              0 otherwise
510
511 int bt_mutextry(BtMutexLatch *latch)
512 {
513 BtMutexLatch prev[1];
514
515         *prev->value = __sync_fetch_and_or(latch->value, XCL);
516
517         //      take write access if exclusive bit was clear
518
519         return !prev->bits->xlock;
520 }
521
522 //      clear write mode
523
524 void bt_releasemutex(BtMutexLatch *latch)
525 {
526 BtMutexLatch prev[1];
527
528         *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
529
530         if( prev->bits->wrt )
531           sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
532 }
533
534 //      Write-Only Queue Lock
535
536 void WriteOLock (WOLock *lock, ushort tid)
537 {
538   while( 1 ) {
539         bt_mutexlock(lock->xcl);
540
541         if( *lock->tid == tid ) {
542                 *lock->dup += 1;
543                 bt_releasemutex(lock->xcl);
544                 return;
545         }
546         if( !*lock->tid ) {
547                 *lock->tid = tid;
548                 bt_releasemutex(lock->xcl);
549                 return;
550         }
551         bt_releasemutex(lock->xcl);
552         sched_yield();
553   }
554 }
555
556 void WriteORelease (WOLock *lock)
557 {
558         if( *lock->dup ) {
559                 *lock->dup -= 1;
560                 return;
561         }
562
563         *lock->tid = 0;
564 }
565
566 //      Phase-Fair reader/writer lock implementation
567
568 void WriteLock (RWLock *lock, ushort tid)
569 {
570 ushort w, r, tix;
571
572 #ifdef unix
573         tix = __sync_fetch_and_add (lock->ticket, 1);
574 #else
575         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
576 #endif
577         // wait for our ticket to come up
578
579         while( tix != lock->serving[0] )
580 #ifdef unix
581                 sched_yield();
582 #else
583                 SwitchToThread ();
584 #endif
585
586         w = PRES | (tix & PHID);
587 #ifdef  unix
588         r = __sync_fetch_and_add (lock->rin, w);
589 #else
590         r = _InterlockedExchangeAdd16 (lock->rin, w);
591 #endif
592         while( r != *lock->rout )
593 #ifdef unix
594                 sched_yield();
595 #else
596                 SwitchToThread();
597 #endif
598 }
599
600 void WriteRelease (RWLock *lock)
601 {
602 #ifdef unix
603         __sync_fetch_and_and (lock->rin, ~MASK);
604 #else
605         _InterlockedAnd16 (lock->rin, ~MASK);
606 #endif
607         lock->serving[0]++;
608 }
609
610 //      try to obtain read lock
611 //  return 1 if successful
612
613 int ReadTry (RWLock *lock, ushort tid)
614 {
615 ushort w;
616
617 #ifdef unix
618         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
619 #else
620         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
621 #endif
622         if( w )
623           if( w == (*lock->rin & MASK) ) {
624 #ifdef unix
625                 __sync_fetch_and_add (lock->rin, -RINC);
626 #else
627                 _InterlockedExchangeAdd16 (lock->rin, -RINC);
628 #endif
629                 return 0;
630           }
631
632         return 1;
633 }
634
635 void ReadLock (RWLock *lock, ushort tid)
636 {
637 ushort w;
638
639 #ifdef unix
640         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
641 #else
642         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
643 #endif
644         if( w )
645           while( w == (*lock->rin & MASK) )
646 #ifdef unix
647                 sched_yield ();
648 #else
649                 SwitchToThread ();
650 #endif
651 }
652
653 void ReadRelease (RWLock *lock)
654 {
655 #ifdef unix
656         __sync_fetch_and_add (lock->rout, RINC);
657 #else
658         _InterlockedExchangeAdd16 (lock->rout, RINC);
659 #endif
660 }
661
662 //      recovery manager -- flush dirty pages
663
664 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
665 {
666 uint cnt3 = 0, cnt2 = 0, cnt = 0;
667 uint entry, segment;
668 BtLatchSet *latch;
669 BtPage page;
670
671   //    flush dirty pool pages to the btree
672
673 fprintf(stderr, "Start flushlsn  ");
674   for( entry = 1; entry < mgr->latchtotal; entry++ ) {
675         page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
676         latch = mgr->latchsets + entry;
677         bt_mutexlock (latch->modify);
678         bt_lockpage(BtLockRead, latch, thread_no);
679
680         if( latch->dirty ) {
681           bt_writepage(mgr, page, latch->page_no);
682           latch->dirty = 0, cnt++;
683         }
684 if( latch->pin & ~CLOCK_bit )
685 cnt2++;
686         bt_unlockpage(BtLockRead, latch);
687         bt_releasemutex (latch->modify);
688   }
689 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
690 fprintf(stderr, "begin sync");
691         for( segment = 0; segment < mgr->segments; segment++ )
692           if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
693                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
694 fprintf(stderr, " end sync\n");
695 }
696
697 //      recovery manager -- process current recovery buff on startup
698 //      this won't do much if previous session was properly closed.
699
700 BTERR bt_recoveryredo (BtMgr *mgr)
701 {
702 BtLogHdr *hdr, *eof;
703 uint offset = 0;
704 BtKey *key;
705 BtVal *val;
706
707   hdr = (BtLogHdr *)mgr->redobuff;
708   mgr->flushlsn = hdr->lsn;
709
710   while( 1 ) {
711         hdr = (BtLogHdr *)(mgr->redobuff + offset);
712         switch( hdr->type ) {
713         case BTRM_eof:
714                 mgr->lsn = hdr->lsn;
715                 return 0;
716         case BTRM_add:          // add a unique key-value to btree
717         
718         case BTRM_dup:          // add a duplicate key-value to btree
719         case BTRM_del:          // delete a key-value from btree
720         case BTRM_upd:          // update a key with a new value
721         case BTRM_new:          // allocate a new empty page
722         case BTRM_old:          // reuse an old empty page
723                 return 0;
724         }
725   }
726 }
727
728 //      recovery manager -- append new entry to recovery log
729 //      flush dirty pages to disk when it overflows.
730
731 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
732 {
733 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
734 uint amt = sizeof(BtLogHdr);
735 BtLogHdr *hdr, *eof;
736 uint last, end;
737
738         bt_mutexlock (mgr->redo);
739
740         if( key )
741           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
742
743         //      see if new entry fits in buffer
744         //      flush and reset if it doesn't
745
746         if( amt > size - mgr->redoend ) {
747           mgr->flushlsn = mgr->lsn;
748           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
749                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
750           mgr->redolast = 0;
751           mgr->redoend = 0;
752           bt_flushlsn(mgr, thread_no);
753         }
754
755         //      fill in new entry & either eof or end block
756
757         hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
758
759         hdr->len = amt;
760         hdr->type = type;
761         hdr->lvl = lvl;
762         hdr->lsn = ++mgr->lsn;
763
764         mgr->redoend += amt;
765
766         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
767         memset (eof, 0, sizeof(BtLogHdr));
768
769         //  fill in key and value
770
771         if( key ) {
772           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
773           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
774         }
775
776         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
777         memset (eof, 0, sizeof(BtLogHdr));
778         eof->lsn = mgr->lsn;
779
780         last = mgr->redolast & ~0xfff;
781         end = mgr->redoend;
782
783         if( end - last + sizeof(BtLogHdr) >= 32768 )
784           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
785                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
786           else
787                 mgr->redolast = end;
788
789         bt_releasemutex(mgr->redo);
790         return hdr->lsn;
791 }
792
793 //      recovery manager -- append transaction to recovery log
794 //      flush dirty pages to disk when it overflows.
795
796 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
797 {
798 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
799 uint amt = 0, src, type;
800 BtLogHdr *hdr, *eof;
801 uint last, end;
802 logseqno lsn;
803 BtKey *key;
804 BtVal *val;
805
806         //      determine amount of redo recovery log space required
807
808         for( src = 0; src++ < source->cnt; ) {
809           key = keyptr(source,src);
810           val = valptr(source,src);
811           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
812           amt += sizeof(BtLogHdr);
813         }
814
815         bt_mutexlock (mgr->redo);
816
817         //      see if new entry fits in buffer
818         //      flush and reset if it doesn't
819
820         if( amt > size - mgr->redoend ) {
821           mgr->flushlsn = mgr->lsn;
822           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
823                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
824           mgr->redolast = 0;
825           mgr->redoend = 0;
826           bt_flushlsn (mgr, thread_no);
827         }
828
829         //      assign new lsn to transaction
830
831         lsn = ++mgr->lsn;
832
833         //      fill in new entries
834
835         for( src = 0; src++ < source->cnt; ) {
836           key = keyptr(source, src);
837           val = valptr(source, src);
838
839           switch( slotptr(source, src)->type ) {
840           case Unique:
841                 type = BTRM_add;
842                 break;
843           case Duplicate:
844                 type = BTRM_dup;
845                 break;
846           case Delete:
847                 type = BTRM_del;
848                 break;
849           }
850
851           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
852           amt += sizeof(BtLogHdr);
853
854           hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
855           hdr->len = amt;
856           hdr->type = type;
857           hdr->lsn = lsn;
858           hdr->lvl = 0;
859
860           //  fill in key and value
861
862           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
863           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
864
865           mgr->redoend += amt;
866         }
867
868         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
869         memset (eof, 0, sizeof(BtLogHdr));
870         eof->lsn = lsn;
871
872         last = mgr->redolast & ~0xfff;
873         end = mgr->redoend;
874
875         if( end - last + sizeof(BtLogHdr) >= 32768 )
876           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
877                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
878           else
879                 mgr->redolast = end;
880
881         bt_releasemutex(mgr->redo);
882         return lsn;
883 }
884
885 //      sync a single btree page to disk
886
887 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
888 {
889 uint segment = latch->page_no >> 16;
890 BtPage perm;
891
892         if( bt_writepage (mgr, page, latch->page_no) )
893                 return mgr->err;
894
895         perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
896
897         if( msync (perm, mgr->page_size, MS_SYNC) < 0 )
898           fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
899
900         latch->dirty = 0;
901         return 0;
902 }
903
904 //      read page into buffer pool from permanent location in Btree file
905
906 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
907 {
908 int flag = PROT_READ | PROT_WRITE;
909 uint segment = page_no >> 16;
910 BtPage perm;
911
912   while( 1 ) {
913         if( segment < mgr->segments ) {
914           perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
915 if( perm->page_no != page_no )
916 abort();
917                 memcpy (page, perm, mgr->page_size);
918                 mgr->reads++;
919                 return 0;
920         }
921
922         bt_mutexlock (mgr->maps);
923
924         if( segment < mgr->segments ) {
925                 bt_releasemutex (mgr->maps);
926                 continue;
927         }
928
929         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
930         mgr->segments++;
931
932         bt_releasemutex (mgr->maps);
933   }
934 }
935
936 //      write page to permanent location in Btree file
937 //      clear the dirty bit
938
939 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
940 {
941 int flag = PROT_READ | PROT_WRITE;
942 uint segment = page_no >> 16;
943 BtPage perm;
944
945   while( 1 ) {
946         if( segment < mgr->segments ) {
947           perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
948 if( page_no > LEAF_page && perm->page_no != page_no)
949 abort();
950                 memcpy (perm, page, mgr->page_size);
951                 mgr->writes++;
952                 return 0;
953         }
954
955         bt_mutexlock (mgr->maps);
956
957         if( segment < mgr->segments ) {
958                 bt_releasemutex (mgr->maps);
959                 continue;
960         }
961
962         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
963         bt_releasemutex (mgr->maps);
964         mgr->segments++;
965   }
966 }
967
968 //      set CLOCK bit in latch
969 //      decrement pin count
970
971 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
972 {
973         bt_mutexlock(latch->modify);
974         latch->pin |= CLOCK_bit;
975         latch->pin--;
976
977         bt_releasemutex(latch->modify);
978 }
979
980 //  return the btree cached page address
981
982 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
983 {
984 uid entry = latch - mgr->latchsets;
985 BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
986
987   return page;
988 }
989
990 //  return next available latch entry
991 //        and with latch entry locked
992
993 uint bt_availnext (BtMgr *mgr)
994 {
995 BtLatchSet *latch;
996 uint entry;
997
998   while( 1 ) {
999 #ifdef unix
1000         entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
1001 #else
1002         entry = _InterlockedIncrement (&mgr->latchvictim);
1003 #endif
1004         entry %= mgr->latchtotal;
1005
1006         if( !entry )
1007                 continue;
1008
1009         latch = mgr->latchsets + entry;
1010
1011         if( !bt_mutextry(latch->modify) )
1012                 continue;
1013
1014         //  return this entry if it is not pinned
1015
1016         if( !latch->pin )
1017                 return entry;
1018
1019         //      if the CLOCK bit is set
1020         //      reset it to zero.
1021
1022         latch->pin &= ~CLOCK_bit;
1023         bt_releasemutex(latch->modify);
1024   }
1025 }
1026
1027 //      pin page in buffer pool
1028 //      return with latchset pinned
1029
1030 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
1031 {
1032 uint hashidx = page_no % mgr->latchhash;
1033 BtLatchSet *latch;
1034 uint entry, idx;
1035 BtPage page;
1036
1037   //  try to find our entry
1038
1039   bt_mutexlock(mgr->hashtable[hashidx].latch);
1040
1041   if( entry = mgr->hashtable[hashidx].entry ) do
1042   {
1043         latch = mgr->latchsets + entry;
1044         if( page_no == latch->page_no )
1045                 break;
1046   } while( entry = latch->next );
1047
1048   //  found our entry: increment pin
1049
1050   if( entry ) {
1051         latch = mgr->latchsets + entry;
1052         bt_mutexlock(latch->modify);
1053         latch->pin |= CLOCK_bit;
1054         latch->pin++;
1055 if(contents)
1056 abort();
1057         bt_releasemutex(latch->modify);
1058         bt_releasemutex(mgr->hashtable[hashidx].latch);
1059         return latch;
1060   }
1061
1062   //  find and reuse unpinned entry
1063
1064 trynext:
1065
1066   entry = bt_availnext (mgr);
1067   latch = mgr->latchsets + entry;
1068
1069   idx = latch->page_no % mgr->latchhash;
1070
1071   //  if latch is on a different hash chain
1072   //    unlink from the old page_no chain
1073
1074   if( latch->page_no )
1075    if( idx != hashidx ) {
1076
1077         //  skip over this entry if latch not available
1078
1079     if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1080           bt_releasemutex(latch->modify);
1081           goto trynext;
1082         }
1083
1084         if( latch->prev )
1085           mgr->latchsets[latch->prev].next = latch->next;
1086         else
1087           mgr->hashtable[idx].entry = latch->next;
1088
1089         if( latch->next )
1090           mgr->latchsets[latch->next].prev = latch->prev;
1091
1092     bt_releasemutex (mgr->hashtable[idx].latch);
1093    }
1094
1095   page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1096
1097   //  update permanent page area in btree from buffer pool
1098   //    no read-lock is required since page is not pinned.
1099
1100   if( latch->dirty )
1101         if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
1102           return mgr->line = __LINE__, NULL;
1103         else
1104           latch->dirty = 0;
1105
1106   if( contents ) {
1107         memcpy (page, contents, mgr->page_size);
1108         latch->dirty = 1;
1109   } else if( bt_readpage (mgr, page, page_no) )
1110         return mgr->line = __LINE__, NULL;
1111
1112   //  link page as head of hash table chain
1113   //  if this is a never before used entry,
1114   //  or it was previously on a different
1115   //  hash table chain. Otherwise, just
1116   //  leave it in its current hash table
1117   //  chain position.
1118
1119   if( !latch->page_no || hashidx != idx ) {
1120         if( latch->next = mgr->hashtable[hashidx].entry )
1121           mgr->latchsets[latch->next].prev = entry;
1122
1123         mgr->hashtable[hashidx].entry = entry;
1124     latch->prev = 0;
1125   }
1126
1127   //  fill in latch structure
1128
1129   latch->pin = CLOCK_bit | 1;
1130   latch->page_no = page_no;
1131   latch->split = 0;
1132
1133   bt_releasemutex (latch->modify);
1134   bt_releasemutex (mgr->hashtable[hashidx].latch);
1135   return latch;
1136 }
1137   
1138 void bt_mgrclose (BtMgr *mgr)
1139 {
1140 BtLatchSet *latch;
1141 BtLogHdr *eof;
1142 uint num = 0;
1143 BtPage page;
1144 uint slot;
1145
1146         //      flush previously written dirty pages
1147         //      and write recovery buffer to disk
1148
1149         fdatasync (mgr->idx);
1150
1151         if( mgr->redoend ) {
1152                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1153                 memset (eof, 0, sizeof(BtLogHdr));
1154         }
1155
1156         //      write remaining dirty pool pages to the btree
1157
1158         for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1159                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1160                 latch = mgr->latchsets + slot;
1161
1162                 if( latch->dirty ) {
1163                         bt_writepage(mgr, page, latch->page_no);
1164                         latch->dirty = 0, num++;
1165                 }
1166         }
1167
1168         //      clear redo recovery buffer on disk.
1169
1170         if( mgr->pagezero->redopages ) {
1171                 eof = (BtLogHdr *)mgr->redobuff;
1172                 memset (eof, 0, sizeof(BtLogHdr));
1173                 eof->lsn = mgr->lsn;
1174                 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1175                   fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1176         }
1177
1178         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1179
1180 #ifdef unix
1181         while( mgr->segments )
1182                 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1183
1184         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1185         munmap (mgr->pagezero, mgr->page_size);
1186 #else
1187         FlushViewOfFile(mgr->pagezero, 0);
1188         UnmapViewOfFile(mgr->pagezero);
1189         UnmapViewOfFile(mgr->pagepool);
1190         CloseHandle(mgr->halloc);
1191         CloseHandle(mgr->hpool);
1192 #endif
1193 #ifdef unix
1194         close (mgr->idx);
1195         free (mgr);
1196 #else
1197         VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1198         FlushFileBuffers(mgr->idx);
1199         CloseHandle(mgr->idx);
1200         GlobalFree (mgr);
1201 #endif
1202 }
1203
1204 //      close and release memory
1205
1206 void bt_close (BtDb *bt)
1207 {
1208 #ifdef unix
1209         if( bt->cursor )
1210                 free (bt->cursor);
1211 #else
1212         if( bt->cursor)
1213                 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1214 #endif
1215         free (bt);
1216 }
1217
1218 //  open/create new btree buffer manager
1219
1220 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1221 //              size of page pool (e.g. 262144)
1222
1223 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1224 {
1225 uint lvl, attr, last, slot, idx;
1226 uint nlatchpage, latchhash;
1227 unsigned char value[BtId];
1228 int flag, initit = 0;
1229 BtPageZero *pagezero;
1230 BtLatchSet *latch;
1231 off64_t size;
1232 uint amt[1];
1233 BtMgr* mgr;
1234 BtKey* key;
1235 BtVal *val;
1236
1237         // determine sanity of page size and buffer pool
1238
1239         if( bits > BT_maxbits )
1240                 bits = BT_maxbits;
1241         else if( bits < BT_minbits )
1242                 bits = BT_minbits;
1243 #ifdef unix
1244         mgr = calloc (1, sizeof(BtMgr));
1245
1246         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1247
1248         if( mgr->idx == -1 ) {
1249                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1250                 return free(mgr), NULL;
1251         }
1252 #else
1253         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1254         attr = FILE_ATTRIBUTE_NORMAL;
1255         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1256
1257         if( mgr->idx == INVALID_HANDLE_VALUE ) {
1258                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1259                 return GlobalFree(mgr), NULL;
1260         }
1261 #endif
1262
1263 #ifdef unix
1264         pagezero = valloc (BT_maxpage);
1265         *amt = 0;
1266
1267         // read minimum page size to get root info
1268         //      to support raw disk partition files
1269         //      check if bits == 0 on the disk.
1270
1271         if( size = lseek (mgr->idx, 0L, 2) )
1272                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1273                         if( pagezero->alloc->bits )
1274                                 bits = pagezero->alloc->bits;
1275                         else
1276                                 initit = 1;
1277                 else
1278                         return free(mgr), free(pagezero), NULL;
1279         else
1280                 initit = 1;
1281 #else
1282         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1283         size = GetFileSize(mgr->idx, amt);
1284
1285         if( size || *amt ) {
1286                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1287                         return bt_mgrclose (mgr), NULL;
1288                 bits = pagezero->alloc->bits;
1289         } else
1290                 initit = 1;
1291 #endif
1292
1293         mgr->page_size = 1 << bits;
1294         mgr->page_bits = bits;
1295
1296         //  calculate number of latch hash table entries
1297
1298         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1299
1300         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1301         mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1302         mgr->latchtotal = nodemax;
1303
1304         if( !initit )
1305                 goto mgrlatch;
1306
1307         // initialize an empty b-tree with latch page, root page, page of leaves
1308         // and page(s) of latches and page pool cache
1309
1310         memset (pagezero, 0, 1 << bits);
1311         pagezero->alloc->lvl = MIN_lvl - 1;
1312         pagezero->alloc->bits = mgr->page_bits;
1313         pagezero->redopages = redopages;
1314
1315         bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1);
1316         pagezero->activepages = 2;
1317
1318         //  initialize left-most LEAF page in
1319         //      alloc->left and count of active leaf pages.
1320
1321         bt_putid (pagezero->alloc->left, LEAF_page);
1322         ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits);
1323
1324         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1325                 fprintf (stderr, "Unable to create btree page zero\n");
1326                 return bt_mgrclose (mgr), NULL;
1327         }
1328
1329         memset (pagezero, 0, 1 << bits);
1330         pagezero->alloc->bits = mgr->page_bits;
1331
1332         for( lvl=MIN_lvl; lvl--; ) {
1333         BtSlot *node = slotptr(pagezero->alloc, 1);
1334                 node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1335                 key = keyptr(pagezero->alloc, 1);
1336                 key->len = 2;           // create stopper key
1337                 key->key[0] = 0xff;
1338                 key->key[1] = 0xff;
1339
1340                 bt_putid(value, MIN_lvl - lvl + 1);
1341                 val = valptr(pagezero->alloc, 1);
1342                 val->len = lvl ? BtId : 0;
1343                 memcpy (val->value, value, val->len);
1344
1345                 pagezero->alloc->min = node->off;
1346                 pagezero->alloc->lvl = lvl;
1347                 pagezero->alloc->cnt = 1;
1348                 pagezero->alloc->act = 1;
1349                 pagezero->alloc->page_no = MIN_lvl - lvl;
1350
1351                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1352                         fprintf (stderr, "Unable to create btree page\n");
1353                         return bt_mgrclose (mgr), NULL;
1354                 }
1355         }
1356
1357 mgrlatch:
1358 #ifdef unix
1359         free (pagezero);
1360 #else
1361         VirtualFree (pagezero, 0, MEM_RELEASE);
1362 #endif
1363 #ifdef unix
1364         // mlock the first segment of 64K pages
1365
1366         flag = PROT_READ | PROT_WRITE;
1367         mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1368         mgr->segments = 1;
1369
1370         if( mgr->pages[0] == MAP_FAILED ) {
1371                 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1372                 return bt_mgrclose (mgr), NULL;
1373         }
1374
1375         mgr->pagezero = (BtPageZero *)mgr->pages[0];
1376         mlock (mgr->pagezero, mgr->page_size);
1377
1378         mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size;
1379         mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1380
1381         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1382         if( mgr->pagepool == MAP_FAILED ) {
1383                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1384                 return bt_mgrclose (mgr), NULL;
1385         }
1386 #else
1387         flag = PAGE_READWRITE;
1388         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1389         if( !mgr->halloc ) {
1390                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1391                 return bt_mgrclose (mgr), NULL;
1392         }
1393
1394         flag = FILE_MAP_WRITE;
1395         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1396         if( !mgr->pagezero ) {
1397                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1398                 return bt_mgrclose (mgr), NULL;
1399         }
1400
1401         flag = PAGE_READWRITE;
1402         size = (uid)mgr->nlatchpage << mgr->page_bits;
1403         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1404         if( !mgr->hpool ) {
1405                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1406                 return bt_mgrclose (mgr), NULL;
1407         }
1408
1409         flag = FILE_MAP_WRITE;
1410         mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1411         if( !mgr->pagepool ) {
1412                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1413                 return bt_mgrclose (mgr), NULL;
1414         }
1415 #endif
1416
1417         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1418         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1419         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1420
1421         return mgr;
1422 }
1423
1424 //      open BTree access method
1425 //      based on buffer manager
1426
1427 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1428 {
1429 BtDb *bt = malloc (sizeof(*bt));
1430
1431         memset (bt, 0, sizeof(*bt));
1432         bt->main = main;
1433         bt->mgr = mgr;
1434 #ifdef unix
1435         bt->cursor = valloc (mgr->page_size);
1436 #else
1437         bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1438 #endif
1439 #ifdef unix
1440         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1441 #else
1442         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1443 #endif
1444         return bt;
1445 }
1446
1447 //  compare two keys, return > 0, = 0, or < 0
1448 //  =0: keys are same
1449 //  -1: key2 > key1
1450 //  +1: key2 < key1
1451 //  as the comparison value
1452
1453 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1454 {
1455 uint len1 = key1->len;
1456 int ans;
1457
1458         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1459                 return ans;
1460
1461         if( len1 > len2 )
1462                 return 1;
1463         if( len1 < len2 )
1464                 return -1;
1465
1466         return 0;
1467 }
1468
1469 // place write, read, or parent lock on requested page_no.
1470
1471 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1472 {
1473         switch( mode ) {
1474         case BtLockRead:
1475                 ReadLock (latch->readwr, thread_no);
1476                 break;
1477         case BtLockWrite:
1478                 WriteLock (latch->readwr, thread_no);
1479                 break;
1480         case BtLockAccess:
1481                 ReadLock (latch->access, thread_no);
1482                 break;
1483         case BtLockDelete:
1484                 WriteLock (latch->access, thread_no);
1485                 break;
1486         case BtLockParent:
1487                 WriteOLock (latch->parent, thread_no);
1488                 break;
1489         case BtLockAtomic:
1490                 WriteOLock (latch->atomic, thread_no);
1491                 break;
1492         case BtLockAtomic | BtLockRead:
1493                 WriteOLock (latch->atomic, thread_no);
1494                 ReadLock (latch->readwr, thread_no);
1495                 break;
1496         }
1497 }
1498
1499 // remove write, read, or parent lock on requested page
1500
1501 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1502 {
1503         switch( mode ) {
1504         case BtLockRead:
1505                 ReadRelease (latch->readwr);
1506                 break;
1507         case BtLockWrite:
1508                 WriteRelease (latch->readwr);
1509                 break;
1510         case BtLockAccess:
1511                 ReadRelease (latch->access);
1512                 break;
1513         case BtLockDelete:
1514                 WriteRelease (latch->access);
1515                 break;
1516         case BtLockParent:
1517                 WriteORelease (latch->parent);
1518                 break;
1519         case BtLockAtomic:
1520                 WriteORelease (latch->atomic);
1521                 break;
1522         case BtLockAtomic | BtLockRead:
1523                 WriteORelease (latch->atomic);
1524                 ReadRelease (latch->readwr);
1525                 break;
1526         }
1527 }
1528
1529 //      allocate a new page
1530 //      return with page latched, but unlocked.
1531
1532 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1533 {
1534 uid page_no;
1535 int blk;
1536
1537         //      lock allocation page
1538
1539         bt_mutexlock(mgr->lock);
1540
1541         // use empty chain first
1542         // else allocate empty page
1543
1544         if( page_no = bt_getid(mgr->pagezero->freechain) ) {
1545                 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1546                         set->page = bt_mappage (mgr, set->latch);
1547                 else
1548                         return mgr->line = __LINE__, mgr->err = BTERR_struct;
1549
1550                 mgr->pagezero->activepages++;
1551                 bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
1552                 if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1553                   fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1554
1555                 bt_releasemutex(mgr->lock);
1556
1557                 contents->page_no = page_no;
1558                 memcpy (set->page, contents, mgr->page_size);
1559                 set->latch->dirty = 1;
1560                 return 0;
1561         }
1562
1563         page_no = bt_getid(mgr->pagezero->alloc->right);
1564         bt_putid(mgr->pagezero->alloc->right, page_no+1);
1565
1566         // unlock allocation latch and
1567         //      extend file into new page.
1568
1569         mgr->pagezero->activepages++;
1570         if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1571           fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1572         bt_releasemutex(mgr->lock);
1573
1574         contents->page_no = page_no;
1575         pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
1576
1577         //      don't load cache from btree page, load it from contents
1578
1579         if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1580                 set->page = bt_mappage (mgr, set->latch);
1581         else
1582                 return mgr->err;
1583
1584         return 0;
1585 }
1586
1587 //  find slot in page for given key at a given level
1588
1589 int bt_findslot (BtPage page, unsigned char *key, uint len)
1590 {
1591 uint diff, higher = page->cnt, low = 1, slot;
1592 uint good = 0;
1593
1594         //        make stopper key an infinite fence value
1595
1596         if( bt_getid (page->right) )
1597                 higher++;
1598         else
1599                 good++;
1600
1601         //      low is the lowest candidate.
1602         //  loop ends when they meet
1603
1604         //  higher is already
1605         //      tested as .ge. the passed key.
1606
1607         while( diff = higher - low ) {
1608                 slot = low + ( diff >> 1 );
1609                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1610                         low = slot + 1;
1611                 else
1612                         higher = slot, good++;
1613         }
1614
1615         //      return zero if key is on right link page
1616
1617         return good ? higher : 0;
1618 }
1619
1620 //  find and load page at given level for given key
1621 //      leave page rd or wr locked as requested
1622
1623 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1624 {
1625 uid page_no = ROOT_page, prevpage = 0;
1626 uint drill = 0xff, slot;
1627 BtLatchSet *prevlatch;
1628 uint mode, prevmode;
1629 BtVal *val;
1630
1631   //  start at root of btree and drill down
1632
1633   do {
1634         // determine lock mode of drill level
1635         mode = (drill == lvl) ? lock : BtLockRead; 
1636
1637         if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1638           return 0;
1639
1640         // obtain access lock using lock chaining with Access mode
1641
1642         if( page_no > ROOT_page )
1643           bt_lockpage(BtLockAccess, set->latch, thread_no);
1644
1645         set->page = bt_mappage (mgr, set->latch);
1646
1647         //      release & unpin parent or left sibling page
1648
1649         if( prevpage ) {
1650           bt_unlockpage(prevmode, prevlatch);
1651           bt_unpinlatch (mgr, prevlatch);
1652           prevpage = 0;
1653         }
1654
1655         // obtain mode lock using lock chaining through AccessLock
1656
1657         bt_lockpage(mode, set->latch, thread_no);
1658
1659         if( set->page->free )
1660                 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1661
1662         if( page_no > ROOT_page )
1663           bt_unlockpage(BtLockAccess, set->latch);
1664
1665         // re-read and re-lock root after determining actual level of root
1666
1667         if( set->page->lvl != drill) {
1668                 if( set->latch->page_no != ROOT_page )
1669                         return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1670                         
1671                 drill = set->page->lvl;
1672
1673                 if( lock != BtLockRead && drill == lvl ) {
1674                   bt_unlockpage(mode, set->latch);
1675                   bt_unpinlatch (mgr, set->latch);
1676                   continue;
1677                 }
1678         }
1679
1680         prevpage = set->latch->page_no;
1681         prevlatch = set->latch;
1682         prevmode = mode;
1683
1684         //  find key on page at this level
1685         //  and descend to requested level
1686
1687         if( !set->page->kill )
1688          if( slot = bt_findslot (set->page, key, len) ) {
1689           if( drill == lvl )
1690                 return slot;
1691
1692           // find next non-dead slot -- the fence key if nothing else
1693
1694           while( slotptr(set->page, slot)->dead )
1695                 if( slot++ < set->page->cnt )
1696                   continue;
1697                 else
1698                   return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1699
1700           val = valptr(set->page, slot);
1701
1702           if( val->len == BtId )
1703                 page_no = bt_getid(valptr(set->page, slot)->value);
1704           else
1705                 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1706
1707           drill--;
1708           continue;
1709          }
1710
1711         //  slide right into next page
1712
1713         page_no = bt_getid(set->page->right);
1714   } while( page_no );
1715
1716   // return error on end of right chain
1717
1718   mgr->line = __LINE__, mgr->err = BTERR_struct;
1719   return 0;     // return error
1720 }
1721
1722 //      return page to free list
1723 //      page must be delete & write locked
1724
1725 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1726 {
1727         //      lock allocation page
1728
1729         bt_mutexlock (mgr->lock);
1730
1731         //      store chain
1732
1733         memcpy(set->page->right, mgr->pagezero->freechain, BtId);
1734         bt_putid(mgr->pagezero->freechain, set->latch->page_no);
1735         set->latch->dirty = 1;
1736         set->page->free = 1;
1737
1738         // decrement active page count
1739
1740         mgr->pagezero->activepages--;
1741         if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1742           fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1743
1744         // unlock released page
1745         // and unlock allocation page
1746
1747         bt_unlockpage (BtLockDelete, set->latch);
1748         bt_unlockpage (BtLockWrite, set->latch);
1749         bt_unpinlatch (mgr, set->latch);
1750         bt_releasemutex (mgr->lock);
1751 }
1752
1753 //      a fence key was deleted from a page
1754 //      push new fence value upwards
1755
1756 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1757 {
1758 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1759 unsigned char value[BtId];
1760 BtKey* ptr;
1761 uint idx;
1762
1763         //      remove the old fence value
1764
1765         ptr = keyptr(set->page, set->page->cnt);
1766         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1767         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1768         set->latch->dirty = 1;
1769
1770         //  cache new fence value
1771
1772         ptr = keyptr(set->page, set->page->cnt);
1773         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1774
1775         bt_lockpage (BtLockParent, set->latch, thread_no);
1776         bt_unlockpage (BtLockWrite, set->latch);
1777
1778         //      insert new (now smaller) fence key
1779
1780         bt_putid (value, set->latch->page_no);
1781         ptr = (BtKey*)leftkey;
1782
1783         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1784           return mgr->err;
1785
1786         //      now delete old fence key
1787
1788         ptr = (BtKey*)rightkey;
1789
1790         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1791                 return mgr->err;
1792
1793         bt_unlockpage (BtLockParent, set->latch);
1794         bt_unpinlatch(mgr, set->latch);
1795         return 0;
1796 }
1797
1798 //      root has a single child
1799 //      collapse a level from the tree
1800
1801 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1802 {
1803 BtPageSet child[1];
1804 uid page_no;
1805 BtVal *val;
1806 uint idx;
1807
1808   // find the child entry and promote as new root contents
1809
1810   do {
1811         for( idx = 0; idx++ < root->page->cnt; )
1812           if( !slotptr(root->page, idx)->dead )
1813                 break;
1814
1815         val = valptr(root->page, idx);
1816
1817         if( val->len == BtId )
1818                 page_no = bt_getid (valptr(root->page, idx)->value);
1819         else
1820                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1821
1822         if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1823                 child->page = bt_mappage (mgr, child->latch);
1824         else
1825                 return mgr->err;
1826
1827         bt_lockpage (BtLockDelete, child->latch, thread_no);
1828         bt_lockpage (BtLockWrite, child->latch, thread_no);
1829
1830         memcpy (root->page, child->page, mgr->page_size);
1831         root->latch->dirty = 1;
1832
1833         bt_freepage (mgr, child);
1834
1835   } while( root->page->lvl > 1 && root->page->act == 1 );
1836
1837   bt_unlockpage (BtLockWrite, root->latch);
1838   bt_unpinlatch (mgr, root->latch);
1839   return 0;
1840 }
1841
1842 //  delete a page and manage keys
1843 //  call with page writelocked
1844
1845 //      returns with page removed
1846 //      from the page pool.
1847
1848 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1849 {
1850 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1851 unsigned char value[BtId];
1852 uint lvl = set->page->lvl;
1853 BtPageSet right[1];
1854 uid page_no;
1855 BtKey *ptr;
1856
1857
1858         //      cache copy of fence key
1859         //      to remove in parent
1860
1861         ptr = keyptr(set->page, set->page->cnt);
1862         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1863
1864         //      obtain lock on right page
1865
1866         page_no = bt_getid(set->page->right);
1867
1868         if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1869                 right->page = bt_mappage (mgr, right->latch);
1870         else
1871                 return 0;
1872
1873         bt_lockpage (BtLockWrite, right->latch, thread_no);
1874
1875         // cache copy of key to update
1876
1877         ptr = keyptr(right->page, right->page->cnt);
1878         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1879
1880         if( right->page->kill )
1881                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1882
1883         // pull contents of right peer into our empty page
1884
1885         memcpy (set->page, right->page, mgr->page_size);
1886         set->page->page_no = set->latch->page_no;
1887         set->latch->dirty = 1;
1888
1889         // mark right page deleted and point it to left page
1890         //      until we can post parent updates that remove access
1891         //      to the deleted page.
1892
1893         bt_putid (right->page->right, set->latch->page_no);
1894         right->latch->dirty = 1;
1895         right->page->kill = 1;
1896
1897         bt_lockpage (BtLockParent, right->latch, thread_no);
1898         bt_unlockpage (BtLockWrite, right->latch);
1899
1900         bt_lockpage (BtLockParent, set->latch, thread_no);
1901         bt_unlockpage (BtLockWrite, set->latch);
1902
1903         // redirect higher key directly to our new node contents
1904
1905         bt_putid (value, set->latch->page_no);
1906         ptr = (BtKey*)higherfence;
1907
1908         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1909           return mgr->err;
1910
1911         //      delete old lower key to our node
1912
1913         ptr = (BtKey*)lowerfence;
1914
1915         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1916           return mgr->err;
1917
1918         //      obtain delete and write locks to right node
1919
1920         bt_unlockpage (BtLockParent, right->latch);
1921         bt_lockpage (BtLockDelete, right->latch, thread_no);
1922         bt_lockpage (BtLockWrite, right->latch, thread_no);
1923         bt_freepage (mgr, right);
1924
1925         bt_unlockpage (BtLockParent, set->latch);
1926         bt_unpinlatch (mgr, set->latch);
1927         return 0;
1928 }
1929
1930 //  find and delete key on page by marking delete flag bit
1931 //  if page becomes empty, delete it from the btree
1932
1933 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
1934 {
1935 uint slot, idx, found, fence;
1936 BtPageSet set[1];
1937 BtSlot *node;
1938 BtKey *ptr;
1939 BtVal *val;
1940
1941         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
1942                 node = slotptr(set->page, slot);
1943                 ptr = keyptr(set->page, slot);
1944         } else
1945                 return mgr->err;
1946
1947         // if librarian slot, advance to real slot
1948
1949         if( node->type == Librarian ) {
1950                 ptr = keyptr(set->page, ++slot);
1951                 node = slotptr(set->page, slot);
1952         }
1953
1954         fence = slot == set->page->cnt;
1955
1956         // delete the key, ignore request if already dead
1957
1958         if( found = !keycmp (ptr, key, len) )
1959           if( found = node->dead == 0 ) {
1960                 val = valptr(set->page,slot);
1961                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1962                 set->page->act--;
1963
1964                 //  mark node type as delete
1965
1966                 node->type = Delete;
1967                 node->dead = 1;
1968
1969                 // collapse empty slots beneath the fence
1970                 // on interiour nodes
1971
1972                 if( lvl )
1973                  while( idx = set->page->cnt - 1 )
1974                   if( slotptr(set->page, idx)->dead ) {
1975                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1976                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1977                   } else
1978                         break;
1979           }
1980
1981         //      did we delete a fence key in an upper level?
1982
1983         if( found && lvl && set->page->act && fence )
1984           if( bt_fixfence (mgr, set, lvl, thread_no) )
1985                 return mgr->err;
1986           else
1987                 return 0;
1988
1989         //      do we need to collapse root?
1990
1991         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1992           if( bt_collapseroot (mgr, set, thread_no) )
1993                 return mgr->err;
1994           else
1995                 return 0;
1996
1997         //      delete empty page
1998
1999         if( !set->page->act ) {
2000           if( bt_deletepage (mgr, set, thread_no) )
2001                 return mgr->err;
2002         } else {
2003           set->latch->dirty = 1;
2004           bt_unlockpage(BtLockWrite, set->latch);
2005           bt_unpinlatch (mgr, set->latch);
2006         }
2007
2008         return 0;
2009 }
2010
2011 //      advance to next slot
2012
2013 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2014 {
2015 BtLatchSet *prevlatch;
2016 uid page_no;
2017
2018         if( slot < set->page->cnt )
2019                 return slot + 1;
2020
2021         prevlatch = set->latch;
2022
2023         if( page_no = bt_getid(set->page->right) )
2024           if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2025                 set->page = bt_mappage (bt->mgr, set->latch);
2026           else
2027                 return 0;
2028         else
2029           return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2030
2031         // obtain access lock using lock chaining with Access mode
2032
2033         bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2034
2035         bt_unlockpage(BtLockRead, prevlatch);
2036         bt_unpinlatch (bt->mgr, prevlatch);
2037
2038         bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2039         bt_unlockpage(BtLockAccess, set->latch);
2040         return 1;
2041 }
2042
2043 //      find unique key == given key, or first duplicate key in
2044 //      leaf level and return number of value bytes
2045 //      or (-1) if not found.
2046
2047 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2048 {
2049 BtPageSet set[1];
2050 uint len, slot;
2051 int ret = -1;
2052 BtKey *ptr;
2053 BtVal *val;
2054
2055   if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2056    do {
2057         ptr = keyptr(set->page, slot);
2058
2059         //      skip librarian slot place holder
2060
2061         if( slotptr(set->page, slot)->type == Librarian )
2062                 ptr = keyptr(set->page, ++slot);
2063
2064         //      return actual key found
2065
2066         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2067         len = ptr->len;
2068
2069         if( slotptr(set->page, slot)->type == Duplicate )
2070                 len -= BtId;
2071
2072         //      not there if we reach the stopper key
2073
2074         if( slot == set->page->cnt )
2075           if( !bt_getid (set->page->right) )
2076                 break;
2077
2078         // if key exists, return >= 0 value bytes copied
2079         //      otherwise return (-1)
2080
2081         if( slotptr(set->page, slot)->dead )
2082                 continue;
2083
2084         if( keylen == len )
2085           if( !memcmp (ptr->key, key, len) ) {
2086                 val = valptr (set->page,slot);
2087                 if( valmax > val->len )
2088                         valmax = val->len;
2089                 memcpy (value, val->value, valmax);
2090                 ret = valmax;
2091           }
2092
2093         break;
2094
2095    } while( slot = bt_findnext (bt, set, slot) );
2096
2097   bt_unlockpage (BtLockRead, set->latch);
2098   bt_unpinlatch (bt->mgr, set->latch);
2099   return ret;
2100 }
2101
2102 //      check page for space available,
2103 //      clean if necessary and return
2104 //      0 - page needs splitting
2105 //      >0  new slot value
2106
2107 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2108 {
2109 BtPage page = set->page, frame;
2110 uint nxt = mgr->page_size;
2111 uint cnt = 0, idx = 0;
2112 uint max = page->cnt;
2113 uint newslot = max;
2114 BtKey *key;
2115 BtVal *val;
2116
2117         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2118                 return slot;
2119
2120         //      skip cleanup and proceed to split
2121         //      if there's not enough garbage
2122         //      to bother with.
2123
2124         if( page->garbage < nxt / 5 )
2125                 return 0;
2126
2127         frame = malloc (mgr->page_size);
2128         memcpy (frame, page, mgr->page_size);
2129
2130         // skip page info and set rest of page to zero
2131
2132         memset (page+1, 0, mgr->page_size - sizeof(*page));
2133         set->latch->dirty = 1;
2134
2135         page->garbage = 0;
2136         page->act = 0;
2137
2138         // clean up page first by
2139         // removing deleted keys
2140
2141         while( cnt++ < max ) {
2142                 if( cnt == slot )
2143                         newslot = idx + 2;
2144
2145                 if( cnt < max || frame->lvl )
2146                   if( slotptr(frame,cnt)->dead )
2147                         continue;
2148
2149                 // copy the value across
2150
2151                 val = valptr(frame, cnt);
2152                 nxt -= val->len + sizeof(BtVal);
2153                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2154
2155                 // copy the key across
2156
2157                 key = keyptr(frame, cnt);
2158                 nxt -= key->len + sizeof(BtKey);
2159                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2160
2161                 // make a librarian slot
2162
2163                 slotptr(page, ++idx)->off = nxt;
2164                 slotptr(page, idx)->type = Librarian;
2165                 slotptr(page, idx)->dead = 1;
2166
2167                 // set up the slot
2168
2169                 slotptr(page, ++idx)->off = nxt;
2170                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2171
2172                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2173                         page->act++;
2174         }
2175
2176         page->min = nxt;
2177         page->cnt = idx;
2178         free (frame);
2179
2180         //      see if page has enough space now, or does it need splitting?
2181
2182         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2183                 return newslot;
2184
2185         return 0;
2186 }
2187
2188 // split the root and raise the height of the btree
2189
2190 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2191 {  
2192 unsigned char leftkey[BT_keyarray];
2193 uint nxt = mgr->page_size;
2194 unsigned char value[BtId];
2195 BtPageSet left[1];
2196 uid left_page_no;
2197 BtPage frame;
2198 BtKey *ptr;
2199 BtVal *val;
2200
2201         frame = malloc (mgr->page_size);
2202         memcpy (frame, root->page, mgr->page_size);
2203
2204         //      save left page fence key for new root
2205
2206         ptr = keyptr(root->page, root->page->cnt);
2207         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2208
2209         //  Obtain an empty page to use, and copy the current
2210         //  root contents into it, e.g. lower keys
2211
2212         if( bt_newpage(mgr, left, frame, page_no) )
2213                 return mgr->err;
2214
2215         left_page_no = left->latch->page_no;
2216         bt_unpinlatch (mgr, left->latch);
2217         free (frame);
2218
2219         // preserve the page info at the bottom
2220         // of higher keys and set rest to zero
2221
2222         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2223
2224         // insert stopper key at top of newroot page
2225         // and increase the root height
2226
2227         nxt -= BtId + sizeof(BtVal);
2228         bt_putid (value, right->page_no);
2229         val = (BtVal *)((unsigned char *)root->page + nxt);
2230         memcpy (val->value, value, BtId);
2231         val->len = BtId;
2232
2233         nxt -= 2 + sizeof(BtKey);
2234         slotptr(root->page, 2)->off = nxt;
2235         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2236         ptr->len = 2;
2237         ptr->key[0] = 0xff;
2238         ptr->key[1] = 0xff;
2239
2240         // insert lower keys page fence key on newroot page as first key
2241
2242         nxt -= BtId + sizeof(BtVal);
2243         bt_putid (value, left_page_no);
2244         val = (BtVal *)((unsigned char *)root->page + nxt);
2245         memcpy (val->value, value, BtId);
2246         val->len = BtId;
2247
2248         ptr = (BtKey *)leftkey;
2249         nxt -= ptr->len + sizeof(BtKey);
2250         slotptr(root->page, 1)->off = nxt;
2251         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2252         
2253         bt_putid(root->page->right, 0);
2254         root->page->min = nxt;          // reset lowest used offset and key count
2255         root->page->cnt = 2;
2256         root->page->act = 2;
2257         root->page->lvl++;
2258
2259         mgr->pagezero->alloc->lvl = root->page->lvl;
2260
2261         // release and unpin root pages
2262
2263         bt_unlockpage(BtLockWrite, root->latch);
2264         bt_unpinlatch (mgr, root->latch);
2265
2266         bt_unpinlatch (mgr, right);
2267         return 0;
2268 }
2269
2270 //  split already locked full node
2271 //      leave it locked.
2272 //      return pool entry for new right
2273 //      page, unlocked
2274
2275 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2276 {
2277 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2278 BtPage frame = malloc (mgr->page_size);
2279 uint lvl = set->page->lvl;
2280 BtPageSet right[1];
2281 BtKey *key, *ptr;
2282 BtVal *val, *src;
2283 uid right2;
2284 uint prev;
2285
2286         //  split higher half of keys to frame
2287
2288         memset (frame, 0, mgr->page_size);
2289         max = set->page->cnt;
2290         cnt = max / 2;
2291         idx = 0;
2292
2293         while( cnt++ < max ) {
2294                 if( cnt < max || set->page->lvl )
2295                   if( slotptr(set->page, cnt)->dead )
2296                         continue;
2297
2298                 src = valptr(set->page, cnt);
2299                 nxt -= src->len + sizeof(BtVal);
2300                 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2301
2302                 key = keyptr(set->page, cnt);
2303                 nxt -= key->len + sizeof(BtKey);
2304                 ptr = (BtKey*)((unsigned char *)frame + nxt);
2305                 memcpy (ptr, key, key->len + sizeof(BtKey));
2306
2307                 //      add librarian slot
2308
2309                 slotptr(frame, ++idx)->off = nxt;
2310                 slotptr(frame, idx)->type = Librarian;
2311                 slotptr(frame, idx)->dead = 1;
2312
2313                 //  add actual slot
2314
2315                 slotptr(frame, ++idx)->off = nxt;
2316                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2317
2318                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2319                         frame->act++;
2320         }
2321
2322         frame->bits = mgr->page_bits;
2323         frame->min = nxt;
2324         frame->cnt = idx;
2325         frame->lvl = lvl;
2326
2327         // link right node
2328
2329         if( set->latch->page_no > ROOT_page )
2330                 bt_putid (frame->right, bt_getid (set->page->right));
2331
2332         //      get new free page and write higher keys to it.
2333
2334         if( bt_newpage(mgr, right, frame, thread_no) )
2335                 return 0;
2336
2337         // process lower keys
2338
2339         memcpy (frame, set->page, mgr->page_size);
2340         memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2341         set->latch->dirty = 1;
2342
2343         nxt = mgr->page_size;
2344         set->page->garbage = 0;
2345         set->page->act = 0;
2346         max /= 2;
2347         cnt = 0;
2348         idx = 0;
2349
2350         if( slotptr(frame, max)->type == Librarian )
2351                 max--;
2352
2353         //  assemble page of smaller keys
2354
2355         while( cnt++ < max ) {
2356                 if( slotptr(frame, cnt)->dead )
2357                         continue;
2358                 val = valptr(frame, cnt);
2359                 nxt -= val->len + sizeof(BtVal);
2360                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2361
2362                 key = keyptr(frame, cnt);
2363                 nxt -= key->len + sizeof(BtKey);
2364                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2365
2366                 //      add librarian slot
2367
2368                 slotptr(set->page, ++idx)->off = nxt;
2369                 slotptr(set->page, idx)->type = Librarian;
2370                 slotptr(set->page, idx)->dead = 1;
2371
2372                 //      add actual slot
2373
2374                 slotptr(set->page, ++idx)->off = nxt;
2375                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2376                 set->page->act++;
2377         }
2378
2379         bt_putid(set->page->right, right->latch->page_no);
2380         set->page->min = nxt;
2381         set->page->cnt = idx;
2382         free(frame);
2383
2384         return right->latch - mgr->latchsets;
2385 }
2386
2387 //      fix keys for newly split page
2388 //      call with page locked, return
2389 //      unlocked
2390
2391 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2392 {
2393 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2394 unsigned char value[BtId];
2395 uint lvl = set->page->lvl;
2396 BtPage page;
2397 BtKey *ptr;
2398
2399         // if current page is the root page, split it
2400
2401         if( set->latch->page_no == ROOT_page )
2402                 return bt_splitroot (mgr, set, right, thread_no);
2403
2404         ptr = keyptr(set->page, set->page->cnt);
2405         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2406
2407         page = bt_mappage (mgr, right);
2408
2409         ptr = keyptr(page, page->cnt);
2410         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2411
2412         // insert new fences in their parent pages
2413
2414         bt_lockpage (BtLockParent, right, thread_no);
2415
2416         bt_lockpage (BtLockParent, set->latch, thread_no);
2417         bt_unlockpage (BtLockWrite, set->latch);
2418
2419         // insert new fence for reformulated left block of smaller keys
2420
2421         bt_putid (value, set->latch->page_no);
2422         ptr = (BtKey *)leftkey;
2423
2424         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2425                 return mgr->err;
2426
2427         // switch fence for right block of larger keys to new right page
2428
2429         bt_putid (value, right->page_no);
2430         ptr = (BtKey *)rightkey;
2431
2432         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2433                 return mgr->err;
2434
2435         bt_unlockpage (BtLockParent, set->latch);
2436         bt_unpinlatch (mgr, set->latch);
2437
2438         bt_unlockpage (BtLockParent, right);
2439         bt_unpinlatch (mgr, right);
2440         return 0;
2441 }
2442
2443 //      install new key and value onto page
2444 //      page must already be checked for
2445 //      adequate space
2446
2447 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2448 {
2449 uint idx, librarian;
2450 BtSlot *node;
2451 BtKey *ptr;
2452 BtVal *val;
2453 int rate;
2454
2455         //      if found slot > desired slot and previous slot
2456         //      is a librarian slot, use it
2457
2458         if( slot > 1 )
2459           if( slotptr(set->page, slot-1)->type == Librarian )
2460                 slot--;
2461
2462         // copy value onto page
2463
2464         set->page->min -= vallen + sizeof(BtVal);
2465         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2466         memcpy (val->value, value, vallen);
2467         val->len = vallen;
2468
2469         // copy key onto page
2470
2471         set->page->min -= keylen + sizeof(BtKey);
2472         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2473         memcpy (ptr->key, key, keylen);
2474         ptr->len = keylen;
2475         
2476         //      find first empty slot
2477
2478         for( idx = slot; idx < set->page->cnt; idx++ )
2479           if( slotptr(set->page, idx)->dead )
2480                 break;
2481
2482         // now insert key into array before slot,
2483         //      adding as many librarian slots as
2484         //      makes sense.
2485
2486         if( idx == set->page->cnt ) {
2487         int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2488
2489                 librarian = ++idx - slot;
2490                 avail /= sizeof(BtSlot);
2491
2492                 if( avail < 0 )
2493                         avail = 0;
2494
2495                 if( librarian > avail )
2496                         librarian = avail;
2497
2498                 if( librarian ) {
2499                         rate = (idx - slot) / librarian;
2500                         set->page->cnt += librarian;
2501                         idx += librarian;
2502                 } else
2503                         rate = 0;
2504         } else
2505                 librarian = 0, rate = 0;
2506
2507         while( idx > slot ) {
2508                 //  transfer slot
2509                 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2510                 idx--;
2511
2512                 //      add librarian slot per rate
2513
2514                 if( librarian )
2515                  if( (idx - slot + 1)/2 <= librarian * rate ) {
2516 //                if( rate && !(idx % rate) ) {
2517                         node = slotptr(set->page, idx--);
2518                         node->off = node[1].off;
2519                         node->type = Librarian;
2520                         node->dead = 1;
2521                         librarian--;
2522                   }
2523         }
2524 if(librarian)
2525 abort();
2526         set->latch->dirty = 1;
2527         set->page->act++;
2528
2529         //      fill in new slot
2530
2531         node = slotptr(set->page, slot);
2532         node->off = set->page->min;
2533         node->type = type;
2534         node->dead = 0;
2535
2536         if( release ) {
2537                 bt_unlockpage (BtLockWrite, set->latch);
2538                 bt_unpinlatch (mgr, set->latch);
2539         }
2540
2541         return 0;
2542 }
2543
2544 //  Insert new key into the btree at given level.
2545 //      either add a new key or update/add an existing one
2546
2547 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2548 {
2549 uint slot, idx, len, entry;
2550 BtPageSet set[1];
2551 BtSlot *node;
2552 BtKey *ptr;
2553 BtVal *val;
2554
2555   while( 1 ) { // find the page and slot for the current key
2556         if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2557                 node = slotptr(set->page, slot);
2558                 ptr = keyptr(set->page, slot);
2559         } else {
2560                 if( !mgr->err )
2561                         mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2562                 return mgr->err;
2563         }
2564
2565         // if librarian slot == found slot, advance to real slot
2566
2567         if( node->type == Librarian )
2568           if( !keycmp (ptr, key, keylen) ) {
2569                 ptr = keyptr(set->page, ++slot);
2570                 node = slotptr(set->page, slot);
2571           }
2572
2573         //  if inserting a duplicate key or unique
2574         //      key that doesn't exist on the page,
2575         //      check for adequate space on the page
2576         //      and insert the new key before slot.
2577
2578         switch( type ) {
2579         case Unique:
2580         case Duplicate:
2581           if( keycmp (ptr, key, keylen) )
2582            if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2583             return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
2584            else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2585                 return mgr->err;
2586            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2587                 return mgr->err;
2588            else
2589                 continue;
2590
2591           // if key already exists, update value and return
2592
2593           val = valptr(set->page, slot);
2594
2595           if( val->len >= vallen ) {
2596                 if( slotptr(set->page, slot)->dead )
2597                         set->page->act++;
2598                 node->type = type;
2599                 node->dead = 0;
2600
2601                 set->page->garbage += val->len - vallen;
2602                 set->latch->dirty = 1;
2603                 val->len = vallen;
2604                 memcpy (val->value, value, vallen);
2605                 bt_unlockpage(BtLockWrite, set->latch);
2606                 bt_unpinlatch (mgr, set->latch);
2607                 return 0;
2608           }
2609
2610           //  new update value doesn't fit in existing value area
2611           //    make sure page has room
2612
2613           if( !node->dead )
2614                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2615           else
2616                 set->page->act++;
2617
2618           node->type = type;
2619           node->dead = 0;
2620
2621           if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2622            if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2623                 return mgr->err;
2624            else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2625                 return mgr->err;
2626            else
2627                 continue;
2628
2629           //  copy key and value onto page and update slot
2630
2631           set->page->min -= vallen + sizeof(BtVal);
2632           val = (BtVal*)((unsigned char *)set->page + set->page->min);
2633           memcpy (val->value, value, vallen);
2634           val->len = vallen;
2635
2636           set->latch->dirty = 1;
2637           set->page->min -= keylen + sizeof(BtKey);
2638           ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2639           memcpy (ptr->key, key, keylen);
2640           ptr->len = keylen;
2641         
2642           node->off = set->page->min;
2643           bt_unlockpage(BtLockWrite, set->latch);
2644           bt_unpinlatch (mgr, set->latch);
2645           return 0;
2646     }
2647   }
2648   return 0;
2649 }
2650
2651 //      determine actual page where key is located
2652 //  return slot number
2653
2654 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2655 {
2656 BtKey *key = keyptr(source,src);
2657 uint slot = locks[src].slot;
2658 uint entry;
2659
2660         if( src > 1 && locks[src].reuse )
2661           entry = locks[src-1].entry, slot = 0;
2662         else
2663           entry = locks[src].entry;
2664
2665         if( slot ) {
2666                 set->latch = mgr->latchsets + entry;
2667                 set->page = bt_mappage (mgr, set->latch);
2668                 return slot;
2669         }
2670
2671         //      is locks->reuse set? or was slot zeroed?
2672         //      if so, find where our key is located 
2673         //      on current page or pages split on
2674         //      same page txn operations.
2675
2676         do {
2677                 set->latch = mgr->latchsets + entry;
2678                 set->page = bt_mappage (mgr, set->latch);
2679
2680                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2681                   if( slotptr(set->page, slot)->type == Librarian )
2682                         slot++;
2683                   if( locks[src].reuse )
2684                         locks[src].entry = entry;
2685                   return slot;
2686                 }
2687         } while( entry = set->latch->split );
2688
2689         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2690         return 0;
2691 }
2692
2693 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2694 {
2695 BtKey *key = keyptr(source, src);
2696 BtVal *val = valptr(source, src);
2697 BtLatchSet *latch;
2698 BtPageSet set[1];
2699 uint entry, slot;
2700
2701   while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2702         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2703           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2704                 return mgr->err;
2705           set->page->lsn = lsn;
2706           return 0;
2707         }
2708
2709         if( entry = bt_splitpage (mgr, set, thread_no) )
2710           latch = mgr->latchsets + entry;
2711         else
2712           return mgr->err;
2713
2714         //      splice right page into split chain
2715         //      and WriteLock it.
2716
2717         bt_lockpage(BtLockWrite, latch, thread_no);
2718         latch->split = set->latch->split;
2719         set->latch->split = entry;
2720         locks[src].slot = 0;
2721   }
2722
2723   return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2724 }
2725
2726 //      perform delete from smaller btree
2727 //  insert a delete slot if not found there
2728
2729 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2730 {
2731 BtKey *key = keyptr(source, src);
2732 BtPageSet set[1];
2733 uint idx, slot;
2734 BtSlot *node;
2735 BtKey *ptr;
2736 BtVal *val;
2737
2738         if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2739           node = slotptr(set->page, slot);
2740           ptr = keyptr(set->page, slot);
2741           val = valptr(set->page, slot);
2742         } else
2743           return mgr->line = __LINE__, mgr->err = BTERR_struct;
2744
2745         //  if slot is not found, insert a delete slot
2746
2747         if( keycmp (ptr, key->key, key->len) )
2748           return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
2749
2750         //      if node is already dead,
2751         //      ignore the request.
2752
2753         if( node->dead )
2754                 return 0;
2755
2756         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2757         set->latch->dirty = 1;
2758         set->page->lsn = lsn;
2759         set->page->act--;
2760
2761         node->dead = 0;
2762         __sync_fetch_and_add(&mgr->found, 1);
2763         return 0;
2764 }
2765
2766 //      delete an empty master page for a transaction
2767
2768 //      note that the far right page never empties because
2769 //      it always contains (at least) the infinite stopper key
2770 //      and that all pages that don't contain any keys are
2771 //      deleted, or are being held under Atomic lock.
2772
2773 BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no)
2774 {
2775 BtPageSet right[1], temp[1];
2776 unsigned char value[BtId];
2777 uid right_page_no;
2778 BtKey *ptr;
2779
2780         bt_lockpage(BtLockWrite, prev->latch, thread_no);
2781
2782         //      grab the right sibling
2783
2784         if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) )
2785                 right->page = bt_mappage (mgr, right->latch);
2786         else
2787                 return mgr->err;
2788
2789         bt_lockpage(BtLockAtomic, right->latch, thread_no);
2790         bt_lockpage(BtLockWrite, right->latch, thread_no);
2791
2792         //      and pull contents over empty page
2793         //      while preserving master's left link
2794
2795         memcpy (right->page->left, prev->page->left, BtId);
2796         memcpy (prev->page, right->page, mgr->page_size);
2797
2798         //      forward seekers to old right sibling
2799         //      to new page location in set
2800
2801         bt_putid (right->page->right, prev->latch->page_no);
2802         right->latch->dirty = 1;
2803         right->page->kill = 1;
2804
2805         //      remove pointer to right page for searchers by
2806         //      changing right fence key to point to the master page
2807
2808         ptr = keyptr(right->page,right->page->cnt);
2809         bt_putid (value, prev->latch->page_no);
2810
2811         if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
2812                 return mgr->err;
2813
2814         //  now that master page is in good shape we can
2815         //      remove its locks.
2816
2817         bt_unlockpage (BtLockAtomic, prev->latch);
2818         bt_unlockpage (BtLockWrite, prev->latch);
2819
2820         //  fix master's right sibling's left pointer
2821         //      to remove scanner's poiner to the right page
2822
2823         if( right_page_no = bt_getid (prev->page->right) ) {
2824           if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
2825                 temp->page = bt_mappage (mgr, temp->latch);
2826           else
2827                 return mgr->err;
2828
2829           bt_lockpage (BtLockWrite, temp->latch, thread_no);
2830           bt_putid (temp->page->left, prev->latch->page_no);
2831           temp->latch->dirty = 1;
2832
2833           bt_unlockpage (BtLockWrite, temp->latch);
2834           bt_unpinlatch (mgr, temp->latch);
2835         } else {        // master is now the far right page
2836           bt_mutexlock (mgr->lock);
2837           bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
2838           bt_releasemutex(mgr->lock);
2839         }
2840
2841         //      now that there are no pointers to the right page
2842         //      we can delete it after the last read access occurs
2843
2844         bt_unlockpage (BtLockWrite, right->latch);
2845         bt_unlockpage (BtLockAtomic, right->latch);
2846         bt_lockpage (BtLockDelete, right->latch, thread_no);
2847         bt_lockpage (BtLockWrite, right->latch, thread_no);
2848         bt_freepage (mgr, right);
2849         return 0;
2850 }
2851
2852 //      atomic modification of a batch of keys.
2853
2854 //      return -1 if BTERR is set
2855 //      otherwise return slot number
2856 //      causing the key constraint violation
2857 //      or zero on successful completion.
2858
2859 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2860 {
2861 uint src, idx, slot, samepage, entry, que = 0;
2862 BtKey *key, *ptr, *key2;
2863 int result = 0;
2864 BtSlot temp[1];
2865 logseqno lsn;
2866 int type;
2867
2868   // stable sort the list of keys into order to
2869   //    prevent deadlocks between threads.
2870
2871   for( src = 1; src++ < source->cnt; ) {
2872         *temp = *slotptr(source,src);
2873         key = keyptr (source,src);
2874
2875         for( idx = src; --idx; ) {
2876           key2 = keyptr (source,idx);
2877           if( keycmp (key, key2->key, key2->len) < 0 ) {
2878                 *slotptr(source,idx+1) = *slotptr(source,idx);
2879                 *slotptr(source,idx) = *temp;
2880           } else
2881                 break;
2882         }
2883   }
2884
2885   //  add entries to redo log
2886
2887   if( bt->mgr->pagezero->redopages )
2888         lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2889   else
2890         lsn = 0;
2891
2892   //  perform the individual actions in the transaction
2893
2894   if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2895         return bt->mgr->err;
2896
2897   // if number of active pages
2898   // is greater than the buffer pool
2899   // promote page into larger btree
2900
2901   if( bt->main )
2902    while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
2903         if( bt_txnpromote (bt) )
2904           return bt->mgr->err;
2905
2906   // return success
2907
2908   return 0;
2909 }
2910
2911 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2912 {
2913 uint src, idx, slot, samepage, entry, que = 0;
2914 AtomicKey *head, *tail, *leaf;
2915 BtPageSet set[1], prev[1];
2916 unsigned char value[BtId];
2917 BtLatchSet *latch;
2918 AtomicTxn *locks;
2919 BtKey *key, *ptr;
2920 BtPage page;
2921 BtVal *val;
2922 uid right;
2923
2924   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2925
2926   head = NULL;
2927   tail = NULL;
2928
2929   // Load the leaf page for each key
2930   // group same page references with reuse bit
2931   // and determine any constraint violations
2932
2933   for( src = 0; src++ < source->cnt; ) {
2934         key = keyptr(source, src);
2935         slot = 0;
2936
2937         // first determine if this modification falls
2938         // on the same page as the previous modification
2939         //      note that the far right leaf page is a special case
2940
2941         if( samepage = src > 1 )
2942           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2943                 slot = bt_findslot(set->page, key->key, key->len);
2944           else
2945                 bt_unlockpage(BtLockRead, set->latch); 
2946
2947         if( !slot )
2948           if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, thread_no) )
2949                 set->latch->split = 0;
2950           else
2951                 return mgr->err;
2952
2953         if( slotptr(set->page, slot)->type == Librarian )
2954           ptr = keyptr(set->page, ++slot);
2955         else
2956           ptr = keyptr(set->page, slot);
2957
2958         if( !samepage ) {
2959           locks[src].entry = set->latch - mgr->latchsets;
2960           locks[src].slot = slot;
2961           locks[src].reuse = 0;
2962         } else {
2963           locks[src].entry = 0;
2964           locks[src].slot = 0;
2965           locks[src].reuse = 1;
2966         }
2967
2968         //      capture current lsn for master page
2969
2970         locks[src].reqlsn = set->page->lsn;
2971   }
2972
2973   //  unlock last loadpage lock
2974
2975   if( source->cnt )
2976         bt_unlockpage(BtLockRead, set->latch);
2977
2978   //  obtain write lock for each master page
2979   //  sync flushed pages to disk
2980
2981   for( src = 0; src++ < source->cnt; ) {
2982         if( locks[src].reuse )
2983           continue;
2984
2985         set->latch = mgr->latchsets + locks[src].entry;
2986         bt_lockpage (BtLockWrite, set->latch, thread_no);
2987   }
2988
2989   // insert or delete each key
2990   // process any splits or merges
2991   // release Write & Atomic latches
2992   // set ParentModifications and build
2993   // queue of keys to insert for split pages
2994   // or delete for deleted pages.
2995
2996   // run through txn list backwards
2997
2998   samepage = source->cnt + 1;
2999
3000   for( src = source->cnt; src; src-- ) {
3001         if( locks[src].reuse )
3002           continue;
3003
3004         //  perform the txn operations
3005         //      from smaller to larger on
3006         //  the same page
3007
3008         for( idx = src; idx < samepage; idx++ )
3009          switch( slotptr(source,idx)->type ) {
3010          case Delete:
3011           if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
3012                 return mgr->err;
3013           break;
3014
3015         case Duplicate:
3016         case Unique:
3017           if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
3018                 return mgr->err;
3019           break;
3020
3021         default:
3022           bt_atomicpage (mgr, source, locks, idx, set);
3023           break;
3024         }
3025
3026         //      after the same page operations have finished,
3027         //  process master page for splits or deletion.
3028
3029         latch = prev->latch = mgr->latchsets + locks[src].entry;
3030         prev->page = bt_mappage (mgr, prev->latch);
3031         samepage = src;
3032
3033         //  pick-up all splits from master page
3034         //      each one is already WriteLocked.
3035
3036         entry = prev->latch->split;
3037
3038         while( entry ) {
3039           set->latch = mgr->latchsets + entry;
3040           set->page = bt_mappage (mgr, set->latch);
3041           entry = set->latch->split;
3042
3043           // delete empty master page by undoing its split
3044           //  (this is potentially another empty page)
3045           //  note that there are no new left pointers yet
3046
3047           if( !prev->page->act ) {
3048                 memcpy (set->page->left, prev->page->left, BtId);
3049                 memcpy (prev->page, set->page, mgr->page_size);
3050                 bt_lockpage (BtLockDelete, set->latch, thread_no);
3051                 bt_freepage (mgr, set);
3052
3053                 prev->latch->split = set->latch->split;
3054                 prev->latch->dirty = 1;
3055                 continue;
3056           }
3057
3058           // remove empty page from the split chain
3059           // and return it to the free list.
3060
3061           if( !set->page->act ) {
3062                 memcpy (prev->page->right, set->page->right, BtId);
3063                 prev->latch->split = set->latch->split;
3064                 bt_lockpage (BtLockDelete, set->latch, thread_no);
3065                 bt_freepage (mgr, set);
3066                 continue;
3067           }
3068
3069           //  schedule prev fence key update
3070
3071           ptr = keyptr(prev->page,prev->page->cnt);
3072           leaf = calloc (sizeof(AtomicKey), 1), que++;
3073
3074           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3075           leaf->entry = prev->latch - mgr->latchsets;
3076           leaf->page_no = prev->latch->page_no;
3077           leaf->type = 0;
3078
3079           if( tail )
3080                 tail->next = leaf;
3081           else
3082                 head = leaf;
3083
3084           tail = leaf;
3085
3086           // splice in the left link into the split page
3087
3088           bt_putid (set->page->left, prev->latch->page_no);
3089           bt_lockpage(BtLockParent, prev->latch, thread_no);
3090           bt_unlockpage(BtLockWrite, prev->latch);
3091           if( lsm )
3092                 bt_syncpage (mgr, prev->page, prev->latch);
3093           *prev = *set;
3094         }
3095
3096         //  update left pointer in next right page from last split page
3097         //      (if all splits were reversed, latch->split == 0)
3098
3099         if( latch->split ) {
3100           //  fix left pointer in master's original (now split)
3101           //  far right sibling or set rightmost page in page zero
3102
3103           if( right = bt_getid (prev->page->right) ) {
3104                 if( set->latch = bt_pinlatch (mgr, right, NULL, thread_no) )
3105                   set->page = bt_mappage (mgr, set->latch);
3106                 else
3107                   return mgr->err;
3108
3109             bt_lockpage (BtLockWrite, set->latch, thread_no);
3110             bt_putid (set->page->left, prev->latch->page_no);
3111                 set->latch->dirty = 1;
3112             bt_unlockpage (BtLockWrite, set->latch);
3113                 bt_unpinlatch (mgr, set->latch);
3114           } else {      // prev is rightmost page
3115             bt_mutexlock (mgr->lock);
3116                 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3117             bt_releasemutex(mgr->lock);
3118           }
3119
3120           //  Process last page split in chain
3121
3122           ptr = keyptr(prev->page,prev->page->cnt);
3123           leaf = calloc (sizeof(AtomicKey), 1), que++;
3124
3125           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3126           leaf->entry = prev->latch - mgr->latchsets;
3127           leaf->page_no = prev->latch->page_no;
3128           leaf->type = 0;
3129   
3130           if( tail )
3131                 tail->next = leaf;
3132           else
3133                 head = leaf;
3134
3135           tail = leaf;
3136
3137           bt_lockpage(BtLockParent, prev->latch, thread_no);
3138           bt_unlockpage(BtLockWrite, prev->latch);
3139
3140           if( lsm )
3141                 bt_syncpage (mgr, prev->page, prev->latch);
3142
3143           //  remove atomic lock on master page
3144
3145           bt_unlockpage(BtLockAtomic, latch);
3146           continue;
3147         }
3148
3149         //  finished if prev page occupied (either master or final split)
3150
3151         if( prev->page->act ) {
3152           bt_unlockpage(BtLockWrite, latch);
3153           bt_unlockpage(BtLockAtomic, latch);
3154
3155           if( lsm )
3156                 bt_syncpage (mgr, prev->page, latch);
3157
3158           bt_unpinlatch(mgr, latch);
3159           continue;
3160         }
3161
3162         // any and all splits were reversed, and the
3163         // master page located in prev is empty, delete it
3164         // by pulling over master's right sibling.
3165
3166         // Remove empty master's fence key
3167
3168         ptr = keyptr(prev->page,prev->page->cnt);
3169
3170         if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) )
3171                 return mgr->err;
3172
3173         //      perform the remainder of the delete
3174         //      from the FIFO queue
3175
3176         leaf = calloc (sizeof(AtomicKey), 1), que++;
3177
3178         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3179         leaf->entry = prev->latch - mgr->latchsets;
3180         leaf->page_no = prev->latch->page_no;
3181         leaf->nounlock = 1;
3182         leaf->type = 2;
3183   
3184         if( tail )
3185           tail->next = leaf;
3186         else
3187           head = leaf;
3188
3189         tail = leaf;
3190
3191         //      leave atomic lock in place until
3192         //      deletion completes in next phase.
3193
3194         bt_unlockpage(BtLockWrite, prev->latch);
3195
3196         if( lsm )
3197           bt_syncpage (mgr, prev->page, prev->latch);
3198
3199   }
3200
3201   //  add & delete keys for any pages split or merged during transaction
3202
3203   if( leaf = head )
3204     do {
3205           set->latch = mgr->latchsets + leaf->entry;
3206           set->page = bt_mappage (mgr, set->latch);
3207
3208           bt_putid (value, leaf->page_no);
3209           ptr = (BtKey *)leaf->leafkey;
3210
3211           switch( leaf->type ) {
3212           case 0:       // insert key
3213             if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3214                   return mgr->err;
3215
3216                 break;
3217
3218           case 1:       // delete key
3219                 if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) )
3220                   return mgr->err;
3221
3222                 break;
3223
3224           case 2:       // free page
3225                 if( bt_atomicfree (mgr, set, thread_no) )
3226                   return mgr->err;
3227
3228                 break;
3229           }
3230
3231           if( !leaf->nounlock )
3232             bt_unlockpage (BtLockParent, set->latch);
3233
3234           bt_unpinlatch (mgr, set->latch);
3235           tail = leaf->next;
3236           free (leaf);
3237         } while( leaf = tail );
3238
3239   free (locks);
3240   return 0;
3241 }
3242
3243 //  promote a page into the larger btree
3244
3245 BTERR bt_txnpromote (BtDb *bt)
3246 {
3247 uint entry, slot, idx;
3248 BtPageSet set[1];
3249 BtSlot *node;
3250 BtKey *ptr;
3251 BtVal *val;
3252
3253   while( 1 ) {
3254 #ifdef unix
3255         entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
3256 #else
3257         entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
3258 #endif
3259         entry %= bt->mgr->latchtotal;
3260
3261         if( !entry )
3262                 continue;
3263
3264         set->latch = bt->mgr->latchsets + entry;
3265         idx = set->latch->page_no % bt->mgr->latchhash;
3266
3267         if( !bt_mutextry(set->latch->modify) )
3268                 continue;
3269
3270         //  skip this entry if it is pinned
3271
3272         if( set->latch->pin & ~CLOCK_bit ) {
3273           bt_releasemutex(set->latch->modify);
3274           continue;
3275         }
3276
3277         set->page = bt_mappage (bt->mgr, set->latch);
3278
3279         // entry never used or has no right sibling
3280
3281         if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3282           bt_releasemutex(set->latch->modify);
3283           continue;
3284         }
3285
3286         bt_lockpage (BtLockAtomic, set->latch, bt->thread_no);
3287         bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3288
3289         // entry interiour node or being or was killed
3290
3291         if( set->page->lvl || set->page->free || set->page->kill ) {
3292           bt_releasemutex(set->latch->modify);
3293       bt_unlockpage(BtLockAtomic, set->latch);
3294       bt_unlockpage(BtLockWrite, set->latch);
3295           continue;
3296         }
3297
3298         //  pin the page for our useage
3299
3300         set->latch->pin++;
3301         bt_releasemutex(set->latch->modify);
3302
3303         // transfer slots in our selected page to larger btree
3304 if( !(set->latch->page_no % 100) )
3305 fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act);
3306
3307         if( bt_atomicexec (bt->main, set->page, 0, 1, bt->thread_no) )
3308                 return bt->main->err;
3309
3310     bt_unlockpage(BtLockAtomic, set->latch);
3311
3312         //  now delete the page
3313
3314         if( bt_deletepage (bt->mgr, set, bt->thread_no) )
3315                 return bt->mgr->err;
3316
3317         return 0;
3318   }
3319 }
3320
3321 //      set cursor to highest slot on highest page
3322
3323 uint bt_lastkey (BtDb *bt)
3324 {
3325 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3326 BtPageSet set[1];
3327
3328         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3329                 set->page = bt_mappage (bt->mgr, set->latch);
3330         else
3331                 return 0;
3332
3333     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3334         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3335     bt_unlockpage(BtLockRead, set->latch);
3336         bt_unpinlatch (bt->mgr, set->latch);
3337         return bt->cursor->cnt;
3338 }
3339
3340 //      return previous slot on cursor page
3341
3342 uint bt_prevkey (BtDb *bt, uint slot)
3343 {
3344 uid cursor_page = bt->cursor->page_no;
3345 uid ourright, next, us = cursor_page;
3346 BtPageSet set[1];
3347
3348         if( --slot )
3349                 return slot;
3350
3351         ourright = bt_getid(bt->cursor->right);
3352
3353 goleft:
3354         if( !(next = bt_getid(bt->cursor->left)) )
3355                 return 0;
3356
3357 findourself:
3358         cursor_page = next;
3359
3360         if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3361                 set->page = bt_mappage (bt->mgr, set->latch);
3362         else
3363                 return 0;
3364
3365     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3366         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3367         bt_unlockpage(BtLockRead, set->latch);
3368         bt_unpinlatch (bt->mgr, set->latch);
3369         
3370         next = bt_getid (bt->cursor->right);
3371
3372         if( bt->cursor->kill )
3373                 goto findourself;
3374
3375         if( next != us )
3376           if( next == ourright )
3377                 goto goleft;
3378           else
3379                 goto findourself;
3380
3381         return bt->cursor->cnt;
3382 }
3383
3384 //  return next slot on cursor page
3385 //  or slide cursor right into next page
3386
3387 uint bt_nextkey (BtDb *bt, uint slot)
3388 {
3389 BtPageSet set[1];
3390 uid right;
3391
3392   do {
3393         right = bt_getid(bt->cursor->right);
3394
3395         while( slot++ < bt->cursor->cnt )
3396           if( slotptr(bt->cursor,slot)->dead )
3397                 continue;
3398           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3399                 return slot;
3400           else
3401                 break;
3402
3403         if( !right )
3404                 break;
3405
3406         if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3407                 set->page = bt_mappage (bt->mgr, set->latch);
3408         else
3409                 return 0;
3410
3411     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3412         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3413         bt_unlockpage(BtLockRead, set->latch);
3414         bt_unpinlatch (bt->mgr, set->latch);
3415         slot = 0;
3416
3417   } while( 1 );
3418
3419   return bt->mgr->err = 0;
3420 }
3421
3422 //  cache page of keys into cursor and return starting slot for given key
3423
3424 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3425 {
3426 BtPageSet set[1];
3427 uint slot;
3428
3429         // cache page for retrieval
3430
3431         if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3432           memcpy (bt->cursor, set->page, bt->mgr->page_size);
3433         else
3434           return 0;
3435
3436         bt_unlockpage(BtLockRead, set->latch);
3437         bt_unpinlatch (bt->mgr, set->latch);
3438         return slot;
3439 }
3440
3441 #ifdef STANDALONE
3442
3443 #ifndef unix
3444 double getCpuTime(int type)
3445 {
3446 FILETIME crtime[1];
3447 FILETIME xittime[1];
3448 FILETIME systime[1];
3449 FILETIME usrtime[1];
3450 SYSTEMTIME timeconv[1];
3451 double ans = 0;
3452
3453         memset (timeconv, 0, sizeof(SYSTEMTIME));
3454
3455         switch( type ) {
3456         case 0:
3457                 GetSystemTimeAsFileTime (xittime);
3458                 FileTimeToSystemTime (xittime, timeconv);
3459                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3460                 break;
3461         case 1:
3462                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3463                 FileTimeToSystemTime (usrtime, timeconv);
3464                 break;
3465         case 2:
3466                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3467                 FileTimeToSystemTime (systime, timeconv);
3468                 break;
3469         }
3470
3471         ans += (double)timeconv->wHour * 3600;
3472         ans += (double)timeconv->wMinute * 60;
3473         ans += (double)timeconv->wSecond;
3474         ans += (double)timeconv->wMilliseconds / 1000;
3475         return ans;
3476 }
3477 #else
3478 #include <time.h>
3479 #include <sys/resource.h>
3480
3481 double getCpuTime(int type)
3482 {
3483 struct rusage used[1];
3484 struct timeval tv[1];
3485
3486         switch( type ) {
3487         case 0:
3488                 gettimeofday(tv, NULL);
3489                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3490
3491         case 1:
3492                 getrusage(RUSAGE_SELF, used);
3493                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3494
3495         case 2:
3496                 getrusage(RUSAGE_SELF, used);
3497                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3498         }
3499
3500         return 0;
3501 }
3502 #endif
3503
3504 void bt_poolaudit (BtMgr *mgr)
3505 {
3506 BtLatchSet *latch;
3507 uint entry = 0;
3508
3509         while( ++entry < mgr->latchtotal ) {
3510                 latch = mgr->latchsets + entry;
3511
3512                 if( *latch->readwr->rin & MASK )
3513                         fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3514
3515                 if( *latch->access->rin & MASK )
3516                         fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3517
3518 //              if( *latch->parent->xcl->value )
3519 //                      fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3520
3521 //              if( *latch->atomic->xcl->value )
3522 //                      fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3523
3524 //              if( *latch->modify->value )
3525 //                      fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3526
3527                 if( latch->pin & ~CLOCK_bit )
3528                         fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3529         }
3530 }
3531
3532 typedef struct {
3533         char idx;
3534         char *type;
3535         char *infile;
3536         BtMgr *main;
3537         BtMgr *mgr;
3538         int num;
3539 } ThreadArg;
3540
3541 //  standalone program to index file of keys
3542 //  then list them onto std-out
3543
3544 #ifdef unix
3545 void *index_file (void *arg)
3546 #else
3547 uint __stdcall index_file (void *arg)
3548 #endif
3549 {
3550 int line = 0, found = 0, cnt = 0, idx;
3551 uid next, page_no = LEAF_page;  // start on first page of leaves
3552 int ch, len = 0, slot, type = 0;
3553 unsigned char key[BT_maxkey];
3554 unsigned char txn[65536];
3555 ThreadArg *args = arg;
3556 BtPage page, frame;
3557 BtPageSet set[1];
3558 uint nxt = 65536;
3559 BtKey *ptr;
3560 BtVal *val;
3561 BtDb *bt;
3562 FILE *in;
3563
3564         bt = bt_open (args->mgr, args->main);
3565         page = (BtPage)txn;
3566
3567         if( args->idx < strlen (args->type) )
3568                 ch = args->type[args->idx];
3569         else
3570                 ch = args->type[strlen(args->type) - 1];
3571
3572         switch(ch | 0x20)
3573         {
3574         case 'd':
3575                 type = Delete;
3576
3577         case 'p':
3578                 if( !type )
3579                         type = Unique;
3580
3581                 if( args->num )
3582                  if( type == Delete )
3583                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3584                  else
3585                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3586                 else
3587                  if( type == Delete )
3588                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3589                  else
3590                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3591
3592                 if( in = fopen (args->infile, "rb") )
3593                   while( ch = getc(in), ch != EOF )
3594                         if( ch == '\n' )
3595                         {
3596                           line++;
3597
3598                           if( !args->num ) {
3599                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3600                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3601                             len = 0;
3602                                 continue;
3603                           }
3604
3605                           nxt -= len - 10;
3606                           memcpy (txn + nxt, key + 10, len - 10);
3607                           nxt -= 1;
3608                           txn[nxt] = len - 10;
3609                           nxt -= 10;
3610                           memcpy (txn + nxt, key, 10);
3611                           nxt -= 1;
3612                           txn[nxt] = 10;
3613                           slotptr(page,++cnt)->off  = nxt;
3614                           slotptr(page,cnt)->type = type;
3615                           len = 0;
3616
3617                           if( cnt < args->num )
3618                                 continue;
3619
3620                           page->cnt = cnt;
3621                           page->act = cnt;
3622                           page->min = nxt;
3623
3624                           if( bt_atomictxn (bt, page) )
3625                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3626                           nxt = sizeof(txn);
3627                           cnt = 0;
3628
3629                         }
3630                         else if( len < BT_maxkey )
3631                                 key[len++] = ch;
3632                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3633                 break;
3634
3635         case 'w':
3636                 fprintf(stderr, "started indexing for %s\n", args->infile);
3637                 if( in = fopen (args->infile, "r") )
3638                   while( ch = getc(in), ch != EOF )
3639                         if( ch == '\n' )
3640                         {
3641                           line++;
3642
3643                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3644                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3645                           len = 0;
3646                         }
3647                         else if( len < BT_maxkey )
3648                                 key[len++] = ch;
3649                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3650                 break;
3651
3652         case 'f':
3653                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3654                 if( in = fopen (args->infile, "rb") )
3655                   while( ch = getc(in), ch != EOF )
3656                         if( ch == '\n' )
3657                         {
3658                           line++;
3659                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3660                                 found++;
3661                           else if( bt->mgr->err )
3662                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3663                           len = 0;
3664                         }
3665                         else if( len < BT_maxkey )
3666                                 key[len++] = ch;
3667                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3668                 break;
3669
3670         case 's':
3671                 fprintf(stderr, "started scanning\n");
3672                 
3673                 do {
3674                         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3675                                 set->page = bt_mappage (bt->mgr, set->latch);
3676                         else
3677                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3678
3679                         bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3680                         next = bt_getid (set->page->right);
3681
3682                         for( slot = 0; slot++ < set->page->cnt; )
3683                          if( next || slot < set->page->cnt )
3684                           if( !slotptr(set->page, slot)->dead ) {
3685                                 ptr = keyptr(set->page, slot);
3686                                 len = ptr->len;
3687
3688                             if( slotptr(set->page, slot)->type == Duplicate )
3689                                         len -= BtId;
3690
3691                                 fwrite (ptr->key, len, 1, stdout);
3692                                 val = valptr(set->page, slot);
3693                                 fwrite (val->value, val->len, 1, stdout);
3694                                 fputc ('\n', stdout);
3695                                 cnt++;
3696                            }
3697
3698                         bt_unlockpage (BtLockRead, set->latch);
3699                         bt_unpinlatch (bt->mgr, set->latch);
3700                 } while( page_no = next );
3701
3702                 fprintf(stderr, " Total keys read %d\n", cnt);
3703                 break;
3704
3705         case 'r':
3706                 fprintf(stderr, "started reverse scan\n");
3707                 if( slot = bt_lastkey (bt) )
3708                    while( slot = bt_prevkey (bt, slot) ) {
3709                         if( slotptr(bt->cursor, slot)->dead )
3710                           continue;
3711
3712                         ptr = keyptr(bt->cursor, slot);
3713                         len = ptr->len;
3714
3715                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3716                                 len -= BtId;
3717
3718                         fwrite (ptr->key, len, 1, stdout);
3719                         val = valptr(bt->cursor, slot);
3720                         fwrite (val->value, val->len, 1, stdout);
3721                         fputc ('\n', stdout);
3722                         cnt++;
3723                   }
3724
3725                 fprintf(stderr, " Total keys read %d\n", cnt);
3726                 break;
3727
3728         case 'c':
3729 #ifdef unix
3730                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3731 #endif
3732                 fprintf(stderr, "started counting\n");
3733                 next = REDO_page + bt->mgr->pagezero->redopages;
3734
3735                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3736                         if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3737                                 break;
3738
3739                         if( !bt->cursor->free && !bt->cursor->lvl )
3740                                 cnt += bt->cursor->act;
3741
3742                         bt->mgr->reads++;
3743                         page_no = next++;
3744                 }
3745                 
3746                 cnt--;  // remove stopper key
3747                 fprintf(stderr, " Total keys counted %d\n", cnt);
3748                 break;
3749         }
3750
3751         bt_close (bt);
3752 #ifdef unix
3753         return NULL;
3754 #else
3755         return 0;
3756 #endif
3757 }
3758
3759 typedef struct timeval timer;
3760
3761 int main (int argc, char **argv)
3762 {
3763 int idx, cnt, len, slot, err;
3764 int segsize, bits = 16;
3765 double start, stop;
3766 #ifdef unix
3767 pthread_t *threads;
3768 #else
3769 HANDLE *threads;
3770 #endif
3771 ThreadArg *args;
3772 uint redopages = 0;
3773 uint poolsize = 0;
3774 uint mainpool = 0;
3775 uint mainbits = 0;
3776 float elapsed;
3777 int num = 0;
3778 char key[1];
3779 BtMgr *main;
3780 BtMgr *mgr;
3781 BtKey *ptr;
3782
3783         if( argc < 3 ) {
3784                 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3785                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
3786                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
3787                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3788                 fprintf (stderr, "  page_bits is the page size in bits for the cache btree\n");
3789                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3790                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3791                 fprintf (stderr, "  recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3792                 fprintf (stderr, "  main_bits is the page size of the main btree in bits\n");
3793                 fprintf (stderr, "  main_pool is the number of main pages in the main buffer pool\n");
3794                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3795                 exit(0);
3796         }
3797
3798         start = getCpuTime(0);
3799
3800         if( argc > 4 )
3801                 bits = atoi(argv[4]);
3802
3803         if( argc > 5 )
3804                 poolsize = atoi(argv[5]);
3805
3806         if( !poolsize )
3807                 fprintf (stderr, "Warning: no mapped_pool\n");
3808
3809         if( argc > 6 )
3810                 num = atoi(argv[6]);
3811
3812         if( argc > 7 )
3813                 redopages = atoi(argv[7]);
3814
3815         if( redopages + REDO_page > 65535 )
3816                 fprintf (stderr, "Warning: Recovery buffer too large\n");
3817
3818         if( argc > 8 )
3819                 mainbits = atoi(argv[8]);
3820
3821         if( argc > 9 )
3822                 mainpool = atoi(argv[9]);
3823
3824         cnt = argc - 10;
3825 #ifdef unix
3826         threads = malloc (cnt * sizeof(pthread_t));
3827 #else
3828         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3829 #endif
3830         args = malloc ((cnt + 1) * sizeof(ThreadArg));
3831
3832         mgr = bt_mgr (argv[1], bits, poolsize, redopages);
3833
3834         if( !mgr ) {
3835                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3836                 exit (1);
3837         }
3838
3839         if( mainbits ) {
3840                 main = bt_mgr (argv[2], mainbits, mainpool, 0);
3841
3842                 if( !main ) {
3843                         fprintf(stderr, "Index Open Error %s\n", argv[2]);
3844                         exit (1);
3845                 }
3846         } else
3847                 main = NULL;
3848
3849         //      fire off threads
3850
3851         if( cnt )
3852           for( idx = 0; idx < cnt; idx++ ) {
3853                 args[idx].infile = argv[idx + 10];
3854                 args[idx].type = argv[3];
3855                 args[idx].main = main;
3856                 args[idx].mgr = mgr;
3857                 args[idx].num = num;
3858                 args[idx].idx = idx;
3859 #ifdef unix
3860                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3861                         fprintf(stderr, "Error creating thread %d\n", err);
3862 #else
3863                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3864 #endif
3865           }
3866         else {
3867                 args[0].infile = argv[idx + 10];
3868                 args[0].type = argv[3];
3869                 args[0].main = main;
3870                 args[0].mgr = mgr;
3871                 args[0].num = num;
3872                 args[0].idx = 0;
3873                 index_file (args);
3874         }
3875
3876         //      wait for termination
3877
3878 #ifdef unix
3879         for( idx = 0; idx < cnt; idx++ )
3880                 pthread_join (threads[idx], NULL);
3881 #else
3882         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3883
3884         for( idx = 0; idx < cnt; idx++ )
3885                 CloseHandle(threads[idx]);
3886 #endif
3887         bt_poolaudit(mgr);
3888
3889         if( main )
3890                 bt_poolaudit(main);
3891
3892         fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
3893
3894         if( main )
3895                 bt_mgrclose (main);
3896         bt_mgrclose (mgr);
3897
3898         elapsed = getCpuTime(0) - start;
3899         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3900         elapsed = getCpuTime(1);
3901         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3902         elapsed = getCpuTime(2);
3903         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3904 }
3905
3906 #endif  //STANDALONE