]> pd.if.org Git - btree/blob - threadskv10f.c
Threadskv10f.c -- add finding keys in addition to penny-sort insert to LSM btree...
[btree] / threadskv10f.c
1 // btree version threadskv10f futex version
2 //      with reworked bt_deletekey code,
3 //      phase-fair re-entrant reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      redo log for failure recovery
10 //      LSM B-trees for write optimization
11 //      larger sized leaf pages than non-leaf
12 //      and LSM B-tree find operations
13
14 // 28 OCT 2014
15
16 // author: karl malbrain, malbrain@cal.berkeley.edu
17
18 /*
19 This work, including the source code, documentation
20 and related data, is placed into the public domain.
21
22 The orginal author is Karl Malbrain.
23
24 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
25 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
26 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
27 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
28 RESULTING FROM THE USE, MODIFICATION, OR
29 REDISTRIBUTION OF THIS SOFTWARE.
30 */
31
32 // Please see the project home page for documentation
33 // code.google.com/p/high-concurrency-btree
34
35 #define _FILE_OFFSET_BITS 64
36 #define _LARGEFILE64_SOURCE
37
38 #ifdef linux
39 #define _GNU_SOURCE
40 #include <xmmintrin.h>
41 #include <linux/futex.h>
42 #define SYS_futex 202
43 #endif
44
45 #ifdef unix
46 #include <unistd.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #include <fcntl.h>
50 #include <sys/time.h>
51 #include <sys/mman.h>
52 #include <errno.h>
53 #include <pthread.h>
54 #include <limits.h>
55 #else
56 #define WIN32_LEAN_AND_MEAN
57 #include <windows.h>
58 #include <stdio.h>
59 #include <stdlib.h>
60 #include <time.h>
61 #include <fcntl.h>
62 #include <process.h>
63 #include <intrin.h>
64 #endif
65
66 #include <memory.h>
67 #include <string.h>
68 #include <stddef.h>
69
70 typedef unsigned long long      uid;
71 typedef unsigned long long      logseqno;
72
73 #ifndef unix
74 typedef unsigned long long      off64_t;
75 typedef unsigned short          ushort;
76 typedef unsigned int            uint;
77 #endif
78
79 #define BT_ro 0x6f72    // ro
80 #define BT_rw 0x7772    // rw
81
82 #define BT_maxbits              26                                      // maximum page size in bits
83 #define BT_minbits              9                                       // minimum page size in bits
84 #define BT_minpage              (1 << BT_minbits)       // minimum page size
85 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
86
87 //  BTree page number constants
88 #define ALLOC_page              0       // allocation page
89 #define ROOT_page               1       // root of the btree
90 #define LEAF_page               2       // first page of leaves
91
92 //      Number of levels to create in a new BTree
93
94 #define MIN_lvl                 2
95
96 /*
97 There are six lock types for each node in four independent sets: 
98 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
99 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
100 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
101 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
102 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
103 6. (set 4) LinkModification: Exclusive. Update of a node's left link is underway. Incompatible with another LinkModification. 
104 */
105
106 typedef enum{
107         BtLockAccess = 1,
108         BtLockDelete = 2,
109         BtLockRead   = 4,
110         BtLockWrite  = 8,
111         BtLockParent = 16,
112         BtLockLink   = 32
113 } BtLock;
114
115 typedef struct {
116   union {
117         struct {
118           volatile unsigned char xcl[1];
119           volatile unsigned char filler;
120           volatile ushort waiters[1];
121         } bits[1];
122         uint value[1];
123   };
124 } MutexLatch;
125
126 //      definition for reader/writer reentrant lock implementation
127
128 typedef struct {
129   MutexLatch xcl[1];
130   MutexLatch wrt[1];
131   ushort readers;
132   ushort dup;           // re-entrant locks
133   ushort tid;           // owner thread-no
134   ushort line;          // owner line #
135 } RWLock;
136
137 //  hash table entries
138
139 typedef struct {
140         MutexLatch latch[1];
141         uint entry;             // Latch table entry at head of chain
142 } BtHashEntry;
143
144 //      latch manager table structure
145
146 typedef struct {
147         uid page_no;                    // latch set page number
148         RWLock readwr[1];               // read/write page lock
149         RWLock access[1];               // Access Intent/Page delete
150         RWLock parent[1];               // Posting of fence key in parent
151         RWLock link[1];                 // left link update in progress
152         MutexLatch modify[1];   // modify entry lite latch
153         uint split;                             // right split page atomic insert
154         uint next;                              // next entry in hash table chain
155         uint prev;                              // prev entry in hash table chain
156         volatile ushort pin;    // number of accessing threads
157         unsigned char dirty;    // page in cache is dirty
158         unsigned char leaf;             // page in cache is a leaf
159 } BtLatchSet;
160
161 //      Define the length of the page record numbers
162
163 #define BtId 6
164
165 //      Page key slot definition.
166
167 //      Keys are marked dead, but remain on the page until
168 //      it cleanup is called. The fence key (highest key) for
169 //      a leaf page is always present, even after cleanup.
170
171 //      Slot types
172
173 //      In addition to the Unique keys that occupy slots
174 //      there are Librarian and Duplicate key
175 //      slots occupying the key slot array.
176
177 //      The Librarian slots are dead keys that
178 //      serve as filler, available to add new Unique
179 //      or Dup slots that are inserted into the B-tree.
180
181 //      The Duplicate slots have had their key bytes extended
182 //      by 6 bytes to contain a binary duplicate key uniqueifier.
183
184 typedef enum {
185         Unique,
186         Update,
187         Librarian,
188         Duplicate,
189         Delete
190 } BtSlotType;
191
192 typedef struct {
193         uint off:BT_maxbits;    // page offset for key start
194         uint type:3;                    // type of slot
195         uint dead:1;                    // set for deleted slot
196 } BtSlot;
197
198 //      The key structure occupies space at the upper end of
199 //      each page.  It's a length byte followed by the key
200 //      bytes.
201
202 typedef struct {
203         unsigned char len;              // this can be changed to a ushort or uint
204         unsigned char key[0];
205 } BtKey;
206
207 //      the value structure also occupies space at the upper
208 //      end of the page. Each key is immediately followed by a value.
209
210 typedef struct {
211         unsigned char len;              // this can be changed to a ushort or uint
212         unsigned char value[0];
213 } BtVal;
214
215 #define BT_maxkey       255             // maximum number of bytes in a key
216 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
217
218 //      The first part of an index page.
219 //      It is immediately followed
220 //      by the BtSlot array of keys.
221
222 typedef struct BtPage_ {
223         uint cnt;                                       // count of keys in page
224         uint act;                                       // count of active keys
225         uint min;                                       // next key offset
226         uint garbage;                           // page garbage in bytes
227         unsigned char lvl;                      // level of page
228         unsigned char free;                     // page is on free chain
229         unsigned char kill;                     // page is being deleted
230         unsigned char nopromote;        // page is being constructed
231         unsigned char filler1[6];       // padding to multiple of 8 bytes
232         unsigned char right[BtId];      // page number to right
233         unsigned char left[BtId];       // page number to left
234         unsigned char filler2[2];       // padding to multiple of 8 bytes
235         logseqno lsn;                           // log sequence number applied
236         uid page_no;                            // this page number
237 } *BtPage;
238
239 //  The loadpage interface object
240
241 typedef struct {
242         BtPage page;            // current page pointer
243         BtLatchSet *latch;      // current page latch set
244 } BtPageSet;
245
246 //      structure for latch manager on ALLOC_page
247
248 typedef struct {
249         struct BtPage_ alloc[1];                // next page_no in right ptr
250         unsigned char freechain[BtId];  // head of free page_nos chain
251         unsigned char leafchain[BtId];  // head of leaf page_nos chain
252         unsigned long long leafpages;   // number of active leaf pages
253         unsigned long long upperpages;  // number of active upper pages
254         uint redopages;                                 // number of redo pages in file
255         unsigned char leaf_xtra;                // leaf page size in xtra bits
256         unsigned char page_bits;                // base page size in bits
257 } BtPageZero;
258
259 //      The object structure for Btree access
260
261 typedef struct {
262         uint page_size;                         // base page size       
263         uint page_bits;                         // base page size in bits       
264         uint leaf_xtra;                         // leaf xtra bits       
265 #ifdef unix
266         int idx;
267 #else
268         HANDLE idx;
269 #endif
270         BtPageZero *pagezero;           // mapped allocation page
271         BtHashEntry *hashtable;         // the buffer pool hash table entries
272         BtHashEntry *leaftable;         // the buffer pool hash table entries
273         BtLatchSet *latchsets;          // mapped latch set from buffer pool
274         BtLatchSet *leafsets;           // mapped latch set from buffer pool
275         unsigned char *pagepool;        // mapped to the buffer pool pages
276         unsigned char *leafpool;        // mapped to the leaf pool pages
277         unsigned char *redobuff;        // mapped recovery buffer pointer
278         logseqno lsn, flushlsn;         // current & first lsn flushed
279         MutexLatch redo[1];                     // redo area lite latch
280         MutexLatch lock[1];                     // allocation area lite latch
281         MutexLatch maps[1];                     // mapping segments lite latch
282         ushort thread_no[1];            // highest thread number issued
283         ushort err_thread;                      // error thread number
284         uint nlatchpage;                        // size of buffer pool & latchsets
285         uint latchtotal;                        // number of page latch entries
286         uint latchhash;                         // number of latch hash table slots
287         uint latchvictim;                       // next latch entry to examine
288         uint nleafpage;                         // size of leaf pool & leafsets
289         uint leaftotal;                         // number of leaf latch entries
290         uint leafhash;                          // number of leaf hash table slots
291         uint leafvictim;                        // next leaf entry to examine
292         uint leafpromote;                       // next leaf entry to promote
293         uint redopage;                          // page number of redo buffer
294         uint redolast;                          // last msync size of recovery buff
295         uint redoend;                           // eof/end element in recovery buff
296         int err;                                        // last error
297         int line;                                       // last error line no
298         int found;                                      // number of keys found by delete
299         int reads, writes;                      // number of reads and writes
300 #ifndef unix
301         HANDLE halloc;                          // allocation handle
302         HANDLE hpool;                           // buffer pool handle
303 #endif
304         volatile uint segments;         // number of memory mapped segments
305         unsigned char *pages[64000];// memory mapped segments of b-tree
306 } BtMgr;
307
308 typedef struct {
309         BtMgr *mgr;                                     // buffer manager for entire process
310         BtMgr *main;                            // buffer manager for main btree
311         BtPage cachecursor;                     // cached page frame for cache btree
312         BtPage maincursor;                      // cached page frame for main btree
313         ushort thread_no;                       // thread number
314         unsigned char key[BT_keyarray]; // last found complete key
315 } BtDb;
316
317 //      atomic txn structures
318
319 typedef struct {
320         logseqno reqlsn;        // redo log seq no required
321         uint entry:31;          // latch table entry number
322         uint reuse:1;           // reused previous page
323         uint slot;                      // slot on page
324 } AtomicTxn;
325
326 //      Catastrophic errors
327
328 typedef enum {
329         BTERR_ok = 0,
330         BTERR_struct,
331         BTERR_ovflw,
332         BTERR_lock,
333         BTERR_map,
334         BTERR_read,
335         BTERR_wrt,
336         BTERR_atomic,
337         BTERR_recovery
338 } BTERR;
339
340 #define CLOCK_bit 0x8000
341
342 // recovery manager entry types
343
344 typedef enum {
345         BTRM_eof = 0,   // rest of buffer is emtpy
346         BTRM_add,               // add a unique key-value to btree
347         BTRM_dup,               // add a duplicate key-value to btree
348         BTRM_del,               // delete a key-value from btree
349         BTRM_upd,               // update a key with a new value
350         BTRM_new,               // allocate a new empty page
351         BTRM_old                // reuse an old empty page
352 } BTRM;
353
354 // recovery manager entry
355 //      structure followed by BtKey & BtVal
356
357 typedef struct {
358         logseqno reqlsn;        // log sequence number required
359         logseqno lsn;           // log sequence number for entry
360         uint len;                       // length of entry
361         unsigned char type;     // type of entry
362         unsigned char lvl;      // level of btree entry pertains to
363 } BtLogHdr;
364
365 // B-Tree functions
366
367 extern void bt_close (BtDb *bt);
368 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
369 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
370 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
371 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
372 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
373 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
374 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
375
376 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
377
378 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
379 extern uint bt_nextkey   (BtDb *bt, uint slot);
380 extern uint bt_prevkey (BtMgr *mgr, BtPage cursor, uint slot, ushort thread_no);
381 extern uint bt_lastkey (BtMgr *mgr, BtPage cursor, ushort thread_no);
382
383 //      manager functions
384 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
385 extern void bt_mgrclose (BtMgr *mgr);
386 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
387 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
388
389 //      atomic transaction functions
390 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
391 BTERR bt_promote (BtDb *bt);
392
393 //  The page is allocated from low and hi ends.
394 //  The key slots are allocated from the bottom,
395 //      while the text and value of the key
396 //  are allocated from the top.  When the two
397 //  areas meet, the page is split into two.
398
399 //  A key consists of a length byte, two bytes of
400 //  index number (0 - 65535), and up to 253 bytes
401 //  of key value.
402
403 //  Associated with each key is a value byte string
404 //      containing any value desired.
405
406 //  The b-tree root is always located at page 1.
407 //      The first leaf page of level zero is always
408 //      located on page 2.
409
410 //      The b-tree pages are linked with next
411 //      pointers to facilitate enumerators,
412 //      and provide for concurrency.
413
414 //      When to root page fills, it is split in two and
415 //      the tree height is raised by a new root at page
416 //      one with two keys.
417
418 //      Deleted keys are marked with a dead bit until
419 //      page cleanup. The fence key for a leaf node is
420 //      always present
421
422 //  To achieve maximum concurrency one page is locked at a time
423 //  as the tree is traversed to find leaf key in question. The right
424 //  page numbers are used in cases where the page is being split,
425 //      or consolidated.
426
427 //  Page 0 is dedicated to lock for new page extensions,
428 //      and chains empty pages together for reuse. It also
429 //      contains the latch manager hash table.
430
431 //      The ParentModification lock on a node is obtained to serialize posting
432 //      or changing the fence key for a node.
433
434 //      Empty pages are chained together through the ALLOC page and reused.
435
436 //      Access macros to address slot and key values from the page
437 //      Page slots use 1 based indexing.
438
439 #define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1))
440 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
441 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
442
443 void bt_putid(unsigned char *dest, uid id)
444 {
445 int i = BtId;
446
447         while( i-- )
448                 dest[i] = (unsigned char)id, id >>= 8;
449 }
450
451 uid bt_getid(unsigned char *src)
452 {
453 uid id = 0;
454 int i;
455
456         for( i = 0; i < BtId; i++ )
457                 id <<= 8, id |= *src++; 
458
459         return id;
460 }
461
462 //      lite weight spin lock Latch Manager
463
464 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
465 {
466         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
467 }
468
469 void bt_mutexlock(MutexLatch *latch)
470 {
471 uint idx, waited = 0;
472 MutexLatch prev[1];
473
474  while( 1 ) {
475   for( idx = 0; idx < 100; idx++ ) {
476         *prev->value = __sync_fetch_and_or (latch->value, 1);
477         if( !*prev->bits->xcl ) {
478           if( waited )
479                 __sync_fetch_and_sub (latch->bits->waiters, 1);
480           return;
481         }
482   }
483
484   if( !waited ) {
485         __sync_fetch_and_add (latch->bits->waiters, 1);
486         *prev->bits->waiters += 1;
487         waited++;
488   }
489
490   sys_futex (latch->value, FUTEX_WAIT_PRIVATE, *prev->value, NULL, NULL, 0);
491  }
492 }
493
494 int bt_mutextry(MutexLatch *latch)
495 {
496         return !__sync_lock_test_and_set (latch->bits->xcl, 1);
497 }
498
499 void bt_releasemutex(MutexLatch *latch)
500 {
501 MutexLatch prev[1];
502
503         *prev->value = __sync_fetch_and_and (latch->value, 0xffff0000);
504
505         if( *prev->bits->waiters )
506                 sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
507 }
508
509 //      reader/writer lock implementation
510
511 void WriteLock (RWLock *lock, ushort tid, uint line)
512 {
513         if( lock->tid == tid ) {
514                 lock->dup++;
515                 return;
516         }
517         bt_mutexlock (lock->xcl);
518         bt_mutexlock (lock->wrt);
519         bt_releasemutex (lock->xcl);
520
521         lock->line = line;
522         lock->tid = tid;
523 }
524
525 void WriteRelease (RWLock *lock)
526 {
527         if( lock->dup ) {
528                 lock->dup--;
529                 return;
530         }
531         lock->tid = 0;
532         bt_releasemutex (lock->wrt);
533 }
534
535 void ReadLock (RWLock *lock, ushort tid)
536 {
537         bt_mutexlock (lock->xcl);
538
539         if( !__sync_fetch_and_add (&lock->readers, 1) )
540                 bt_mutexlock (lock->wrt);
541
542         bt_releasemutex (lock->xcl);
543 }
544
545 void ReadRelease (RWLock *lock)
546 {
547         if( __sync_fetch_and_sub (&lock->readers, 1) == 1 )
548                 bt_releasemutex (lock->wrt);
549 }
550
551 //      recovery manager -- flush dirty pages
552
553 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
554 {
555 uint cnt3 = 0, cnt2 = 0, cnt = 0;
556 uint entry, segment;
557 BtLatchSet *latch;
558 BtPage page;
559
560   //    flush dirty pool pages to the btree
561
562 fprintf(stderr, "Start flushlsn  ");
563   for( entry = 1; entry < mgr->latchtotal; entry++ ) {
564         page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
565         latch = mgr->latchsets + entry;
566         bt_mutexlock (latch->modify);
567         bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
568
569         if( latch->dirty ) {
570           bt_writepage(mgr, page, latch->page_no, 0);
571           latch->dirty = 0, cnt++;
572         }
573 if( latch->pin & ~CLOCK_bit )
574 cnt2++;
575         bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
576         bt_releasemutex (latch->modify);
577   }
578   for( entry = 1; entry < mgr->leaftotal; entry++ ) {
579         page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
580         latch = mgr->leafsets + entry;
581         bt_mutexlock (latch->modify);
582         bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
583
584         if( latch->dirty ) {
585           bt_writepage(mgr, page, latch->page_no, 1);
586           latch->dirty = 0, cnt++;
587         }
588 if( latch->pin & ~CLOCK_bit )
589 cnt2++;
590         bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
591         bt_releasemutex (latch->modify);
592   }
593 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
594 fprintf(stderr, "begin sync");
595         for( segment = 0; segment < mgr->segments; segment++ )
596           if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
597                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
598 fprintf(stderr, " end sync\n");
599 }
600
601 //      recovery manager -- process current recovery buff on startup
602 //      this won't do much if previous session was properly closed.
603
604 BTERR bt_recoveryredo (BtMgr *mgr)
605 {
606 BtLogHdr *hdr, *eof;
607 uint offset = 0;
608 BtKey *key;
609 BtVal *val;
610
611   hdr = (BtLogHdr *)mgr->redobuff;
612   mgr->flushlsn = hdr->lsn;
613
614   while( 1 ) {
615         hdr = (BtLogHdr *)(mgr->redobuff + offset);
616         switch( hdr->type ) {
617         case BTRM_eof:
618                 mgr->lsn = hdr->lsn;
619                 return 0;
620         case BTRM_add:          // add a unique key-value to btree
621         
622         case BTRM_dup:          // add a duplicate key-value to btree
623         case BTRM_del:          // delete a key-value from btree
624         case BTRM_upd:          // update a key with a new value
625         case BTRM_new:          // allocate a new empty page
626         case BTRM_old:          // reuse an old empty page
627                 return 0;
628         }
629   }
630 }
631
632 //      recovery manager -- append new entry to recovery log
633 //      flush dirty pages to disk when it overflows.
634
635 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
636 {
637 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
638 uint amt = sizeof(BtLogHdr);
639 BtLogHdr *hdr, *eof;
640 uint last, end;
641
642         bt_mutexlock (mgr->redo);
643
644         if( key )
645           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
646
647         //      see if new entry fits in buffer
648         //      flush and reset if it doesn't
649
650         if( amt > size - mgr->redoend ) {
651           mgr->flushlsn = mgr->lsn;
652           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
653                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
654           mgr->redolast = 0;
655           mgr->redoend = 0;
656           bt_flushlsn(mgr, thread_no);
657         }
658
659         //      fill in new entry & either eof or end block
660
661         hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
662
663         hdr->len = amt;
664         hdr->type = type;
665         hdr->lvl = lvl;
666         hdr->lsn = ++mgr->lsn;
667
668         mgr->redoend += amt;
669
670         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
671         memset (eof, 0, sizeof(BtLogHdr));
672
673         //  fill in key and value
674
675         if( key ) {
676           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
677           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
678         }
679
680         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
681         memset (eof, 0, sizeof(BtLogHdr));
682         eof->lsn = mgr->lsn;
683
684         last = mgr->redolast & ~0xfff;
685         end = mgr->redoend;
686
687         if( end - last + sizeof(BtLogHdr) >= 32768 )
688           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
689                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
690           else
691                 mgr->redolast = end;
692
693         bt_releasemutex(mgr->redo);
694         return hdr->lsn;
695 }
696
697 //      recovery manager -- append transaction to recovery log
698 //      flush dirty pages to disk when it overflows.
699
700 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
701 {
702 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
703 uint amt = 0, src, type;
704 BtLogHdr *hdr, *eof;
705 uint last, end;
706 logseqno lsn;
707 BtKey *key;
708 BtVal *val;
709
710         //      determine amount of redo recovery log space required
711
712         for( src = 0; src++ < source->cnt; ) {
713           key = keyptr(source,src);
714           val = valptr(source,src);
715           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
716           amt += sizeof(BtLogHdr);
717         }
718
719         bt_mutexlock (mgr->redo);
720
721         //      see if new entry fits in buffer
722         //      flush and reset if it doesn't
723
724         if( amt > size - mgr->redoend ) {
725           mgr->flushlsn = mgr->lsn;
726           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
727                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
728           mgr->redolast = 0;
729           mgr->redoend = 0;
730           bt_flushlsn (mgr, thread_no);
731         }
732
733         //      assign new lsn to transaction
734
735         lsn = ++mgr->lsn;
736
737         //      fill in new entries
738
739         for( src = 0; src++ < source->cnt; ) {
740           key = keyptr(source, src);
741           val = valptr(source, src);
742
743           switch( slotptr(source, src)->type ) {
744           case Unique:
745                 type = BTRM_add;
746                 break;
747           case Duplicate:
748                 type = BTRM_dup;
749                 break;
750           case Delete:
751                 type = BTRM_del;
752                 break;
753           }
754
755           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
756           amt += sizeof(BtLogHdr);
757
758           hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
759           hdr->len = amt;
760           hdr->type = type;
761           hdr->lsn = lsn;
762           hdr->lvl = 0;
763
764           //  fill in key and value
765
766           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
767           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
768
769           mgr->redoend += amt;
770         }
771
772         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
773         memset (eof, 0, sizeof(BtLogHdr));
774         eof->lsn = lsn;
775
776         last = mgr->redolast & ~0xfff;
777         end = mgr->redoend;
778
779         if( end - last + sizeof(BtLogHdr) >= 32768 )
780           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
781                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
782           else
783                 mgr->redolast = end;
784
785         bt_releasemutex(mgr->redo);
786         return lsn;
787 }
788
789 //      sync a single btree page to disk
790
791 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
792 {
793 uint segment = latch->page_no >> 16;
794 uint page_size = mgr->page_size;
795 BtPage perm;
796
797         if( bt_writepage (mgr, page, latch->page_no, latch->leaf) )
798                 return mgr->err;
799
800         if( !page->lvl )
801                 page_size <<= mgr->leaf_xtra;
802
803         perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
804
805         if( msync (perm, page_size, MS_SYNC) < 0 )
806           fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
807
808         latch->dirty = 0;
809         return 0;
810 }
811
812 //      read page into buffer pool from permanent location in Btree file
813
814 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
815 {
816 int flag = PROT_READ | PROT_WRITE;
817 uint page_size = mgr->page_size;
818 uint off = 0, segment, fragment;
819 unsigned char *perm;
820
821   if( leaf )
822         page_size <<= mgr->leaf_xtra;
823
824   fragment = page_no & 0xffff;
825   segment = page_no >> 16;
826   mgr->reads++;
827
828   while( off < page_size ) {
829         if( fragment >> 16 )
830                 segment++, fragment = 0;
831
832         if( segment < mgr->segments ) {
833           perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
834
835           memcpy ((unsigned char *)page + off, perm, mgr->page_size);
836           off += mgr->page_size;
837           fragment++;
838           continue;
839         }
840
841         bt_mutexlock (mgr->maps);
842
843         if( segment < mgr->segments ) {
844                 bt_releasemutex (mgr->maps);
845                 continue;
846         }
847
848         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
849         mgr->segments++;
850
851         bt_releasemutex (mgr->maps);
852   }
853
854   return 0;
855 }
856
857 //      write page to permanent location in Btree file
858
859 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
860 {
861 int flag = PROT_READ | PROT_WRITE;
862 uint page_size = mgr->page_size;
863 uint off = 0, segment, fragment;
864 unsigned char *perm;
865
866   if( leaf )
867         page_size <<= mgr->leaf_xtra;
868
869   fragment = page_no & 0xffff;
870   segment = page_no >> 16;
871   mgr->writes++;
872
873   while( off < page_size ) {
874         if( fragment >> 16 )
875                 segment++, fragment = 0;
876
877         if( segment < mgr->segments ) {
878           perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
879           memcpy (perm, (unsigned char *)page + off, mgr->page_size);
880           off += mgr->page_size;
881           fragment++;
882           continue;
883         }
884
885         bt_mutexlock (mgr->maps);
886
887         if( segment < mgr->segments ) {
888                 bt_releasemutex (mgr->maps);
889                 continue;
890         }
891
892         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
893         mgr->segments++;
894         bt_releasemutex (mgr->maps);
895   }
896
897   return 0;
898 }
899
900 //      set CLOCK bit in latch
901 //      decrement pin count
902
903 int value;
904
905 void bt_unpinlatch (BtLatchSet *latch, ushort thread_no, uint line)
906 {
907         bt_mutexlock(latch->modify);
908         latch->pin |= CLOCK_bit;
909         latch->pin--;
910
911         bt_releasemutex(latch->modify);
912 }
913
914 //  return the btree cached page address
915
916 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
917 {
918 uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets);
919 BtPage page;
920
921   if( latch->leaf )
922         page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
923   else
924         page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
925
926   return page;
927 }
928
929 //  return next available leaf entry
930 //        and with latch entry locked
931
932 uint bt_availleaf (BtMgr *mgr)
933 {
934 BtLatchSet *latch;
935 uint entry;
936
937   while( 1 ) {
938 #ifdef unix
939         entry = __sync_fetch_and_add (&mgr->leafvictim, 1) + 1;
940 #else
941         entry = _InterlockedIncrement (&mgr->leafvictim);
942 #endif
943         entry %= mgr->leaftotal;
944
945         if( !entry )
946                 continue;
947
948         latch = mgr->leafsets + entry;
949
950         if( !bt_mutextry(latch->modify) )
951                 continue;
952
953         //  return this entry if it is not pinned
954
955         if( !latch->pin )
956                 return entry;
957
958         //      if the CLOCK bit is set
959         //      reset it to zero.
960
961         latch->pin &= ~CLOCK_bit;
962         bt_releasemutex(latch->modify);
963   }
964 }
965
966 //  return next available latch entry
967 //        and with latch entry locked
968
969 uint bt_availnext (BtMgr *mgr)
970 {
971 BtLatchSet *latch;
972 uint entry;
973
974   while( 1 ) {
975 #ifdef unix
976         entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
977 #else
978         entry = _InterlockedIncrement (&mgr->latchvictim);
979 #endif
980         entry %= mgr->latchtotal;
981
982         if( !entry )
983                 continue;
984
985         latch = mgr->latchsets + entry;
986
987         if( !bt_mutextry(latch->modify) )
988                 continue;
989
990         //  return this entry if it is not pinned
991
992         if( !latch->pin )
993                 return entry;
994
995         //      if the CLOCK bit is set
996         //      reset it to zero.
997
998         latch->pin &= ~CLOCK_bit;
999         bt_releasemutex(latch->modify);
1000   }
1001 }
1002
1003 //      pin leaf in leaf buffer pool
1004 //      return with latchset pinned
1005
1006 BtLatchSet *bt_pinleaf (BtMgr *mgr, uid page_no, ushort thread_no)
1007 {
1008 uint hashidx = page_no % mgr->leafhash;
1009 BtLatchSet *latch;
1010 uint entry, idx;
1011 BtPage page;
1012
1013   //  try to find our entry
1014
1015   bt_mutexlock(mgr->leaftable[hashidx].latch);
1016
1017   if( entry = mgr->leaftable[hashidx].entry )
1018    do {
1019         latch = mgr->leafsets + entry;
1020
1021         if( page_no == latch->page_no )
1022                 break;
1023    } while( entry = latch->next );
1024
1025   //  found our entry: increment pin
1026
1027   if( entry ) {
1028         latch = mgr->leafsets + entry;
1029         bt_mutexlock(latch->modify);
1030         latch->pin |= CLOCK_bit;
1031         latch->pin++;
1032
1033         bt_releasemutex(latch->modify);
1034         bt_releasemutex(mgr->leaftable[hashidx].latch);
1035         return latch;
1036   }
1037
1038   //  find and reuse unpinned entry
1039
1040 trynext:
1041   entry = bt_availleaf (mgr);
1042   latch = mgr->leafsets + entry;
1043
1044   idx = latch->page_no % mgr->leafhash;
1045
1046   //  if latch is on a different hash chain
1047   //    unlink from the old page_no chain
1048
1049   if( latch->page_no )
1050    if( idx != hashidx ) {
1051
1052         //  skip over this entry if latch not available
1053
1054     if( !bt_mutextry (mgr->leaftable[idx].latch) ) {
1055           bt_releasemutex(latch->modify);
1056           goto trynext;
1057         }
1058
1059         if( latch->prev )
1060           mgr->leafsets[latch->prev].next = latch->next;
1061         else
1062           mgr->leaftable[idx].entry = latch->next;
1063
1064         if( latch->next )
1065           mgr->leafsets[latch->next].prev = latch->prev;
1066
1067     bt_releasemutex (mgr->leaftable[idx].latch);
1068    }
1069
1070   page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1071
1072   //  update permanent page area in btree from buffer pool
1073   //    no read-lock is required since page is not pinned.
1074
1075   if( latch->dirty )
1076         if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) )
1077           return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1078         else
1079           latch->dirty = 0;
1080
1081   if( bt_readpage (mgr, page, page_no, 1) )
1082         return mgr->line = __LINE__, NULL;
1083
1084   //  link page as head of hash table chain
1085   //  if this is a never before used entry,
1086   //  or it was previously on a different
1087   //  hash table chain. Otherwise, just
1088   //  leave it in its current hash table
1089   //  chain position.
1090
1091   if( !latch->page_no || hashidx != idx ) {
1092         if( latch->next = mgr->leaftable[hashidx].entry )
1093           mgr->leafsets[latch->next].prev = entry;
1094
1095         mgr->leaftable[hashidx].entry = entry;
1096     latch->prev = 0;
1097   }
1098
1099   //  fill in latch structure
1100
1101   latch->pin = CLOCK_bit | 1;
1102   latch->page_no = page_no;
1103   latch->leaf = 1;
1104
1105   bt_releasemutex (latch->modify);
1106   bt_releasemutex (mgr->leaftable[hashidx].latch);
1107   return latch;
1108 }
1109
1110 //      pin page in non-leaf buffer pool
1111 //      return with latchset pinned
1112
1113 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, ushort thread_no)
1114 {
1115 uint hashidx = page_no % mgr->latchhash;
1116 BtLatchSet *latch;
1117 uint entry, idx;
1118 BtPage page;
1119
1120   //  try to find our entry
1121
1122   bt_mutexlock(mgr->hashtable[hashidx].latch);
1123
1124   if( entry = mgr->hashtable[hashidx].entry ) do
1125   {
1126         latch = mgr->latchsets + entry;
1127
1128         if( page_no == latch->page_no )
1129                 break;
1130   } while( entry = latch->next );
1131
1132   //  found our entry: increment pin
1133
1134   if( entry ) {
1135         latch = mgr->latchsets + entry;
1136         bt_mutexlock(latch->modify);
1137         latch->pin |= CLOCK_bit;
1138         latch->pin++;
1139         bt_releasemutex(latch->modify);
1140         bt_releasemutex(mgr->hashtable[hashidx].latch);
1141         return latch;
1142   }
1143
1144   //  find and reuse unpinned entry
1145
1146 trynext:
1147   entry = bt_availnext (mgr);
1148   latch = mgr->latchsets + entry;
1149
1150   idx = latch->page_no % mgr->latchhash;
1151
1152   //  if latch is on a different hash chain
1153   //    unlink from the old page_no chain
1154
1155   if( latch->page_no )
1156    if( idx != hashidx ) {
1157
1158         //  skip over this entry if latch not available
1159
1160     if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1161           bt_releasemutex(latch->modify);
1162           goto trynext;
1163         }
1164
1165         if( latch->prev )
1166           mgr->latchsets[latch->prev].next = latch->next;
1167         else
1168           mgr->hashtable[idx].entry = latch->next;
1169
1170         if( latch->next )
1171           mgr->latchsets[latch->next].prev = latch->prev;
1172
1173     bt_releasemutex (mgr->hashtable[idx].latch);
1174    }
1175
1176   page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1177
1178   //  update permanent page area in btree from buffer pool
1179   //    no read-lock is required since page is not pinned.
1180
1181   if( latch->dirty )
1182         if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1183           return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1184         else
1185           latch->dirty = 0;
1186
1187   if( bt_readpage (mgr, page, page_no, 0) )
1188         return mgr->line = __LINE__, NULL;
1189
1190   //  link page as head of hash table chain
1191   //  if this is a never before used entry,
1192   //  or it was previously on a different
1193   //  hash table chain. Otherwise, just
1194   //  leave it in its current hash table
1195   //  chain position.
1196
1197   if( !latch->page_no || hashidx != idx ) {
1198         if( latch->next = mgr->hashtable[hashidx].entry )
1199           mgr->latchsets[latch->next].prev = entry;
1200
1201         mgr->hashtable[hashidx].entry = entry;
1202     latch->prev = 0;
1203   }
1204
1205   //  fill in latch structure
1206
1207   latch->pin = CLOCK_bit | 1;
1208   latch->page_no = page_no;
1209   latch->leaf = 0;
1210
1211   bt_releasemutex (latch->modify);
1212   bt_releasemutex (mgr->hashtable[hashidx].latch);
1213   return latch;
1214 }
1215   
1216 void bt_mgrclose (BtMgr *mgr)
1217 {
1218 BtLatchSet *latch;
1219 BtLogHdr *eof;
1220 uint num = 0;
1221 BtPage page;
1222 uint slot;
1223
1224         //      flush previously written dirty pages
1225         //      and write recovery buffer to disk
1226
1227         fdatasync (mgr->idx);
1228
1229         if( mgr->redoend ) {
1230                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1231                 memset (eof, 0, sizeof(BtLogHdr));
1232         }
1233
1234         //      write remaining dirty pool pages to the btree
1235
1236         for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1237                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1238                 latch = mgr->latchsets + slot;
1239
1240                 if( latch->dirty ) {
1241                         bt_writepage(mgr, page, latch->page_no, 0);
1242                         latch->dirty = 0, num++;
1243                 }
1244         }
1245
1246         //      write remaining dirty leaf pages to the btree
1247
1248         for( slot = 1; slot < mgr->leaftotal; slot++ ) {
1249                 page = (BtPage)(((uid)slot << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1250                 latch = mgr->leafsets + slot;
1251
1252                 if( latch->dirty ) {
1253                         bt_writepage(mgr, page, latch->page_no, 1);
1254                         latch->dirty = 0, num++;
1255                 }
1256         }
1257
1258         //      clear redo recovery buffer on disk.
1259
1260         if( mgr->pagezero->redopages ) {
1261                 eof = (BtLogHdr *)mgr->redobuff;
1262                 memset (eof, 0, sizeof(BtLogHdr));
1263                 eof->lsn = mgr->lsn;
1264                 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1265                   fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1266         }
1267
1268         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1269
1270 #ifdef unix
1271         while( mgr->segments )
1272                 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1273
1274         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1275         munmap (mgr->leafpool, (uid)mgr->nleafpage << mgr->page_bits);
1276         munmap (mgr->pagezero, mgr->page_size);
1277 #else
1278         FlushViewOfFile(mgr->pagezero, 0);
1279         UnmapViewOfFile(mgr->pagezero);
1280         UnmapViewOfFile(mgr->pagepool);
1281         CloseHandle(mgr->halloc);
1282         CloseHandle(mgr->hpool);
1283 #endif
1284 #ifdef unix
1285         close (mgr->idx);
1286         free (mgr);
1287 #else
1288         VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1289         FlushFileBuffers(mgr->idx);
1290         CloseHandle(mgr->idx);
1291         GlobalFree (mgr);
1292 #endif
1293 }
1294
1295 //      close and release memory
1296
1297 void bt_close (BtDb *bt)
1298 {
1299 #ifdef unix
1300         if( bt->cachecursor )
1301                 free (bt->cachecursor);
1302         if( bt->maincursor )
1303                 free (bt->maincursor);
1304 #else
1305         if( bt->cachecursor)
1306                 VirtualFree (bt->cachecursor, 0, MEM_RELEASE);
1307         if( bt->maincursor)
1308                 VirtualFree (bt->maincursor, 0, MEM_RELEASE);
1309 #endif
1310         free (bt);
1311 }
1312
1313 //  open/create new btree buffer manager
1314
1315 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1316 //              size of page pool (e.g. 262144) and leaf pool
1317
1318 BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax, uint leafmax, uint redopages)
1319 {
1320 uint lvl, attr, last, slot, idx;
1321 uint nlatchpage, latchhash;
1322 uint nleafpage, leafhash;
1323 unsigned char value[BtId];
1324 int flag, initit = 0;
1325 BtPageZero *pagezero;
1326 BtLatchSet *latch;
1327 off64_t size;
1328 uint amt[1];
1329 BtMgr* mgr;
1330 BtKey* key;
1331 BtVal *val;
1332
1333         // determine sanity of page size and buffer pool
1334
1335         if( leafxtra + pagebits > BT_maxbits )
1336                 fprintf (stderr, "pagebits + leafxtra > maxbits\n"), exit(1);
1337
1338         if( pagebits < BT_minbits )
1339                 fprintf (stderr, "pagebits < minbits\n"), exit(1);
1340
1341 #ifdef unix
1342         mgr = calloc (1, sizeof(BtMgr));
1343
1344         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1345
1346         if( mgr->idx == -1 ) {
1347                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1348                 return free(mgr), NULL;
1349         }
1350 #else
1351         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1352         attr = FILE_ATTRIBUTE_NORMAL;
1353         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1354
1355         if( mgr->idx == INVALID_HANDLE_VALUE ) {
1356                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1357                 return GlobalFree(mgr), NULL;
1358         }
1359 #endif
1360
1361 #ifdef unix
1362         pagezero = valloc (BT_maxpage);
1363         *amt = 0;
1364
1365         // read minimum page size to get root info
1366         //      to support raw disk partition files
1367         //      check if page_bits == 0 on the disk.
1368
1369         if( size = lseek (mgr->idx, 0L, 2) )
1370                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1371                         if( pagezero->page_bits ) {
1372                                 pagebits = pagezero->page_bits;
1373                                 leafxtra = pagezero->leaf_xtra;
1374                         } else
1375                                 initit = 1;
1376                 else
1377                         return free(mgr), free(pagezero), NULL;
1378         else
1379                 initit = 1;
1380 #else
1381         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1382         size = GetFileSize(mgr->idx, amt);
1383
1384         if( size || *amt ) {
1385                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1386                         return bt_mgrclose (mgr), NULL;
1387                 pagebits = pagezero->page_bits;
1388                 leafxtra = pagezero->leaf_xtra;
1389         } else
1390                 initit = 1;
1391 #endif
1392
1393         mgr->page_size = 1 << pagebits;
1394         mgr->page_bits = pagebits;
1395         mgr->leaf_xtra = leafxtra;
1396
1397         //  calculate number of latch hash table entries
1398
1399         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1400
1401         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1402         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1) >> mgr->page_bits;
1403         mgr->latchtotal = nodemax;
1404
1405         //  calculate number of leaf hash table entries
1406
1407         mgr->nleafpage = ((uid)leafmax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1408
1409         mgr->nleafpage += leafmax << leafxtra;          // size of the leaf pool in pages
1410         mgr->nleafpage += (sizeof(BtLatchSet) * leafmax + mgr->page_size - 1) >> mgr->page_bits;
1411         mgr->leaftotal = leafmax;
1412
1413         mgr->redopage = LEAF_page + (1 << leafxtra);
1414
1415         if( !initit )
1416                 goto mgrlatch;
1417
1418         // initialize an empty b-tree with latch page, root page, page of leaves
1419         // and page(s) of latches and page pool cache
1420
1421         ftruncate (mgr->idx, (mgr->redopage + pagezero->redopages) << mgr->page_bits);
1422         memset (pagezero, 0, 1 << pagebits);
1423         pagezero->alloc->lvl = MIN_lvl - 1;
1424         pagezero->redopages = redopages;
1425         pagezero->page_bits = mgr->page_bits;
1426         pagezero->leaf_xtra = leafxtra;
1427
1428         bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages);
1429         pagezero->upperpages = 1;
1430         pagezero->leafpages = 1;
1431
1432         //  initialize left-most LEAF page in
1433         //      alloc->left and count of active leaf pages.
1434
1435         bt_putid (pagezero->alloc->left, LEAF_page);
1436
1437         if( bt_writepage (mgr, pagezero->alloc, 0, 0) ) {
1438                 fprintf (stderr, "Unable to create btree page zero\n");
1439                 return bt_mgrclose (mgr), NULL;
1440         }
1441
1442         memset (pagezero, 0, 1 << pagebits);
1443
1444         for( lvl=MIN_lvl; lvl--; ) {
1445         BtSlot *node = slotptr(pagezero->alloc, 1);
1446                 node->off = mgr->page_size;
1447
1448                 if( !lvl )
1449                         node->off <<= mgr->leaf_xtra;
1450
1451                 node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1452                 node->type = Librarian;
1453                 node++->dead = 1;
1454
1455                 node->off = node[-1].off;
1456                 key = keyptr(pagezero->alloc, 2);
1457                 key = keyptr(pagezero->alloc, 1);
1458                 key->len = 2;           // create stopper key
1459                 key->key[0] = 0xff;
1460                 key->key[1] = 0xff;
1461
1462                 bt_putid(value, MIN_lvl - lvl + 1);
1463                 val = valptr(pagezero->alloc, 1);
1464                 val->len = lvl ? BtId : 0;
1465                 memcpy (val->value, value, val->len);
1466
1467                 pagezero->alloc->min = node->off;
1468                 pagezero->alloc->lvl = lvl;
1469                 pagezero->alloc->cnt = 2;
1470                 pagezero->alloc->act = 1;
1471                 pagezero->alloc->page_no = MIN_lvl - lvl;
1472
1473                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, !lvl) ) {
1474                         fprintf (stderr, "Unable to create btree page\n");
1475                         return bt_mgrclose (mgr), NULL;
1476                 }
1477         }
1478
1479 mgrlatch:
1480 #ifdef unix
1481         free (pagezero);
1482 #else
1483         VirtualFree (pagezero, 0, MEM_RELEASE);
1484 #endif
1485
1486         // mlock the first segment of 64K pages
1487
1488         flag = PROT_READ | PROT_WRITE;
1489         mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1490         mgr->segments = 1;
1491
1492         if( mgr->pages[0] == MAP_FAILED ) {
1493                 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1494                 return bt_mgrclose (mgr), NULL;
1495         }
1496
1497         mgr->pagezero = (BtPageZero *)mgr->pages[0];
1498         mlock (mgr->pagezero, mgr->page_size);
1499
1500         mgr->redobuff = mgr->pages[0] + (mgr->redopage << mgr->page_bits);
1501         mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1502
1503         //      allocate pool buffers
1504
1505         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1506         if( mgr->pagepool == MAP_FAILED ) {
1507                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1508                 return bt_mgrclose (mgr), NULL;
1509         }
1510
1511         mgr->leafpool = mmap (0, (uid)mgr->nleafpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1512         if( mgr->leafpool == MAP_FAILED ) {
1513                 fprintf (stderr, "Unable to mmap anonymous leaf pool pages, error = %d\n", errno);
1514                 return bt_mgrclose (mgr), NULL;
1515         }
1516
1517         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1518         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1519         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1520
1521         mgr->leafsets = (BtLatchSet *)(mgr->leafpool + ((uid)mgr->leaftotal << mgr->page_bits << mgr->leaf_xtra));
1522         mgr->leaftable = (BtHashEntry *)(mgr->leafsets + mgr->leaftotal);
1523         mgr->leafhash = (mgr->leafpool + ((uid)mgr->nleafpage << mgr->page_bits) - (unsigned char *)mgr->leaftable) / sizeof(BtHashEntry);
1524
1525         return mgr;
1526 }
1527
1528 //      open BTree access method
1529 //      based on buffer manager
1530
1531 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1532 {
1533 BtDb *bt = malloc (sizeof(*bt));
1534
1535         memset (bt, 0, sizeof(*bt));
1536         bt->main = main;
1537         bt->mgr = mgr;
1538 #ifdef unix
1539         bt->cachecursor = valloc (mgr->page_size << mgr->leaf_xtra);
1540         bt->maincursor = valloc (mgr->page_size << mgr->leaf_xtra);
1541 #else
1542         bt->cachecursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
1543         bt->maincursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
1544 #endif
1545 #ifdef unix
1546         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1547 #else
1548         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1549 #endif
1550         return bt;
1551 }
1552
1553 //  compare two keys, return > 0, = 0, or < 0
1554 //  =0: keys are same
1555 //  -1: key2 > key1
1556 //  +1: key2 < key1
1557 //  as the comparison value
1558
1559 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1560 {
1561 uint len1 = key1->len;
1562 int ans;
1563
1564         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1565                 return ans;
1566
1567         if( len1 > len2 )
1568                 return 1;
1569         if( len1 < len2 )
1570                 return -1;
1571
1572         return 0;
1573 }
1574
1575 // place write, read, or parent lock on requested page_no.
1576
1577 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1578 {
1579         switch( mode ) {
1580         case BtLockRead:
1581                 ReadLock (latch->readwr, thread_no);
1582                 break;
1583         case BtLockWrite:
1584                 WriteLock (latch->readwr, thread_no, line);
1585                 break;
1586         case BtLockAccess:
1587                 ReadLock (latch->access, thread_no);
1588                 break;
1589         case BtLockDelete:
1590                 WriteLock (latch->access, thread_no, line);
1591                 break;
1592         case BtLockParent:
1593                 WriteLock (latch->parent, thread_no, line);
1594                 break;
1595         case BtLockLink:
1596                 WriteLock (latch->link, thread_no, line);
1597                 break;
1598         }
1599 }
1600
1601 // remove write, read, or parent lock on requested page
1602
1603 void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1604 {
1605         switch( mode ) {
1606         case BtLockRead:
1607                 ReadRelease (latch->readwr);
1608                 break;
1609         case BtLockWrite:
1610                 WriteRelease (latch->readwr);
1611                 break;
1612         case BtLockAccess:
1613                 ReadRelease (latch->access);
1614                 break;
1615         case BtLockDelete:
1616                 WriteRelease (latch->access);
1617                 break;
1618         case BtLockParent:
1619                 WriteRelease (latch->parent);
1620                 break;
1621         case BtLockLink:
1622                 WriteRelease (latch->link);
1623                 break;
1624         }
1625 }
1626
1627 //      allocate a new page
1628 //      return with page latched, but unlocked.
1629
1630 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_no)
1631 {
1632 uint page_size = mgr->page_size, page_xtra = 0;
1633 unsigned char *freechain;
1634 uid page_no;
1635
1636         if( contents->lvl ) {
1637                 freechain = mgr->pagezero->freechain;
1638                 mgr->pagezero->upperpages++;
1639         } else {
1640                 freechain = mgr->pagezero->leafchain;
1641                 mgr->pagezero->leafpages++;
1642                 page_xtra = mgr->leaf_xtra;
1643                 page_size <<= page_xtra;
1644         }
1645
1646         //      lock allocation page
1647
1648         bt_mutexlock(mgr->lock);
1649
1650         // use empty chain first
1651         // else allocate new page
1652
1653         if( page_no = bt_getid(freechain) ) {
1654                 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1655                         set->page = bt_mappage (mgr, set->latch);
1656                 else
1657                         return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1658
1659                 bt_putid(freechain, bt_getid(set->page->right));
1660
1661                 //  the page is currently nopromote and this
1662                 //  will keep bt_promote out.
1663
1664                 //      contents will replace this bit
1665                 //  and pin will keep bt_promote out
1666
1667                 contents->page_no = page_no;
1668                 contents->nopromote = 0;
1669                 set->latch->dirty = 1;
1670
1671                 memcpy (set->page, contents, page_size);
1672
1673 //              if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1674 //                fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1675
1676                 bt_releasemutex(mgr->lock);
1677                 return 0;
1678         }
1679
1680         page_no = bt_getid(mgr->pagezero->alloc->right);
1681         bt_putid(mgr->pagezero->alloc->right, page_no+(1 << page_xtra));
1682
1683         // unlock allocation latch and
1684         //      extend file into new page.
1685
1686 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1687 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1688         bt_releasemutex(mgr->lock);
1689
1690         //      keep bt_promote out of this page
1691         contents->nopromote = 1;
1692         contents->page_no = page_no;
1693         if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
1694                 fprintf(stderr, "Write %d error %d\n", (uint)page_no, errno);
1695         if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1696                 set->page = bt_mappage (mgr, set->latch);
1697         else
1698                 return mgr->err_thread = thread_no, mgr->err;
1699
1700         // now pin will keep bt_promote out
1701
1702         set->page->nopromote = 0;
1703         set->latch->dirty = 1;
1704         return 0;
1705 }
1706
1707 //  find slot in page for given key at a given level
1708
1709 int bt_findslot (BtPage page, unsigned char *key, uint len)
1710 {
1711 uint diff, higher = page->cnt, low = 1, slot;
1712 uint good = 0;
1713
1714         //        make stopper key an infinite fence value
1715
1716         if( bt_getid (page->right) )
1717                 higher++;
1718         else
1719                 good++;
1720
1721         //      low is the lowest candidate.
1722         //  loop ends when they meet
1723
1724         //  higher is already
1725         //      tested as .ge. the passed key.
1726
1727         while( diff = higher - low ) {
1728                 slot = low + ( diff >> 1 );
1729                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1730                         low = slot + 1;
1731                 else
1732                         higher = slot, good++;
1733         }
1734
1735         //      return zero if key is on right link page
1736
1737         return good ? higher : 0;
1738 }
1739
1740 //  find and load page at given level for given key
1741 //      leave page rd or wr locked as requested
1742
1743 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1744 {
1745 uid page_no = ROOT_page, prevpage_no = 0;
1746 uint drill = 0xff, slot;
1747 BtLatchSet *prevlatch;
1748 uint mode, prevmode;
1749 BtPage prevpage;
1750 BtVal *val;
1751 BtKey *ptr;
1752
1753   //  start at root of btree and drill down
1754
1755   do {
1756         // determine lock mode of drill level
1757         mode = (drill == lvl) ? lock : BtLockRead; 
1758
1759         if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1760           set->page = bt_mappage (mgr, set->latch);
1761         else
1762           return 0;
1763
1764         // obtain access lock using lock chaining with Access mode
1765
1766         if( page_no > ROOT_page )
1767           bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1768
1769         //      release & unpin parent or left sibling page
1770
1771         if( prevpage_no ) {
1772           bt_unlockpage(prevmode, prevlatch, thread_no, __LINE__);
1773           bt_unpinlatch (prevlatch, thread_no, __LINE__);
1774           prevpage_no = 0;
1775         }
1776
1777         // obtain mode lock using lock coupling through AccessLock
1778
1779         bt_lockpage(mode, set->latch, thread_no, __LINE__);
1780
1781         // grab our fence key
1782
1783         ptr=keyptr(set->page,set->page->cnt);
1784
1785         if( set->page->free )
1786                 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1787
1788         if( page_no > ROOT_page )
1789           bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1790
1791         // re-read and re-lock root after determining actual level of root
1792
1793         if( set->page->lvl != drill) {
1794                 if( set->latch->page_no != ROOT_page )
1795                         return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1796                         
1797                 drill = set->page->lvl;
1798
1799                 if( lock != BtLockRead && drill == lvl ) {
1800                   bt_unlockpage(mode, set->latch, thread_no, __LINE__);
1801                   bt_unpinlatch (set->latch, thread_no, __LINE__);
1802                   continue;
1803                 }
1804         }
1805
1806         prevpage_no = set->latch->page_no;
1807         prevlatch = set->latch;
1808         prevpage = set->page;
1809         prevmode = mode;
1810
1811         //  if requested key is beyond our fence,
1812         //      slide to the right
1813
1814         if( keycmp (ptr, key, len) < 0 )
1815           if( page_no = bt_getid(set->page->right) )
1816                 continue;
1817
1818         //  if page is part of a delete operation,
1819         //      slide to the left;
1820
1821         if( set->page->kill ) {
1822           bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
1823           page_no = bt_getid(set->page->left);
1824           bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
1825           continue;
1826         }
1827
1828         //  find key on page at this level
1829         //  and descend to requested level
1830
1831         if( slot = bt_findslot (set->page, key, len) ) {
1832           if( drill == lvl )
1833                 return slot;
1834
1835           // find next non-dead slot -- the fence key if nothing else
1836
1837           while( slotptr(set->page, slot)->dead )
1838                 if( slot++ < set->page->cnt )
1839                   continue;
1840                 else
1841                   return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1842
1843           val = valptr(set->page, slot);
1844
1845           if( val->len == BtId )
1846                 page_no = bt_getid(val->value);
1847           else
1848                 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0;
1849
1850           drill--;
1851           continue;
1852          }
1853
1854         //  slide right into next page
1855
1856         page_no = bt_getid(set->page->right);
1857
1858   } while( page_no );
1859
1860   // return error on end of right chain
1861
1862   mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1863   return 0;     // return error
1864 }
1865
1866 //      return page to free list
1867 //      page must be delete & write locked
1868 //      and have no keys pointing to it.
1869
1870 void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1871 {
1872 unsigned char *freechain;
1873
1874         if( set->page->lvl ) {
1875                 freechain = mgr->pagezero->freechain;
1876                 mgr->pagezero->upperpages--;
1877         } else {
1878                 freechain = mgr->pagezero->leafchain;
1879                 mgr->pagezero->leafpages--;
1880         }
1881
1882         //      lock allocation page
1883
1884         bt_mutexlock (mgr->lock);
1885
1886         //      store chain
1887
1888         memcpy(set->page->right, freechain, BtId);
1889         bt_putid(freechain, set->latch->page_no);
1890         set->latch->dirty = 1;
1891         set->page->free = 1;
1892
1893 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1894 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1895
1896         // unlock released page
1897         // and unlock allocation page
1898
1899         bt_unlockpage (BtLockDelete, set->latch, thread_no, __LINE__);
1900         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1901         bt_unpinlatch (set->latch, thread_no, __LINE__);
1902         bt_releasemutex (mgr->lock);
1903 }
1904
1905 //      a fence key was deleted from a page
1906 //      push new fence value upwards
1907
1908 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1909 {
1910 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1911 unsigned char value[BtId];
1912 BtKey* ptr;
1913 uint idx;
1914
1915         //      remove the old fence value
1916
1917         ptr = keyptr(set->page, set->page->cnt);
1918         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1919         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1920         set->latch->dirty = 1;
1921
1922         //  cache new fence value
1923
1924         ptr = keyptr(set->page, set->page->cnt);
1925         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1926
1927         bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
1928         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1929
1930         //      insert new (now smaller) fence key
1931
1932         bt_putid (value, set->latch->page_no);
1933         ptr = (BtKey*)leftkey;
1934
1935         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1936           return mgr->err_thread = thread_no, mgr->err;
1937
1938         //      now delete old fence key
1939
1940         ptr = (BtKey*)rightkey;
1941
1942         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1943                 return mgr->err_thread = thread_no, mgr->err;
1944
1945         bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
1946         bt_unpinlatch(set->latch, thread_no, __LINE__);
1947         return 0;
1948 }
1949
1950 //      root has a single child
1951 //      collapse a level from the tree
1952
1953 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1954 {
1955 BtPageSet child[1];
1956 uid page_no;
1957 BtVal *val;
1958 uint idx;
1959
1960   // find the child entry and promote as new root contents
1961
1962   do {
1963         for( idx = 0; idx++ < root->page->cnt; )
1964           if( !slotptr(root->page, idx)->dead )
1965                 break;
1966
1967         val = valptr(root->page, idx);
1968
1969         if( val->len == BtId )
1970                 page_no = bt_getid (valptr(root->page, idx)->value);
1971         else
1972                 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1973
1974         if( child->latch = bt_pinlatch (mgr, page_no, thread_no) )
1975                 child->page = bt_mappage (mgr, child->latch);
1976         else
1977                 return mgr->err_thread = thread_no, mgr->err;
1978
1979         bt_lockpage (BtLockDelete, child->latch, thread_no, __LINE__);
1980         bt_lockpage (BtLockWrite, child->latch, thread_no, __LINE__);
1981
1982         memcpy (root->page, child->page, mgr->page_size);
1983         root->latch->dirty = 1;
1984
1985         bt_freepage (mgr, child, thread_no);
1986
1987   } while( root->page->lvl > 1 && root->page->act == 1 );
1988
1989   bt_unlockpage (BtLockWrite, root->latch, thread_no, __LINE__);
1990   bt_unpinlatch (root->latch, thread_no, __LINE__);
1991   return 0;
1992 }
1993
1994 //  delete a page and manage key
1995 //  call with page writelocked
1996
1997 //      returns with page unpinned
1998 //      from the page pool.
1999
2000 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl)
2001 {
2002 unsigned char lowerfence[BT_keyarray];
2003 uint page_size = mgr->page_size;
2004 BtPageSet right[1], temp[1];
2005 unsigned char value[BtId];
2006 uid page_no, right2;
2007 BtKey *ptr;
2008
2009         if( !lvl )
2010                 page_size <<= mgr->leaf_xtra;
2011
2012         //  cache copy of original fence key
2013         //      that is being deleted.
2014
2015         ptr = keyptr(set->page, set->page->cnt);
2016         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
2017
2018         //      obtain locks on right page
2019
2020         page_no = bt_getid(set->page->right);
2021
2022         if( right->latch = lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
2023                 right->page = bt_mappage (mgr, right->latch);
2024         else
2025                 return 0;
2026
2027         bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2028
2029         if( right->page->kill )
2030                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
2031
2032         // pull contents of right peer into our empty page
2033         //      preserving our left page number, and its right page number.
2034
2035         bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
2036         page_no = bt_getid(set->page->left);
2037         memcpy (set->page, right->page, page_size);
2038         bt_putid (set->page->left, page_no);
2039         bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
2040
2041         set->page->page_no = set->latch->page_no;
2042         set->latch->dirty = 1;
2043
2044         //  fix left link from far right page
2045
2046         if( right2 = bt_getid (right->page->right) ) {
2047           if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2048                 temp->page = bt_mappage (mgr, temp->latch);
2049           else
2050                 return 0;
2051
2052       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2053           bt_putid (temp->page->left, set->latch->page_no);
2054           bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2055           bt_unpinlatch (temp->latch, thread_no, __LINE__);
2056         } else {        // our page is now rightmost
2057           bt_mutexlock (mgr->lock);
2058           bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
2059           bt_releasemutex(mgr->lock);
2060         }
2061
2062         // mark right page deleted and release lock
2063
2064         right->latch->dirty = 1;
2065         right->page->kill = 1;
2066         bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2067
2068         // redirect higher key directly to our new node contents
2069
2070         ptr = keyptr(right->page, right->page->cnt);
2071         bt_putid (value, set->latch->page_no);
2072
2073         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
2074           return mgr->err;
2075
2076         //      delete our orignal fence key in parent
2077         //      and unlock our page.
2078
2079         ptr = (BtKey *)lowerfence;
2080
2081         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
2082           return mgr->err;
2083
2084         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2085         bt_unpinlatch (set->latch, thread_no, __LINE__);
2086
2087         //      obtain delete and write locks to right node
2088
2089         bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
2090         bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2091         bt_freepage (mgr, right, thread_no);
2092         return 0;
2093 }
2094
2095 //  find and delete key on page by marking delete flag bit
2096 //  if page becomes empty, delete it from the btree
2097
2098 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2099 {
2100 uint slot, idx, found, fence;
2101 BtPageSet set[1];
2102 BtSlot *node;
2103 BtKey *ptr;
2104 BtVal *val;
2105
2106         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2107                 node = slotptr(set->page, slot);
2108                 ptr = keyptr(set->page, slot);
2109         } else
2110                 return mgr->err_thread = thread_no, mgr->err;
2111
2112         // if librarian slot, advance to real slot
2113
2114         if( node->type == Librarian ) {
2115                 ptr = keyptr(set->page, ++slot);
2116                 node = slotptr(set->page, slot);
2117         }
2118
2119         fence = slot == set->page->cnt;
2120
2121         // delete the key, ignore request if already dead
2122
2123         if( found = !keycmp (ptr, key, len) )
2124           if( found = node->dead == 0 ) {
2125                 val = valptr(set->page,slot);
2126                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2127                 set->page->act--;
2128
2129                 //  mark node type as delete
2130
2131                 node->type = Delete;
2132                 node->dead = 1;
2133
2134                 // collapse empty slots beneath the fence
2135                 // on interiour nodes
2136
2137                 if( lvl )
2138                  while( idx = set->page->cnt - 1 )
2139                   if( slotptr(set->page, idx)->dead ) {
2140                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2141                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2142                   } else
2143                         break;
2144           }
2145
2146         if( !found )
2147                 return 0;
2148
2149         //      did we delete a fence key in an upper level?
2150
2151         if( lvl && set->page->act && fence )
2152           if( bt_fixfence (mgr, set, lvl, thread_no) )
2153                 return mgr->err_thread = thread_no, mgr->err;
2154           else
2155                 return 0;
2156
2157         //      do we need to collapse root?
2158
2159         if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2160           if( bt_collapseroot (mgr, set, thread_no) )
2161                 return mgr->err_thread = thread_no, mgr->err;
2162           else
2163                 return 0;
2164
2165         //      delete empty page
2166
2167         if( !set->page->act )
2168           return bt_deletepage (mgr, set, thread_no, set->page->lvl);
2169
2170         set->latch->dirty = 1;
2171         bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2172         bt_unpinlatch (set->latch, thread_no, __LINE__);
2173         return 0;
2174 }
2175
2176 //      check page for space available,
2177 //      clean if necessary and return
2178 //      0 - page needs splitting
2179 //      >0  new slot value
2180
2181 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2182 {
2183 uint page_size = mgr->page_size;
2184 BtPage page = set->page, frame;
2185 uint cnt = 0, idx = 0;
2186 uint max = page->cnt;
2187 uint newslot = max;
2188 BtKey *key;
2189 BtVal *val;
2190
2191         if( !set->page->lvl )
2192                 page_size <<= mgr->leaf_xtra;
2193
2194         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2195                 return slot;
2196
2197         //      skip cleanup and proceed to split
2198         //      if there's not enough garbage
2199         //      to bother with.
2200
2201         if( page->garbage < page_size / 5 )
2202                 return 0;
2203
2204         frame = malloc (page_size);
2205         memcpy (frame, page, page_size);
2206
2207         // skip page info and set rest of page to zero
2208
2209         memset (page+1, 0, page_size - sizeof(*page));
2210         set->latch->dirty = 1;
2211
2212         page->min = page_size;
2213         page->garbage = 0;
2214         page->act = 0;
2215
2216         // clean up page first by
2217         // removing deleted keys
2218
2219         while( cnt++ < max ) {
2220                 if( cnt == slot )
2221                         newslot = idx + 2;
2222
2223                 if( cnt < max || frame->lvl )
2224                   if( slotptr(frame,cnt)->dead )
2225                         continue;
2226
2227                 // copy the value across
2228
2229                 val = valptr(frame, cnt);
2230                 page->min -= val->len + sizeof(BtVal);
2231                 memcpy ((unsigned char *)page + page->min, val, val->len + sizeof(BtVal));
2232
2233                 // copy the key across
2234
2235                 key = keyptr(frame, cnt);
2236                 page->min -= key->len + sizeof(BtKey);
2237                 memcpy ((unsigned char *)page + page->min, key, key->len + sizeof(BtKey));
2238
2239                 // make a librarian slot
2240
2241                 slotptr(page, ++idx)->off = page->min;
2242                 slotptr(page, idx)->type = Librarian;
2243                 slotptr(page, idx)->dead = 1;
2244
2245                 // set up the slot
2246
2247                 slotptr(page, ++idx)->off = page->min;
2248                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2249
2250                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2251                         page->act++;
2252         }
2253
2254         page->cnt = idx;
2255         free (frame);
2256
2257         //      see if page has enough space now, or does it need splitting?
2258
2259         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2260                 return newslot;
2261
2262         return 0;
2263 }
2264
2265 // split the root and raise the height of the btree
2266
2267 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort thread_no)
2268 {  
2269 unsigned char leftkey[BT_keyarray];
2270 uint nxt = mgr->page_size;
2271 unsigned char value[BtId];
2272 BtPage frame, page;
2273 BtPageSet left[1];
2274 uid left_page_no;
2275 BtKey *ptr;
2276 BtVal *val;
2277
2278         frame = malloc (mgr->page_size);
2279         memcpy (frame, root->page, mgr->page_size);
2280
2281         //      save left page fence key for new root
2282
2283         ptr = keyptr(root->page, root->page->cnt);
2284         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2285
2286         //  Obtain an empty page to use, and copy the current
2287         //  root contents into it, e.g. lower keys
2288
2289         if( bt_newpage(mgr, left, frame, thread_no) )
2290                 return mgr->err_thread = thread_no, mgr->err;
2291
2292         left_page_no = left->latch->page_no;
2293         bt_unpinlatch (left->latch, thread_no, __LINE__);
2294         free (frame);
2295
2296         //      left link the pages together
2297
2298         page = bt_mappage (mgr, right);
2299         bt_putid (page->left, left_page_no);
2300
2301         // preserve the page info at the bottom
2302         // of higher keys and set rest to zero
2303
2304         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2305
2306         // insert stopper key at top of newroot page
2307         // and increase the root height
2308
2309         nxt -= BtId + sizeof(BtVal);
2310         bt_putid (value, right->page_no);
2311         val = (BtVal *)((unsigned char *)root->page + nxt);
2312         memcpy (val->value, value, BtId);
2313         val->len = BtId;
2314
2315         nxt -= 2 + sizeof(BtKey);
2316         slotptr(root->page, 2)->off = nxt;
2317         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2318         ptr->len = 2;
2319         ptr->key[0] = 0xff;
2320         ptr->key[1] = 0xff;
2321
2322         // insert lower keys page fence key on newroot page as first key
2323
2324         nxt -= BtId + sizeof(BtVal);
2325         bt_putid (value, left_page_no);
2326         val = (BtVal *)((unsigned char *)root->page + nxt);
2327         memcpy (val->value, value, BtId);
2328         val->len = BtId;
2329
2330         ptr = (BtKey *)leftkey;
2331         nxt -= ptr->len + sizeof(BtKey);
2332         slotptr(root->page, 1)->off = nxt;
2333         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2334         
2335         bt_putid(root->page->right, 0);
2336         root->page->min = nxt;          // reset lowest used offset and key count
2337         root->page->cnt = 2;
2338         root->page->act = 2;
2339         root->page->lvl++;
2340
2341         mgr->pagezero->alloc->lvl = root->page->lvl;
2342
2343         // release and unpin root pages
2344
2345         bt_unlockpage(BtLockWrite, root->latch, thread_no, __LINE__);
2346         bt_unpinlatch (root->latch, thread_no, __LINE__);
2347
2348         bt_unpinlatch (right, thread_no, __LINE__);
2349         return 0;
2350 }
2351
2352 //  split already locked full node
2353 //      leave it locked.
2354 //      return pool entry for new right
2355 //      page, pinned & unlocked
2356
2357 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint linkleft)
2358 {
2359 uint page_size = mgr->page_size;
2360 BtPageSet right[1], temp[1];
2361 uint cnt = 0, idx = 0, max;
2362 uint lvl = set->page->lvl;
2363 BtPage frame;
2364 BtKey *key;
2365 BtVal *val;
2366 uid right2;
2367 uint entry;
2368 uint prev;
2369
2370         if( !set->page->lvl )
2371                 page_size <<= mgr->leaf_xtra;
2372
2373         //  split higher half of keys to frame
2374
2375         frame = malloc (page_size);
2376         memset (frame, 0, page_size);
2377         frame->min = page_size;
2378         max = set->page->cnt;
2379         cnt = max / 2;
2380         idx = 0;
2381
2382         while( cnt++ < max ) {
2383                 if( cnt < max || set->page->lvl )
2384                   if( slotptr(set->page, cnt)->dead )
2385                         continue;
2386
2387                 val = valptr(set->page, cnt);
2388                 frame->min -= val->len + sizeof(BtVal);
2389                 memcpy ((unsigned char *)frame + frame->min, val, val->len + sizeof(BtVal));
2390
2391                 key = keyptr(set->page, cnt);
2392                 frame->min -= key->len + sizeof(BtKey);
2393                 memcpy ((unsigned char *)frame + frame->min, key, key->len + sizeof(BtKey));
2394
2395                 //      add librarian slot
2396
2397                 slotptr(frame, ++idx)->off = frame->min;
2398                 slotptr(frame, idx)->type = Librarian;
2399                 slotptr(frame, idx)->dead = 1;
2400
2401                 //  add actual slot
2402
2403                 slotptr(frame, ++idx)->off = frame->min;
2404                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2405
2406                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2407                         frame->act++;
2408         }
2409
2410         frame->cnt = idx;
2411         frame->lvl = lvl;
2412
2413         // link right node
2414
2415         if( set->latch->page_no > ROOT_page ) {
2416                 right2 = bt_getid (set->page->right);
2417                 bt_putid (frame->right, right2);
2418
2419                 if( linkleft )
2420                         bt_putid (frame->left, set->latch->page_no);
2421         }
2422
2423         //      get new free page and write higher keys to it.
2424
2425         if( bt_newpage(mgr, right, frame, thread_no) )
2426                 return 0;
2427
2428         //      link far right's left pointer to new page
2429
2430         if( linkleft && set->latch->page_no > ROOT_page )
2431          if( right2 ) {
2432           if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2433                 temp->page = bt_mappage (mgr, temp->latch);
2434           else
2435                 return 0;
2436
2437       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2438           bt_putid (temp->page->left, right->latch->page_no);
2439           bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2440           bt_unpinlatch (temp->latch, thread_no, __LINE__);
2441          } else {       // page is rightmost
2442           bt_mutexlock (mgr->lock);
2443           bt_putid (mgr->pagezero->alloc->left, right->latch->page_no);
2444           bt_releasemutex(mgr->lock);
2445          }
2446
2447         // process lower keys
2448
2449         memcpy (frame, set->page, page_size);
2450         memset (set->page+1, 0, page_size - sizeof(*set->page));
2451         set->latch->dirty = 1;
2452
2453         set->page->min = page_size;
2454         set->page->garbage = 0;
2455         set->page->act = 0;
2456         max /= 2;
2457         cnt = 0;
2458         idx = 0;
2459
2460         //  assemble page of smaller keys
2461
2462         while( cnt++ < max ) {
2463                 if( slotptr(frame, cnt)->dead )
2464                         continue;
2465                 val = valptr(frame, cnt);
2466                 set->page->min -= val->len + sizeof(BtVal);
2467                 memcpy ((unsigned char *)set->page + set->page->min, val, val->len + sizeof(BtVal));
2468
2469                 key = keyptr(frame, cnt);
2470                 set->page->min -= key->len + sizeof(BtKey);
2471                 memcpy ((unsigned char *)set->page + set->page->min, key, key->len + sizeof(BtKey));
2472
2473                 //      add librarian slot
2474
2475                 slotptr(set->page, ++idx)->off = set->page->min;
2476                 slotptr(set->page, idx)->type = Librarian;
2477                 slotptr(set->page, idx)->dead = 1;
2478
2479                 //      add actual slot
2480
2481                 slotptr(set->page, ++idx)->off = set->page->min;
2482                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2483                 set->page->act++;
2484         }
2485
2486         bt_putid(set->page->right, right->latch->page_no);
2487         set->page->cnt = idx;
2488         free(frame);
2489
2490         entry = right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
2491         return entry;
2492 }
2493
2494 //      fix keys for newly split page
2495 //      call with both pages pinned & locked
2496 //  return unlocked and unpinned
2497
2498 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2499 {
2500 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2501 unsigned char value[BtId];
2502 uint lvl = set->page->lvl;
2503 BtPageSet temp[1];
2504 BtPage page;
2505 BtKey *ptr;
2506 uid right2;
2507
2508         // if current page is the root page, split it
2509
2510         if( set->latch->page_no == ROOT_page )
2511                 return bt_splitroot (mgr, set, right, thread_no);
2512
2513         ptr = keyptr(set->page, set->page->cnt);
2514         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2515
2516         page = bt_mappage (mgr, right);
2517
2518         ptr = keyptr(page, page->cnt);
2519         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2520
2521         //      splice in far right page's left page_no
2522
2523         if( right2 = bt_getid (page->right) ) {
2524           if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2525                 temp->page = bt_mappage (mgr, temp->latch);
2526           else
2527                 return 0;
2528
2529       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2530           bt_putid (temp->page->left, right->page_no);
2531           bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2532           bt_unpinlatch (temp->latch, thread_no, __LINE__);
2533         } else {  // right page is far right page
2534           bt_mutexlock (mgr->lock);
2535           bt_putid (mgr->pagezero->alloc->left, right->page_no);
2536           bt_releasemutex(mgr->lock);
2537         }
2538         // insert new fences in their parent pages
2539
2540         bt_lockpage (BtLockParent, right, thread_no, __LINE__);
2541
2542         bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
2543         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2544
2545         // insert new fence for reformulated left block of smaller keys
2546
2547         bt_putid (value, set->latch->page_no);
2548         ptr = (BtKey *)leftkey;
2549
2550         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2551                 return mgr->err_thread = thread_no, mgr->err;
2552
2553         // switch fence for right block of larger keys to new right page
2554
2555         bt_putid (value, right->page_no);
2556         ptr = (BtKey *)rightkey;
2557
2558         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2559                 return mgr->err_thread = thread_no, mgr->err;
2560
2561         bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
2562         bt_unpinlatch (set->latch, thread_no, __LINE__);
2563
2564         bt_unlockpage (BtLockParent, right, thread_no, __LINE__);
2565         bt_unpinlatch (right, thread_no, __LINE__);
2566         return 0;
2567 }
2568
2569 //      install new key and value onto page
2570 //      page must already be checked for
2571 //      adequate space
2572
2573 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2574 {
2575 uint idx, librarian;
2576 BtSlot *node;
2577 BtKey *ptr;
2578 BtVal *val;
2579 int rate;
2580
2581         //      if previous slot is a librarian slot, use it
2582
2583         if( slot > 1 )
2584           if( slotptr(set->page, slot-1)->type == Librarian )
2585                 slot--;
2586
2587         // copy value onto page
2588
2589         set->page->min -= vallen + sizeof(BtVal);
2590         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2591         memcpy (val->value, value, vallen);
2592         val->len = vallen;
2593
2594         // copy key onto page
2595
2596         set->page->min -= keylen + sizeof(BtKey);
2597         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2598         memcpy (ptr->key, key, keylen);
2599         ptr->len = keylen;
2600         
2601         //      find first empty slot at or above our insert slot
2602
2603         for( idx = slot; idx < set->page->cnt; idx++ )
2604           if( slotptr(set->page, idx)->dead )
2605                 break;
2606
2607         // now insert key into array before slot.
2608
2609         //      if we're going all the way to the top,
2610         //      add as many librarian slots as
2611         //      makes sense.
2612
2613         if( idx == set->page->cnt ) {
2614         int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2615
2616                 librarian = ++idx - slot;
2617                 avail /= sizeof(BtSlot);
2618
2619                 if( avail < 0 )
2620                         avail = 0;
2621
2622                 if( librarian > avail )
2623                         librarian = avail;
2624
2625                 if( librarian ) {
2626                         rate = (idx - slot) / librarian;
2627                         set->page->cnt += librarian;
2628                         idx += librarian;
2629                 } else
2630                         rate = 0;
2631         } else
2632                 librarian = 0, rate = 0;
2633
2634         //  transfer slots and add librarian slots
2635
2636         while( idx > slot ) {
2637                 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2638
2639                 //      add librarian slot per rate
2640
2641                 if( librarian )
2642                  if( (idx - slot)/2 <= librarian * rate ) {
2643                         node = slotptr(set->page, --idx);
2644                         node->off = node[1].off;
2645                         node->type = Librarian;
2646                         node->dead = 1;
2647                         librarian--;
2648                   }
2649
2650                 --idx;
2651         }
2652
2653         set->latch->dirty = 1;
2654         set->page->act++;
2655
2656         //      fill in new slot
2657
2658         node = slotptr(set->page, slot);
2659         node->off = set->page->min;
2660         node->type = type;
2661         node->dead = 0;
2662         return 0;
2663 }
2664
2665 //  Insert new key into the btree at given level.
2666 //      either add a new key or update/add an existing one
2667
2668 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2669 {
2670 uint slot, idx, len, entry;
2671 BtPageSet set[1];
2672 BtSlot *node;
2673 BtKey *ptr;
2674 BtVal *val;
2675
2676   while( 1 ) { // find the page and slot for the current key
2677         if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2678                 node = slotptr(set->page, slot);
2679                 ptr = keyptr(set->page, slot);
2680         } else
2681                 return mgr->err;
2682
2683         // if librarian slot == found slot, advance to real slot
2684
2685         if( node->type == Librarian ) {
2686                 node = slotptr(set->page, ++slot);
2687                 ptr = keyptr(set->page, slot);
2688         }
2689
2690         //  if inserting a duplicate key or unique
2691         //      key that doesn't exist on the page,
2692         //      check for adequate space on the page
2693         //      and insert the new key before slot.
2694
2695         switch( type ) {
2696         case Unique:
2697         case Duplicate:
2698           if( !keycmp (ptr, key, keylen) )
2699                 break;
2700
2701           if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2702             if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
2703                   return mgr->err;
2704                 else
2705                   goto insxit;
2706
2707           if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2708             if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2709                   continue;
2710
2711           return mgr->err_thread = thread_no, mgr->err;
2712
2713         case Update:
2714           if( keycmp (ptr, key, keylen) )
2715                 goto insxit;
2716
2717           break;
2718         }
2719
2720         // if key already exists, update value and return
2721
2722         val = valptr(set->page, slot);
2723
2724         if( val->len >= vallen ) {
2725           if( node->dead )
2726                 set->page->act++;
2727           node->type = type;
2728           node->dead = 0;
2729
2730           set->page->garbage += val->len - vallen;
2731           set->latch->dirty = 1;
2732           val->len = vallen;
2733           memcpy (val->value, value, vallen);
2734           goto insxit;
2735         }
2736
2737         //  new update value doesn't fit in existing value area
2738         //      make sure page has room
2739
2740         if( !node->dead )
2741           set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2742         else
2743           set->page->act++;
2744
2745         node->type = type;
2746         node->dead = 0;
2747
2748         if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2749           break;
2750
2751         if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2752           if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2753                 continue;
2754
2755         return mgr->err_thread = thread_no, mgr->err;
2756   }
2757
2758   //  copy key and value onto page and update slot
2759
2760   set->page->min -= vallen + sizeof(BtVal);
2761   val = (BtVal*)((unsigned char *)set->page + set->page->min);
2762   memcpy (val->value, value, vallen);
2763   val->len = vallen;
2764
2765   set->latch->dirty = 1;
2766   set->page->min -= keylen + sizeof(BtKey);
2767   ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2768   memcpy (ptr->key, key, keylen);
2769   ptr->len = keylen;
2770         
2771   slotptr(set->page,slot)->off = set->page->min;
2772
2773 insxit:
2774   bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2775   bt_unpinlatch (set->latch, thread_no, __LINE__);
2776   return 0;
2777 }
2778
2779 //      determine actual page where key is located
2780 //  return slot number
2781
2782 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2783 {
2784 BtKey *key = keyptr(source,src), *ptr;
2785 uint slot = locks[src].slot;
2786 uint entry;
2787
2788         if( locks[src].reuse )
2789           entry = locks[src-1].entry;
2790         else
2791           entry = locks[src].entry;
2792
2793         if( slot ) {
2794                 set->latch = mgr->leafsets + entry;
2795                 set->page = bt_mappage (mgr, set->latch);
2796                 return slot;
2797         }
2798
2799         //      find where our key is located 
2800         //      on current page or pages split on
2801         //      same page txn operations.
2802
2803         do {
2804                 set->latch = mgr->leafsets + entry;
2805                 set->page = bt_mappage (mgr, set->latch);
2806
2807                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2808                   if( slotptr(set->page, slot)->type == Librarian )
2809                         slot++;
2810                   if( locks[src].reuse )
2811                         locks[src].entry = entry;
2812                   return slot;
2813                 }
2814         } while( entry = set->latch->split );
2815
2816         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2817         return 0;
2818 }
2819
2820 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2821 {
2822 BtKey *key = keyptr(source, src);
2823 BtVal *val = valptr(source, src);
2824 BtLatchSet *latch;
2825 BtPageSet set[1];
2826 uint entry, slot;
2827
2828   while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2829         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2830           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type) )
2831                 return mgr->err_thread = thread_no, mgr->err;
2832
2833           set->page->lsn = lsn;
2834           return 0;
2835         }
2836
2837         //  split page
2838
2839         if( entry = bt_splitpage (mgr, set, thread_no, 0) )
2840           latch = mgr->leafsets + entry;
2841         else
2842           return mgr->err;
2843
2844         //      splice right page into split chain
2845         //      and WriteLock it
2846
2847         bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2848         latch->split = set->latch->split;
2849         set->latch->split = entry;
2850
2851         // clear slot number for atomic page
2852
2853         locks[src].slot = 0;
2854   }
2855
2856   return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
2857 }
2858
2859 //      perform delete from smaller btree
2860 //  insert a delete slot if not found there
2861
2862 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2863 {
2864 BtKey *key = keyptr(source, src);
2865 BtPageSet set[1];
2866 uint idx, slot;
2867 BtSlot *node;
2868 BtKey *ptr;
2869 BtVal *val;
2870
2871         if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2872           node = slotptr(set->page, slot);
2873           ptr = keyptr(set->page, slot);
2874           val = valptr(set->page, slot);
2875         } else
2876           return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
2877
2878         //  if slot is not found, insert a delete slot
2879
2880         if( keycmp (ptr, key->key, key->len) )
2881           if( bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete) )
2882                 return mgr->err;
2883
2884         //      if node is already dead,
2885         //      ignore the request.
2886
2887         if( node->dead )
2888                 return 0;
2889
2890         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2891         set->latch->dirty = 1;
2892         set->page->lsn = lsn;
2893         set->page->act--;
2894
2895         node->dead = 0;
2896         __sync_fetch_and_add(&mgr->found, 1);
2897         return 0;
2898 }
2899
2900 //      release master's splits from right to left
2901
2902 void bt_atomicrelease (BtMgr *mgr, uint entry, ushort thread_no)
2903 {
2904 BtLatchSet *latch = mgr->leafsets + entry;
2905
2906         if( latch->split )
2907                 bt_atomicrelease (mgr, latch->split, thread_no);
2908
2909         latch->split = 0;
2910         bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
2911         bt_unpinlatch(latch, thread_no, __LINE__);
2912 }
2913
2914 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2915 {
2916 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2917 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2918
2919         return keycmp (key1, key2->key, key2->len);
2920 }
2921 //      atomic modification of a batch of keys.
2922
2923 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2924 {
2925 uint src, idx, slot, samepage, entry, que = 0;
2926 BtKey *key, *ptr, *key2;
2927 int result = 0;
2928 BtSlot temp[1];
2929 logseqno lsn;
2930 int type;
2931
2932   // stable sort the list of keys into order to
2933   //    prevent deadlocks between threads.
2934 /*
2935   for( src = 1; src++ < source->cnt; ) {
2936         *temp = *slotptr(source,src);
2937         key = keyptr (source,src);
2938
2939         for( idx = src; --idx; ) {
2940           key2 = keyptr (source,idx);
2941           if( keycmp (key, key2->key, key2->len) < 0 ) {
2942                 *slotptr(source,idx+1) = *slotptr(source,idx);
2943                 *slotptr(source,idx) = *temp;
2944           } else
2945                 break;
2946         }
2947   }
2948 */
2949         qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2950   //  add entries to redo log
2951
2952   if( bt->mgr->pagezero->redopages )
2953         lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2954   else
2955         lsn = 0;
2956
2957   //  perform the individual actions in the transaction
2958
2959   if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2960         return bt->mgr->err;
2961
2962   // if number of active pages
2963   // is greater than the buffer pool
2964   // promote page into larger btree
2965
2966   if( bt->main )
2967    while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no )
2968         if( bt_promote (bt) )
2969           return bt->mgr->err;
2970
2971   // return success
2972
2973   return 0;
2974 }
2975
2976 //      execute the source list of inserts/deletes
2977
2978 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2979 {
2980 uint slot, src, idx, samepage, entry;
2981 BtPageSet set[1], prev[1];
2982 unsigned char value[BtId];
2983 BtLatchSet *latch;
2984 uid right_page_no;
2985 AtomicTxn *locks;
2986 BtKey *key, *ptr;
2987 BtPage page;
2988 BtVal *val;
2989
2990   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2991
2992   // Load the leaf page for each key
2993   // group same page references with reuse bit
2994
2995   for( src = 0; src++ < source->cnt; ) {
2996         key = keyptr(source, src);
2997
2998         // first determine if this modification falls
2999         // on the same page as the previous modification
3000         //      note that the far right leaf page is a special case
3001
3002         if( samepage = src > 1 )
3003           samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
3004
3005         if( !samepage )
3006           if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
3007                 ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
3008           else
3009                 return mgr->err;
3010         else
3011           slot = 0;
3012
3013         if( slot )
3014          if( slotptr(set->page, slot)->type == Librarian )
3015           slot++;
3016
3017         entry = set->latch - mgr->leafsets;
3018         locks[src].reuse = samepage;
3019         locks[src].entry = entry;
3020         locks[src].slot = slot;
3021
3022         //      capture current lsn for master page
3023
3024         locks[src].reqlsn = set->page->lsn;
3025   }
3026
3027   // insert or delete each key
3028   // process any splits or merges
3029   // run through txn list backwards
3030
3031   samepage = source->cnt + 1;
3032
3033   for( src = source->cnt; src; src-- ) {
3034         if( locks[src].reuse )
3035           continue;
3036
3037         //  perform the txn operations
3038         //      from smaller to larger on
3039         //  the same page
3040
3041         for( idx = src; idx < samepage; idx++ )
3042          switch( slotptr(source,idx)->type ) {
3043          case Delete:
3044           if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
3045                 return mgr->err;
3046           break;
3047
3048         case Duplicate:
3049         case Unique:
3050           if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
3051                 return mgr->err;
3052           break;
3053
3054         default:
3055           bt_atomicpage (mgr, source, locks, idx, set);
3056           break;
3057         }
3058
3059         //      after the same page operations have finished,
3060         //  process master page for splits or deletion.
3061
3062         latch = prev->latch = mgr->leafsets + locks[src].entry;
3063         prev->page = bt_mappage (mgr, prev->latch);
3064         samepage = src;
3065
3066         //  pick-up all splits from master page
3067         //      each one is already pinned & WriteLocked.
3068
3069         while( entry = prev->latch->split ) {
3070           set->latch = mgr->leafsets + entry;
3071           set->page = bt_mappage (mgr, set->latch);
3072
3073           // delete empty master page by undoing its split
3074           //  (this is potentially another empty page)
3075           //  note that there are no pointers to it yet
3076
3077           if( !prev->page->act ) {
3078                 memcpy (set->page->left, prev->page->left, BtId);
3079                 memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
3080                 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3081                 prev->latch->split = set->latch->split;
3082                 prev->latch->dirty = 1;
3083                 bt_freepage (mgr, set, thread_no);
3084                 continue;
3085           }
3086
3087           // remove empty split page from the split chain
3088           // and return it to the free list. No other
3089           // thread has its page number yet.
3090
3091           if( !set->page->act ) {
3092                 memcpy (prev->page->right, set->page->right, BtId);
3093                 prev->latch->split = set->latch->split;
3094
3095                 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3096                 bt_freepage (mgr, set, thread_no);
3097                 continue;
3098           }
3099
3100           //  update prev's fence key
3101
3102           ptr = keyptr(prev->page,prev->page->cnt);
3103           bt_putid (value, prev->latch->page_no);
3104
3105           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3106                 return mgr->err;
3107
3108           // splice in the left link into the split page
3109
3110           bt_putid (set->page->left, prev->latch->page_no);
3111
3112           if( lsm )
3113                 bt_syncpage (mgr, prev->page, prev->latch);
3114
3115           *prev = *set;
3116         }
3117
3118         //  update left pointer in next right page from last split page
3119         //      (if all splits were reversed or none occurred, latch->split == 0)
3120
3121         if( latch->split ) {
3122           //  fix left pointer in master's original (now split)
3123           //  far right sibling or set rightmost page in page zero
3124
3125           if( right_page_no = bt_getid (prev->page->right) ) {
3126                 if( set->latch = bt_pinleaf (mgr, right_page_no, thread_no) )
3127                   set->page = bt_mappage (mgr, set->latch);
3128                 else
3129                   return mgr->err;
3130
3131             bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3132             bt_putid (set->page->left, prev->latch->page_no);
3133                 set->latch->dirty = 1;
3134
3135             bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
3136                 bt_unpinlatch (set->latch, thread_no, __LINE__);
3137           } else {      // prev is rightmost page
3138             bt_mutexlock (mgr->lock);
3139                 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3140             bt_releasemutex(mgr->lock);
3141           }
3142
3143           //  switch the original fence key from the
3144           //  master page to the last split page.
3145
3146           ptr = keyptr(prev->page,prev->page->cnt);
3147           bt_putid (value, prev->latch->page_no);
3148
3149           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) )
3150                 return mgr->err;
3151
3152           if( lsm )
3153                 bt_syncpage (mgr, prev->page, prev->latch);
3154
3155           //  unlock and unpin the split pages
3156
3157           bt_atomicrelease (mgr, latch->split, thread_no);
3158
3159           //  unlock and unpin the master page
3160
3161           latch->split = 0;
3162           bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
3163           bt_unpinlatch(latch, thread_no, __LINE__);
3164           continue;
3165         }
3166
3167         //      since there are no splits remaining, we're
3168         //  finished if master page occupied
3169
3170         if( prev->page->act ) {
3171           bt_unlockpage(BtLockWrite, prev->latch, thread_no, __LINE__);
3172
3173           if( lsm )
3174                 bt_syncpage (mgr, prev->page, prev->latch);
3175
3176           bt_unpinlatch(prev->latch, thread_no, __LINE__);
3177           continue;
3178         }
3179
3180         // any and all splits were reversed, and the
3181         // master page located in prev is empty, delete it
3182
3183         if( bt_deletepage (mgr, prev, thread_no, 0) )
3184                 return mgr->err;
3185   }
3186
3187   free (locks);
3188   return 0;
3189 }
3190
3191 //  pick & promote a page into the larger btree
3192
3193 BTERR bt_promote (BtDb *bt)
3194 {
3195 uint entry, slot, idx;
3196 BtPageSet set[1];
3197 BtSlot *node;
3198 BtKey *ptr;
3199 BtVal *val;
3200
3201   while( 1 ) {
3202 #ifdef unix
3203         entry = __sync_fetch_and_add(&bt->mgr->leafpromote, 1);
3204 #else
3205         entry = _InterlockedIncrement (&bt->mgr->leafpromote) - 1;
3206 #endif
3207         entry %= bt->mgr->leaftotal;
3208
3209         if( !entry )
3210                 continue;
3211
3212         set->latch = bt->mgr->leafsets + entry;
3213
3214         //  skip this entry if it has never been used
3215
3216         if( !set->latch->page_no )
3217           continue;
3218
3219         if( !bt_mutextry(set->latch->modify) )
3220                 continue;
3221
3222         //  skip this entry if it is pinned
3223
3224         if( set->latch->pin & ~CLOCK_bit ) {
3225           bt_releasemutex(set->latch->modify);
3226           continue;
3227         }
3228
3229         set->page = bt_mappage (bt->mgr, set->latch);
3230
3231         // entry has no right sibling
3232
3233         if( !bt_getid (set->page->right) ) {
3234           bt_releasemutex(set->latch->modify);
3235           continue;
3236         }
3237
3238         // entry interiour node or being killed or constructed
3239
3240         if( set->page->lvl || set->page->nopromote || set->page->kill ) {
3241           bt_releasemutex(set->latch->modify);
3242           continue;
3243         }
3244
3245         //  pin the page for our access
3246         //      and leave it locked for the
3247         //      duration of the promotion.
3248
3249         set->latch->pin++;
3250         bt_lockpage (BtLockWrite, set->latch, bt->thread_no, __LINE__);
3251         bt_releasemutex(set->latch->modify);
3252
3253         // transfer slots in our selected page to the main btree
3254 if( !(entry % 100) )
3255 fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_no, set->page->act);
3256
3257         if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
3258                 fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
3259                 return bt->main->err;
3260         }
3261
3262         //  now delete the page
3263
3264         if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
3265                 fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread);
3266
3267         return bt->mgr->err;
3268   }
3269 }
3270
3271 //      find unique key == given key, or first duplicate key in
3272 //      leaf level and return number of value bytes
3273 //      or (-1) if not found.
3274
3275 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
3276 {
3277 int ret = -1, type;
3278 BtPageSet set[1];
3279 BtSlot *node;
3280 BtKey *ptr;
3281 BtVal *val;
3282 uint slot;
3283
3284  for( type = 0; type < 2; type++ )
3285   if( slot = bt_loadpage (type ? bt->main : bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) ) {
3286         node = slotptr(set->page, slot);
3287
3288         //      skip librarian slot place holder
3289
3290         if( node->type == Librarian )
3291           node = slotptr(set->page, ++slot);
3292
3293         ptr = keyptr(set->page, slot);
3294
3295         //      not there if we reach the stopper key
3296         //  or the key doesn't match what's on the page.
3297
3298         if( slot == set->page->cnt )
3299           if( !bt_getid (set->page->right) ) {
3300                 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3301                 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3302                 continue;
3303         }
3304
3305         if( keycmp (ptr, key, keylen) ) {
3306                 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3307                 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3308                 continue;
3309         }
3310
3311         // key matches, return >= 0 value bytes copied
3312         //      or -1 if not there.
3313
3314         if( node->type == Delete || node->dead ) {
3315                 ret = -1;
3316                 goto findxit;
3317         }
3318
3319         val = valptr (set->page,slot);
3320
3321         if( valmax > val->len )
3322                 valmax = val->len;
3323
3324         memcpy (value, val->value, valmax);
3325         ret = valmax;
3326         goto findxit;
3327   }
3328
3329   ret = -1;
3330
3331 findxit:
3332   if( type < 2 ) {
3333         bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3334         bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3335   }
3336   return ret;
3337 }
3338
3339 //      set cursor to highest slot on highest page
3340
3341 uint bt_lastkey (BtMgr *mgr, BtPage cursor, ushort thread_no)
3342 {
3343 uid page_no = bt_getid (mgr->pagezero->alloc->left);
3344 BtPageSet set[1];
3345
3346         if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
3347                 set->page = bt_mappage (mgr, set->latch);
3348         else
3349                 return 0;
3350
3351     bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3352         memcpy (cursor, set->page, mgr->page_size << mgr->leaf_xtra);
3353     bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3354         bt_unpinlatch (set->latch, thread_no, __LINE__);
3355         return cursor->cnt;
3356 }
3357
3358 //      return previous slot on cursor page
3359
3360 uint bt_prevkey (BtMgr *mgr, BtPage cursor, uint slot, ushort thread_no)
3361 {
3362 uid cursor_page = cursor->page_no;
3363 uid ourright, next, us = cursor_page;
3364 BtPageSet set[1];
3365
3366         if( --slot )
3367                 return slot;
3368
3369         ourright = bt_getid(cursor->right);
3370
3371 goleft:
3372         if( !(next = bt_getid(cursor->left)) )
3373                 return 0;
3374
3375 findourself:
3376         cursor_page = next;
3377
3378         if( set->latch = bt_pinleaf (mgr, next, thread_no) )
3379                 set->page = bt_mappage (mgr, set->latch);
3380         else
3381                 return 0;
3382
3383     bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3384         memcpy (cursor, set->page, mgr->page_size << mgr->leaf_xtra);
3385         bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3386         bt_unpinlatch (set->latch, thread_no, __LINE__);
3387         
3388         next = bt_getid (cursor->right);
3389
3390         if( cursor->kill )
3391                 goto findourself;
3392
3393         if( next != us )
3394           if( next == ourright )
3395                 goto goleft;
3396           else
3397                 goto findourself;
3398
3399         return cursor->cnt;
3400 }
3401
3402 //  return next slot on cursor page
3403 //  or slide cursor right into next page
3404
3405 uint bt_nextkey (BtDb *bt, uint slot)
3406 {
3407 BtPageSet set[1];
3408 uid right;
3409
3410   do {
3411         right = bt_getid(bt->cachecursor->right);
3412
3413         while( slot++ < bt->cachecursor->cnt )
3414           if( slotptr(bt->cachecursor,slot)->dead )
3415                 continue;
3416           else if( right || (slot < bt->cachecursor->cnt) ) // skip infinite stopper
3417                 return slot;
3418           else
3419                 break;
3420
3421         if( !right )
3422                 break;
3423
3424         if( set->latch = bt_pinleaf (bt->mgr, right, bt->thread_no) )
3425                 set->page = bt_mappage (bt->mgr, set->latch);
3426         else
3427                 return 0;
3428
3429     bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3430         memcpy (bt->cachecursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3431         bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3432         bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3433         slot = 0;
3434
3435   } while( 1 );
3436
3437   return bt->mgr->err = 0;
3438 }
3439
3440 //  cache page of keys into cursor and return starting slot for given key
3441
3442 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3443 {
3444 BtPageSet set[1];
3445 uint slot;
3446
3447         // cache page for retrieval
3448
3449         if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3450           memcpy (bt->cachecursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3451         else
3452           return 0;
3453
3454         bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3455         bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3456         return slot;
3457 }
3458
3459 #ifdef STANDALONE
3460
3461 #ifndef unix
3462 double getCpuTime(int type)
3463 {
3464 FILETIME crtime[1];
3465 FILETIME xittime[1];
3466 FILETIME systime[1];
3467 FILETIME usrtime[1];
3468 SYSTEMTIME timeconv[1];
3469 double ans = 0;
3470
3471         memset (timeconv, 0, sizeof(SYSTEMTIME));
3472
3473         switch( type ) {
3474         case 0:
3475                 GetSystemTimeAsFileTime (xittime);
3476                 FileTimeToSystemTime (xittime, timeconv);
3477                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3478                 break;
3479         case 1:
3480                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3481                 FileTimeToSystemTime (usrtime, timeconv);
3482                 break;
3483         case 2:
3484                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3485                 FileTimeToSystemTime (systime, timeconv);
3486                 break;
3487         }
3488
3489         ans += (double)timeconv->wHour * 3600;
3490         ans += (double)timeconv->wMinute * 60;
3491         ans += (double)timeconv->wSecond;
3492         ans += (double)timeconv->wMilliseconds / 1000;
3493         return ans;
3494 }
3495 #else
3496 #include <time.h>
3497 #include <sys/resource.h>
3498
3499 double getCpuTime(int type)
3500 {
3501 struct rusage used[1];
3502 struct timeval tv[1];
3503
3504         switch( type ) {
3505         case 0:
3506                 gettimeofday(tv, NULL);
3507                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3508
3509         case 1:
3510                 getrusage(RUSAGE_SELF, used);
3511                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3512
3513         case 2:
3514                 getrusage(RUSAGE_SELF, used);
3515                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3516         }
3517
3518         return 0;
3519 }
3520 #endif
3521
3522 void bt_poolaudit (BtMgr *mgr, char *type)
3523 {
3524 BtLatchSet *latch, test[1];
3525 uint entry;
3526
3527         memset (test, 0, sizeof(test));
3528
3529         if( memcmp (test, mgr->latchsets, sizeof(test)) )
3530                 fprintf(stderr, "%s latchset zero overwritten\n", type);
3531
3532         if( memcmp (test, mgr->leafsets, sizeof(test)) )
3533                 fprintf(stderr, "%s leafset zero overwritten\n", type);
3534
3535         for( entry = 0; ++entry < mgr->latchtotal; ) {
3536                 latch = mgr->latchsets + entry;
3537
3538                 if( *latch->modify->value )
3539                         fprintf(stderr, "%s latchset %d modifylocked for page %d\n", type, entry, latch->page_no);
3540
3541                 if( latch->pin & ~CLOCK_bit )
3542                         fprintf(stderr, "%s latchset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3543         }
3544
3545         for( entry = 0; ++entry < mgr->leaftotal; ) {
3546                 latch = mgr->leafsets + entry;
3547
3548                 if( *latch->modify->value )
3549                         fprintf(stderr, "%s leafset %d modifylocked for page %d\n", type, entry, latch->page_no);
3550
3551                 if( latch->pin & ~CLOCK_bit )
3552                         fprintf(stderr, "%s leafset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3553         }
3554 }
3555
3556 typedef struct {
3557         char idx;
3558         char *type;
3559         char *infile;
3560         BtMgr *main;
3561         BtMgr *mgr;
3562         int num;
3563 } ThreadArg;
3564
3565 //  standalone program to index file of keys
3566 //  then list them onto std-out
3567
3568 #ifdef unix
3569 void *index_file (void *arg)
3570 #else
3571 uint __stdcall index_file (void *arg)
3572 #endif
3573 {
3574 int line = 0, found = 0, cnt = 0, idx;
3575 uid next, page_no = LEAF_page;  // start on first page of leaves
3576 int ch, len = 0, slot, type = 0;
3577 unsigned char key[BT_maxkey];
3578 unsigned char txn[65536];
3579 ThreadArg *args = arg;
3580 BtPage page, frame;
3581 BtPageSet set[1];
3582 uint nxt = 65536;
3583 BtKey *ptr;
3584 BtVal *val;
3585 BtDb *bt;
3586 FILE *in;
3587
3588         bt = bt_open (args->mgr, args->main);
3589         page = (BtPage)txn;
3590
3591         if( args->idx < strlen (args->type) )
3592                 ch = args->type[args->idx];
3593         else
3594                 ch = args->type[strlen(args->type) - 1];
3595
3596         switch(ch | 0x20)
3597         {
3598         case 'd':
3599                 type = Delete;
3600
3601         case 'p':
3602                 if( !type )
3603                         type = Unique;
3604
3605                 if( args->num )
3606                  if( type == Delete )
3607                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3608                  else
3609                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3610                 else
3611                  if( type == Delete )
3612                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3613                  else
3614                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3615
3616                 if( in = fopen (args->infile, "rb") )
3617                   while( ch = getc(in), ch != EOF )
3618                         if( ch == '\n' )
3619                         {
3620                           line++;
3621
3622                           if( !args->num ) {
3623                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3624                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3625                             len = 0;
3626                                 continue;
3627                           }
3628
3629                           nxt -= len - 10;
3630                           memcpy (txn + nxt, key + 10, len - 10);
3631                           nxt -= 1;
3632                           txn[nxt] = len - 10;
3633                           nxt -= 10;
3634                           memcpy (txn + nxt, key, 10);
3635                           nxt -= 1;
3636                           txn[nxt] = 10;
3637                           slotptr(page,++cnt)->off  = nxt;
3638                           slotptr(page,cnt)->type = type;
3639                           len = 0;
3640
3641                           if( cnt < args->num )
3642                                 continue;
3643
3644                           page->cnt = cnt;
3645                           page->act = cnt;
3646                           page->min = nxt;
3647
3648                           if( bt_atomictxn (bt, page) )
3649                                 fprintf(stderr, "Error %d Line: %d by %d source: %d\n", bt->mgr->err, bt->mgr->line, bt->mgr->err_thread, line), exit(0);
3650                           nxt = sizeof(txn);
3651                           cnt = 0;
3652
3653                         }
3654                         else if( len < BT_maxkey )
3655                                 key[len++] = ch;
3656                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3657                 break;
3658
3659         case 'w':
3660                 fprintf(stderr, "started indexing for %s\n", args->infile);
3661                 if( in = fopen (args->infile, "r") )
3662                   while( ch = getc(in), ch != EOF )
3663                         if( ch == '\n' )
3664                         {
3665                           line++;
3666
3667                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3668                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3669                           len = 0;
3670                         }
3671                         else if( len < BT_maxkey )
3672                                 key[len++] = ch;
3673                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3674                 break;
3675
3676         case 'f':
3677                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3678                 if( in = fopen (args->infile, "rb") )
3679                   while( ch = getc(in), ch != EOF )
3680                         if( ch == '\n' )
3681                         {
3682                           line++;
3683                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3684                                 found++;
3685                           else if( bt->mgr->err )
3686                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3687                           len = 0;
3688                         }
3689                         else if( len < BT_maxkey )
3690                                 key[len++] = ch;
3691                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3692                 break;
3693
3694         case 's':
3695                 fprintf(stderr, "started forward scan\n");
3696                 if( slot = bt_startkey (bt, NULL, 0) )
3697                   do {
3698                         if( slotptr(bt->cachecursor, slot)->type == Librarian )
3699                           slot++;
3700
3701                         ptr = keyptr(bt->cachecursor, slot);
3702                         len = ptr->len;
3703
3704                         if( slotptr(bt->cachecursor, slot)->type == Duplicate )
3705                                 len -= BtId;
3706
3707                         fwrite (ptr->key, len, 1, stdout);
3708                         val = valptr(bt->cachecursor, slot);
3709                         fwrite (val->value, val->len, 1, stdout);
3710                         fputc ('\n', stdout);
3711                         cnt++;
3712                  } while( slot = bt_nextkey (bt, slot) );
3713
3714                 fprintf(stderr, " Total keys read %d\n", cnt);
3715                 break;
3716
3717         case 'r':
3718                 fprintf(stderr, "started reverse scan\n");
3719                 if( slot = bt_lastkey (bt->mgr, bt->cachecursor, bt->thread_no) )
3720                    while( slot = bt_prevkey (bt->mgr, bt->cachecursor, slot, bt->thread_no) ) {
3721                         if( slotptr(bt->cachecursor, slot)->dead )
3722                           continue;
3723
3724                         ptr = keyptr(bt->cachecursor, slot);
3725                         len = ptr->len;
3726
3727                         if( slotptr(bt->cachecursor, slot)->type == Duplicate )
3728                                 len -= BtId;
3729
3730                         fwrite (ptr->key, len, 1, stdout);
3731                         val = valptr(bt->cachecursor, slot);
3732                         fwrite (val->value, val->len, 1, stdout);
3733                         fputc ('\n', stdout);
3734                         cnt++;
3735                   }
3736
3737                 fprintf(stderr, " Total keys read %d\n", cnt);
3738                 break;
3739         }
3740
3741         bt_close (bt);
3742 #ifdef unix
3743         return NULL;
3744 #else
3745         return 0;
3746 #endif
3747 }
3748
3749 typedef struct timeval timer;
3750
3751 int main (int argc, char **argv)
3752 {
3753 int idx, cnt, len, slot, err;
3754 double start, stop;
3755 #ifdef unix
3756 pthread_t *threads;
3757 #else
3758 HANDLE *threads;
3759 #endif
3760 ThreadArg *args;
3761 uint mainleafpool = 0;
3762 uint mainleafxtra = 0;
3763 uint redopages = 0;
3764 uint poolsize = 0;
3765 uint leafpool = 0;
3766 uint leafxtra = 0;
3767 uint mainpool = 0;
3768 uint mainbits = 0;
3769 int bits = 16;
3770 float elapsed;
3771 int num = 0;
3772 char key[1];
3773 BtMgr *main;
3774 BtMgr *mgr;
3775 BtKey *ptr;
3776
3777         if( argc < 3 ) {
3778                 fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
3779                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
3780                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
3781                 fprintf (stderr, "  cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with a one character command for each input src_file. Commands can also be given with no input file\n");
3782                 fprintf (stderr, "  pagebits is the page size in bits for the cache btree\n");
3783                 fprintf (stderr, "  leafbits is the number of xtra bits for a leaf page\n");
3784                 fprintf (stderr, "  poolsize is the number of pages in buffer pool for the cache btree\n");
3785                 fprintf (stderr, "  leafpool is the number of leaf pages in leaf pool for the cache btree\n");
3786                 fprintf (stderr, "  txnsize = n to block transactions into n units, or zero for no transactions\n");
3787                 fprintf (stderr, "  redopages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3788                 fprintf (stderr, "  mainbits is the page size of the main btree in bits\n");
3789                 fprintf (stderr, "  mainpool is the number of main pages in the main buffer pool\n");
3790                 fprintf (stderr, "  mainleaf is the number of main pages in the main leaf pool\n");
3791                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3792                 exit(0);
3793         }
3794
3795         start = getCpuTime(0);
3796
3797         if( argc > 4 )
3798                 bits = atoi(argv[4]);
3799
3800         if( argc > 5 )
3801                 leafxtra = atoi(argv[5]);
3802
3803         if( argc > 6 )
3804                 poolsize = atoi(argv[6]);
3805
3806         if( !poolsize )
3807                 fprintf (stderr, "no page pool\n"), exit(1);
3808
3809         if( argc > 7 )
3810                 leafpool = atoi(argv[7]);
3811
3812         if( argc > 8 )
3813                 num = atoi(argv[8]);
3814
3815         if( argc > 9 )
3816                 redopages = atoi(argv[9]);
3817
3818         if( redopages > 65535 )
3819                 fprintf (stderr, "Recovery buffer too large\n"), exit(1);
3820
3821         if( argc > 10 )
3822                 mainbits = atoi(argv[10]);
3823
3824         if( argc > 11 )
3825                 mainleafxtra = atoi(argv[11]);
3826
3827         if( argc > 12 )
3828                 mainpool = atoi(argv[12]);
3829
3830         if( argc > 13 )
3831                 mainleafpool = atoi(argv[13]);
3832
3833         cnt = argc - 14;
3834 #ifdef unix
3835         threads = malloc (cnt * sizeof(pthread_t));
3836 #else
3837         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3838 #endif
3839         args = malloc ((cnt + 1) * sizeof(ThreadArg));
3840
3841         mgr = bt_mgr (argv[1], bits, leafxtra, poolsize, leafpool, redopages);
3842
3843         if( !mgr ) {
3844                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3845                 exit (1);
3846         }
3847
3848         if( mainbits ) {
3849                 main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool, mainleafpool, 0);
3850
3851                 if( !main ) {
3852                         fprintf(stderr, "Index Open Error %s\n", argv[2]);
3853                         exit (1);
3854                 }
3855         } else
3856                 main = NULL;
3857
3858         //      fire off threads
3859
3860         if( cnt )
3861           for( idx = 0; idx < cnt; idx++ ) {
3862                 args[idx].infile = argv[idx + 14];
3863                 args[idx].type = argv[3];
3864                 args[idx].main = main;
3865                 args[idx].mgr = mgr;
3866                 args[idx].num = num;
3867                 args[idx].idx = idx;
3868 #ifdef unix
3869                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3870                         fprintf(stderr, "Error creating thread %d\n", err);
3871 #else
3872                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3873 #endif
3874           }
3875         else {
3876                 args[0].infile = argv[idx + 10];
3877                 args[0].type = argv[3];
3878                 args[0].main = main;
3879                 args[0].mgr = mgr;
3880                 args[0].num = num;
3881                 args[0].idx = 0;
3882                 index_file (args);
3883         }
3884
3885         //      wait for termination
3886
3887 #ifdef unix
3888         for( idx = 0; idx < cnt; idx++ )
3889                 pthread_join (threads[idx], NULL);
3890 #else
3891         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3892
3893         for( idx = 0; idx < cnt; idx++ )
3894                 CloseHandle(threads[idx]);
3895 #endif
3896         bt_poolaudit(mgr, "cache");
3897
3898         if( main )
3899                 bt_poolaudit(main, "main");
3900
3901         fprintf(stderr, "cache %lld leaves %lld upper %d reads %d writes %d found\n", mgr->pagezero->leafpages, mgr->pagezero->upperpages, mgr->reads, mgr->writes, mgr->found);
3902         if( main )
3903                 fprintf(stderr, "main %lld leaves %lld upper %d reads %d writes %d found\n", main->pagezero->leafpages, main->pagezero->upperpages, main->reads, main->writes, main->found);
3904
3905         if( main )
3906                 bt_mgrclose (main);
3907         bt_mgrclose (mgr);
3908
3909         elapsed = getCpuTime(0) - start;
3910         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3911         elapsed = getCpuTime(1);
3912         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3913         elapsed = getCpuTime(2);
3914         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3915 }
3916
3917 BtKey *bt_key (BtPage page, uint slot)
3918 {
3919 return keyptr(page,slot);
3920 }
3921
3922 BtSlot *bt_slot (BtPage page, uint slot)
3923 {
3924 return slotptr(page,slot);
3925 }
3926 #endif  //STANDALONE