]> pd.if.org Git - btree/blob - btree2u.c
Remove debug code.
[btree] / btree2u.c
1 // btree version 2u
2 //      with combined latch & pool manager
3 // 26 FEB 2014
4
5 // author: karl malbrain, malbrain@cal.berkeley.edu
6
7 /*
8 This work, including the source code, documentation
9 and related data, is placed into the public domain.
10
11 The orginal author is Karl Malbrain.
12
13 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
14 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
15 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
16 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
17 RESULTING FROM THE USE, MODIFICATION, OR
18 REDISTRIBUTION OF THIS SOFTWARE.
19 */
20
21 // Please see the project home page for documentation
22 // code.google.com/p/high-concurrency-btree
23
24 #define _FILE_OFFSET_BITS 64
25 #define _LARGEFILE64_SOURCE
26
27 #ifdef linux
28 #define _GNU_SOURCE
29 #endif
30
31 #ifdef unix
32 #include <unistd.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <time.h>
36 #include <fcntl.h>
37 #include <sys/mman.h>
38 #include <errno.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #include <io.h>
47 #endif
48
49 #include <memory.h>
50 #include <string.h>
51
52 typedef unsigned long long      uid;
53
54 #ifndef unix
55 typedef unsigned long long      off64_t;
56 typedef unsigned short          ushort;
57 typedef unsigned int            uint;
58 #endif
59
60 #define BT_ro 0x6f72    // ro
61 #define BT_rw 0x7772    // rw
62 #define BT_fl 0x6c66    // fl
63
64 #define BT_maxbits              15                                      // maximum page size in bits
65 #define BT_minbits              12                                      // minimum page size in bits
66 #define BT_minpage              (1 << BT_minbits)       // minimum page size
67 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
68
69 //  BTree page number constants
70 #define ALLOC_page              0
71 #define ROOT_page               1
72 #define LEAF_page               2
73 #define LATCH_page              3
74
75 //      Number of levels to create in a new BTree
76
77 #define MIN_lvl                 2
78 #define MAX_lvl                 15
79
80 /*
81 There are five lock types for each node in three independent sets: 
82 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
83 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
84 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
85 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
86 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
87 */
88
89 typedef enum{
90         BtLockAccess,
91         BtLockDelete,
92         BtLockRead,
93         BtLockWrite,
94         BtLockParent
95 }BtLock;
96
97 //      definition for latch implementation
98
99 // exclusive is set for write access
100 // share is count of read accessors
101 // grant write lock when share == 0
102
103 volatile typedef struct {
104         unsigned char mutex[1];
105         unsigned char exclusive:1;
106         unsigned char pending:1;
107         ushort share;
108 } BtSpinLatch;
109
110 //      Define the length of the page and key pointers
111
112 #define BtId 6
113
114 //      Page key slot definition.
115
116 //      If BT_maxbits is 15 or less, you can save 2 bytes
117 //      for each key stored by making the first two uints
118 //      into ushorts.  You can also save 4 bytes by removing
119 //      the tod field from the key.
120
121 //      Keys are marked dead, but remain on the page until
122 //      cleanup is called. The fence key (highest key) for
123 //      the page is always present, even if dead.
124
125 typedef struct {
126 #ifdef USETOD
127         uint tod;                                       // time-stamp for key
128 #endif
129         ushort off:BT_maxbits;          // page offset for key start
130         ushort dead:1;                          // set for deleted key
131         unsigned char id[BtId];         // id associated with key
132 } BtSlot;
133
134 //      The key structure occupies space at the upper end of
135 //      each page.  It's a length byte followed by the value
136 //      bytes.
137
138 typedef struct {
139         unsigned char len;
140         unsigned char key[0];
141 } *BtKey;
142
143 //      The first part of an index page.
144 //      It is immediately followed
145 //      by the BtSlot array of keys.
146
147 typedef struct BtPage_ {
148         uint cnt;                                       // count of keys in page
149         uint act;                                       // count of active keys
150         uint min;                                       // next key offset
151         unsigned char bits:6;           // page size in bits
152         unsigned char free:1;           // page is on free list
153         unsigned char dirty:1;          // page is dirty in cache
154         unsigned char lvl:6;            // level of page
155         unsigned char kill:1;           // page is being deleted
156         unsigned char clean:1;          // page needs cleaning
157         unsigned char right[BtId];      // page number to right
158 } *BtPage;
159
160 typedef struct {
161         struct BtPage_ alloc[2];        // next & free page_nos in right ptr
162         BtSpinLatch lock[1];            // allocation area lite latch
163         volatile uint latchdeployed;// highest number of latch entries deployed
164         volatile uint nlatchpage;       // number of latch pages at BT_latch
165         volatile uint latchtotal;       // number of page latch entries
166         volatile uint latchhash;        // number of latch hash table slots
167         volatile uint latchvictim;      // next latch hash entry to examine
168         volatile uint safelevel;        // safe page level in cache
169         volatile uint cache[MAX_lvl];// cache census counts by btree level
170 } BtLatchMgr;
171
172 //  latch hash table entries
173
174 typedef struct {
175         volatile uint slot;             // Latch table entry at head of collision chain
176         BtSpinLatch latch[1];   // lock for the collision chain
177 } BtHashEntry;
178
179 //      latch manager table structure
180
181 typedef struct {
182         volatile uid page_no;           // latch set page number on disk
183         BtSpinLatch readwr[1];          // read/write page lock
184         BtSpinLatch access[1];          // Access Intent/Page delete
185         BtSpinLatch parent[1];          // Posting of fence key in parent
186         volatile ushort pin;            // number of pins/level/clock bits
187         volatile uint next;                     // next entry in hash table chain
188         volatile uint prev;                     // prev entry in hash table chain
189 } BtLatchSet;
190
191 #define CLOCK_mask 0xe000
192 #define CLOCK_unit 0x2000
193 #define PIN_mask 0x07ff
194 #define LVL_mask 0x1800
195 #define LVL_shift 11
196
197 //      The object structure for Btree access
198
199 typedef struct _BtDb {
200         uint page_size;         // each page size       
201         uint page_bits;         // each page size in bits       
202         uid page_no;            // current page number  
203         uid cursor_page;        // current cursor page number   
204         int  err;
205         uint mode;                      // read-write mode
206         BtPage cursor;          // cached frame for start/next (never mapped)
207         BtPage frame;           // spare frame for the page split (never mapped)
208         BtPage page;            // current mapped page in buffer pool
209         BtLatchSet *latch;                      // current page latch
210         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
211         BtLatchSet *latchsets;          // mapped latch set from latch pages
212         unsigned char *pagepool;        // cached page pool set
213         BtHashEntry *table;     // the hash table
214 #ifdef unix
215         int idx;
216 #else
217         HANDLE idx;
218         HANDLE halloc;          // allocation and latch table handle
219 #endif
220         unsigned char *mem;     // frame, cursor, memory buffers
221         uint found;                     // last deletekey found key
222 } BtDb;
223
224 typedef enum {
225 BTERR_ok = 0,
226 BTERR_notfound,
227 BTERR_struct,
228 BTERR_ovflw,
229 BTERR_read,
230 BTERR_lock,
231 BTERR_hash,
232 BTERR_kill,
233 BTERR_map,
234 BTERR_wrt,
235 BTERR_eof
236 } BTERR;
237
238 // B-Tree functions
239 extern void bt_close (BtDb *bt);
240 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk);
241 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
242 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
243 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
244 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
245 extern uint bt_nextkey   (BtDb *bt, uint slot);
246
247 //      internal functions
248 void bt_update (BtDb *bt, BtPage page);
249 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch);
250 //  Helper functions to return slot values
251
252 extern BtKey bt_key (BtDb *bt, uint slot);
253 extern uid bt_uid (BtDb *bt, uint slot);
254 #ifdef USETOD
255 extern uint bt_tod (BtDb *bt, uint slot);
256 #endif
257
258 //  The page is allocated from low and hi ends.
259 //  The key offsets and row-id's are allocated
260 //  from the bottom, while the text of the key
261 //  is allocated from the top.  When the two
262 //  areas meet, the page is split into two.
263
264 //  A key consists of a length byte, two bytes of
265 //  index number (0 - 65534), and up to 253 bytes
266 //  of key value.  Duplicate keys are discarded.
267 //  Associated with each key is a 48 bit row-id.
268
269 //  The b-tree root is always located at page 1.
270 //      The first leaf page of level zero is always
271 //      located on page 2.
272
273 //      The b-tree pages are linked with right
274 //      pointers to facilitate enumerators,
275 //      and provide for concurrency.
276
277 //      When to root page fills, it is split in two and
278 //      the tree height is raised by a new root at page
279 //      one with two keys.
280
281 //      Deleted keys are marked with a dead bit until
282 //      page cleanup The fence key for a node is always
283 //      present, even after deletion and cleanup.
284
285 //  Deleted leaf pages are reclaimed  on a free list.
286 //      The upper levels of the btree are fixed on creation.
287
288 //  To achieve maximum concurrency one page is locked at a time
289 //  as the tree is traversed to find leaf key in question. The right
290 //  page numbers are used in cases where the page is being split,
291 //      or consolidated.
292
293 //  Page 0 (ALLOC page) is dedicated to lock for new page extensions,
294 //      and chains empty leaf pages together for reuse.
295
296 //      Parent locks are obtained to prevent resplitting or deleting a node
297 //      before its fence is posted into its upper level.
298
299 //      A special open mode of BT_fl is provided to safely access files on
300 //      WIN32 networks. WIN32 network operations should not use memory mapping.
301 //      This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
302 //      to prevent local caching of network file contents.
303
304 //      Access macros to address slot and key values from the page.
305 //      Page slots use 1 based indexing.
306
307 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
308 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
309
310 void bt_putid(unsigned char *dest, uid id)
311 {
312 int i = BtId;
313
314         while( i-- )
315                 dest[i] = (unsigned char)id, id >>= 8;
316 }
317
318 uid bt_getid(unsigned char *src)
319 {
320 uid id = 0;
321 int i;
322
323         for( i = 0; i < BtId; i++ )
324                 id <<= 8, id |= *src++; 
325
326         return id;
327 }
328
329 BTERR bt_abort (BtDb *bt, BtPage page, uid page_no, BTERR err)
330 {
331 BtKey ptr;
332
333         fprintf(stderr, "\n Btree2 abort, error %d on page %.8x\n", err, page_no);
334         fprintf(stderr, "level=%d kill=%d free=%d cnt=%x act=%x\n", page->lvl, page->kill, page->free, page->cnt, page->act);
335         ptr = keyptr(page, page->cnt);
336         fprintf(stderr, "fence='%.*s'\n", ptr->len, ptr->key);
337         fprintf(stderr, "right=%.8x\n", bt_getid(page->right));
338         return bt->err = err;
339 }
340
341 //      Latch Manager
342
343 //      wait until write lock mode is clear
344 //      and add 1 to the share count
345
346 void bt_spinreadlock(BtSpinLatch *latch)
347 {
348 ushort prev;
349
350   do {
351         //      obtain latch mutex
352 #ifdef unix
353         if( __sync_lock_test_and_set(latch->mutex, 1) )
354                 continue;
355 #else
356         if( _InterlockedExchange8(latch->mutex, 1) )
357                 continue;
358 #endif
359         //  see if exclusive request is granted or pending
360
361         if( prev = !(latch->exclusive | latch->pending) )
362                 latch->share++;
363
364 #ifdef unix
365         *latch->mutex = 0;
366 #else
367         _InterlockedExchange8(latch->mutex, 0);
368 #endif
369
370         if( prev )
371                 return;
372
373 #ifdef  unix
374   } while( sched_yield(), 1 );
375 #else
376   } while( SwitchToThread(), 1 );
377 #endif
378 }
379
380 //      wait for other read and write latches to relinquish
381
382 void bt_spinwritelock(BtSpinLatch *latch)
383 {
384 uint prev;
385
386   do {
387 #ifdef  unix
388         if( __sync_lock_test_and_set(latch->mutex, 1) )
389                 continue;
390 #else
391         if( _InterlockedExchange8(latch->mutex, 1) )
392                 continue;
393 #endif
394         if( prev = !(latch->share | latch->exclusive) )
395                 latch->exclusive = 1, latch->pending = 0;
396         else
397                 latch->pending = 1;
398 #ifdef unix
399         *latch->mutex = 0;
400 #else
401         _InterlockedExchange8(latch->mutex, 0);
402 #endif
403         if( prev )
404                 return;
405 #ifdef  unix
406   } while( sched_yield(), 1 );
407 #else
408   } while( SwitchToThread(), 1 );
409 #endif
410 }
411
412 //      try to obtain write lock
413
414 //      return 1 if obtained,
415 //              0 otherwise
416
417 int bt_spinwritetry(BtSpinLatch *latch)
418 {
419 uint prev;
420
421 #ifdef unix
422         if( __sync_lock_test_and_set(latch->mutex, 1) )
423                 return 0;
424 #else
425         if( _InterlockedExchange8(latch->mutex, 1) )
426                 return 0;
427 #endif
428         //      take write access if all bits are clear
429
430         if( prev = !(latch->exclusive | latch->share) )
431                 latch->exclusive = 1;
432
433 #ifdef unix
434         *latch->mutex = 0;
435 #else
436         _InterlockedExchange8(latch->mutex, 0);
437 #endif
438         return prev;
439 }
440
441 //      clear write mode
442
443 void bt_spinreleasewrite(BtSpinLatch *latch)
444 {
445 #ifdef unix
446         while( __sync_lock_test_and_set(latch->mutex, 1) )
447                 sched_yield();
448 #else
449         while( _InterlockedExchange8(latch->mutex, 1) )
450                 SwitchToThread();
451 #endif
452         latch->exclusive = 0;
453 #ifdef unix
454         *latch->mutex = 0;
455 #else
456         _InterlockedExchange8(latch->mutex, 0);
457 #endif
458 }
459
460 //      decrement reader count
461
462 void bt_spinreleaseread(BtSpinLatch *latch)
463 {
464 #ifdef unix
465         while( __sync_lock_test_and_set(latch->mutex, 1) )
466                 sched_yield();
467 #else
468         while( _InterlockedExchange8(latch->mutex, 1) )
469                 SwitchToThread();
470 #endif
471         latch->share--;
472 #ifdef unix
473         *latch->mutex = 0;
474 #else
475         _InterlockedExchange8(latch->mutex, 0);
476 #endif
477 }
478
479 //      read page from permanent location in Btree file
480
481 BTERR bt_readpage (BtDb *bt, BtPage page, uid page_no)
482 {
483 off64_t off = page_no << bt->page_bits;
484
485 #ifdef unix
486         if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) < bt->page_size ) {
487                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
488                 return bt->err = BTERR_read;
489         }
490 #else
491 OVERLAPPED ovl[1];
492 uint amt[1];
493
494         memset (ovl, 0, sizeof(OVERLAPPED));
495         ovl->Offset = off;
496         ovl->OffsetHigh = off >> 32;
497
498         if( !ReadFile(bt->idx, page, bt->page_size, amt, ovl)) {
499                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
500                 return bt->err = BTERR_read;
501         }
502         if( *amt <  bt->page_size ) {
503                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
504                 return bt->err = BTERR_read;
505         }
506 #endif
507         return 0;
508 }
509
510 //      write page to permanent location in Btree file
511 //      clear the dirty bit
512
513 BTERR bt_writepage (BtDb *bt, BtPage page, uid page_no)
514 {
515 off64_t off = page_no << bt->page_bits;
516
517 #ifdef unix
518         page->dirty = 0;
519
520         if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
521                 return bt->err = BTERR_wrt;
522 #else
523 OVERLAPPED ovl[1];
524 uint amt[1];
525
526         memset (ovl, 0, sizeof(OVERLAPPED));
527         ovl->Offset = off;
528         ovl->OffsetHigh = off >> 32;
529         page->dirty = 0;
530
531         if( !WriteFile(bt->idx, page, bt->page_size, amt, ovl) )
532                 return bt->err = BTERR_wrt;
533
534         if( *amt <  bt->page_size )
535                 return bt->err = BTERR_wrt;
536 #endif
537         return 0;
538 }
539
540 //      link latch table entry into head of latch hash table
541
542 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no)
543 {
544 BtPage page = (BtPage)((uid)slot * bt->page_size + bt->pagepool);
545 BtLatchSet *latch = bt->latchsets + slot;
546 int lvl;
547
548         if( latch->next = bt->table[hashidx].slot )
549                 bt->latchsets[latch->next].prev = slot;
550
551         bt->table[hashidx].slot = slot;
552         latch->page_no = page_no;
553         latch->prev = 0;
554         latch->pin = 1;
555
556         if( bt_readpage (bt, page, page_no) )
557                 return bt->err;
558
559         lvl = page->lvl << LVL_shift;
560         if( lvl > LVL_mask )
561                 lvl = LVL_mask;
562         latch->pin |= lvl;              // store lvl
563         latch->pin |= lvl << 3; // initialize clock
564
565 #ifdef unix
566         __sync_fetch_and_add (&bt->latchmgr->cache[page->lvl], 1);
567 #else
568         _InterlockedAdd(&bt->latchmgr->cache[page->lvl], 1);
569 #endif
570         return bt->err = 0;
571 }
572
573 //      release latch pin
574
575 void bt_unpinlatch (BtLatchSet *latch)
576 {
577 #ifdef unix
578         __sync_fetch_and_add(&latch->pin, -1);
579 #else
580         _InterlockedDecrement16 (&latch->pin);
581 #endif
582 }
583
584 //      find existing latchset or inspire new one
585 //      return with latchset pinned
586
587 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
588 {
589 uint hashidx = page_no % bt->latchmgr->latchhash;
590 BtLatchSet *latch;
591 uint slot, idx;
592 uint lvl, cnt;
593 off64_t off;
594 uint amt[1];
595 BtPage page;
596
597   //  try to find our entry
598
599   bt_spinwritelock(bt->table[hashidx].latch);
600
601   if( slot = bt->table[hashidx].slot ) do
602   {
603         latch = bt->latchsets + slot;
604         if( page_no == latch->page_no )
605                 break;
606   } while( slot = latch->next );
607
608   //  found our entry
609   //  increment clock
610
611   if( slot ) {
612         latch = bt->latchsets + slot;
613         lvl = (latch->pin & LVL_mask) >> LVL_shift;
614         lvl *= CLOCK_unit * 2;
615         lvl |= CLOCK_unit;
616 #ifdef unix
617         __sync_fetch_and_add(&latch->pin, 1);
618         __sync_fetch_and_or(&latch->pin, lvl);
619 #else
620         _InterlockedIncrement16 (&latch->pin);
621         _InterlockedOr16 (&latch->pin, lvl);
622 #endif
623         bt_spinreleasewrite(bt->table[hashidx].latch);
624         return latch;
625   }
626
627         //  see if there are any unused pool entries
628 #ifdef unix
629         slot = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1;
630 #else
631         slot = _InterlockedIncrement (&bt->latchmgr->latchdeployed);
632 #endif
633
634         if( slot < bt->latchmgr->latchtotal ) {
635                 latch = bt->latchsets + slot;
636                 if( bt_latchlink (bt, hashidx, slot, page_no) )
637                         return NULL;
638                 bt_spinreleasewrite (bt->table[hashidx].latch);
639                 return latch;
640         }
641
642 #ifdef unix
643         __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1);
644 #else
645         _InterlockedDecrement (&bt->latchmgr->latchdeployed);
646 #endif
647   //  find and reuse previous entry on victim
648
649   while( 1 ) {
650 #ifdef unix
651         slot = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
652 #else
653         slot = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1;
654 #endif
655         // try to get write lock on hash chain
656         //      skip entry if not obtained
657         //      or has outstanding pins
658
659         slot %= bt->latchmgr->latchtotal;
660
661         //      on slot wraparound, check census
662         //      count and increment safe level
663
664         cnt = bt->latchmgr->cache[bt->latchmgr->safelevel];
665
666         if( !slot ) {
667           if( cnt < bt->latchmgr->latchtotal / 10 )
668 #ifdef unix
669                 __sync_fetch_and_add(&bt->latchmgr->safelevel, 1);
670 #else
671                 _InterlockedIncrement (&bt->latchmgr->safelevel);
672 #endif
673           continue;
674         }
675
676         latch = bt->latchsets + slot;
677         idx = latch->page_no % bt->latchmgr->latchhash;
678         lvl = (latch->pin & LVL_mask) >> LVL_shift;
679
680         //      see if we are evicting this level yet
681
682         if( lvl > bt->latchmgr->safelevel )
683                 continue;
684
685         if( !bt_spinwritetry (bt->table[idx].latch) )
686                 continue;
687
688         if( latch->pin & ~LVL_mask ) {
689           if( latch->pin & CLOCK_mask )
690 #ifdef unix
691                 __sync_fetch_and_add(&latch->pin, -CLOCK_unit);
692 #else
693                 _InterlockedExchangeAdd16 (&latch->pin, -CLOCK_unit);
694 #endif
695           bt_spinreleasewrite (bt->table[idx].latch);
696           continue;
697         }
698
699         //  update permanent page area in btree
700
701         page = (BtPage)((uid)slot * bt->page_size + bt->pagepool);
702 #ifdef unix
703         posix_fadvise (bt->idx, page_no << bt->page_bits, bt->page_size, POSIX_FADV_WILLNEED);
704         __sync_fetch_and_add (&bt->latchmgr->cache[page->lvl], -1);
705 #else
706         _InterlockedAdd(&bt->latchmgr->cache[page->lvl], -1);
707 #endif
708         if( page->dirty )
709           if( bt_writepage (bt, page, latch->page_no) )
710                 return NULL;
711
712         //  unlink our available slot from its hash chain
713
714         if( latch->prev )
715                 bt->latchsets[latch->prev].next = latch->next;
716         else
717                 bt->table[idx].slot = latch->next;
718
719         if( latch->next )
720                 bt->latchsets[latch->next].prev = latch->prev;
721
722         bt_spinreleasewrite (bt->table[idx].latch);
723
724         if( bt_latchlink (bt, hashidx, slot, page_no) )
725                 return NULL;
726
727         bt_spinreleasewrite (bt->table[hashidx].latch);
728         return latch;
729   }
730 }
731
732 //      close and release memory
733
734 void bt_close (BtDb *bt)
735 {
736 #ifdef unix
737         munmap (bt->table, bt->latchmgr->nlatchpage * bt->page_size);
738         munmap (bt->latchmgr, bt->page_size);
739 #else
740         FlushViewOfFile(bt->latchmgr, 0);
741         UnmapViewOfFile(bt->latchmgr);
742         CloseHandle(bt->halloc);
743 #endif
744 #ifdef unix
745         if( bt->mem )
746                 free (bt->mem);
747         close (bt->idx);
748         free (bt);
749 #else
750         if( bt->mem)
751                 VirtualFree (bt->mem, 0, MEM_RELEASE);
752         FlushFileBuffers(bt->idx);
753         CloseHandle(bt->idx);
754         GlobalFree (bt);
755 #endif
756 }
757 //  open/create new btree
758
759 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
760 //              size of mapped page pool (e.g. 8192)
761
762 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax)
763 {
764 uint lvl, attr, last, slot, idx;
765 uint nlatchpage, latchhash;
766 BtLatchMgr *latchmgr;
767 off64_t size, off;
768 uint amt[1];
769 BtKey key;
770 BtDb* bt;
771 int flag;
772
773 #ifndef unix
774 OVERLAPPED ovl[1];
775 #else
776 struct flock lock[1];
777 #endif
778
779         // determine sanity of page size and buffer pool
780
781         if( bits > BT_maxbits )
782                 bits = BT_maxbits;
783         else if( bits < BT_minbits )
784                 bits = BT_minbits;
785
786         if( mode == BT_ro ) {
787                 fprintf(stderr, "ReadOnly mode not supported: %s\n", name);
788                 return NULL;
789         }
790 #ifdef unix
791         bt = calloc (1, sizeof(BtDb));
792
793         bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
794         posix_fadvise( bt->idx, 0, 0, POSIX_FADV_RANDOM);
795  
796         if( bt->idx == -1 ) {
797                 fprintf(stderr, "unable to open %s\n", name);
798                 return free(bt), NULL;
799         }
800 #else
801         bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb));
802         attr = FILE_ATTRIBUTE_NORMAL;
803         bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
804
805         if( bt->idx == INVALID_HANDLE_VALUE ) {
806                 fprintf(stderr, "unable to open %s\n", name);
807                 return GlobalFree(bt), NULL;
808         }
809 #endif
810 #ifdef unix
811         memset (lock, 0, sizeof(lock));
812         lock->l_len = sizeof(struct BtPage_);
813         lock->l_type = F_WRLCK;
814
815         if( fcntl (bt->idx, F_SETLKW, lock) < 0 ) {
816                 fprintf(stderr, "unable to lock record zero %s\n", name);
817                 return bt_close (bt), NULL;
818         }
819 #else
820         memset (ovl, 0, sizeof(ovl));
821
822         //      use large offsets to
823         //      simulate advisory locking
824
825         ovl->OffsetHigh |= 0x80000000;
826
827         if( !LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, sizeof(struct BtPage_), 0L, ovl) ) {
828                 fprintf(stderr, "unable to lock record zero %s, GetLastError = %d\n", name, GetLastError());
829                 return bt_close (bt), NULL;
830         }
831 #endif 
832
833 #ifdef unix
834         latchmgr = valloc (BT_maxpage);
835         *amt = 0;
836
837         // read minimum page size to get root info
838
839         if( size = lseek (bt->idx, 0L, 2) ) {
840                 if( pread(bt->idx, latchmgr, BT_minpage, 0) == BT_minpage )
841                         bits = latchmgr->alloc->bits;
842                 else {
843                         fprintf(stderr, "Unable to read page zero\n");
844                         return free(bt), free(latchmgr), NULL;
845                 }
846         }
847 #else
848         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
849         size = GetFileSize(bt->idx, amt);
850
851         if( size || *amt ) {
852                 if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) ) {
853                         fprintf(stderr, "Unable to read page zero\n");
854                         return bt_close (bt), NULL;
855                 } else
856                         bits = latchmgr->alloc->bits;
857         }
858 #endif
859
860         bt->page_size = 1 << bits;
861         bt->page_bits = bits;
862
863         bt->mode = mode;
864
865         if( size || *amt ) {
866                 nlatchpage = latchmgr->nlatchpage;
867                 goto btlatch;
868         }
869
870         if( nodemax < 16 ) {
871                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
872                 return bt_close(bt), NULL;
873         }
874
875         // initialize an empty b-tree with latch page, root page, page of leaves
876         // and page(s) of latches and page pool cache
877
878         memset (latchmgr, 0, 1 << bits);
879         latchmgr->alloc->bits = bt->page_bits;
880
881         //  calculate number of latch hash table entries
882
883         nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + bt->page_size - 1) / bt->page_size;
884         latchhash = nlatchpage * bt->page_size / sizeof(BtHashEntry);
885
886         nlatchpage += nodemax;          // size of the buffer pool in pages
887         nlatchpage += (sizeof(BtLatchSet) * nodemax + bt->page_size - 1)/bt->page_size;
888
889         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
890         latchmgr->nlatchpage = nlatchpage;
891         latchmgr->latchtotal = nodemax;
892         latchmgr->latchhash = latchhash;
893
894         if( bt_writepage (bt, latchmgr->alloc, 0) ) {
895                 fprintf (stderr, "Unable to create btree page zero\n");
896                 return bt_close (bt), NULL;
897         }
898
899         memset (latchmgr, 0, 1 << bits);
900         latchmgr->alloc->bits = bt->page_bits;
901
902         for( lvl=MIN_lvl; lvl--; ) {
903                 last = MIN_lvl - lvl;   // page number
904                 slotptr(latchmgr->alloc, 1)->off = bt->page_size - 3;
905                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? last + 1 : 0);
906                 key = keyptr(latchmgr->alloc, 1);
907                 key->len = 2;           // create stopper key
908                 key->key[0] = 0xff;
909                 key->key[1] = 0xff;
910
911                 latchmgr->alloc->min = bt->page_size - 3;
912                 latchmgr->alloc->lvl = lvl;
913                 latchmgr->alloc->cnt = 1;
914                 latchmgr->alloc->act = 1;
915
916                 if( bt_writepage (bt, latchmgr->alloc, last) ) {
917                         fprintf (stderr, "Unable to create btree page %.8x\n", last);
918                         return bt_close (bt), NULL;
919                 }
920         }
921
922         // clear out buffer pool pages
923
924         memset(latchmgr, 0, bt->page_size);
925         last = MIN_lvl + nlatchpage;
926
927         if( bt_writepage (bt, latchmgr->alloc, last) ) {
928                 fprintf (stderr, "Unable to write buffer pool page %.8x\n", last);
929                 return bt_close (bt), NULL;
930         }
931                 
932 #ifdef unix
933         free (latchmgr);
934 #else
935         VirtualFree (latchmgr, 0, MEM_RELEASE);
936 #endif
937
938 btlatch:
939 #ifdef unix
940         lock->l_type = F_UNLCK;
941         if( fcntl (bt->idx, F_SETLK, lock) < 0 ) {
942                 fprintf (stderr, "Unable to unlock page zero\n");
943                 return bt_close (bt), NULL;
944         }
945 #else
946         if( !UnlockFileEx (bt->idx, 0, sizeof(struct BtPage_), 0, ovl) ) {
947                 fprintf (stderr, "Unable to unlock page zero, GetLastError = %d\n", GetLastError());
948                 return bt_close (bt), NULL;
949         }
950 #endif
951 #ifdef unix
952         flag = PROT_READ | PROT_WRITE;
953         bt->latchmgr = mmap (0, bt->page_size, flag, MAP_SHARED, bt->idx, ALLOC_page * bt->page_size);
954         if( bt->latchmgr == MAP_FAILED ) {
955                 fprintf (stderr, "Unable to mmap page zero, errno = %d", errno);
956                 return bt_close (bt), NULL;
957         }
958         bt->table = (void *)mmap (0, (uid)nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size);
959         if( bt->table == MAP_FAILED ) {
960                 fprintf (stderr, "Unable to mmap buffer pool, errno = %d", errno);
961                 return bt_close (bt), NULL;
962         }
963         madvise (bt->table, (uid)nlatchpage << bt->page_bits, MADV_RANDOM | MADV_WILLNEED);
964 #else
965         flag = PAGE_READWRITE;
966         bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size, NULL);
967         if( !bt->halloc ) {
968                 fprintf (stderr, "Unable to create file mapping for buffer pool mgr, GetLastError = %d\n", GetLastError());
969                 return bt_close (bt), NULL;
970         }
971
972         flag = FILE_MAP_WRITE;
973         bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size);
974         if( !bt->latchmgr ) {
975                 fprintf (stderr, "Unable to map buffer pool, GetLastError = %d\n", GetLastError());
976                 return bt_close (bt), NULL;
977         }
978
979         bt->table = (void *)((char *)bt->latchmgr + LATCH_page * bt->page_size);
980 #endif
981         bt->pagepool = (unsigned char *)bt->table + (uid)(nlatchpage - bt->latchmgr->latchtotal) * bt->page_size;
982         bt->latchsets = (BtLatchSet *)(bt->pagepool - (uid)bt->latchmgr->latchtotal * sizeof(BtLatchSet));
983
984 #ifdef unix
985         bt->mem = valloc (2 * bt->page_size);
986 #else
987         bt->mem = VirtualAlloc(NULL, 2 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
988 #endif
989         bt->frame = (BtPage)bt->mem;
990         bt->cursor = (BtPage)(bt->mem + bt->page_size);
991         return bt;
992 }
993
994 // place write, read, or parent lock on requested page_no.
995
996 void bt_lockpage(BtLock mode, BtLatchSet *latch)
997 {
998         switch( mode ) {
999         case BtLockRead:
1000                 bt_spinreadlock (latch->readwr);
1001                 break;
1002         case BtLockWrite:
1003                 bt_spinwritelock (latch->readwr);
1004                 break;
1005         case BtLockAccess:
1006                 bt_spinreadlock (latch->access);
1007                 break;
1008         case BtLockDelete:
1009                 bt_spinwritelock (latch->access);
1010                 break;
1011         case BtLockParent:
1012                 bt_spinwritelock (latch->parent);
1013                 break;
1014         }
1015 }
1016
1017 // remove write, read, or parent lock on requested page
1018
1019 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1020 {
1021         switch( mode ) {
1022         case BtLockRead:
1023                 bt_spinreleaseread (latch->readwr);
1024                 break;
1025         case BtLockWrite:
1026                 bt_spinreleasewrite (latch->readwr);
1027                 break;
1028         case BtLockAccess:
1029                 bt_spinreleaseread (latch->access);
1030                 break;
1031         case BtLockDelete:
1032                 bt_spinreleasewrite (latch->access);
1033                 break;
1034         case BtLockParent:
1035                 bt_spinreleasewrite (latch->parent);
1036                 break;
1037         }
1038 }
1039
1040 //      allocate a new page and write page into it
1041
1042 uid bt_newpage(BtDb *bt, BtPage page)
1043 {
1044 BtLatchSet *latch;
1045 uid new_page;
1046 BtPage temp;
1047
1048         //      lock allocation page
1049
1050         bt_spinwritelock(bt->latchmgr->lock);
1051
1052         // use empty chain first
1053         // else allocate empty page
1054
1055         if( new_page = bt_getid(bt->latchmgr->alloc[1].right) ) {
1056           if( latch = bt_pinlatch (bt, new_page) )
1057                 temp = bt_mappage (bt, latch);
1058           else
1059                 return 0;
1060
1061           bt_putid(bt->latchmgr->alloc[1].right, bt_getid(temp->right));
1062           bt_spinreleasewrite(bt->latchmgr->lock);
1063           memcpy (temp, page, bt->page_size);
1064
1065           bt_update (bt, temp);
1066           bt_unpinlatch (latch);
1067           return new_page;
1068         } else {
1069           new_page = bt_getid(bt->latchmgr->alloc->right);
1070           bt_putid(bt->latchmgr->alloc->right, new_page+1);
1071           bt_spinreleasewrite(bt->latchmgr->lock);
1072
1073           if( bt_writepage (bt, page, new_page) )
1074                 return 0;
1075         }
1076
1077         bt_update (bt, bt->latchmgr->alloc);
1078         return new_page;
1079 }
1080
1081 //  compare two keys, returning > 0, = 0, or < 0
1082 //  as the comparison value
1083
1084 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1085 {
1086 uint len1 = key1->len;
1087 int ans;
1088
1089         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1090                 return ans;
1091
1092         if( len1 > len2 )
1093                 return 1;
1094         if( len1 < len2 )
1095                 return -1;
1096
1097         return 0;
1098 }
1099
1100 //  Update current page of btree by
1101 //      flushing mapped area to disk backing of cache pool.
1102 //      mark page as dirty for rewrite to permanent location
1103
1104 void bt_update (BtDb *bt, BtPage page)
1105 {
1106 #ifdef unix
1107         msync (page, bt->page_size, MS_ASYNC);
1108 #else
1109 //      FlushViewOfFile (page, bt->page_size);
1110 #endif
1111         page->dirty = 1;
1112 }
1113
1114 //  map the btree cached page onto current page
1115
1116 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
1117 {
1118         return (BtPage)((uid)(latch - bt->latchsets) * bt->page_size + bt->pagepool);
1119 }
1120
1121 //      deallocate a deleted page 
1122 //      place on free chain out of allocator page
1123 //      call with page latched for Writing and Deleting
1124
1125 BTERR bt_freepage(BtDb *bt, uid page_no, BtLatchSet *latch)
1126 {
1127 BtPage page = bt_mappage (bt, latch);
1128
1129         //      lock allocation page
1130
1131         bt_spinwritelock (bt->latchmgr->lock);
1132
1133         //      store chain in second right
1134         bt_putid(page->right, bt_getid(bt->latchmgr->alloc[1].right));
1135         bt_putid(bt->latchmgr->alloc[1].right, page_no);
1136
1137         page->free = 1;
1138         bt_update(bt, page);
1139
1140         // unlock released page
1141
1142         bt_unlockpage (BtLockDelete, latch);
1143         bt_unlockpage (BtLockWrite, latch);
1144         bt_unpinlatch (latch);
1145
1146         // unlock allocation page
1147
1148         bt_spinreleasewrite (bt->latchmgr->lock);
1149         bt_update (bt, bt->latchmgr->alloc);
1150         return 0;
1151 }
1152
1153 //  find slot in page for given key at a given level
1154
1155 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1156 {
1157 uint diff, higher = bt->page->cnt, low = 1, slot;
1158 uint good = 0;
1159
1160         //      make stopper key an infinite fence value
1161
1162         if( bt_getid (bt->page->right) )
1163                 higher++;
1164         else
1165                 good++;
1166
1167         //      low is the lowest candidate, higher is already
1168         //      tested as .ge. the given key, loop ends when they meet
1169
1170         while( diff = higher - low ) {
1171                 slot = low + ( diff >> 1 );
1172                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1173                         low = slot + 1;
1174                 else
1175                         higher = slot, good++;
1176         }
1177
1178         //      return zero if key is on right link page
1179
1180         return good ? higher : 0;
1181 }
1182
1183 //  find and load page at given level for given key
1184 //      leave page rd or wr locked as requested
1185
1186 int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
1187 {
1188 uid page_no = ROOT_page, prevpage = 0;
1189 uint drill = 0xff, slot;
1190 BtLatchSet *prevlatch;
1191 uint mode, prevmode;
1192
1193   //  start at root of btree and drill down
1194
1195   do {
1196         // determine lock mode of drill level
1197         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1198
1199         if( bt->latch = bt_pinlatch(bt, page_no) )
1200                 bt->page_no = page_no;
1201         else
1202                 return 0;
1203
1204         // obtain access lock using lock chaining
1205
1206         if( page_no > ROOT_page )
1207                 bt_lockpage(BtLockAccess, bt->latch);
1208
1209         if( prevpage ) {
1210                 bt_unlockpage(prevmode, prevlatch);
1211                 bt_unpinlatch(prevlatch);
1212                 prevpage = 0;
1213         }
1214
1215         // obtain read lock using lock chaining
1216
1217         bt_lockpage(mode, bt->latch);
1218
1219         if( page_no > ROOT_page )
1220                 bt_unlockpage(BtLockAccess, bt->latch);
1221
1222         //      map/obtain page contents
1223
1224         bt->page = bt_mappage (bt, bt->latch);
1225
1226         // re-read and re-lock root after determining actual level of root
1227
1228         if( bt->page->lvl != drill) {
1229                 if( bt->page_no != ROOT_page )
1230                         return bt->err = BTERR_struct, 0;
1231                         
1232                 drill = bt->page->lvl;
1233
1234                 if( lock != BtLockRead && drill == lvl ) {
1235                         bt_unlockpage(mode, bt->latch);
1236                         bt_unpinlatch(bt->latch);
1237                         continue;
1238                 }
1239         }
1240
1241         prevpage = bt->page_no;
1242         prevlatch = bt->latch;
1243         prevmode = mode;
1244
1245         //  find key on page at this level
1246         //  and descend to requested level
1247
1248         if( !bt->page->kill )
1249          if( slot = bt_findslot (bt, key, len) ) {
1250           if( drill == lvl )
1251                 return slot;
1252
1253           while( slotptr(bt->page, slot)->dead )
1254                 if( slot++ < bt->page->cnt )
1255                         continue;
1256                 else
1257                         goto slideright;
1258
1259           page_no = bt_getid(slotptr(bt->page, slot)->id);
1260           drill--;
1261           continue;
1262          }
1263
1264         //  or slide right into next page
1265
1266 slideright:
1267         page_no = bt_getid(bt->page->right);
1268
1269   } while( page_no );
1270
1271   // return error on end of right chain
1272
1273   bt->err = BTERR_eof;
1274   return 0;     // return error
1275 }
1276
1277 //      a fence key was deleted from a page
1278 //      push new fence value upwards
1279
1280 BTERR bt_fixfence (BtDb *bt, uid page_no, uint lvl)
1281 {
1282 unsigned char leftkey[256], rightkey[256];
1283 BtLatchSet *latch = bt->latch;
1284 BtKey ptr;
1285
1286         // remove deleted key, the old fence value
1287
1288         ptr = keyptr(bt->page, bt->page->cnt);
1289         memcpy(rightkey, ptr, ptr->len + 1);
1290
1291         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1292         bt->page->clean = 1;
1293
1294         ptr = keyptr(bt->page, bt->page->cnt);
1295         memcpy(leftkey, ptr, ptr->len + 1);
1296
1297         bt_update (bt, bt->page);
1298         bt_lockpage (BtLockParent, latch);
1299         bt_unlockpage (BtLockWrite, latch);
1300
1301         //  insert new (now smaller) fence key
1302
1303         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl + 1, page_no, time(NULL)) )
1304                 return bt->err;
1305
1306         //  remove old (larger) fence key
1307
1308         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl + 1) )
1309                 return bt->err;
1310
1311         bt_unlockpage (BtLockParent, latch);
1312         bt_unpinlatch (latch);
1313         return 0;
1314 }
1315
1316 //      root has a single child
1317 //      collapse a level from the btree
1318 //      call with root locked in bt->page
1319
1320 BTERR bt_collapseroot (BtDb *bt, BtPage root)
1321 {
1322 BtLatchSet *latch;
1323 BtPage temp;
1324 uid child;
1325 uint idx;
1326
1327         // find the child entry
1328         //      and promote to new root
1329
1330   do {
1331         for( idx = 0; idx++ < root->cnt; )
1332           if( !slotptr(root, idx)->dead )
1333                 break;
1334
1335         child = bt_getid (slotptr(root, idx)->id);
1336         if( latch = bt_pinlatch (bt, child) )
1337                 temp = bt_mappage (bt, latch);
1338         else
1339                 return bt->err;
1340
1341         bt_lockpage (BtLockDelete, latch);
1342         bt_lockpage (BtLockWrite, latch);
1343         memcpy (root, temp, bt->page_size);
1344
1345         bt_update (bt, root);
1346
1347         if( bt_freepage (bt, child, latch) )
1348                 return bt->err;
1349
1350   } while( root->lvl > 1 && root->act == 1 );
1351
1352   bt_unlockpage (BtLockWrite, bt->latch);
1353   bt_unpinlatch (bt->latch);
1354   return 0;
1355 }
1356
1357 //  find and delete key on page by marking delete flag bit
1358 //  when page becomes empty, delete it
1359
1360 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1361 {
1362 unsigned char lowerkey[256], higherkey[256];
1363 uint slot, dirty = 0, idx, fence, found;
1364 BtLatchSet *latch, *rlatch;
1365 uid page_no, right;
1366 BtPage temp;
1367 BtKey ptr;
1368
1369         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1370                 ptr = keyptr(bt->page, slot);
1371         else
1372                 return bt->err;
1373
1374         // are we deleting a fence slot?
1375
1376         fence = slot == bt->page->cnt;
1377
1378         // if key is found delete it, otherwise ignore request
1379
1380         if( found = !keycmp (ptr, key, len) )
1381           if( found = slotptr(bt->page, slot)->dead == 0 ) {
1382                 dirty = slotptr(bt->page,slot)->dead = 1;
1383                 bt->page->clean = 1;
1384                 bt->page->act--;
1385
1386                 // collapse empty slots
1387
1388                 while( idx = bt->page->cnt - 1 )
1389                   if( slotptr(bt->page, idx)->dead ) {
1390                         *slotptr(bt->page, idx) = *slotptr(bt->page, idx + 1);
1391                         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1392                   } else
1393                         break;
1394           }
1395
1396         right = bt_getid(bt->page->right);
1397         page_no = bt->page_no;
1398         latch = bt->latch;
1399
1400         if( !dirty ) {
1401           if( lvl )
1402                 return bt_abort (bt, bt->page, page_no, BTERR_notfound);
1403           bt_unlockpage(BtLockWrite, latch);
1404           bt_unpinlatch (latch);
1405           return bt->found = found, 0;
1406         }
1407
1408         //      did we delete a fence key in an upper level?
1409
1410         if( lvl && bt->page->act && fence )
1411           if( bt_fixfence (bt, page_no, lvl) )
1412                 return bt->err;
1413           else
1414                 return bt->found = found, 0;
1415
1416         //      is this a collapsed root?
1417
1418         if( lvl > 1 && page_no == ROOT_page && bt->page->act == 1 )
1419           if( bt_collapseroot (bt, bt->page) )
1420                 return bt->err;
1421           else
1422                 return bt->found = found, 0;
1423
1424         // return if page is not empty
1425
1426         if( bt->page->act ) {
1427           bt_update(bt, bt->page);
1428           bt_unlockpage(BtLockWrite, latch);
1429           bt_unpinlatch (latch);
1430           return bt->found = found, 0;
1431         }
1432
1433         // cache copy of fence key
1434         //      in order to find parent
1435
1436         ptr = keyptr(bt->page, bt->page->cnt);
1437         memcpy(lowerkey, ptr, ptr->len + 1);
1438
1439         // obtain lock on right page
1440
1441         if( rlatch = bt_pinlatch (bt, right) )
1442                 temp = bt_mappage (bt, rlatch);
1443         else
1444                 return bt->err;
1445
1446         bt_lockpage(BtLockWrite, rlatch);
1447
1448         if( temp->kill ) {
1449                 bt_abort(bt, temp, right, 0);
1450                 return bt_abort(bt, bt->page, bt->page_no, BTERR_kill);
1451         }
1452
1453         // pull contents of next page into current empty page 
1454
1455         memcpy (bt->page, temp, bt->page_size);
1456
1457         //      cache copy of key to update
1458
1459         ptr = keyptr(temp, temp->cnt);
1460         memcpy(higherkey, ptr, ptr->len + 1);
1461
1462         //  Mark right page as deleted and point it to left page
1463         //      until we can post updates at higher level.
1464
1465         bt_putid(temp->right, page_no);
1466         temp->kill = 1;
1467
1468         bt_update(bt, bt->page);
1469         bt_update(bt, temp);
1470
1471         bt_lockpage(BtLockParent, latch);
1472         bt_unlockpage(BtLockWrite, latch);
1473
1474         bt_lockpage(BtLockParent, rlatch);
1475         bt_unlockpage(BtLockWrite, rlatch);
1476
1477         //  redirect higher key directly to consolidated node
1478
1479         if( bt_insertkey (bt, higherkey+1, *higherkey, lvl+1, page_no, time(NULL)) )
1480                 return bt->err;
1481
1482         //  delete old lower key to consolidated node
1483
1484         if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) )
1485                 return bt->err;
1486
1487         //  obtain write & delete lock on deleted node
1488         //      add right block to free chain
1489
1490         bt_lockpage(BtLockDelete, rlatch);
1491         bt_lockpage(BtLockWrite, rlatch);
1492         bt_unlockpage(BtLockParent, rlatch);
1493
1494         if( bt_freepage (bt, right, rlatch) )
1495                 return bt->err;
1496
1497         bt_unlockpage(BtLockParent, latch);
1498         bt_unpinlatch(latch);
1499         return 0;
1500 }
1501
1502 //      find key in leaf level and return row-id
1503
1504 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1505 {
1506 uint  slot;
1507 BtKey ptr;
1508 uid id;
1509
1510         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1511                 ptr = keyptr(bt->page, slot);
1512         else
1513                 return 0;
1514
1515         // if key exists, return row-id
1516         //      otherwise return 0
1517
1518         if( ptr->len == len && !memcmp (ptr->key, key, len) )
1519                 id = bt_getid(slotptr(bt->page,slot)->id);
1520         else
1521                 id = 0;
1522
1523         bt_unlockpage (BtLockRead, bt->latch);
1524         bt_unpinlatch (bt->latch);
1525         return id;
1526 }
1527
1528 //      check page for space available,
1529 //      clean if necessary and return
1530 //      0 - page needs splitting
1531 //      >0 - go ahead with new slot
1532  
1533 uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
1534 {
1535 uint nxt = bt->page_size;
1536 BtPage page = bt->page;
1537 uint cnt = 0, idx = 0;
1538 uint max = page->cnt;
1539 uint newslot = slot;
1540 BtKey key;
1541 int ret;
1542
1543         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1544                 return slot;
1545
1546         //      skip cleanup if nothing to reclaim
1547
1548         if( !page->clean )
1549                 return 0;
1550
1551         memcpy (bt->frame, page, bt->page_size);
1552
1553         // skip page info and set rest of page to zero
1554
1555         memset (page+1, 0, bt->page_size - sizeof(*page));
1556         page->act = 0;
1557
1558         while( cnt++ < max ) {
1559                 if( cnt == slot )
1560                         newslot = idx + 1;
1561                 // always leave fence key in list
1562                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1563                         continue;
1564
1565                 // copy key
1566                 key = keyptr(bt->frame, cnt);
1567                 nxt -= key->len + 1;
1568                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1569
1570                 // copy slot
1571                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1572                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1573                         page->act++;
1574 #ifdef USETOD
1575                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1576 #endif
1577                 slotptr(page, idx)->off = nxt;
1578         }
1579
1580         page->min = nxt;
1581         page->cnt = idx;
1582
1583         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1584                 return newslot;
1585
1586         return 0;
1587 }
1588
1589 // split the root and raise the height of the btree
1590
1591 BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
1592 {
1593 uint nxt = bt->page_size;
1594 BtPage root = bt->page;
1595 uid right;
1596
1597         //  Obtain an empty page to use, and copy the current
1598         //  root contents into it
1599
1600         if( !(right = bt_newpage(bt, root)) )
1601                 return bt->err;
1602
1603         // preserve the page info at the bottom
1604         // and set rest to zero
1605
1606         memset(root+1, 0, bt->page_size - sizeof(*root));
1607
1608         // insert first key on newroot page
1609
1610         nxt -= *leftkey + 1;
1611         memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
1612         bt_putid(slotptr(root, 1)->id, right);
1613         slotptr(root, 1)->off = nxt;
1614         
1615         // insert second key on newroot page
1616         // and increase the root height
1617
1618         nxt -= 3;
1619         ((unsigned char *)root)[nxt] = 2;
1620         ((unsigned char *)root)[nxt+1] = 0xff;
1621         ((unsigned char *)root)[nxt+2] = 0xff;
1622         bt_putid(slotptr(root, 2)->id, page_no2);
1623         slotptr(root, 2)->off = nxt;
1624
1625         bt_putid(root->right, 0);
1626         root->min = nxt;                // reset lowest used offset and key count
1627         root->cnt = 2;
1628         root->act = 2;
1629         root->lvl++;
1630
1631         // update and release root (bt->page)
1632
1633         bt_update(bt, root);
1634
1635         bt_unlockpage(BtLockWrite, bt->latch);
1636         bt_unpinlatch(bt->latch);
1637         return 0;
1638 }
1639
1640 //  split already locked full node
1641 //      return unlocked.
1642
1643 BTERR bt_splitpage (BtDb *bt)
1644 {
1645 uint cnt = 0, idx = 0, max, nxt = bt->page_size;
1646 unsigned char fencekey[256], rightkey[256];
1647 uid page_no = bt->page_no, right;
1648 BtLatchSet *latch, *rlatch;
1649 BtPage page = bt->page;
1650 uint lvl = page->lvl;
1651 BtKey key;
1652
1653         latch = bt->latch;
1654
1655         //  split higher half of keys to bt->frame
1656         //      the last key (fence key) might be dead
1657
1658         memset (bt->frame, 0, bt->page_size);
1659         max = page->cnt;
1660         cnt = max / 2;
1661         idx = 0;
1662
1663         while( cnt++ < max ) {
1664                 key = keyptr(page, cnt);
1665                 nxt -= key->len + 1;
1666                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1667                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1668                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
1669                         bt->frame->act++;
1670 #ifdef USETOD
1671                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1672 #endif
1673                 slotptr(bt->frame, idx)->off = nxt;
1674         }
1675
1676         //      remember fence key for new right page
1677
1678         memcpy (rightkey, key, key->len + 1);
1679
1680         bt->frame->bits = bt->page_bits;
1681         bt->frame->min = nxt;
1682         bt->frame->cnt = idx;
1683         bt->frame->lvl = lvl;
1684
1685         // link right node
1686
1687         if( page_no > ROOT_page )
1688                 memcpy (bt->frame->right, page->right, BtId);
1689
1690         //      get new free page and write frame to it.
1691
1692         if( !(right = bt_newpage(bt, bt->frame)) )
1693                 return bt->err;
1694
1695         //      update lower keys to continue in old page
1696
1697         memcpy (bt->frame, page, bt->page_size);
1698         memset (page+1, 0, bt->page_size - sizeof(*page));
1699         nxt = bt->page_size;
1700         page->clean = 0;
1701         page->act = 0;
1702         cnt = 0;
1703         idx = 0;
1704
1705         //  assemble page of smaller keys
1706         //      (they're all active keys)
1707
1708         while( cnt++ < max / 2 ) {
1709                 key = keyptr(bt->frame, cnt);
1710                 nxt -= key->len + 1;
1711                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1712                 memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1713 #ifdef USETOD
1714                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1715 #endif
1716                 slotptr(page, idx)->off = nxt;
1717                 page->act++;
1718         }
1719
1720         // remember fence key for smaller page
1721
1722         memcpy (fencekey, key, key->len + 1);
1723
1724         bt_putid(page->right, right);
1725         page->min = nxt;
1726         page->cnt = idx;
1727
1728         // if current page is the root page, split it
1729
1730         if( page_no == ROOT_page )
1731                 return bt_splitroot (bt, fencekey, right);
1732
1733         //      lock right page
1734
1735         if( rlatch = bt_pinlatch (bt, right) )
1736                 bt_lockpage (BtLockParent, rlatch);
1737         else
1738                 return bt->err;
1739
1740         // update left (containing) node
1741
1742         bt_update(bt, page);
1743
1744         bt_lockpage (BtLockParent, latch);
1745         bt_unlockpage (BtLockWrite, latch);
1746
1747         // insert new fence for reformulated left block
1748
1749         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, page_no, time(NULL)) )
1750                 return bt->err;
1751
1752         //      switch fence for right block of larger keys to new right page
1753
1754         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, right, time(NULL)) )
1755                 return bt->err;
1756
1757         bt_unlockpage (BtLockParent, latch);
1758         bt_unlockpage (BtLockParent, rlatch);
1759
1760         bt_unpinlatch (rlatch);
1761         bt_unpinlatch (latch);
1762         return 0;
1763 }
1764
1765 //  Insert new key into the btree at requested level.
1766 //  Pages are unlocked at exit.
1767
1768 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
1769 {
1770 uint slot, idx;
1771 BtPage page;
1772 BtKey ptr;
1773
1774   while( 1 ) {
1775         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1776                 ptr = keyptr(bt->page, slot);
1777         else
1778         {
1779                 if( !bt->err )
1780                         bt->err = BTERR_ovflw;
1781                 return bt->err;
1782         }
1783
1784         // if key already exists, update id and return
1785
1786         page = bt->page;
1787
1788         if( !keycmp (ptr, key, len) ) {
1789           if( slotptr(page, slot)->dead )
1790                 page->act++;
1791           slotptr(page, slot)->dead = 0;
1792 #ifdef USETOD
1793           slotptr(page, slot)->tod = tod;
1794 #endif
1795           bt_putid(slotptr(page,slot)->id, id);
1796           bt_update(bt, bt->page);
1797           bt_unlockpage(BtLockWrite, bt->latch);
1798           bt_unpinlatch (bt->latch);
1799           return 0;
1800         }
1801
1802         // check if page has enough space
1803
1804         if( slot = bt_cleanpage (bt, len, slot) )
1805                 break;
1806
1807         if( bt_splitpage (bt) )
1808                 return bt->err;
1809   }
1810
1811   // calculate next available slot and copy key into page
1812
1813   page->min -= len + 1; // reset lowest used offset
1814   ((unsigned char *)page)[page->min] = len;
1815   memcpy ((unsigned char *)page + page->min +1, key, len );
1816
1817   for( idx = slot; idx < page->cnt; idx++ )
1818         if( slotptr(page, idx)->dead )
1819                 break;
1820
1821   // now insert key into array before slot
1822   // preserving the fence slot
1823
1824   if( idx == page->cnt )
1825         idx++, page->cnt++;
1826
1827   page->act++;
1828
1829   while( idx > slot )
1830         *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
1831
1832   bt_putid(slotptr(page,slot)->id, id);
1833   slotptr(page, slot)->off = page->min;
1834 #ifdef USETOD
1835   slotptr(page, slot)->tod = tod;
1836 #endif
1837   slotptr(page, slot)->dead = 0;
1838
1839   bt_update(bt, bt->page);
1840
1841   bt_unlockpage(BtLockWrite, bt->latch);
1842   bt_unpinlatch(bt->latch);
1843   return 0;
1844 }
1845
1846 //  cache page of keys into cursor and return starting slot for given key
1847
1848 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
1849 {
1850 uint slot;
1851
1852         // cache page for retrieval
1853
1854         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1855           memcpy (bt->cursor, bt->page, bt->page_size);
1856         else
1857           return 0;
1858
1859         bt_unlockpage(BtLockRead, bt->latch);
1860         bt->cursor_page = bt->page_no;
1861         bt_unpinlatch (bt->latch);
1862         return slot;
1863 }
1864
1865 //  return next slot for cursor page
1866 //  or slide cursor right into next page
1867
1868 uint bt_nextkey (BtDb *bt, uint slot)
1869 {
1870 BtLatchSet *latch;
1871 off64_t right;
1872
1873   do {
1874         right = bt_getid(bt->cursor->right);
1875
1876         while( slot++ < bt->cursor->cnt )
1877           if( slotptr(bt->cursor,slot)->dead )
1878                 continue;
1879           else if( right || (slot < bt->cursor->cnt))
1880                 return slot;
1881           else
1882                 break;
1883
1884         if( !right )
1885                 break;
1886
1887         bt->cursor_page = right;
1888
1889         if( latch = bt_pinlatch (bt, right) )
1890         bt_lockpage(BtLockRead, latch);
1891         else
1892                 return 0;
1893
1894         bt->page = bt_mappage (bt, latch);
1895         memcpy (bt->cursor, bt->page, bt->page_size);
1896         bt_unlockpage(BtLockRead, latch);
1897         bt_unpinlatch (latch);
1898         slot = 0;
1899   } while( 1 );
1900
1901   return bt->err = 0;
1902 }
1903
1904 BtKey bt_key(BtDb *bt, uint slot)
1905 {
1906         return keyptr(bt->cursor, slot);
1907 }
1908
1909 uid bt_uid(BtDb *bt, uint slot)
1910 {
1911         return bt_getid(slotptr(bt->cursor,slot)->id);
1912 }
1913
1914 #ifdef USETOD
1915 uint bt_tod(BtDb *bt, uint slot)
1916 {
1917         return slotptr(bt->cursor,slot)->tod;
1918 }
1919 #endif
1920
1921 #ifdef STANDALONE
1922
1923 uint bt_audit (BtDb *bt)
1924 {
1925 uint idx, hashidx;
1926 uid next, page_no;
1927 BtLatchSet *latch;
1928 uint blks[64];
1929 uint cnt = 0;
1930 BtPage page;
1931 uint amt[1];
1932 BtKey ptr;
1933
1934 #ifdef unix
1935         posix_fadvise( bt->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
1936 #endif
1937         if( *(ushort *)(bt->latchmgr->lock) )
1938                 fprintf(stderr, "Alloc page locked\n");
1939         *(ushort *)(bt->latchmgr->lock) = 0;
1940
1941         memset (blks, 0, sizeof(blks));
1942
1943         for( idx = 1; idx <= bt->latchmgr->latchdeployed; idx++ ) {
1944                 latch = bt->latchsets + idx;
1945                 if( *(ushort *)latch->readwr )
1946                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
1947                 *(ushort *)latch->readwr = 0;
1948
1949                 if( *(ushort *)latch->access )
1950                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
1951                 *(ushort *)latch->access = 0;
1952
1953                 if( *(ushort *)latch->parent )
1954                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
1955                 *(ushort *)latch->parent = 0;
1956
1957                 if( latch->pin & PIN_mask ) {
1958                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
1959                         latch->pin = 0;
1960                 }
1961                 page = (BtPage)((uid)idx * bt->page_size + bt->pagepool);
1962                 blks[page->lvl]++;
1963
1964             if( page->dirty )
1965                  if( bt_writepage (bt, page, latch->page_no) )
1966                         fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
1967         }
1968
1969         for( idx = 0; blks[idx]; idx++ )
1970                 fprintf(stderr, "cache: %d lvl %d blocks\n", blks[idx], idx);
1971
1972         for( hashidx = 0; hashidx < bt->latchmgr->latchhash; hashidx++ ) {
1973           if( *(ushort *)(bt->table[hashidx].latch) )
1974                         fprintf(stderr, "hash entry %d locked\n", hashidx);
1975
1976           *(ushort *)(bt->table[hashidx].latch) = 0;
1977         }
1978
1979         memset (blks, 0, sizeof(blks));
1980
1981         next = bt->latchmgr->nlatchpage + LATCH_page;
1982         page_no = LEAF_page;
1983
1984         while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
1985                 if( bt_readpage (bt, bt->frame, page_no) )
1986                   fprintf(stderr, "page %.8x unreadable\n", page_no);
1987                 if( !bt->frame->free ) {
1988                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
1989                   ptr = keyptr(bt->frame, idx+1);
1990                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
1991                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
1992                  }
1993                  if( !bt->frame->lvl )
1994                         cnt += bt->frame->act;
1995                  blks[bt->frame->lvl]++;
1996                 }
1997
1998                 if( page_no > LEAF_page )
1999                         next = page_no + 1;
2000                 page_no = next;
2001         }
2002
2003         for( idx = 0; blks[idx]; idx++ )
2004                 fprintf(stderr, "btree: %d lvl %d blocks\n", blks[idx], idx);
2005
2006         return cnt - 1;
2007 }
2008
2009 #ifndef unix
2010 double getCpuTime(int type)
2011 {
2012 FILETIME crtime[1];
2013 FILETIME xittime[1];
2014 FILETIME systime[1];
2015 FILETIME usrtime[1];
2016 SYSTEMTIME timeconv[1];
2017 double ans = 0;
2018
2019         memset (timeconv, 0, sizeof(SYSTEMTIME));
2020
2021         switch( type ) {
2022         case 0:
2023                 GetSystemTimeAsFileTime (xittime);
2024                 FileTimeToSystemTime (xittime, timeconv);
2025                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2026                 break;
2027         case 1:
2028                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2029                 FileTimeToSystemTime (usrtime, timeconv);
2030                 break;
2031         case 2:
2032                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2033                 FileTimeToSystemTime (systime, timeconv);
2034                 break;
2035         }
2036
2037         ans += (double)timeconv->wHour * 3600;
2038         ans += (double)timeconv->wMinute * 60;
2039         ans += (double)timeconv->wSecond;
2040         ans += (double)timeconv->wMilliseconds / 1000;
2041         return ans;
2042 }
2043 #else
2044 #include <time.h>
2045 #include <sys/resource.h>
2046
2047 double getCpuTime(int type)
2048 {
2049 struct rusage used[1];
2050 struct timeval tv[1];
2051
2052         switch( type ) {
2053         case 0:
2054                 gettimeofday(tv, NULL);
2055                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2056
2057         case 1:
2058                 getrusage(RUSAGE_SELF, used);
2059                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2060
2061         case 2:
2062                 getrusage(RUSAGE_SELF, used);
2063                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2064         }
2065
2066         return 0;
2067 }
2068 #endif
2069
2070 //  standalone program to index file of keys
2071 //  then list them onto std-out
2072
2073 int main (int argc, char **argv)
2074 {
2075 uint slot, line = 0, off = 0, found = 0;
2076 int ch, cnt = 0, bits = 12, idx;
2077 unsigned char key[256];
2078 double done, start;
2079 uid next, page_no;
2080 BtLatchSet *latch;
2081 float elapsed;
2082 time_t tod[1];
2083 uint scan = 0;
2084 uint len = 0;
2085 uint map = 0;
2086 BtPage page;
2087 BtKey ptr;
2088 BtDb *bt;
2089 FILE *in;
2090
2091 #ifdef WIN32
2092         _setmode (1, _O_BINARY);
2093 #endif
2094         if( argc < 4 ) {
2095                 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find/Count [page_bits mapped_pool_pages start_line_number]\n", argv[0]);
2096                 fprintf (stderr, "  page_bits: size of btree page in bits\n");
2097                 fprintf (stderr, "  mapped_pool_pages: number of pages in buffer pool\n");
2098                 exit(0);
2099         }
2100
2101         start = getCpuTime(0);
2102         time(tod);
2103
2104         if( argc > 4 )
2105                 bits = atoi(argv[4]);
2106
2107         if( argc > 5 )
2108                 map = atoi(argv[5]);
2109
2110         if( argc > 6 )
2111                 off = atoi(argv[6]);
2112
2113         bt = bt_open ((argv[1]), BT_rw, bits, map);
2114
2115         if( !bt ) {
2116                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2117                 exit (1);
2118         }
2119
2120         switch(argv[3][0]| 0x20)
2121         {
2122         case 'p':       // display page
2123                 if( latch = bt_pinlatch (bt, off) )
2124                         page = bt_mappage (bt, latch);
2125                 else
2126                         fprintf(stderr, "unable to read page %.8x\n", off);
2127
2128                 write (1, page, bt->page_size);
2129                 break;
2130
2131         case 'a':       // buffer pool audit
2132                 fprintf(stderr, "started audit for %s\n", argv[1]);
2133                 cnt = bt_audit (bt);
2134                 fprintf(stderr, "finished audit for %s, %d keys\n", argv[1], cnt);
2135                 break;
2136
2137         case 'w':       // write keys
2138                 fprintf(stderr, "started indexing for %s\n", argv[2]);
2139                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2140 #ifdef unix
2141                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2142 #endif
2143                   while( ch = getc(in), ch != EOF )
2144                         if( ch == '\n' )
2145                         {
2146                           if( off )
2147                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2148
2149                           if( bt_insertkey (bt, key, len, 0, ++line, *tod) )
2150                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2151                           len = 0;
2152                         }
2153                         else if( len < 245 )
2154                                 key[len++] = ch;
2155                   }
2156                 fprintf(stderr, "finished adding keys for %s, %d \n", argv[2], line);
2157                 break;
2158
2159         case 'd':       // delete keys
2160                 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2161                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2162 #ifdef unix
2163                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2164 #endif
2165                   while( ch = getc(in), ch != EOF )
2166                         if( ch == '\n' )
2167                         {
2168                           if( off )
2169                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2170                           line++;
2171                           if( bt_deletekey (bt, key, len, 0) )
2172                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2173                           len = 0;
2174                         }
2175                         else if( len < 245 )
2176                                 key[len++] = ch;
2177                   }
2178                 fprintf(stderr, "finished deleting keys for %s, %d \n", argv[2], line);
2179                 break;
2180
2181         case 'f':       // find keys
2182                 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2183                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2184 #ifdef unix
2185                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2186 #endif
2187                   while( ch = getc(in), ch != EOF )
2188                         if( ch == '\n' )
2189                         {
2190                           if( off )
2191                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2192                           line++;
2193                           if( bt_findkey (bt, key, len) )
2194                                 found++;
2195                           else if( bt->err )
2196                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2197                           len = 0;
2198                         }
2199                         else if( len < 245 )
2200                                 key[len++] = ch;
2201                   }
2202                 fprintf(stderr, "finished search of %d keys for %s, found %d\n", line, argv[2], found);
2203                 break;
2204
2205         case 's':       // scan and print keys
2206                 fprintf(stderr, "started scaning\n");
2207                 cnt = len = key[0] = 0;
2208
2209                 if( slot = bt_startkey (bt, key, len) )
2210                   slot--;
2211                 else
2212                   fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2213
2214                 while( slot = bt_nextkey (bt, slot) ) {
2215                         ptr = bt_key(bt, slot);
2216                         fwrite (ptr->key, ptr->len, 1, stdout);
2217                         fputc ('\n', stdout);
2218                         cnt++;
2219                 }
2220
2221                 fprintf(stderr, " Total keys read %d\n", cnt - 1);
2222                 break;
2223
2224         case 'c':       // count keys
2225           fprintf(stderr, "started counting\n");
2226           cnt = 0;
2227
2228           next = bt->latchmgr->nlatchpage + LATCH_page;
2229           page_no = LEAF_page;
2230
2231           while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
2232                 if( latch = bt_pinlatch (bt, page_no) )
2233                         page = bt_mappage (bt, latch);
2234                 if( !page->free && !page->lvl )
2235                         cnt += page->act;
2236                 if( page_no > LEAF_page )
2237                         next = page_no + 1;
2238                 if( scan )
2239                  for( idx = 0; idx++ < page->cnt; ) {
2240                   if( slotptr(page, idx)->dead )
2241                         continue;
2242                   ptr = keyptr(page, idx);
2243                   if( idx != page->cnt && bt_getid (page->right) ) {
2244                         fwrite (ptr->key, ptr->len, 1, stdout);
2245                         fputc ('\n', stdout);
2246                   }
2247                  }
2248                 bt_unpinlatch (latch);
2249                 page_no = next;
2250           }
2251                 
2252           cnt--;        // remove stopper key
2253           fprintf(stderr, " Total keys read %d\n", cnt);
2254           break;
2255         }
2256
2257         done = getCpuTime(0);
2258         elapsed = (float)(done - start);
2259         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2260         elapsed = getCpuTime(1);
2261         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2262         elapsed = getCpuTime(2);
2263         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2264         return 0;
2265 }
2266
2267 #endif  //STANDALONE