]> pd.if.org Git - btree/blob - threadskv10.c
Fix futex locking for linux
[btree] / threadskv10.c
1 // btree version threadskv10 FUTEX version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      redo log for failure recovery
10 //      and dual B-trees for write optimization
11
12 // 09 OCT 2014
13
14 // author: karl malbrain, malbrain@cal.berkeley.edu
15
16 /*
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
19
20 The orginal author is Karl Malbrain.
21
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
28 */
29
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
32
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
35
36 #ifdef linux
37 #define _GNU_SOURCE
38 #include <linux/futex.h>
39 #define SYS_futex 202
40 #endif
41
42 #ifdef unix
43 #include <unistd.h>
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <fcntl.h>
47 #include <sys/time.h>
48 #include <sys/mman.h>
49 #include <errno.h>
50 #include <pthread.h>
51 #include <limits.h>
52 #else
53 #define WIN32_LEAN_AND_MEAN
54 #include <windows.h>
55 #include <stdio.h>
56 #include <stdlib.h>
57 #include <time.h>
58 #include <fcntl.h>
59 #include <process.h>
60 #include <intrin.h>
61 #endif
62
63 #include <memory.h>
64 #include <string.h>
65 #include <stddef.h>
66
67 typedef unsigned long long      uid;
68 typedef unsigned long long      logseqno;
69
70 #ifndef unix
71 typedef unsigned long long      off64_t;
72 typedef unsigned short          ushort;
73 typedef unsigned int            uint;
74 #endif
75
76 #define BT_ro 0x6f72    // ro
77 #define BT_rw 0x7772    // rw
78
79 #define BT_maxbits              26                                      // maximum page size in bits
80 #define BT_minbits              9                                       // minimum page size in bits
81 #define BT_minpage              (1 << BT_minbits)       // minimum page size
82 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
83
84 //  BTree page number constants
85 #define ALLOC_page              0       // allocation page
86 #define ROOT_page               1       // root of the btree
87 #define LEAF_page               2       // first page of leaves
88 #define REDO_page               3       // first page of redo buffer
89
90 //      Number of levels to create in a new BTree
91
92 #define MIN_lvl                 2
93
94 /*
95 There are six lock types for each node in four independent sets: 
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
101 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
102 */
103
104 typedef enum{
105         BtLockAccess = 1,
106         BtLockDelete = 2,
107         BtLockRead   = 4,
108         BtLockWrite  = 8,
109         BtLockParent = 16,
110         BtLockAtomic = 32
111 } BtLock;
112
113 //      definition for phase-fair reader/writer lock implementation
114
115 typedef struct {
116         volatile ushort rin[1];
117         volatile ushort rout[1];
118         volatile ushort ticket[1];
119         volatile ushort serving[1];
120         ushort tid;
121         ushort dup;
122 } RWLock;
123
124 //      write only lock
125
126 typedef struct {
127         volatile uint exclusive[1];
128         ushort tid;
129         ushort dup;
130 } WOLock;
131
132 #define PHID 0x1
133 #define PRES 0x2
134 #define MASK 0x3
135 #define RINC 0x4
136
137 //      lite weight mutex
138
139 // exclusive is set for write access
140
141 typedef struct {
142         volatile uint exclusive[1];
143 } BtMutexLatch;
144
145 #define XCL 1
146
147 //      mode & definition for lite latch implementation
148
149 enum {
150         QueRd = 1,              // reader queue
151         QueWr = 2               // writer queue
152 } RWQueue;
153
154 //  hash table entries
155
156 typedef struct {
157         uint entry;             // Latch table entry at head of chain
158         BtMutexLatch latch[1];
159 } BtHashEntry;
160
161 //      latch manager table structure
162
163 typedef struct {
164         uid page_no;                    // latch set page number
165         RWLock readwr[1];               // read/write page lock
166         RWLock access[1];               // Access Intent/Page delete
167         WOLock parent[1];               // Posting of fence key in parent
168         WOLock atomic[1];               // Atomic update in progress
169         uint split;                             // right split page atomic insert
170         uint entry;                             // entry slot in latch table
171         uint next;                              // next entry in hash table chain
172         uint prev;                              // prev entry in hash table chain
173         ushort pin;                             // number of accessing threads
174         unsigned char dirty;    // page in cache is dirty (atomic set)
175         unsigned char avail;    // page is an available entry
176         BtMutexLatch modify[1]; // modify entry lite latch
177 } BtLatchSet;
178
179 //      Define the length of the page record numbers
180
181 #define BtId 6
182
183 //      Page key slot definition.
184
185 //      Keys are marked dead, but remain on the page until
186 //      it cleanup is called. The fence key (highest key) for
187 //      a leaf page is always present, even after cleanup.
188
189 //      Slot types
190
191 //      In addition to the Unique keys that occupy slots
192 //      there are Librarian and Duplicate key
193 //      slots occupying the key slot array.
194
195 //      The Librarian slots are dead keys that
196 //      serve as filler, available to add new Unique
197 //      or Dup slots that are inserted into the B-tree.
198
199 //      The Duplicate slots have had their key bytes extended
200 //      by 6 bytes to contain a binary duplicate key uniqueifier.
201
202 typedef enum {
203         Unique,
204         Librarian,
205         Duplicate,
206         Delete,
207         Update
208 } BtSlotType;
209
210 typedef struct {
211         uint off:BT_maxbits;    // page offset for key start
212         uint type:3;                    // type of slot
213         uint dead:1;                    // set for deleted slot
214 } BtSlot;
215
216 //      The key structure occupies space at the upper end of
217 //      each page.  It's a length byte followed by the key
218 //      bytes.
219
220 typedef struct {
221         unsigned char len;              // this can be changed to a ushort or uint
222         unsigned char key[0];
223 } BtKey;
224
225 //      the value structure also occupies space at the upper
226 //      end of the page. Each key is immediately followed by a value.
227
228 typedef struct {
229         unsigned char len;              // this can be changed to a ushort or uint
230         unsigned char value[0];
231 } BtVal;
232
233 #define BT_maxkey       255             // maximum number of bytes in a key
234 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
235
236 //      The first part of an index page.
237 //      It is immediately followed
238 //      by the BtSlot array of keys.
239
240 //      note that this structure size
241 //      must be a multiple of 8 bytes
242 //      in order to place dups correctly.
243
244 typedef struct BtPage_ {
245         uint cnt;                                       // count of keys in page
246         uint act;                                       // count of active keys
247         uint min;                                       // next key offset
248         uint garbage;                           // page garbage in bytes
249         unsigned char bits:7;           // page size in bits
250         unsigned char free:1;           // page is on free chain
251         unsigned char lvl:7;            // level of page
252         unsigned char kill:1;           // page is being deleted
253         unsigned char right[BtId];      // page number to right
254         unsigned char left[BtId];       // page number to left
255         unsigned char filler[2];        // padding to multiple of 8
256         logseqno lsn;                           // log sequence number applied
257         uid page_no;                            // this page number
258 } *BtPage;
259
260 //  The loadpage interface object
261
262 typedef struct {
263         BtPage page;            // current page pointer
264         BtLatchSet *latch;      // current page latch set
265 } BtPageSet;
266
267 //      structure for latch manager on ALLOC_page
268
269 typedef struct {
270         struct BtPage_ alloc[1];        // next page_no in right ptr
271         unsigned long long dups[1];     // global duplicate key uniqueifier
272         unsigned char chain[BtId];      // head of free page_nos chain
273 } BtPageZero;
274
275 //      The object structure for Btree access
276
277 typedef struct {
278         uint page_size;                         // page size    
279         uint page_bits;                         // page size in bits    
280 #ifdef unix
281         int idx;
282 #else
283         HANDLE idx;
284 #endif
285         BtPageZero *pagezero;           // mapped allocation page
286         BtHashEntry *hashtable;         // the buffer pool hash table entries
287         BtLatchSet *latchsets;          // mapped latch set from buffer pool
288         unsigned char *pagepool;        // mapped to the buffer pool pages
289         unsigned char *redobuff;        // mapped recovery buffer pointer
290         logseqno lsn, flushlsn;         // current & first lsn flushed
291         BtMutexLatch dump[1];           // redo dump lite latch
292         BtMutexLatch redo[1];           // redo area lite latch
293         BtMutexLatch lock[1];           // allocation area lite latch
294         ushort thread_no[1];            // next thread number
295         uint nlatchpage;                        // number of latch pages at BT_latch
296         uint latchtotal;                        // number of page latch entries
297         uint latchhash;                         // number of latch hash table slots
298         uint latchvictim;                       // next latch entry to examine
299         uint latchavail;                        // next available latch entry
300         uint availlock[1];                      // latch available commitments
301         uint available;                         // number of available latches
302         uint redopages;                         // size of recovery buff in pages
303         uint redoend;                           // eof/end element in recovery buff
304         int err;                                        // last error
305         int line;                                       // last error line no
306         int found;                                      // number of keys found by delete
307         int reads, writes;                      // number of reads and writes
308 #ifndef unix
309         HANDLE halloc;                          // allocation handle
310         HANDLE hpool;                           // buffer pool handle
311 #endif
312 } BtMgr;
313
314 typedef struct {
315         BtMgr *mgr;                                     // buffer manager for entire process
316         BtMgr *main;                            // buffer manager for main btree
317         BtPage cursor;                          // cached page frame for start/next
318         ushort thread_no;                       // thread number
319         unsigned char key[BT_keyarray]; // last found complete key
320 } BtDb;
321
322 //      Catastrophic errors
323
324 typedef enum {
325         BTERR_ok = 0,
326         BTERR_struct,
327         BTERR_ovflw,
328         BTERR_lock,
329         BTERR_map,
330         BTERR_read,
331         BTERR_wrt,
332         BTERR_atomic,
333         BTERR_recovery,
334         BTERR_avail
335 } BTERR;
336
337 #define CLOCK_bit 0x8000
338
339 // recovery manager entry types
340
341 typedef enum {
342         BTRM_eof = 0,   // rest of buffer is emtpy
343         BTRM_add,               // add a unique key-value to btree
344         BTRM_dup,               // add a duplicate key-value to btree
345         BTRM_del,               // delete a key-value from btree
346         BTRM_upd,               // update a key with a new value
347         BTRM_new,               // allocate a new empty page
348         BTRM_old                // reuse an old empty page
349 } BTRM;
350
351 // recovery manager entry
352 //      structure followed by BtKey & BtVal
353
354 typedef struct {
355         logseqno reqlsn;        // log sequence number required
356         logseqno lsn;           // log sequence number for entry
357         uint len;                       // length of entry
358         unsigned char type;     // type of entry
359         unsigned char lvl;      // level of btree entry pertains to
360 } BtLogHdr;
361
362 // B-Tree functions
363
364 extern void bt_close (BtDb *bt);
365 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
366 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
367 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
368 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
369 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
370 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update, ushort thread_no);
371 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
372
373 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
374
375 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
376 extern uint bt_nextkey   (BtDb *bt, uint slot);
377 extern uint bt_prevkey (BtDb *db, uint slot);
378 extern uint bt_lastkey (BtDb *db);
379
380 //      manager functions
381 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
382 extern void bt_mgrclose (BtMgr *mgr);
383 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
384 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
385
386 //  The page is allocated from low and hi ends.
387 //  The key slots are allocated from the bottom,
388 //      while the text and value of the key
389 //  are allocated from the top.  When the two
390 //  areas meet, the page is split into two.
391
392 //  A key consists of a length byte, two bytes of
393 //  index number (0 - 65535), and up to 253 bytes
394 //  of key value.
395
396 //  Associated with each key is a value byte string
397 //      containing any value desired.
398
399 //  The b-tree root is always located at page 1.
400 //      The first leaf page of level zero is always
401 //      located on page 2.
402
403 //      The b-tree pages are linked with next
404 //      pointers to facilitate enumerators,
405 //      and provide for concurrency.
406
407 //      When to root page fills, it is split in two and
408 //      the tree height is raised by a new root at page
409 //      one with two keys.
410
411 //      Deleted keys are marked with a dead bit until
412 //      page cleanup. The fence key for a leaf node is
413 //      always present
414
415 //  To achieve maximum concurrency one page is locked at a time
416 //  as the tree is traversed to find leaf key in question. The right
417 //  page numbers are used in cases where the page is being split,
418 //      or consolidated.
419
420 //  Page 0 is dedicated to lock for new page extensions,
421 //      and chains empty pages together for reuse. It also
422 //      contains the latch manager hash table.
423
424 //      The ParentModification lock on a node is obtained to serialize posting
425 //      or changing the fence key for a node.
426
427 //      Empty pages are chained together through the ALLOC page and reused.
428
429 //      Access macros to address slot and key values from the page
430 //      Page slots use 1 based indexing.
431
432 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
433 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
434 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
435
436 void bt_putid(unsigned char *dest, uid id)
437 {
438 int i = BtId;
439
440         while( i-- )
441                 dest[i] = (unsigned char)id, id >>= 8;
442 }
443
444 uid bt_getid(unsigned char *src)
445 {
446 uid id = 0;
447 int i;
448
449         for( i = 0; i < BtId; i++ )
450                 id <<= 8, id |= *src++; 
451
452         return id;
453 }
454
455 uid bt_newdup (BtMgr *mgr)
456 {
457 #ifdef unix
458         return __sync_fetch_and_add (mgr->pagezero->dups, 1) + 1;
459 #else
460         return _InterlockedIncrement64(mgr->pagezero->dups, 1);
461 #endif
462 }
463
464 //      Write-Only Queue Lock
465
466 void WriteOLock (WOLock *lock, ushort tid)
467 {
468 uint prev;
469
470         if( lock->tid == tid ) {
471                 lock->dup++;
472                 return;
473         }
474
475         while( 1 ) {
476 #ifdef unix
477           prev = __sync_fetch_and_or (lock->exclusive, 1);
478 #else
479           prev = _InterlockedExchangeOr (lock->exclusive, 1);
480 #endif
481           if( !(prev & XCL) ) {
482                 lock->tid = tid;
483                 return;
484           }
485 #ifdef unix
486           sys_futex( (void *)lock->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
487 #else
488           SwitchToThread ();
489 #endif
490         }
491 }
492
493 void WriteORelease (WOLock *lock)
494 {
495         if( lock->dup ) {
496                 lock->dup--;
497                 return;
498         }
499
500         *lock->exclusive = 0;
501         lock->tid = 0;
502 #ifdef linux
503         sys_futex( (void *)lock->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
504 #endif
505 }
506
507 //      Phase-Fair reader/writer lock implementation
508
509 void WriteLock (RWLock *lock, ushort tid)
510 {
511 ushort w, r, tix;
512
513         if( lock->tid == tid ) {
514                 lock->dup++;
515                 return;
516         }
517 #ifdef unix
518         tix = __sync_fetch_and_add (lock->ticket, 1);
519 #else
520         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
521 #endif
522         // wait for our ticket to come up
523
524         while( tix != lock->serving[0] )
525 #ifdef unix
526                 sched_yield();
527 #else
528                 SwitchToThread ();
529 #endif
530
531         w = PRES | (tix & PHID);
532 #ifdef  unix
533         r = __sync_fetch_and_add (lock->rin, w);
534 #else
535         r = _InterlockedExchangeAdd16 (lock->rin, w);
536 #endif
537         while( r != *lock->rout )
538 #ifdef unix
539                 sched_yield();
540 #else
541                 SwitchToThread();
542 #endif
543         lock->tid = tid;
544 }
545
546 void WriteRelease (RWLock *lock)
547 {
548         if( lock->dup ) {
549                 lock->dup--;
550                 return;
551         }
552
553         lock->tid = 0;
554 #ifdef unix
555         __sync_fetch_and_and (lock->rin, ~MASK);
556 #else
557         _InterlockedAnd16 (lock->rin, ~MASK);
558 #endif
559         lock->serving[0]++;
560 }
561
562 //      try to obtain read lock
563 //  return 1 if successful
564
565 int ReadTry (RWLock *lock, ushort tid)
566 {
567 ushort w;
568
569         //  OK if write lock already held by same thread
570
571         if( lock->tid == tid ) {
572                 lock->dup++;
573                 return 1;
574         }
575 #ifdef unix
576         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
577 #else
578         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
579 #endif
580         if( w )
581           if( w == (*lock->rin & MASK) ) {
582 #ifdef unix
583                 __sync_fetch_and_add (lock->rin, -RINC);
584 #else
585                 _InterlockedExchangeAdd16 (lock->rin, -RINC);
586 #endif
587                 return 0;
588           }
589
590         return 1;
591 }
592
593 void ReadLock (RWLock *lock, ushort tid)
594 {
595 ushort w;
596         if( lock->tid == tid ) {
597                 lock->dup++;
598                 return;
599         }
600 #ifdef unix
601         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
602 #else
603         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
604 #endif
605         if( w )
606           while( w == (*lock->rin & MASK) )
607 #ifdef unix
608                 sched_yield ();
609 #else
610                 SwitchToThread ();
611 #endif
612 }
613
614 void ReadRelease (RWLock *lock)
615 {
616         if( lock->dup ) {
617                 lock->dup--;
618                 return;
619         }
620
621 #ifdef unix
622         __sync_fetch_and_add (lock->rout, RINC);
623 #else
624         _InterlockedExchangeAdd16 (lock->rout, RINC);
625 #endif
626 }
627
628 //      lite weight spin lock Latch Manager
629
630 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
631 {
632         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
633 }
634
635 void bt_mutexlock(BtMutexLatch *latch)
636 {
637 uint prev;
638
639   while( 1 ) {
640 #ifdef  unix
641         prev = __sync_fetch_and_or(latch->exclusive, XCL);
642 #else
643         prev = _InterlockedOr(latch->exclusive, XCL);
644 #endif
645         if( !(prev & XCL) )
646                 return;
647 #ifdef  unix
648         sys_futex( (void *)latch->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
649 #else
650         SwitchToThread();
651 #endif
652   }
653 }
654
655 //      try to obtain write lock
656
657 //      return 1 if obtained,
658 //              0 otherwise
659
660 int bt_mutextry(BtMutexLatch *latch)
661 {
662 uint prev;
663
664 #ifdef  unix
665         prev = __sync_fetch_and_or(latch->exclusive, XCL);
666 #else
667         prev = _InterlockedOr(latch->exclusive, XCL);
668 #endif
669         //      take write access if exclusive bit is clear
670
671         return !(prev & XCL);
672 }
673
674 //      clear write mode
675
676 void bt_releasemutex(BtMutexLatch *latch)
677 {
678         *latch->exclusive = 0;
679 #ifdef unix
680         sys_futex( (void *)latch->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
681 #endif
682 }
683
684 //      recovery manager -- flush dirty pages
685
686 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
687 {
688 uint cnt3 = 0, cnt2 = 0, cnt = 0;
689 BtLatchSet *latch;
690 BtPage page;
691 uint entry;
692
693   //    flush dirty pool pages to the btree
694
695 fprintf(stderr, "Start flushlsn\n");
696   for( entry = 1; entry < mgr->latchtotal; entry++ ) {
697         page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
698         latch = mgr->latchsets + entry;
699         bt_mutexlock (latch->modify);
700         bt_lockpage(BtLockRead, latch, thread_no);
701
702         if( latch->dirty ) {
703           bt_writepage(mgr, page, latch->page_no);
704           latch->dirty = 0, cnt++;
705         }
706 if( latch->avail )
707 cnt3++;
708 if( latch->pin & ~CLOCK_bit )
709 cnt2++;
710         bt_unlockpage(BtLockRead, latch);
711         bt_releasemutex (latch->modify);
712   }
713 fprintf(stderr, "End flushlsn %d pages  %d pinned  %d available\n", cnt, cnt2, cnt3);
714 }
715
716 //      recovery manager -- process current recovery buff on startup
717 //      this won't do much if previous session was properly closed.
718
719 BTERR bt_recoveryredo (BtMgr *mgr)
720 {
721 BtLogHdr *hdr, *eof;
722 uint offset = 0;
723 BtKey *key;
724 BtVal *val;
725
726   pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
727
728   hdr = (BtLogHdr *)mgr->redobuff;
729   mgr->flushlsn = hdr->lsn;
730
731   while( 1 ) {
732         hdr = (BtLogHdr *)(mgr->redobuff + offset);
733         switch( hdr->type ) {
734         case BTRM_eof:
735                 mgr->lsn = hdr->lsn;
736                 return 0;
737         case BTRM_add:          // add a unique key-value to btree
738         
739         case BTRM_dup:          // add a duplicate key-value to btree
740         case BTRM_del:          // delete a key-value from btree
741         case BTRM_upd:          // update a key with a new value
742         case BTRM_new:          // allocate a new empty page
743         case BTRM_old:          // reuse an old empty page
744                 return 0;
745         }
746   }
747 }
748
749 //      recovery manager -- dump current recovery buff & flush dirty pages
750 //              in preparation for next recovery buffer.
751
752 BTERR bt_dumpredo (BtMgr *mgr)
753 {
754 BtLogHdr *eof;
755 fprintf(stderr, "Flush pages ");
756
757         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
758         memset (eof, 0, sizeof(BtLogHdr));
759
760         //  flush pages written at beginning of this redo buffer
761         //      then write the redo buffer out to disk
762
763         fdatasync (mgr->idx);
764
765 fprintf(stderr, "Dump ReDo: %d bytes\n", mgr->redoend);
766         pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
767
768         sync_file_range (mgr->idx, REDO_page << mgr->page_bits, mgr->redoend + sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
769
770         mgr->flushlsn = mgr->lsn;
771         mgr->redoend = 0;
772
773         eof = (BtLogHdr *)(mgr->redobuff);
774         memset (eof, 0, sizeof(BtLogHdr));
775         eof->lsn = mgr->lsn;
776         return 0;
777 }
778
779 //      recovery manager -- append new entry to recovery log
780 //      flush to disk when it overflows.
781
782 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
783 {
784 uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
785 uint amt = sizeof(BtLogHdr);
786 BtLogHdr *hdr, *eof;
787 uint flush;
788
789         bt_mutexlock (mgr->redo);
790
791         if( key )
792           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
793
794         //      see if new entry fits in buffer
795         //      flush and reset if it doesn't
796
797         if( flush = amt > size - mgr->redoend ) {
798           bt_mutexlock (mgr->dump);
799
800           if( bt_dumpredo (mgr) )
801                 return 0;
802         }
803
804         //      fill in new entry & either eof or end block
805
806         hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
807
808         hdr->len = amt;
809         hdr->type = type;
810         hdr->lvl = lvl;
811         hdr->lsn = ++mgr->lsn;
812
813         mgr->redoend += amt;
814
815         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
816         memset (eof, 0, sizeof(BtLogHdr));
817
818         //  fill in key and value
819
820         if( key ) {
821           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
822           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
823         }
824
825         bt_releasemutex(mgr->redo);
826
827         if( flush ) {
828           bt_flushlsn (mgr, thread_no);
829           bt_releasemutex(mgr->dump);
830         }
831
832         return hdr->lsn;
833 }
834
835 //      recovery manager -- append transaction to recovery log
836 //      flush to disk when it overflows.
837
838 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
839 {
840 uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
841 uint amt = 0, src, type;
842 BtLogHdr *hdr, *eof;
843 logseqno lsn;
844 uint flush;
845 BtKey *key;
846 BtVal *val;
847
848         //      determine amount of redo recovery log space required
849
850         for( src = 0; src++ < source->cnt; ) {
851           key = keyptr(source,src);
852           val = valptr(source,src);
853           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
854           amt += sizeof(BtLogHdr);
855         }
856
857         bt_mutexlock (mgr->redo);
858
859         //      see if new entry fits in buffer
860         //      flush and reset if it doesn't
861
862         if( flush = amt > size - mgr->redoend ) {
863           bt_mutexlock (mgr->dump);
864
865           if( bt_dumpredo (mgr) )
866                 return 0;
867         }
868
869         //      assign new lsn to transaction
870
871         lsn = ++mgr->lsn;
872
873         //      fill in new entries
874
875         for( src = 0; src++ < source->cnt; ) {
876           key = keyptr(source, src);
877           val = valptr(source, src);
878
879           switch( slotptr(source, src)->type ) {
880           case Unique:
881                 type = BTRM_add;
882                 break;
883           case Duplicate:
884                 type = BTRM_dup;
885                 break;
886           case Delete:
887                 type = BTRM_del;
888                 break;
889           case Update:
890                 type = BTRM_upd;
891                 break;
892           }
893
894           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
895           amt += sizeof(BtLogHdr);
896
897           hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
898           hdr->len = amt;
899           hdr->type = type;
900           hdr->lsn = lsn;
901           hdr->lvl = 0;
902
903           //  fill in key and value
904
905           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
906           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
907
908           mgr->redoend += amt;
909         }
910
911         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
912         memset (eof, 0, sizeof(BtLogHdr));
913
914         bt_releasemutex(mgr->redo);
915
916         if( flush ) {
917           bt_flushlsn (mgr, thread_no);
918           bt_releasemutex(mgr->dump);
919         }
920
921         return lsn;
922 }
923
924 //      read page into buffer pool from permanent location in Btree file
925
926 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
927 {
928 off64_t off = page_no << mgr->page_bits;
929
930 #ifdef unix
931         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
932                 fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno);
933                 return mgr->err = BTERR_read;
934         }
935 #else
936 OVERLAPPED ovl[1];
937 uint amt[1];
938
939         memset (ovl, 0, sizeof(OVERLAPPED));
940         ovl->Offset = off;
941         ovl->OffsetHigh = off >> 32;
942
943         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
944                 fprintf (stderr, "Unable to read page %d GetLastError = %d\n", page_no, GetLastError());
945                 return BTERR_read;
946         }
947         if( *amt <  mgr->page_size ) {
948                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
949                 return BTERR_read;
950         }
951 #endif
952         mgr->reads++;
953         return 0;
954 }
955
956 //      write page to permanent location in Btree file
957 //      clear the dirty bit
958
959 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
960 {
961 off64_t off = page_no << mgr->page_bits;
962
963         page->page_no = page_no;
964
965 #ifdef unix
966         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
967                 return BTERR_wrt;
968 #else
969 OVERLAPPED ovl[1];
970 uint amt[1];
971
972         memset (ovl, 0, sizeof(OVERLAPPED));
973         ovl->Offset = off;
974         ovl->OffsetHigh = off >> 32;
975
976         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
977                 return BTERR_wrt;
978
979         if( *amt <  mgr->page_size )
980                 return BTERR_wrt;
981 #endif
982         mgr->writes++;
983         return 0;
984 }
985
986 //      set CLOCK bit in latch
987 //      decrement pin count
988
989 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
990 {
991         bt_mutexlock(latch->modify);
992         latch->pin |= CLOCK_bit;
993         latch->pin--;
994
995         // if not doing redo recovery
996         //      set latch available
997
998         if( mgr->redopages )
999           if( !(latch->pin & ~CLOCK_bit) ) {
1000                 latch->avail = 1;
1001 #ifdef unix
1002                 __sync_fetch_and_add (&mgr->available, 1);
1003 #else
1004                 _InterlockedIncrement(&mgr->available);
1005 #endif
1006         }
1007
1008         bt_releasemutex(latch->modify);
1009 }
1010
1011 //  return the btree cached page address
1012 //      if page is dirty and has not yet been
1013 //      flushed to disk for the current redo
1014 //      recovery buffer, write it out.
1015
1016 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
1017 {
1018 BtPage page = (BtPage)(((uid)latch->entry << mgr->page_bits) + mgr->pagepool);
1019
1020   return page;
1021 }
1022
1023 //  return next available latch entry
1024 //        and with latch entry locked
1025
1026 uint bt_availnext (BtMgr *mgr, ushort thread_id)
1027 {
1028 BtLatchSet *latch;
1029 uint entry;
1030
1031   while( 1 ) {
1032
1033   //  flush dirty pages if none are available
1034   //    and we aren't doing redo recovery
1035
1036   if( !mgr->redopages )
1037         if( !mgr->available )
1038           bt_flushlsn (mgr, thread_id);
1039
1040 #ifdef unix
1041         entry = __sync_fetch_and_add (&mgr->latchavail, 1) + 1;
1042 #else
1043         entry = _InterlockedIncrement (&mgr->latchavail);
1044 #endif
1045         entry %= mgr->latchtotal;
1046
1047         if( !entry )
1048                 continue;
1049
1050         latch = mgr->latchsets + entry;
1051
1052         if( !latch->avail )
1053                 continue;
1054
1055         bt_mutexlock(latch->modify);
1056
1057         if( !latch->avail ) {
1058                 bt_releasemutex(latch->modify);
1059                 continue;
1060         }
1061
1062         return entry;
1063   }
1064 }
1065
1066 //      find available latchset
1067 //      return with latchset pinned
1068
1069 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage loadit, ushort thread_id)
1070 {
1071 uint hashidx = page_no % mgr->latchhash;
1072 BtLatchSet *latch;
1073 uint entry, idx;
1074 BtPage page;
1075
1076   //  try to find our entry
1077
1078   bt_mutexlock(mgr->hashtable[hashidx].latch);
1079
1080   if( entry = mgr->hashtable[hashidx].entry ) do
1081   {
1082         latch = mgr->latchsets + entry;
1083         if( page_no == latch->page_no )
1084                 break;
1085   } while( entry = latch->next );
1086
1087   //  found our entry: increment pin
1088   //  remove from available status
1089
1090   if( entry ) {
1091         latch = mgr->latchsets + entry;
1092         bt_mutexlock(latch->modify);
1093         if( latch->avail )
1094 #ifdef unix
1095           __sync_fetch_and_add (&mgr->available, -1);
1096 #else
1097           _InterlockedDecrement(&mgr->available);
1098 #endif
1099         latch->avail = 0;
1100         latch->pin |= CLOCK_bit;
1101         latch->pin++;
1102
1103         bt_releasemutex(latch->modify);
1104         bt_releasemutex(mgr->hashtable[hashidx].latch);
1105         return latch;
1106   }
1107
1108   //  find and reuse entry from available set
1109
1110 trynext:
1111
1112   if( entry = bt_availnext (mgr, thread_id) )
1113         latch = mgr->latchsets + entry;
1114   else
1115         return mgr->line = __LINE__, mgr->err = BTERR_avail, NULL;
1116
1117   idx = latch->page_no % mgr->latchhash;
1118
1119   //  if latch is on a different hash chain
1120   //    unlink from the old page_no chain
1121
1122   if( latch->page_no )
1123    if( idx != hashidx ) {
1124
1125         //  skip over this entry if latch not available
1126
1127     if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1128           bt_releasemutex(latch->modify);
1129           goto trynext;
1130         }
1131
1132         if( latch->prev )
1133           mgr->latchsets[latch->prev].next = latch->next;
1134         else
1135           mgr->hashtable[idx].entry = latch->next;
1136
1137         if( latch->next )
1138           mgr->latchsets[latch->next].prev = latch->prev;
1139
1140     bt_releasemutex (mgr->hashtable[idx].latch);
1141    }
1142
1143   //  remove available status
1144
1145   latch->avail = 0;
1146 #ifdef unix
1147   __sync_fetch_and_add (&mgr->available, -1);
1148 #else
1149   _InterlockedDecrement(&mgr->available);
1150 #endif
1151   page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1152
1153   if( latch->dirty )
1154         if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
1155           return mgr->line = __LINE__, NULL;
1156         else
1157           latch->dirty = 0;
1158
1159   if( loadit ) {
1160         memcpy (page, loadit, mgr->page_size);
1161         latch->dirty = 1;
1162   } else
1163         if( bt_readpage (mgr, page, page_no) )
1164           return mgr->line = __LINE__, NULL;
1165
1166   //  link page as head of hash table chain
1167   //  if this is a never before used entry,
1168   //  or it was previously on a different
1169   //  hash table chain. Otherwise, just
1170   //  leave it in its current hash table
1171   //  chain position.
1172
1173   if( !latch->page_no || hashidx != idx ) {
1174         if( latch->next = mgr->hashtable[hashidx].entry )
1175           mgr->latchsets[latch->next].prev = entry;
1176
1177         mgr->hashtable[hashidx].entry = entry;
1178     latch->prev = 0;
1179   }
1180
1181   //  fill in latch structure
1182
1183   latch->pin = CLOCK_bit | 1;
1184   latch->page_no = page_no;
1185   latch->entry = entry;
1186   latch->split = 0;
1187
1188   bt_releasemutex (latch->modify);
1189   bt_releasemutex (mgr->hashtable[hashidx].latch);
1190   return latch;
1191 }
1192   
1193 void bt_mgrclose (BtMgr *mgr)
1194 {
1195 BtLatchSet *latch;
1196 BtLogHdr *eof;
1197 uint num = 0;
1198 BtPage page;
1199 uint slot;
1200
1201         //      flush previously written dirty pages
1202         //      and write recovery buffer to disk
1203
1204         fdatasync (mgr->idx);
1205
1206         if( mgr->redoend ) {
1207                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1208                 memset (eof, 0, sizeof(BtLogHdr));
1209
1210                 pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1211         }
1212
1213         //      write remaining dirty pool pages to the btree
1214
1215         for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1216                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1217                 latch = mgr->latchsets + slot;
1218
1219                 if( latch->dirty ) {
1220                         bt_writepage(mgr, page, latch->page_no);
1221                         latch->dirty = 0, num++;
1222                 }
1223         }
1224
1225         //      flush last batch to disk and clear
1226         //      redo recovery buffer on disk.
1227
1228         fdatasync (mgr->idx);
1229
1230         if( mgr->redopages ) {
1231                 eof = (BtLogHdr *)mgr->redobuff;
1232                 memset (eof, 0, sizeof(BtLogHdr));
1233                 eof->lsn = mgr->lsn;
1234
1235                 pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1236
1237                 sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
1238         }
1239
1240         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1241
1242 #ifdef unix
1243         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1244         munmap (mgr->pagezero, mgr->page_size);
1245 #else
1246         FlushViewOfFile(mgr->pagezero, 0);
1247         UnmapViewOfFile(mgr->pagezero);
1248         UnmapViewOfFile(mgr->pagepool);
1249         CloseHandle(mgr->halloc);
1250         CloseHandle(mgr->hpool);
1251 #endif
1252 #ifdef unix
1253         free (mgr->redobuff);
1254         close (mgr->idx);
1255         free (mgr);
1256 #else
1257         VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1258         FlushFileBuffers(mgr->idx);
1259         CloseHandle(mgr->idx);
1260         GlobalFree (mgr);
1261 #endif
1262 }
1263
1264 //      close and release memory
1265
1266 void bt_close (BtDb *bt)
1267 {
1268 #ifdef unix
1269         if( bt->cursor )
1270                 free (bt->cursor);
1271 #else
1272         if( bt->cursor)
1273                 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1274 #endif
1275         free (bt);
1276 }
1277
1278 //  open/create new btree buffer manager
1279
1280 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1281 //              size of page pool (e.g. 262144)
1282
1283 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1284 {
1285 uint lvl, attr, last, slot, idx;
1286 uint nlatchpage, latchhash;
1287 unsigned char value[BtId];
1288 int flag, initit = 0;
1289 BtPageZero *pagezero;
1290 BtLatchSet *latch;
1291 off64_t size;
1292 uint amt[1];
1293 BtMgr* mgr;
1294 BtKey* key;
1295 BtVal *val;
1296
1297         // determine sanity of page size and buffer pool
1298
1299         if( bits > BT_maxbits )
1300                 bits = BT_maxbits;
1301         else if( bits < BT_minbits )
1302                 bits = BT_minbits;
1303 #ifdef unix
1304         mgr = calloc (1, sizeof(BtMgr));
1305
1306         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1307
1308         if( mgr->idx == -1 ) {
1309                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1310                 return free(mgr), NULL;
1311         }
1312 #else
1313         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1314         attr = FILE_ATTRIBUTE_NORMAL;
1315         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1316
1317         if( mgr->idx == INVALID_HANDLE_VALUE ) {
1318                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1319                 return GlobalFree(mgr), NULL;
1320         }
1321 #endif
1322
1323 #ifdef unix
1324         pagezero = valloc (BT_maxpage);
1325         *amt = 0;
1326
1327         // read minimum page size to get root info
1328         //      to support raw disk partition files
1329         //      check if bits == 0 on the disk.
1330
1331         if( size = lseek (mgr->idx, 0L, 2) )
1332                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1333                         if( pagezero->alloc->bits )
1334                                 bits = pagezero->alloc->bits;
1335                         else
1336                                 initit = 1;
1337                 else
1338                         return free(mgr), free(pagezero), NULL;
1339         else
1340                 initit = 1;
1341 #else
1342         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1343         size = GetFileSize(mgr->idx, amt);
1344
1345         if( size || *amt ) {
1346                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1347                         return bt_mgrclose (mgr), NULL;
1348                 bits = pagezero->alloc->bits;
1349         } else
1350                 initit = 1;
1351 #endif
1352
1353         mgr->page_size = 1 << bits;
1354         mgr->page_bits = bits;
1355
1356         //  calculate number of latch hash table entries
1357
1358         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1359
1360         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1361         mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1362         mgr->latchtotal = nodemax;
1363
1364         if( !initit )
1365                 goto mgrlatch;
1366
1367         // initialize an empty b-tree with latch page, root page, page of leaves
1368         // and page(s) of latches and page pool cache
1369
1370         memset (pagezero, 0, 1 << bits);
1371         pagezero->alloc->bits = mgr->page_bits;
1372         bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
1373
1374         //  initialize left-most LEAF page in
1375         //      alloc->left.
1376
1377         bt_putid (pagezero->alloc->left, LEAF_page);
1378
1379         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1380                 fprintf (stderr, "Unable to create btree page zero\n");
1381                 return bt_mgrclose (mgr), NULL;
1382         }
1383
1384         memset (pagezero, 0, 1 << bits);
1385         pagezero->alloc->bits = mgr->page_bits;
1386
1387         for( lvl=MIN_lvl; lvl--; ) {
1388                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1389                 key = keyptr(pagezero->alloc, 1);
1390                 key->len = 2;           // create stopper key
1391                 key->key[0] = 0xff;
1392                 key->key[1] = 0xff;
1393
1394                 bt_putid(value, MIN_lvl - lvl + 1);
1395                 val = valptr(pagezero->alloc, 1);
1396                 val->len = lvl ? BtId : 0;
1397                 memcpy (val->value, value, val->len);
1398
1399                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1400                 pagezero->alloc->lvl = lvl;
1401                 pagezero->alloc->cnt = 1;
1402                 pagezero->alloc->act = 1;
1403
1404                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1405                         fprintf (stderr, "Unable to create btree page zero\n");
1406                         return bt_mgrclose (mgr), NULL;
1407                 }
1408         }
1409
1410 mgrlatch:
1411 #ifdef unix
1412         free (pagezero);
1413 #else
1414         VirtualFree (pagezero, 0, MEM_RELEASE);
1415 #endif
1416 #ifdef unix
1417         // mlock the pagezero page
1418
1419         flag = PROT_READ | PROT_WRITE;
1420         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1421         if( mgr->pagezero == MAP_FAILED ) {
1422                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1423                 return bt_mgrclose (mgr), NULL;
1424         }
1425         mlock (mgr->pagezero, mgr->page_size);
1426
1427         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1428         if( mgr->pagepool == MAP_FAILED ) {
1429                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1430                 return bt_mgrclose (mgr), NULL;
1431         }
1432         if( mgr->redopages = redopages )
1433                 mgr->redobuff = valloc (redopages * mgr->page_size);
1434 #else
1435         flag = PAGE_READWRITE;
1436         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1437         if( !mgr->halloc ) {
1438                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1439                 return bt_mgrclose (mgr), NULL;
1440         }
1441
1442         flag = FILE_MAP_WRITE;
1443         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1444         if( !mgr->pagezero ) {
1445                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1446                 return bt_mgrclose (mgr), NULL;
1447         }
1448
1449         flag = PAGE_READWRITE;
1450         size = (uid)mgr->nlatchpage << mgr->page_bits;
1451         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1452         if( !mgr->hpool ) {
1453                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1454                 return bt_mgrclose (mgr), NULL;
1455         }
1456
1457         flag = FILE_MAP_WRITE;
1458         mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1459         if( !mgr->pagepool ) {
1460                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1461                 return bt_mgrclose (mgr), NULL;
1462         }
1463         if( mgr->redopages = redopages )
1464                 mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
1465 #endif
1466
1467         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1468         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1469         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1470
1471         //      mark all pool entries as available
1472
1473         for( idx = 1; idx < mgr->latchtotal; idx++ ) {
1474                 latch = mgr->latchsets + idx;
1475                 latch->avail = 1;
1476                 mgr->available++;
1477         }
1478
1479         return mgr;
1480 }
1481
1482 //      open BTree access method
1483 //      based on buffer manager
1484
1485 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1486 {
1487 BtDb *bt = malloc (sizeof(*bt));
1488
1489         memset (bt, 0, sizeof(*bt));
1490         bt->main = main;
1491         bt->mgr = mgr;
1492 #ifdef unix
1493         bt->cursor = valloc (mgr->page_size);
1494 #else
1495         bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1496 #endif
1497 #ifdef unix
1498         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1499 #else
1500         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1501 #endif
1502         return bt;
1503 }
1504
1505 //  compare two keys, return > 0, = 0, or < 0
1506 //  =0: keys are same
1507 //  -1: key2 > key1
1508 //  +1: key2 < key1
1509 //  as the comparison value
1510
1511 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1512 {
1513 uint len1 = key1->len;
1514 int ans;
1515
1516         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1517                 return ans;
1518
1519         if( len1 > len2 )
1520                 return 1;
1521         if( len1 < len2 )
1522                 return -1;
1523
1524         return 0;
1525 }
1526
1527 // place write, read, or parent lock on requested page_no.
1528
1529 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1530 {
1531         switch( mode ) {
1532         case BtLockRead:
1533                 ReadLock (latch->readwr, thread_no);
1534                 break;
1535         case BtLockWrite:
1536                 WriteLock (latch->readwr, thread_no);
1537                 break;
1538         case BtLockAccess:
1539                 ReadLock (latch->access, thread_no);
1540                 break;
1541         case BtLockDelete:
1542                 WriteLock (latch->access, thread_no);
1543                 break;
1544         case BtLockParent:
1545                 WriteOLock (latch->parent, thread_no);
1546                 break;
1547         case BtLockAtomic:
1548                 WriteOLock (latch->atomic, thread_no);
1549                 break;
1550         case BtLockAtomic | BtLockRead:
1551                 WriteOLock (latch->atomic, thread_no);
1552                 ReadLock (latch->readwr, thread_no);
1553                 break;
1554         }
1555 }
1556
1557 // remove write, read, or parent lock on requested page
1558
1559 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1560 {
1561         switch( mode ) {
1562         case BtLockRead:
1563                 ReadRelease (latch->readwr);
1564                 break;
1565         case BtLockWrite:
1566                 WriteRelease (latch->readwr);
1567                 break;
1568         case BtLockAccess:
1569                 ReadRelease (latch->access);
1570                 break;
1571         case BtLockDelete:
1572                 WriteRelease (latch->access);
1573                 break;
1574         case BtLockParent:
1575                 WriteORelease (latch->parent);
1576                 break;
1577         case BtLockAtomic:
1578                 WriteORelease (latch->atomic);
1579                 break;
1580         case BtLockAtomic | BtLockRead:
1581                 WriteORelease (latch->atomic);
1582                 ReadRelease (latch->readwr);
1583                 break;
1584         }
1585 }
1586
1587 //      allocate a new page
1588 //      return with page latched, but unlocked.
1589
1590 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1591 {
1592 uid page_no;
1593 int blk;
1594
1595         //      lock allocation page
1596
1597         bt_mutexlock(mgr->lock);
1598
1599         // use empty chain first
1600         // else allocate empty page
1601
1602         if( page_no = bt_getid(mgr->pagezero->chain) ) {
1603                 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1604                         set->page = bt_mappage (mgr, set->latch);
1605                 else
1606                         return mgr->err = BTERR_struct, mgr->line = __LINE__, -1;
1607
1608                 bt_putid(mgr->pagezero->chain, bt_getid(set->page->right));
1609                 bt_releasemutex(mgr->lock);
1610
1611                 memcpy (set->page, contents, mgr->page_size);
1612                 set->latch->dirty = 1;
1613                 return 0;
1614         }
1615
1616         page_no = bt_getid(mgr->pagezero->alloc->right);
1617         bt_putid(mgr->pagezero->alloc->right, page_no+1);
1618
1619         // unlock allocation latch and
1620         //      extend file into new page.
1621
1622         bt_releasemutex(mgr->lock);
1623
1624         //      don't load cache from btree page
1625
1626         if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1627                 set->page = bt_mappage (mgr, set->latch);
1628         else
1629                 return mgr->err;
1630
1631         set->latch->dirty = 1;
1632         return 0;
1633 }
1634
1635 //  find slot in page for given key at a given level
1636
1637 int bt_findslot (BtPage page, unsigned char *key, uint len)
1638 {
1639 uint diff, higher = page->cnt, low = 1, slot;
1640 uint good = 0;
1641
1642         //        make stopper key an infinite fence value
1643
1644         if( bt_getid (page->right) )
1645                 higher++;
1646         else
1647                 good++;
1648
1649         //      low is the lowest candidate.
1650         //  loop ends when they meet
1651
1652         //  higher is already
1653         //      tested as .ge. the passed key.
1654
1655         while( diff = higher - low ) {
1656                 slot = low + ( diff >> 1 );
1657                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1658                         low = slot + 1;
1659                 else
1660                         higher = slot, good++;
1661         }
1662
1663         //      return zero if key is on right link page
1664
1665         return good ? higher : 0;
1666 }
1667
1668 //  find and load page at given level for given key
1669 //      leave page rd or wr locked as requested
1670
1671 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1672 {
1673 uid page_no = ROOT_page, prevpage = 0;
1674 uint drill = 0xff, slot;
1675 BtLatchSet *prevlatch;
1676 uint mode, prevmode;
1677 BtVal *val;
1678
1679   //  start at root of btree and drill down
1680
1681   do {
1682         // determine lock mode of drill level
1683         mode = (drill == lvl) ? lock : BtLockRead; 
1684
1685         if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1686           return 0;
1687
1688         // obtain access lock using lock chaining with Access mode
1689
1690         if( page_no > ROOT_page )
1691           bt_lockpage(BtLockAccess, set->latch, thread_no);
1692
1693         set->page = bt_mappage (mgr, set->latch);
1694
1695         //      release & unpin parent or left sibling page
1696
1697         if( prevpage ) {
1698           bt_unlockpage(prevmode, prevlatch);
1699           bt_unpinlatch (mgr, prevlatch);
1700           prevpage = 0;
1701         }
1702
1703         // obtain mode lock using lock chaining through AccessLock
1704
1705         bt_lockpage(mode, set->latch, thread_no);
1706
1707         if( set->page->free )
1708                 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1709
1710         if( page_no > ROOT_page )
1711           bt_unlockpage(BtLockAccess, set->latch);
1712
1713         // re-read and re-lock root after determining actual level of root
1714
1715         if( set->page->lvl != drill) {
1716                 if( set->latch->page_no != ROOT_page )
1717                         return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1718                         
1719                 drill = set->page->lvl;
1720
1721                 if( lock != BtLockRead && drill == lvl ) {
1722                   bt_unlockpage(mode, set->latch);
1723                   bt_unpinlatch (mgr, set->latch);
1724                   continue;
1725                 }
1726         }
1727
1728         prevpage = set->latch->page_no;
1729         prevlatch = set->latch;
1730         prevmode = mode;
1731
1732         //  find key on page at this level
1733         //  and descend to requested level
1734
1735         if( !set->page->kill )
1736          if( slot = bt_findslot (set->page, key, len) ) {
1737           if( drill == lvl )
1738                 return slot;
1739
1740           // find next non-dead slot -- the fence key if nothing else
1741
1742           while( slotptr(set->page, slot)->dead )
1743                 if( slot++ < set->page->cnt )
1744                   continue;
1745                 else
1746                   return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1747
1748           val = valptr(set->page, slot);
1749
1750           if( val->len == BtId )
1751                 page_no = bt_getid(valptr(set->page, slot)->value);
1752           else
1753                 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1754
1755           drill--;
1756           continue;
1757          }
1758
1759         //  slide right into next page
1760
1761         page_no = bt_getid(set->page->right);
1762   } while( page_no );
1763
1764   // return error on end of right chain
1765
1766   mgr->line = __LINE__, mgr->err = BTERR_struct;
1767   return 0;     // return error
1768 }
1769
1770 //      return page to free list
1771 //      page must be delete & write locked
1772
1773 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1774 {
1775         //      lock allocation page
1776
1777         bt_mutexlock (mgr->lock);
1778
1779         //      store chain
1780
1781         memcpy(set->page->right, mgr->pagezero->chain, BtId);
1782         bt_putid(mgr->pagezero->chain, set->latch->page_no);
1783         set->latch->dirty = 1;
1784         set->page->free = 1;
1785
1786         // unlock released page
1787
1788         bt_unlockpage (BtLockDelete, set->latch);
1789         bt_unlockpage (BtLockWrite, set->latch);
1790         bt_unpinlatch (mgr, set->latch);
1791
1792         // unlock allocation page
1793
1794         bt_releasemutex (mgr->lock);
1795 }
1796
1797 //      a fence key was deleted from a page
1798 //      push new fence value upwards
1799
1800 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1801 {
1802 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1803 unsigned char value[BtId];
1804 BtKey* ptr;
1805 uint idx;
1806
1807         //      remove the old fence value
1808
1809         ptr = keyptr(set->page, set->page->cnt);
1810         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1811         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1812         set->latch->dirty = 1;
1813
1814         //  cache new fence value
1815
1816         ptr = keyptr(set->page, set->page->cnt);
1817         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1818
1819         bt_lockpage (BtLockParent, set->latch, thread_no);
1820         bt_unlockpage (BtLockWrite, set->latch);
1821
1822         //      insert new (now smaller) fence key
1823
1824         bt_putid (value, set->latch->page_no);
1825         ptr = (BtKey*)leftkey;
1826
1827         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) )
1828           return mgr->err;
1829
1830         //      now delete old fence key
1831
1832         ptr = (BtKey*)rightkey;
1833
1834         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1835                 return mgr->err;
1836
1837         bt_unlockpage (BtLockParent, set->latch);
1838         bt_unpinlatch(mgr, set->latch);
1839         return 0;
1840 }
1841
1842 //      root has a single child
1843 //      collapse a level from the tree
1844
1845 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1846 {
1847 BtPageSet child[1];
1848 uid page_no;
1849 BtVal *val;
1850 uint idx;
1851
1852   // find the child entry and promote as new root contents
1853
1854   do {
1855         for( idx = 0; idx++ < root->page->cnt; )
1856           if( !slotptr(root->page, idx)->dead )
1857                 break;
1858
1859         val = valptr(root->page, idx);
1860
1861         if( val->len == BtId )
1862                 page_no = bt_getid (valptr(root->page, idx)->value);
1863         else
1864                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1865
1866         if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1867                 child->page = bt_mappage (mgr, child->latch);
1868         else
1869                 return mgr->err;
1870
1871         bt_lockpage (BtLockDelete, child->latch, thread_no);
1872         bt_lockpage (BtLockWrite, child->latch, thread_no);
1873
1874         memcpy (root->page, child->page, mgr->page_size);
1875         root->latch->dirty = 1;
1876
1877         bt_freepage (mgr, child);
1878
1879   } while( root->page->lvl > 1 && root->page->act == 1 );
1880
1881   bt_unlockpage (BtLockWrite, root->latch);
1882   bt_unpinlatch (mgr, root->latch);
1883   return 0;
1884 }
1885
1886 //  delete a page and manage keys
1887 //  call with page writelocked
1888 //      returns with page unpinned
1889
1890 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1891 {
1892 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1893 unsigned char value[BtId];
1894 uint lvl = set->page->lvl;
1895 BtPageSet right[1];
1896 uid page_no;
1897 BtKey *ptr;
1898
1899         //      cache copy of fence key
1900         //      to post in parent
1901
1902         ptr = keyptr(set->page, set->page->cnt);
1903         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1904
1905         //      obtain lock on right page
1906
1907         page_no = bt_getid(set->page->right);
1908
1909         if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1910                 right->page = bt_mappage (mgr, right->latch);
1911         else
1912                 return 0;
1913
1914         bt_lockpage (BtLockWrite, right->latch, thread_no);
1915
1916         // cache copy of key to update
1917
1918         ptr = keyptr(right->page, right->page->cnt);
1919         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1920
1921         if( right->page->kill )
1922                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1923
1924         // pull contents of right peer into our empty page
1925
1926         memcpy (set->page, right->page, mgr->page_size);
1927         set->latch->dirty = 1;
1928
1929         // mark right page deleted and point it to left page
1930         //      until we can post parent updates that remove access
1931         //      to the deleted page.
1932
1933         bt_putid (right->page->right, set->latch->page_no);
1934         right->latch->dirty = 1;
1935         right->page->kill = 1;
1936
1937         bt_lockpage (BtLockParent, right->latch, thread_no);
1938         bt_unlockpage (BtLockWrite, right->latch);
1939
1940         bt_lockpage (BtLockParent, set->latch, thread_no);
1941         bt_unlockpage (BtLockWrite, set->latch);
1942
1943         // redirect higher key directly to our new node contents
1944
1945         bt_putid (value, set->latch->page_no);
1946         ptr = (BtKey*)higherfence;
1947
1948         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) )
1949           return mgr->err;
1950
1951         //      delete old lower key to our node
1952
1953         ptr = (BtKey*)lowerfence;
1954
1955         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1956           return mgr->err;
1957
1958         //      obtain delete and write locks to right node
1959
1960         bt_unlockpage (BtLockParent, right->latch);
1961         bt_lockpage (BtLockDelete, right->latch, thread_no);
1962         bt_lockpage (BtLockWrite, right->latch, thread_no);
1963         bt_freepage (mgr, right);
1964
1965         bt_unlockpage (BtLockParent, set->latch);
1966         bt_unpinlatch (mgr, set->latch);
1967         return 0;
1968 }
1969
1970 //  find and delete key on page by marking delete flag bit
1971 //  if page becomes empty, delete it from the btree
1972
1973 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
1974 {
1975 uint slot, idx, found, fence;
1976 BtPageSet set[1];
1977 BtKey *ptr;
1978 BtVal *val;
1979
1980         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) )
1981                 ptr = keyptr(set->page, slot);
1982         else
1983                 return mgr->err;
1984
1985         // if librarian slot, advance to real slot
1986
1987         if( slotptr(set->page, slot)->type == Librarian )
1988                 ptr = keyptr(set->page, ++slot);
1989
1990         fence = slot == set->page->cnt;
1991
1992         // if key is found delete it, otherwise ignore request
1993
1994         if( found = !keycmp (ptr, key, len) )
1995           if( found = slotptr(set->page, slot)->dead == 0 ) {
1996                 val = valptr(set->page,slot);
1997                 slotptr(set->page, slot)->dead = 1;
1998                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1999                 set->page->act--;
2000
2001                 // collapse empty slots beneath the fence
2002
2003                 while( idx = set->page->cnt - 1 )
2004                   if( slotptr(set->page, idx)->dead ) {
2005                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2006                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2007                   } else
2008                         break;
2009           }
2010
2011         //      did we delete a fence key in an upper level?
2012
2013         if( found && lvl && set->page->act && fence )
2014           if( bt_fixfence (mgr, set, lvl, thread_no) )
2015                 return mgr->err;
2016           else
2017                 return 0;
2018
2019         //      do we need to collapse root?
2020
2021         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
2022           if( bt_collapseroot (mgr, set, thread_no) )
2023                 return mgr->err;
2024           else
2025                 return 0;
2026
2027         //      delete empty page
2028
2029         if( !set->page->act )
2030                 return bt_deletepage (mgr, set, thread_no);
2031
2032         set->latch->dirty = 1;
2033         bt_unlockpage(BtLockWrite, set->latch);
2034         bt_unpinlatch (mgr, set->latch);
2035         return 0;
2036 }
2037
2038 //      advance to next slot
2039
2040 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2041 {
2042 BtLatchSet *prevlatch;
2043 uid page_no;
2044
2045         if( slot < set->page->cnt )
2046                 return slot + 1;
2047
2048         prevlatch = set->latch;
2049
2050         if( page_no = bt_getid(set->page->right) )
2051           if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2052                 set->page = bt_mappage (bt->mgr, set->latch);
2053           else
2054                 return 0;
2055         else
2056           return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2057
2058         // obtain access lock using lock chaining with Access mode
2059
2060         bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2061
2062         bt_unlockpage(BtLockRead, prevlatch);
2063         bt_unpinlatch (bt->mgr, prevlatch);
2064
2065         bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2066         bt_unlockpage(BtLockAccess, set->latch);
2067         return 1;
2068 }
2069
2070 //      find unique key == given key, or first duplicate key in
2071 //      leaf level and return number of value bytes
2072 //      or (-1) if not found.
2073
2074 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2075 {
2076 BtPageSet set[1];
2077 uint len, slot;
2078 int ret = -1;
2079 BtKey *ptr;
2080 BtVal *val;
2081
2082   if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2083    do {
2084         ptr = keyptr(set->page, slot);
2085
2086         //      skip librarian slot place holder
2087
2088         if( slotptr(set->page, slot)->type == Librarian )
2089                 ptr = keyptr(set->page, ++slot);
2090
2091         //      return actual key found
2092
2093         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2094         len = ptr->len;
2095
2096         if( slotptr(set->page, slot)->type == Duplicate )
2097                 len -= BtId;
2098
2099         //      not there if we reach the stopper key
2100
2101         if( slot == set->page->cnt )
2102           if( !bt_getid (set->page->right) )
2103                 break;
2104
2105         // if key exists, return >= 0 value bytes copied
2106         //      otherwise return (-1)
2107
2108         if( slotptr(set->page, slot)->dead )
2109                 continue;
2110
2111         if( keylen == len )
2112           if( !memcmp (ptr->key, key, len) ) {
2113                 val = valptr (set->page,slot);
2114                 if( valmax > val->len )
2115                         valmax = val->len;
2116                 memcpy (value, val->value, valmax);
2117                 ret = valmax;
2118           }
2119
2120         break;
2121
2122    } while( slot = bt_findnext (bt, set, slot) );
2123
2124   bt_unlockpage (BtLockRead, set->latch);
2125   bt_unpinlatch (bt->mgr, set->latch);
2126   return ret;
2127 }
2128
2129 //      check page for space available,
2130 //      clean if necessary and return
2131 //      0 - page needs splitting
2132 //      >0  new slot value
2133
2134 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2135 {
2136 BtPage page = set->page, frame;
2137 uint nxt = mgr->page_size;
2138 uint cnt = 0, idx = 0;
2139 uint max = page->cnt;
2140 uint newslot = max;
2141 BtKey *key;
2142 BtVal *val;
2143
2144         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2145                 return slot;
2146
2147         //      skip cleanup and proceed to split
2148         //      if there's not enough garbage
2149         //      to bother with.
2150
2151         if( page->garbage < nxt / 5 )
2152                 return 0;
2153
2154         frame = malloc (mgr->page_size);
2155         memcpy (frame, page, mgr->page_size);
2156
2157         // skip page info and set rest of page to zero
2158
2159         memset (page+1, 0, mgr->page_size - sizeof(*page));
2160         set->latch->dirty = 1;
2161
2162         page->garbage = 0;
2163         page->act = 0;
2164
2165         // clean up page first by
2166         // removing deleted keys
2167
2168         while( cnt++ < max ) {
2169                 if( cnt == slot )
2170                         newslot = idx + 2;
2171
2172                 if( cnt < max || frame->lvl )
2173                   if( slotptr(frame,cnt)->dead )
2174                         continue;
2175
2176                 // copy the value across
2177
2178                 val = valptr(frame, cnt);
2179                 nxt -= val->len + sizeof(BtVal);
2180                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2181
2182                 // copy the key across
2183
2184                 key = keyptr(frame, cnt);
2185                 nxt -= key->len + sizeof(BtKey);
2186                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2187
2188                 // make a librarian slot
2189
2190                 slotptr(page, ++idx)->off = nxt;
2191                 slotptr(page, idx)->type = Librarian;
2192                 slotptr(page, idx)->dead = 1;
2193
2194                 // set up the slot
2195
2196                 slotptr(page, ++idx)->off = nxt;
2197                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2198
2199                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2200                         page->act++;
2201         }
2202
2203         page->min = nxt;
2204         page->cnt = idx;
2205         free (frame);
2206
2207         //      see if page has enough space now, or does it need splitting?
2208
2209         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2210                 return newslot;
2211
2212         return 0;
2213 }
2214
2215 // split the root and raise the height of the btree
2216
2217 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2218 {  
2219 unsigned char leftkey[BT_keyarray];
2220 uint nxt = mgr->page_size;
2221 unsigned char value[BtId];
2222 BtPageSet left[1];
2223 uid left_page_no;
2224 BtKey *ptr;
2225 BtVal *val;
2226
2227         //      save left page fence key for new root
2228
2229         ptr = keyptr(root->page, root->page->cnt);
2230         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2231
2232         //  Obtain an empty page to use, and copy the current
2233         //  root contents into it, e.g. lower keys
2234
2235         if( bt_newpage(mgr, left, root->page, page_no) )
2236                 return mgr->err;
2237
2238         left_page_no = left->latch->page_no;
2239         bt_unpinlatch (mgr, left->latch);
2240
2241         // preserve the page info at the bottom
2242         // of higher keys and set rest to zero
2243
2244         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2245
2246         // insert stopper key at top of newroot page
2247         // and increase the root height
2248
2249         nxt -= BtId + sizeof(BtVal);
2250         bt_putid (value, right->page_no);
2251         val = (BtVal *)((unsigned char *)root->page + nxt);
2252         memcpy (val->value, value, BtId);
2253         val->len = BtId;
2254
2255         nxt -= 2 + sizeof(BtKey);
2256         slotptr(root->page, 2)->off = nxt;
2257         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2258         ptr->len = 2;
2259         ptr->key[0] = 0xff;
2260         ptr->key[1] = 0xff;
2261
2262         // insert lower keys page fence key on newroot page as first key
2263
2264         nxt -= BtId + sizeof(BtVal);
2265         bt_putid (value, left_page_no);
2266         val = (BtVal *)((unsigned char *)root->page + nxt);
2267         memcpy (val->value, value, BtId);
2268         val->len = BtId;
2269
2270         ptr = (BtKey *)leftkey;
2271         nxt -= ptr->len + sizeof(BtKey);
2272         slotptr(root->page, 1)->off = nxt;
2273         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2274         
2275         bt_putid(root->page->right, 0);
2276         root->page->min = nxt;          // reset lowest used offset and key count
2277         root->page->cnt = 2;
2278         root->page->act = 2;
2279         root->page->lvl++;
2280
2281         // release and unpin root pages
2282
2283         bt_unlockpage(BtLockWrite, root->latch);
2284         bt_unpinlatch (mgr, root->latch);
2285
2286         bt_unpinlatch (mgr, right);
2287         return 0;
2288 }
2289
2290 //  split already locked full node
2291 //      leave it locked.
2292 //      return pool entry for new right
2293 //      page, unlocked
2294
2295 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2296 {
2297 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2298 BtPage frame = malloc (mgr->page_size);
2299 uint lvl = set->page->lvl;
2300 BtPageSet right[1];
2301 BtKey *key, *ptr;
2302 BtVal *val, *src;
2303 uid right2;
2304 uint prev;
2305
2306         //  split higher half of keys to frame
2307
2308         memset (frame, 0, mgr->page_size);
2309         max = set->page->cnt;
2310         cnt = max / 2;
2311         idx = 0;
2312
2313         while( cnt++ < max ) {
2314                 if( cnt < max || set->page->lvl )
2315                   if( slotptr(set->page, cnt)->dead )
2316                         continue;
2317
2318                 src = valptr(set->page, cnt);
2319                 nxt -= src->len + sizeof(BtVal);
2320                 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2321
2322                 key = keyptr(set->page, cnt);
2323                 nxt -= key->len + sizeof(BtKey);
2324                 ptr = (BtKey*)((unsigned char *)frame + nxt);
2325                 memcpy (ptr, key, key->len + sizeof(BtKey));
2326
2327                 //      add librarian slot
2328
2329                 slotptr(frame, ++idx)->off = nxt;
2330                 slotptr(frame, idx)->type = Librarian;
2331                 slotptr(frame, idx)->dead = 1;
2332
2333                 //  add actual slot
2334
2335                 slotptr(frame, ++idx)->off = nxt;
2336                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2337
2338                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2339                         frame->act++;
2340         }
2341
2342         frame->bits = mgr->page_bits;
2343         frame->min = nxt;
2344         frame->cnt = idx;
2345         frame->lvl = lvl;
2346
2347         // link right node
2348
2349         if( set->latch->page_no > ROOT_page )
2350                 bt_putid (frame->right, bt_getid (set->page->right));
2351
2352         //      get new free page and write higher keys to it.
2353
2354         if( bt_newpage(mgr, right, frame, thread_no) )
2355                 return 0;
2356
2357         // process lower keys
2358
2359         memcpy (frame, set->page, mgr->page_size);
2360         memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2361         set->latch->dirty = 1;
2362
2363         nxt = mgr->page_size;
2364         set->page->garbage = 0;
2365         set->page->act = 0;
2366         max /= 2;
2367         cnt = 0;
2368         idx = 0;
2369
2370         if( slotptr(frame, max)->type == Librarian )
2371                 max--;
2372
2373         //  assemble page of smaller keys
2374
2375         while( cnt++ < max ) {
2376                 if( slotptr(frame, cnt)->dead )
2377                         continue;
2378                 val = valptr(frame, cnt);
2379                 nxt -= val->len + sizeof(BtVal);
2380                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2381
2382                 key = keyptr(frame, cnt);
2383                 nxt -= key->len + sizeof(BtKey);
2384                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2385
2386                 //      add librarian slot
2387
2388                 slotptr(set->page, ++idx)->off = nxt;
2389                 slotptr(set->page, idx)->type = Librarian;
2390                 slotptr(set->page, idx)->dead = 1;
2391
2392                 //      add actual slot
2393
2394                 slotptr(set->page, ++idx)->off = nxt;
2395                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2396                 set->page->act++;
2397         }
2398
2399         bt_putid(set->page->right, right->latch->page_no);
2400         set->page->min = nxt;
2401         set->page->cnt = idx;
2402
2403         return right->latch->entry;
2404 }
2405
2406 //      fix keys for newly split page
2407 //      call with page locked, return
2408 //      unlocked
2409
2410 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2411 {
2412 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2413 unsigned char value[BtId];
2414 uint lvl = set->page->lvl;
2415 BtPage page;
2416 BtKey *ptr;
2417
2418         // if current page is the root page, split it
2419
2420         if( set->latch->page_no == ROOT_page )
2421                 return bt_splitroot (mgr, set, right, thread_no);
2422
2423         ptr = keyptr(set->page, set->page->cnt);
2424         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2425
2426         page = bt_mappage (mgr, right);
2427
2428         ptr = keyptr(page, page->cnt);
2429         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2430
2431         // insert new fences in their parent pages
2432
2433         bt_lockpage (BtLockParent, right, thread_no);
2434
2435         bt_lockpage (BtLockParent, set->latch, thread_no);
2436         bt_unlockpage (BtLockWrite, set->latch);
2437
2438         // insert new fence for reformulated left block of smaller keys
2439
2440         bt_putid (value, set->latch->page_no);
2441         ptr = (BtKey *)leftkey;
2442
2443         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) )
2444                 return mgr->err;
2445
2446         // switch fence for right block of larger keys to new right page
2447
2448         bt_putid (value, right->page_no);
2449         ptr = (BtKey *)rightkey;
2450
2451         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) )
2452                 return mgr->err;
2453
2454         bt_unlockpage (BtLockParent, set->latch);
2455         bt_unpinlatch (mgr, set->latch);
2456
2457         bt_unlockpage (BtLockParent, right);
2458         bt_unpinlatch (mgr, right);
2459         return 0;
2460 }
2461
2462 //      install new key and value onto page
2463 //      page must already be checked for
2464 //      adequate space
2465
2466 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2467 {
2468 uint idx, librarian;
2469 BtSlot *node;
2470 BtKey *ptr;
2471 BtVal *val;
2472
2473         //      if found slot > desired slot and previous slot
2474         //      is a librarian slot, use it
2475
2476         if( slot > 1 )
2477           if( slotptr(set->page, slot-1)->type == Librarian )
2478                 slot--;
2479
2480         // copy value onto page
2481
2482         set->page->min -= vallen + sizeof(BtVal);
2483         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2484         memcpy (val->value, value, vallen);
2485         val->len = vallen;
2486
2487         // copy key onto page
2488
2489         set->page->min -= keylen + sizeof(BtKey);
2490         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2491         memcpy (ptr->key, key, keylen);
2492         ptr->len = keylen;
2493         
2494         //      find first empty slot
2495
2496         for( idx = slot; idx < set->page->cnt; idx++ )
2497           if( slotptr(set->page, idx)->dead )
2498                 break;
2499
2500         // now insert key into array before slot
2501
2502         if( idx == set->page->cnt )
2503                 idx += 2, set->page->cnt += 2, librarian = 2;
2504         else
2505                 librarian = 1;
2506
2507         set->latch->dirty = 1;
2508         set->page->act++;
2509
2510         while( idx > slot + librarian - 1 )
2511                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2512
2513         //      add librarian slot
2514
2515         if( librarian > 1 ) {
2516                 node = slotptr(set->page, slot++);
2517                 node->off = set->page->min;
2518                 node->type = Librarian;
2519                 node->dead = 1;
2520         }
2521
2522         //      fill in new slot
2523
2524         node = slotptr(set->page, slot);
2525         node->off = set->page->min;
2526         node->type = type;
2527         node->dead = 0;
2528
2529         if( release ) {
2530                 bt_unlockpage (BtLockWrite, set->latch);
2531                 bt_unpinlatch (mgr, set->latch);
2532         }
2533
2534         return 0;
2535 }
2536
2537 //  Insert new key into the btree at given level.
2538 //      either add a new key or update/add an existing one
2539
2540 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique, ushort thread_no)
2541 {
2542 unsigned char newkey[BT_keyarray];
2543 uint slot, idx, len, entry;
2544 BtPageSet set[1];
2545 BtKey *ptr, *ins;
2546 uid sequence;
2547 BtVal *val;
2548 uint type;
2549
2550   // set up the key we're working on
2551
2552   ins = (BtKey*)newkey;
2553   memcpy (ins->key, key, keylen);
2554   ins->len = keylen;
2555
2556   // is this a non-unique index value?
2557
2558   if( unique )
2559         type = Unique;
2560   else {
2561         type = Duplicate;
2562         sequence = bt_newdup (mgr);
2563         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2564         ins->len += BtId;
2565   }
2566
2567   while( 1 ) { // find the page and slot for the current key
2568         if( slot = bt_loadpage (mgr, set, ins->key, ins->len, lvl, BtLockWrite, thread_no) )
2569                 ptr = keyptr(set->page, slot);
2570         else {
2571                 if( !mgr->err )
2572                         mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2573                 return mgr->err;
2574         }
2575
2576         // if librarian slot == found slot, advance to real slot
2577
2578         if( slotptr(set->page, slot)->type == Librarian )
2579           if( !keycmp (ptr, key, keylen) )
2580                 ptr = keyptr(set->page, ++slot);
2581
2582         len = ptr->len;
2583
2584         if( slotptr(set->page, slot)->type == Duplicate )
2585                 len -= BtId;
2586
2587         //  if inserting a duplicate key or unique key
2588         //      check for adequate space on the page
2589         //      and insert the new key before slot.
2590
2591         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2592           if( !(slot = bt_cleanpage (mgr, set, ins->len, slot, vallen)) )
2593                 if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2594                   return mgr->err;
2595                 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2596                   return mgr->err;
2597                 else
2598                   continue;
2599
2600           return bt_insertslot (mgr, set, slot, ins->key, ins->len, value, vallen, type, 1);
2601         }
2602
2603         // if key already exists, update value and return
2604
2605         val = valptr(set->page, slot);
2606
2607         if( val->len >= vallen ) {
2608                 if( slotptr(set->page, slot)->dead )
2609                         set->page->act++;
2610                 set->page->garbage += val->len - vallen;
2611                 set->latch->dirty = 1;
2612                 slotptr(set->page, slot)->dead = 0;
2613                 val->len = vallen;
2614                 memcpy (val->value, value, vallen);
2615                 bt_unlockpage(BtLockWrite, set->latch);
2616                 bt_unpinlatch (mgr, set->latch);
2617                 return 0;
2618         }
2619
2620         //      new update value doesn't fit in existing value area
2621
2622         if( !slotptr(set->page, slot)->dead )
2623                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2624         else {
2625                 slotptr(set->page, slot)->dead = 0;
2626                 set->page->act++;
2627         }
2628
2629         if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2630           if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2631                 return mgr->err;
2632           else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2633                 return mgr->err;
2634           else
2635                 continue;
2636
2637         set->page->min -= vallen + sizeof(BtVal);
2638         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2639         memcpy (val->value, value, vallen);
2640         val->len = vallen;
2641
2642         set->latch->dirty = 1;
2643         set->page->min -= keylen + sizeof(BtKey);
2644         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2645         memcpy (ptr->key, key, keylen);
2646         ptr->len = keylen;
2647         
2648         slotptr(set->page, slot)->off = set->page->min;
2649         bt_unlockpage(BtLockWrite, set->latch);
2650         bt_unpinlatch (mgr, set->latch);
2651         return 0;
2652   }
2653   return 0;
2654 }
2655
2656 typedef struct {
2657         logseqno reqlsn;        // redo log seq no required
2658         logseqno lsn;           // redo log sequence number
2659         uint entry;                     // latch table entry number
2660         uint slot:31;           // page slot number
2661         uint reuse:1;           // reused previous page
2662 } AtomicTxn;
2663
2664 typedef struct {
2665         uid page_no;            // page number for split leaf
2666         void *next;                     // next key to insert
2667         uint entry:29;          // latch table entry number
2668         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2669         uint nounlock:1;        // don't unlock ParentModification
2670         unsigned char leafkey[BT_keyarray];
2671 } AtomicKey;
2672
2673 //      determine actual page where key is located
2674 //  return slot number
2675
2676 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2677 {
2678 BtKey *key = keyptr(source,src);
2679 uint slot = locks[src].slot;
2680 uint entry;
2681
2682         if( src > 1 && locks[src].reuse )
2683           entry = locks[src-1].entry, slot = 0;
2684         else
2685           entry = locks[src].entry;
2686
2687         if( slot ) {
2688                 set->latch = mgr->latchsets + entry;
2689                 set->page = bt_mappage (mgr, set->latch);
2690                 return slot;
2691         }
2692
2693         //      is locks->reuse set? or was slot zeroed?
2694         //      if so, find where our key is located 
2695         //      on current page or pages split on
2696         //      same page txn operations.
2697
2698         do {
2699                 set->latch = mgr->latchsets + entry;
2700                 set->page = bt_mappage (mgr, set->latch);
2701
2702                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2703                   if( slotptr(set->page, slot)->type == Librarian )
2704                         slot++;
2705                   if( locks[src].reuse )
2706                         locks[src].entry = entry;
2707                   return slot;
2708                 }
2709         } while( entry = set->latch->split );
2710
2711         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2712         return 0;
2713 }
2714
2715 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
2716 {
2717 BtKey *key = keyptr(source, src);
2718 BtVal *val = valptr(source, src);
2719 BtLatchSet *latch;
2720 BtPageSet set[1];
2721 uint entry, slot;
2722
2723   while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2724         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2725           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2726                 return mgr->err;
2727           set->page->lsn = locks[src].lsn;
2728           return 0;
2729         }
2730
2731         if( entry = bt_splitpage (mgr, set, thread_no) )
2732           latch = mgr->latchsets + entry;
2733         else
2734           return mgr->err;
2735
2736         //      splice right page into split chain
2737         //      and WriteLock it.
2738
2739         bt_lockpage(BtLockWrite, latch, thread_no);
2740         latch->split = set->latch->split;
2741         set->latch->split = entry;
2742         locks[src].slot = 0;
2743   }
2744
2745   return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2746 }
2747
2748 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
2749 {
2750 BtKey *key = keyptr(source, src);
2751 BtPageSet set[1];
2752 uint idx, slot;
2753 BtKey *ptr;
2754 BtVal *val;
2755
2756         if( slot = bt_atomicpage (mgr, source, locks, src, set) )
2757           ptr = keyptr(set->page, slot);
2758         else
2759           return mgr->line = __LINE__, mgr->err = BTERR_struct;
2760
2761         if( !keycmp (ptr, key->key, key->len) )
2762           if( !slotptr(set->page, slot)->dead )
2763                 slotptr(set->page, slot)->dead = 1;
2764           else
2765                 return 0;
2766         else
2767                 return 0;
2768
2769         val = valptr(set->page, slot);
2770         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2771         set->latch->dirty = 1;
2772         set->page->lsn = locks[src].lsn;
2773         set->page->act--;
2774         mgr->found++;
2775         return 0;
2776 }
2777
2778 //      delete an empty master page for a transaction
2779
2780 //      note that the far right page never empties because
2781 //      it always contains (at least) the infinite stopper key
2782 //      and that all pages that don't contain any keys are
2783 //      deleted, or are being held under Atomic lock.
2784
2785 BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no)
2786 {
2787 BtPageSet right[1], temp[1];
2788 unsigned char value[BtId];
2789 uid right_page_no;
2790 BtKey *ptr;
2791
2792         bt_lockpage(BtLockWrite, prev->latch, thread_no);
2793
2794         //      grab the right sibling
2795
2796         if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) )
2797                 right->page = bt_mappage (mgr, right->latch);
2798         else
2799                 return mgr->err;
2800
2801         bt_lockpage(BtLockAtomic, right->latch, thread_no);
2802         bt_lockpage(BtLockWrite, right->latch, thread_no);
2803
2804         //      and pull contents over empty page
2805         //      while preserving master's left link
2806
2807         memcpy (right->page->left, prev->page->left, BtId);
2808         memcpy (prev->page, right->page, mgr->page_size);
2809
2810         //      forward seekers to old right sibling
2811         //      to new page location in set
2812
2813         bt_putid (right->page->right, prev->latch->page_no);
2814         right->latch->dirty = 1;
2815         right->page->kill = 1;
2816
2817         //      remove pointer to right page for searchers by
2818         //      changing right fence key to point to the master page
2819
2820         ptr = keyptr(right->page,right->page->cnt);
2821         bt_putid (value, prev->latch->page_no);
2822
2823         if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, 1, thread_no) )
2824                 return mgr->err;
2825
2826         //  now that master page is in good shape we can
2827         //      remove its locks.
2828
2829         bt_unlockpage (BtLockAtomic, prev->latch);
2830         bt_unlockpage (BtLockWrite, prev->latch);
2831
2832         //  fix master's right sibling's left pointer
2833         //      to remove scanner's poiner to the right page
2834
2835         if( right_page_no = bt_getid (prev->page->right) ) {
2836           if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
2837                 temp->page = bt_mappage (mgr, temp->latch);
2838
2839           bt_lockpage (BtLockWrite, temp->latch, thread_no);
2840           bt_putid (temp->page->left, prev->latch->page_no);
2841           temp->latch->dirty = 1;
2842
2843           bt_unlockpage (BtLockWrite, temp->latch);
2844           bt_unpinlatch (mgr, temp->latch);
2845         } else {        // master is now the far right page
2846           bt_mutexlock (mgr->lock);
2847           bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
2848           bt_releasemutex(mgr->lock);
2849         }
2850
2851         //      now that there are no pointers to the right page
2852         //      we can delete it after the last read access occurs
2853
2854         bt_unlockpage (BtLockWrite, right->latch);
2855         bt_unlockpage (BtLockAtomic, right->latch);
2856         bt_lockpage (BtLockDelete, right->latch, thread_no);
2857         bt_lockpage (BtLockWrite, right->latch, thread_no);
2858         bt_freepage (mgr, right);
2859         return 0;
2860 }
2861
2862 //      find and add the next available latch entry
2863 //      to the queue
2864
2865 BTERR bt_txnavaillatch (BtDb *bt)
2866 {
2867 BtLatchSet *latch;
2868 uint startattempt;
2869 uint cnt, entry;
2870 uint hashidx;
2871 BtPage page;
2872
2873   //  find and reuse previous entry on victim
2874
2875   startattempt = bt->mgr->latchvictim;
2876
2877   while( 1 ) {
2878 #ifdef unix
2879         entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
2880 #else
2881         entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
2882 #endif
2883         //      skip entry if it has outstanding pins
2884
2885         entry %= bt->mgr->latchtotal;
2886
2887         if( !entry )
2888                 continue;
2889
2890         //      only go around one time before
2891         //      flushing redo recovery buffer,
2892         //      and the buffer pool to free up entries.
2893
2894         if( bt->mgr->redopages )
2895           if( bt->mgr->latchvictim - startattempt > bt->mgr->latchtotal ) {
2896             if( bt_mutextry (bt->mgr->dump) ) {
2897                  if( bt_dumpredo (bt->mgr) )
2898                   return bt->mgr->err;
2899                  bt_flushlsn (bt->mgr, bt->thread_no);
2900                 // synchronize the various threads running into this condition
2901                 //      so that only one thread does the dump and flush
2902                 } else
2903                         bt_mutexlock(bt->mgr->dump);
2904
2905                 startattempt = bt->mgr->latchvictim;
2906                 bt_releasemutex(bt->mgr->dump);
2907           }
2908
2909         latch = bt->mgr->latchsets + entry;
2910
2911         if( latch->avail )
2912           continue;
2913
2914         bt_mutexlock(latch->modify);
2915
2916         //      skip if already an available entry
2917
2918         if( latch->avail ) {
2919           bt_releasemutex(latch->modify);
2920           continue;
2921         }
2922
2923         //  skip this entry if it is pinned
2924         //      if the CLOCK bit is set
2925         //      reset it to zero.
2926
2927         if( latch->pin ) {
2928           latch->pin &= ~CLOCK_bit;
2929           bt_releasemutex(latch->modify);
2930           continue;
2931         }
2932
2933         page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
2934
2935         //      if dirty page has lsn >= last redo recovery buffer
2936         //      then hold page in the buffer pool until next redo
2937         //      recovery buffer is being written to disk.
2938
2939         if( latch->dirty )
2940           if( page->lsn >= bt->mgr->flushlsn ) {
2941             bt_releasemutex(latch->modify);
2942                 continue;
2943           }
2944
2945         //      entry is now available
2946 #ifdef unix
2947         __sync_fetch_and_add (&bt->mgr->available, 1);
2948 #else
2949         _InterlockedIncrement(&bt->mgr->available);
2950 #endif
2951         latch->avail = 1;
2952         bt_releasemutex(latch->modify);
2953         return 0;
2954   }
2955 }
2956
2957 //      release available latch requests
2958
2959 void bt_txnavailrelease (BtDb *bt, uint count)
2960 {
2961 #ifdef unix
2962         __sync_fetch_and_add(bt->mgr->availlock, -count);
2963 #else
2964         _InterlockedAdd(bt->mgr->availlock, -count);
2965 #endif
2966 }
2967
2968 //      promote page of keys from first btree
2969 //  into main btree
2970
2971 BTERR bt_txnavailmain (BtDb *bt)
2972 {
2973 BtLatchSet *latch;
2974 uint entry;
2975
2976   while( 1 ) {
2977 #ifdef unix
2978         entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
2979 #else
2980         entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
2981 #endif
2982         //      skip entry if it has outstanding pins
2983
2984         entry %= bt->mgr->latchtotal;
2985
2986         if( !entry )
2987                 continue;
2988
2989         latch = bt->mgr->latchsets + entry;
2990
2991         if( latch->avail )
2992           continue;
2993
2994         bt_mutexlock(latch->modify);
2995
2996         //      skip if already an available entry
2997
2998         if( latch->avail ) {
2999           bt_releasemutex(latch->modify);
3000           continue;
3001         }
3002
3003         //  skip this entry if it is pinned
3004         //      if the CLOCK bit is set
3005         //      reset it to zero.
3006
3007         if( latch->pin ) {
3008           latch->pin &= ~CLOCK_bit;
3009           bt_releasemutex(latch->modify);
3010           continue;
3011         }
3012
3013   }
3014 }
3015
3016 //      commit available pool entries
3017 //      find available entries as required
3018
3019 BTERR bt_txnavailrequest (BtDb *bt, uint count)
3020 {
3021 #ifdef unix
3022         __sync_fetch_and_add(bt->mgr->availlock, count);
3023 #else
3024         _InterlockedAdd(bt->mgr->availlock, count);
3025 #endif
3026
3027         //      find another available pool entry
3028
3029         while( *bt->mgr->availlock > bt->mgr->available )
3030           if( bt->mgr->redopages )
3031                 bt_txnavaillatch (bt);
3032           else
3033                 if( bt_txnavailmain (bt) )
3034                         return bt->mgr->err;
3035 }
3036
3037 //      atomic modification of a batch of keys.
3038
3039 //      return -1 if BTERR is set
3040 //      otherwise return slot number
3041 //      causing the key constraint violation
3042 //      or zero on successful completion.
3043
3044 int bt_atomictxn (BtDb *bt, BtPage source)
3045 {
3046 uint src, idx, slot, samepage, entry, avail, que = 0;
3047 AtomicKey *head, *tail, *leaf;
3048 BtPageSet set[1], prev[1];
3049 unsigned char value[BtId];
3050 BtKey *key, *ptr, *key2;
3051 BtLatchSet *latch;
3052 AtomicTxn *locks;
3053 int result = 0;
3054 BtSlot temp[1];
3055 logseqno lsn;
3056 BtPage page;
3057 BtVal *val;
3058 uid right;
3059 int type;
3060
3061   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
3062   head = NULL;
3063   tail = NULL;
3064
3065   // stable sort the list of keys into order to
3066   //    prevent deadlocks between threads.
3067
3068   for( src = 1; src++ < source->cnt; ) {
3069         *temp = *slotptr(source,src);
3070         key = keyptr (source,src);
3071
3072         for( idx = src; --idx; ) {
3073           key2 = keyptr (source,idx);
3074           if( keycmp (key, key2->key, key2->len) < 0 ) {
3075                 *slotptr(source,idx+1) = *slotptr(source,idx);
3076                 *slotptr(source,idx) = *temp;
3077           } else
3078                 break;
3079         }
3080   }
3081
3082   // reserve enough buffer pool entries
3083
3084   avail = source->cnt * 3 + bt->mgr->pagezero->alloc->lvl + 1;
3085
3086   if( bt_txnavailrequest (bt, avail) )
3087         return bt->mgr->err;
3088
3089   // Load the leaf page for each key
3090   // group same page references with reuse bit
3091   // and determine any constraint violations
3092
3093   for( src = 0; src++ < source->cnt; ) {
3094         key = keyptr(source, src);
3095         slot = 0;
3096
3097         // first determine if this modification falls
3098         // on the same page as the previous modification
3099         //      note that the far right leaf page is a special case
3100
3101         if( samepage = src > 1 )
3102           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
3103                 slot = bt_findslot(set->page, key->key, key->len);
3104           else
3105                 bt_unlockpage(BtLockRead, set->latch); 
3106
3107         if( !slot )
3108           if( slot = bt_loadpage(bt->mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, bt->thread_no) )
3109                 set->latch->split = 0;
3110           else
3111                 goto atomicerr;
3112
3113         if( slotptr(set->page, slot)->type == Librarian )
3114           ptr = keyptr(set->page, ++slot);
3115         else
3116           ptr = keyptr(set->page, slot);
3117
3118         if( !samepage ) {
3119           locks[src].entry = set->latch->entry;
3120           locks[src].slot = slot;
3121           locks[src].reuse = 0;
3122         } else {
3123           locks[src].entry = 0;
3124           locks[src].slot = 0;
3125           locks[src].reuse = 1;
3126         }
3127
3128         //      capture current lsn for master page
3129
3130         locks[src].reqlsn = set->page->lsn;
3131
3132         //      perform constraint checks
3133
3134         switch( slotptr(source, src)->type ) {
3135         case Duplicate:
3136         case Unique:
3137           if( !slotptr(set->page, slot)->dead )
3138            if( slot < set->page->cnt || bt_getid (set->page->right) )
3139             if( !keycmp (ptr, key->key, key->len) ) {
3140
3141                   // return constraint violation if key already exists
3142
3143                   bt_unlockpage(BtLockRead, set->latch);
3144                   result = src;
3145
3146                   while( src ) {
3147                         if( locks[src].entry ) {
3148                           set->latch = bt->mgr->latchsets + locks[src].entry;
3149                           bt_unlockpage(BtLockAtomic, set->latch);
3150                           bt_unpinlatch (bt->mgr, set->latch);
3151                         }
3152                         src--;
3153                   }
3154                   free (locks);
3155                   return result;
3156                 }
3157           break;
3158         }
3159   }
3160
3161   //  unlock last loadpage lock
3162
3163   if( source->cnt )
3164         bt_unlockpage(BtLockRead, set->latch);
3165
3166   //  and add entries to redo log
3167
3168   if( bt->mgr->redopages )
3169    if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
3170     for( src = 0; src++ < source->cnt; )
3171          locks[src].lsn = lsn;
3172     else
3173          goto atomicerr;
3174
3175   //  obtain write lock for each master page
3176
3177   for( src = 0; src++ < source->cnt; ) {
3178         if( locks[src].reuse )
3179           continue;
3180         else
3181           bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry, bt->thread_no);
3182   }
3183
3184   // insert or delete each key
3185   // process any splits or merges
3186   // release Write & Atomic latches
3187   // set ParentModifications and build
3188   // queue of keys to insert for split pages
3189   // or delete for deleted pages.
3190
3191   // run through txn list backwards
3192
3193   samepage = source->cnt + 1;
3194
3195   for( src = source->cnt; src; src-- ) {
3196         if( locks[src].reuse )
3197           continue;
3198
3199         //  perform the txn operations
3200         //      from smaller to larger on
3201         //  the same page
3202
3203         for( idx = src; idx < samepage; idx++ )
3204          switch( slotptr(source,idx)->type ) {
3205          case Delete:
3206           if( bt_atomicdelete (bt->mgr, source, locks, idx, bt->thread_no) )
3207                 goto atomicerr;
3208           break;
3209
3210         case Duplicate:
3211         case Unique:
3212           if( bt_atomicinsert (bt->mgr, source, locks, idx, bt->thread_no) )
3213                 goto atomicerr;
3214           break;
3215         }
3216
3217         //      after the same page operations have finished,
3218         //  process master page for splits or deletion.
3219
3220         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
3221         prev->page = bt_mappage (bt->mgr, prev->latch);
3222         samepage = src;
3223
3224         //  pick-up all splits from master page
3225         //      each one is already WriteLocked.
3226
3227         entry = prev->latch->split;
3228
3229         while( entry ) {
3230           set->latch = bt->mgr->latchsets + entry;
3231           set->page = bt_mappage (bt->mgr, set->latch);
3232           entry = set->latch->split;
3233
3234           // delete empty master page by undoing its split
3235           //  (this is potentially another empty page)
3236           //  note that there are no new left pointers yet
3237
3238           if( !prev->page->act ) {
3239                 memcpy (set->page->left, prev->page->left, BtId);
3240                 memcpy (prev->page, set->page, bt->mgr->page_size);
3241                 bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
3242                 bt_freepage (bt->mgr, set);
3243
3244                 prev->latch->split = set->latch->split;
3245                 prev->latch->dirty = 1;
3246                 continue;
3247           }
3248
3249           // remove empty page from the split chain
3250           // and return it to the free list.
3251
3252           if( !set->page->act ) {
3253                 memcpy (prev->page->right, set->page->right, BtId);
3254                 prev->latch->split = set->latch->split;
3255                 bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
3256                 bt_freepage (bt->mgr, set);
3257                 continue;
3258           }
3259
3260           //  schedule prev fence key update
3261
3262           ptr = keyptr(prev->page,prev->page->cnt);
3263           leaf = calloc (sizeof(AtomicKey), 1), que++;
3264
3265           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3266           leaf->page_no = prev->latch->page_no;
3267           leaf->entry = prev->latch->entry;
3268           leaf->type = 0;
3269
3270           if( tail )
3271                 tail->next = leaf;
3272           else
3273                 head = leaf;
3274
3275           tail = leaf;
3276
3277           // splice in the left link into the split page
3278
3279           bt_putid (set->page->left, prev->latch->page_no);
3280           bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
3281           bt_unlockpage(BtLockWrite, prev->latch);
3282           *prev = *set;
3283         }
3284
3285         //  update left pointer in next right page from last split page
3286         //      (if all splits were reversed, latch->split == 0)
3287
3288         if( latch->split ) {
3289           //  fix left pointer in master's original (now split)
3290           //  far right sibling or set rightmost page in page zero
3291
3292           if( right = bt_getid (prev->page->right) ) {
3293                 if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3294                   set->page = bt_mappage (bt->mgr, set->latch);
3295                 else
3296                   goto atomicerr;
3297
3298             bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3299             bt_putid (set->page->left, prev->latch->page_no);
3300                 set->latch->dirty = 1;
3301             bt_unlockpage (BtLockWrite, set->latch);
3302                 bt_unpinlatch (bt->mgr, set->latch);
3303           } else {      // prev is rightmost page
3304             bt_mutexlock (bt->mgr->lock);
3305                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
3306             bt_releasemutex(bt->mgr->lock);
3307           }
3308
3309           //  Process last page split in chain
3310
3311           ptr = keyptr(prev->page,prev->page->cnt);
3312           leaf = calloc (sizeof(AtomicKey), 1), que++;
3313
3314           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3315           leaf->page_no = prev->latch->page_no;
3316           leaf->entry = prev->latch->entry;
3317           leaf->type = 0;
3318   
3319           if( tail )
3320                 tail->next = leaf;
3321           else
3322                 head = leaf;
3323
3324           tail = leaf;
3325
3326           bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
3327           bt_unlockpage(BtLockWrite, prev->latch);
3328
3329           //  remove atomic lock on master page
3330
3331           bt_unlockpage(BtLockAtomic, latch);
3332           continue;
3333         }
3334
3335         //  finished if prev page occupied (either master or final split)
3336
3337         if( prev->page->act ) {
3338           bt_unlockpage(BtLockWrite, latch);
3339           bt_unlockpage(BtLockAtomic, latch);
3340           bt_unpinlatch(bt->mgr, latch);
3341           continue;
3342         }
3343
3344         // any and all splits were reversed, and the
3345         // master page located in prev is empty, delete it
3346         // by pulling over master's right sibling.
3347
3348         // Remove empty master's fence key
3349
3350         ptr = keyptr(prev->page,prev->page->cnt);
3351
3352         if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3353                 goto atomicerr;
3354
3355         //      perform the remainder of the delete
3356         //      from the FIFO queue
3357
3358         leaf = calloc (sizeof(AtomicKey), 1), que++;
3359
3360         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3361         leaf->page_no = prev->latch->page_no;
3362         leaf->entry = prev->latch->entry;
3363         leaf->nounlock = 1;
3364         leaf->type = 2;
3365   
3366         if( tail )
3367           tail->next = leaf;
3368         else
3369           head = leaf;
3370
3371         tail = leaf;
3372
3373         //      leave atomic lock in place until
3374         //      deletion completes in next phase.
3375
3376         bt_unlockpage(BtLockWrite, prev->latch);
3377   }
3378
3379   bt_txnavailrelease (bt, avail);
3380
3381   que *= bt->mgr->pagezero->alloc->lvl;
3382
3383   if( bt_txnavailrequest (bt, que) )
3384         return bt->mgr->err;
3385
3386   //  add & delete keys for any pages split or merged during transaction
3387
3388   if( leaf = head )
3389     do {
3390           set->latch = bt->mgr->latchsets + leaf->entry;
3391           set->page = bt_mappage (bt->mgr, set->latch);
3392
3393           bt_putid (value, leaf->page_no);
3394           ptr = (BtKey *)leaf->leafkey;
3395
3396           switch( leaf->type ) {
3397           case 0:       // insert key
3398             if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, 1, bt->thread_no) )
3399                   goto atomicerr;
3400
3401                 break;
3402
3403           case 1:       // delete key
3404                 if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3405                   goto atomicerr;
3406
3407                 break;
3408
3409           case 2:       // free page
3410                 if( bt_atomicfree (bt->mgr, set, bt->thread_no) )
3411                   goto atomicerr;
3412
3413                 break;
3414           }
3415
3416           if( !leaf->nounlock )
3417             bt_unlockpage (BtLockParent, set->latch);
3418
3419           bt_unpinlatch (bt->mgr, set->latch);
3420           tail = leaf->next;
3421           free (leaf);
3422         } while( leaf = tail );
3423
3424   bt_txnavailrelease (bt, que);
3425
3426   // return success
3427
3428   free (locks);
3429   return 0;
3430 atomicerr:
3431   return -1;
3432 }
3433
3434 //      set cursor to highest slot on highest page
3435
3436 uint bt_lastkey (BtDb *bt)
3437 {
3438 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3439 BtPageSet set[1];
3440
3441         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3442                 set->page = bt_mappage (bt->mgr, set->latch);
3443         else
3444                 return 0;
3445
3446     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3447         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3448     bt_unlockpage(BtLockRead, set->latch);
3449         bt_unpinlatch (bt->mgr, set->latch);
3450         return bt->cursor->cnt;
3451 }
3452
3453 //      return previous slot on cursor page
3454
3455 uint bt_prevkey (BtDb *bt, uint slot)
3456 {
3457 uid cursor_page = bt->cursor->page_no;
3458 uid ourright, next, us = cursor_page;
3459 BtPageSet set[1];
3460
3461         if( --slot )
3462                 return slot;
3463
3464         ourright = bt_getid(bt->cursor->right);
3465
3466 goleft:
3467         if( !(next = bt_getid(bt->cursor->left)) )
3468                 return 0;
3469
3470 findourself:
3471         cursor_page = next;
3472
3473         if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3474                 set->page = bt_mappage (bt->mgr, set->latch);
3475         else
3476                 return 0;
3477
3478     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3479         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3480         bt_unlockpage(BtLockRead, set->latch);
3481         bt_unpinlatch (bt->mgr, set->latch);
3482         
3483         next = bt_getid (bt->cursor->right);
3484
3485         if( bt->cursor->kill )
3486                 goto findourself;
3487
3488         if( next != us )
3489           if( next == ourright )
3490                 goto goleft;
3491           else
3492                 goto findourself;
3493
3494         return bt->cursor->cnt;
3495 }
3496
3497 //  return next slot on cursor page
3498 //  or slide cursor right into next page
3499
3500 uint bt_nextkey (BtDb *bt, uint slot)
3501 {
3502 BtPageSet set[1];
3503 uid right;
3504
3505   do {
3506         right = bt_getid(bt->cursor->right);
3507
3508         while( slot++ < bt->cursor->cnt )
3509           if( slotptr(bt->cursor,slot)->dead )
3510                 continue;
3511           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3512                 return slot;
3513           else
3514                 break;
3515
3516         if( !right )
3517                 break;
3518
3519         if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3520                 set->page = bt_mappage (bt->mgr, set->latch);
3521         else
3522                 return 0;
3523
3524     bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3525         memcpy (bt->cursor, set->page, bt->mgr->page_size);
3526         bt_unlockpage(BtLockRead, set->latch);
3527         bt_unpinlatch (bt->mgr, set->latch);
3528         slot = 0;
3529
3530   } while( 1 );
3531
3532   return bt->mgr->err = 0;
3533 }
3534
3535 //  cache page of keys into cursor and return starting slot for given key
3536
3537 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3538 {
3539 BtPageSet set[1];
3540 uint slot;
3541
3542         // cache page for retrieval
3543
3544         if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3545           memcpy (bt->cursor, set->page, bt->mgr->page_size);
3546         else
3547           return 0;
3548
3549         bt_unlockpage(BtLockRead, set->latch);
3550         bt_unpinlatch (bt->mgr, set->latch);
3551         return slot;
3552 }
3553
3554 #ifdef STANDALONE
3555
3556 #ifndef unix
3557 double getCpuTime(int type)
3558 {
3559 FILETIME crtime[1];
3560 FILETIME xittime[1];
3561 FILETIME systime[1];
3562 FILETIME usrtime[1];
3563 SYSTEMTIME timeconv[1];
3564 double ans = 0;
3565
3566         memset (timeconv, 0, sizeof(SYSTEMTIME));
3567
3568         switch( type ) {
3569         case 0:
3570                 GetSystemTimeAsFileTime (xittime);
3571                 FileTimeToSystemTime (xittime, timeconv);
3572                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3573                 break;
3574         case 1:
3575                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3576                 FileTimeToSystemTime (usrtime, timeconv);
3577                 break;
3578         case 2:
3579                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3580                 FileTimeToSystemTime (systime, timeconv);
3581                 break;
3582         }
3583
3584         ans += (double)timeconv->wHour * 3600;
3585         ans += (double)timeconv->wMinute * 60;
3586         ans += (double)timeconv->wSecond;
3587         ans += (double)timeconv->wMilliseconds / 1000;
3588         return ans;
3589 }
3590 #else
3591 #include <time.h>
3592 #include <sys/resource.h>
3593
3594 double getCpuTime(int type)
3595 {
3596 struct rusage used[1];
3597 struct timeval tv[1];
3598
3599         switch( type ) {
3600         case 0:
3601                 gettimeofday(tv, NULL);
3602                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3603
3604         case 1:
3605                 getrusage(RUSAGE_SELF, used);
3606                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3607
3608         case 2:
3609                 getrusage(RUSAGE_SELF, used);
3610                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3611         }
3612
3613         return 0;
3614 }
3615 #endif
3616
3617 void bt_poolaudit (BtMgr *mgr)
3618 {
3619 BtLatchSet *latch;
3620 uint entry = 0;
3621
3622         while( ++entry < mgr->latchtotal ) {
3623                 latch = mgr->latchsets + entry;
3624
3625                 if( *latch->readwr->rin & MASK )
3626                         fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3627
3628                 if( *latch->access->rin & MASK )
3629                         fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3630
3631                 if( *latch->parent->exclusive )
3632                         fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3633
3634                 if( *latch->atomic->exclusive )
3635                         fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3636
3637                 if( *latch->modify->exclusive )
3638                         fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3639
3640                 if( latch->pin & ~CLOCK_bit )
3641                         fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3642         }
3643 }
3644
3645 typedef struct {
3646         char idx;
3647         char *type;
3648         char *infile;
3649         BtMgr *main;
3650         BtMgr *mgr;
3651         int num;
3652 } ThreadArg;
3653
3654 //  standalone program to index file of keys
3655 //  then list them onto std-out
3656
3657 #ifdef unix
3658 void *index_file (void *arg)
3659 #else
3660 uint __stdcall index_file (void *arg)
3661 #endif
3662 {
3663 int line = 0, found = 0, cnt = 0, idx;
3664 uid next, page_no = LEAF_page;  // start on first page of leaves
3665 int ch, len = 0, slot, type = 0;
3666 unsigned char key[BT_maxkey];
3667 unsigned char txn[65536];
3668 ThreadArg *args = arg;
3669 BtPage page, frame;
3670 BtPageSet set[1];
3671 uint nxt = 65536;
3672 BtKey *ptr;
3673 BtVal *val;
3674 BtDb *bt;
3675 FILE *in;
3676
3677         bt = bt_open (args->mgr, args->main);
3678         page = (BtPage)txn;
3679
3680         if( args->idx < strlen (args->type) )
3681                 ch = args->type[args->idx];
3682         else
3683                 ch = args->type[strlen(args->type) - 1];
3684
3685         switch(ch | 0x20)
3686         {
3687         case 'd':
3688                 type = Delete;
3689
3690         case 'p':
3691                 if( !type )
3692                         type = Unique;
3693
3694                 if( args->num )
3695                  if( type == Delete )
3696                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3697                  else
3698                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3699                 else
3700                  if( type == Delete )
3701                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3702                  else
3703                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3704
3705                 if( in = fopen (args->infile, "rb") )
3706                   while( ch = getc(in), ch != EOF )
3707                         if( ch == '\n' )
3708                         {
3709                           line++;
3710
3711                           if( !args->num ) {
3712                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, 1, bt->thread_no) )
3713                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3714                             len = 0;
3715                                 continue;
3716                           }
3717
3718                           nxt -= len - 10;
3719                           memcpy (txn + nxt, key + 10, len - 10);
3720                           nxt -= 1;
3721                           txn[nxt] = len - 10;
3722                           nxt -= 10;
3723                           memcpy (txn + nxt, key, 10);
3724                           nxt -= 1;
3725                           txn[nxt] = 10;
3726                           slotptr(page,++cnt)->off  = nxt;
3727                           slotptr(page,cnt)->type = type;
3728                           len = 0;
3729
3730                           if( cnt < args->num )
3731                                 continue;
3732
3733                           page->cnt = cnt;
3734                           page->act = cnt;
3735                           page->min = nxt;
3736
3737                           if( bt_atomictxn (bt, page) )
3738                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3739                           nxt = sizeof(txn);
3740                           cnt = 0;
3741
3742                         }
3743                         else if( len < BT_maxkey )
3744                                 key[len++] = ch;
3745                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3746                 break;
3747
3748         case 'w':
3749                 fprintf(stderr, "started indexing for %s\n", args->infile);
3750                 if( in = fopen (args->infile, "r") )
3751                   while( ch = getc(in), ch != EOF )
3752                         if( ch == '\n' )
3753                         {
3754                           line++;
3755
3756                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, 1, bt->thread_no) )
3757                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3758                           len = 0;
3759                         }
3760                         else if( len < BT_maxkey )
3761                                 key[len++] = ch;
3762                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3763                 break;
3764
3765         case 'f':
3766                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3767                 if( in = fopen (args->infile, "rb") )
3768                   while( ch = getc(in), ch != EOF )
3769                         if( ch == '\n' )
3770                         {
3771                           line++;
3772                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3773                                 found++;
3774                           else if( bt->mgr->err )
3775                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3776                           len = 0;
3777                         }
3778                         else if( len < BT_maxkey )
3779                                 key[len++] = ch;
3780                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3781                 break;
3782
3783         case 's':
3784                 fprintf(stderr, "started scanning\n");
3785                 
3786                 do {
3787                         if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3788                                 set->page = bt_mappage (bt->mgr, set->latch);
3789                         else
3790                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3791
3792                         bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3793                         next = bt_getid (set->page->right);
3794
3795                         for( slot = 0; slot++ < set->page->cnt; )
3796                          if( next || slot < set->page->cnt )
3797                           if( !slotptr(set->page, slot)->dead ) {
3798                                 ptr = keyptr(set->page, slot);
3799                                 len = ptr->len;
3800
3801                             if( slotptr(set->page, slot)->type == Duplicate )
3802                                         len -= BtId;
3803
3804                                 fwrite (ptr->key, len, 1, stdout);
3805                                 val = valptr(set->page, slot);
3806                                 fwrite (val->value, val->len, 1, stdout);
3807                                 fputc ('\n', stdout);
3808                                 cnt++;
3809                            }
3810
3811                         set->latch->avail = 1;
3812                         bt_unlockpage (BtLockRead, set->latch);
3813                         bt_unpinlatch (bt->mgr, set->latch);
3814                 } while( page_no = next );
3815
3816                 fprintf(stderr, " Total keys read %d\n", cnt);
3817                 break;
3818
3819         case 'r':
3820                 fprintf(stderr, "started reverse scan\n");
3821                 if( slot = bt_lastkey (bt) )
3822                    while( slot = bt_prevkey (bt, slot) ) {
3823                         if( slotptr(bt->cursor, slot)->dead )
3824                           continue;
3825
3826                         ptr = keyptr(bt->cursor, slot);
3827                         len = ptr->len;
3828
3829                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3830                                 len -= BtId;
3831
3832                         fwrite (ptr->key, len, 1, stdout);
3833                         val = valptr(bt->cursor, slot);
3834                         fwrite (val->value, val->len, 1, stdout);
3835                         fputc ('\n', stdout);
3836                         cnt++;
3837                   }
3838
3839                 fprintf(stderr, " Total keys read %d\n", cnt);
3840                 break;
3841
3842         case 'c':
3843 #ifdef unix
3844                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3845 #endif
3846                 fprintf(stderr, "started counting\n");
3847                 page_no = LEAF_page;
3848
3849                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3850                         if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3851                                 break;
3852
3853                         if( !bt->cursor->free && !bt->cursor->lvl )
3854                                 cnt += bt->cursor->act;
3855
3856                         page_no++;
3857                 }
3858                 
3859                 cnt--;  // remove stopper key
3860                 fprintf(stderr, " Total keys counted %d\n", cnt);
3861                 break;
3862         }
3863
3864         fprintf(stderr, "%d reads %d writes %d found\n", bt->mgr->reads, bt->mgr->writes, bt->mgr->found);
3865         bt_close (bt);
3866 #ifdef unix
3867         return NULL;
3868 #else
3869         return 0;
3870 #endif
3871 }
3872
3873 typedef struct timeval timer;
3874
3875 int main (int argc, char **argv)
3876 {
3877 int idx, cnt, len, slot, err;
3878 int segsize, bits = 16;
3879 double start, stop;
3880 #ifdef unix
3881 pthread_t *threads;
3882 #else
3883 HANDLE *threads;
3884 #endif
3885 ThreadArg *args;
3886 uint poolsize = 0;
3887 uint recovery = 0;
3888 uint mainpool = 0;
3889 uint mainbits = 0;
3890 float elapsed;
3891 int num = 0;
3892 char key[1];
3893 BtMgr *main;
3894 BtMgr *mgr;
3895 BtKey *ptr;
3896
3897         if( argc < 3 ) {
3898                 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3899                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
3900                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
3901                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3902                 fprintf (stderr, "  page_bits is the page size in bits for the cache btree\n");
3903                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3904                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3905                 fprintf (stderr, "  recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3906                 fprintf (stderr, "  main_bits is the page size of the main btree in bits\n");
3907                 fprintf (stderr, "  main_pool is the number of main pages in the main buffer pool\n");
3908                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3909                 exit(0);
3910         }
3911
3912         start = getCpuTime(0);
3913
3914         if( argc > 4 )
3915                 bits = atoi(argv[4]);
3916
3917         if( argc > 5 )
3918                 poolsize = atoi(argv[5]);
3919
3920         if( !poolsize )
3921                 fprintf (stderr, "Warning: no mapped_pool\n");
3922
3923         if( argc > 6 )
3924                 num = atoi(argv[6]);
3925
3926         if( argc > 7 )
3927                 recovery = atoi(argv[7]);
3928
3929         if( argc > 8 )
3930                 mainbits = atoi(argv[8]);
3931
3932         if( argc > 9 )
3933                 mainpool = atoi(argv[9]);
3934
3935         cnt = argc - 10;
3936 #ifdef unix
3937         threads = malloc (cnt * sizeof(pthread_t));
3938 #else
3939         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3940 #endif
3941         args = malloc ((cnt + 1) * sizeof(ThreadArg));
3942
3943         mgr = bt_mgr (argv[1], bits, poolsize, recovery);
3944
3945         if( !mgr ) {
3946                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3947                 exit (1);
3948         }
3949
3950         main = bt_mgr (argv[2], mainbits, mainpool, 0);
3951
3952         if( !main ) {
3953                 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3954                 exit (1);
3955         }
3956
3957         //      fire off threads
3958
3959         if( cnt )
3960           for( idx = 0; idx < cnt; idx++ ) {
3961                 args[idx].infile = argv[idx + 10];
3962                 args[idx].type = argv[3];
3963                 args[idx].main = main;
3964                 args[idx].mgr = mgr;
3965                 args[idx].num = num;
3966                 args[idx].idx = idx;
3967 #ifdef unix
3968                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3969                         fprintf(stderr, "Error creating thread %d\n", err);
3970 #else
3971                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3972 #endif
3973           }
3974         else {
3975                 args[0].infile = argv[idx + 10];
3976                 args[0].type = argv[3];
3977                 args[0].main = main;
3978                 args[0].mgr = mgr;
3979                 args[0].num = num;
3980                 args[0].idx = 0;
3981                 index_file (args);
3982         }
3983
3984         //      wait for termination
3985
3986 #ifdef unix
3987         for( idx = 0; idx < cnt; idx++ )
3988                 pthread_join (threads[idx], NULL);
3989 #else
3990         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3991
3992         for( idx = 0; idx < cnt; idx++ )
3993                 CloseHandle(threads[idx]);
3994 #endif
3995         bt_poolaudit(mgr);
3996         bt_poolaudit(main);
3997
3998         bt_mgrclose (main);
3999         bt_mgrclose (mgr);
4000
4001         elapsed = getCpuTime(0) - start;
4002         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4003         elapsed = getCpuTime(1);
4004         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4005         elapsed = getCpuTime(2);
4006         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4007 }
4008
4009 #endif  //STANDALONE