]> pd.if.org Git - btree/blob - btree2u.c
Implement enhanced GCLOCK page replacement method for buffer pool
[btree] / btree2u.c
1 // btree version 2u
2 //      with combined latch & pool manager
3 // 26 FEB 2014
4
5 // author: karl malbrain, malbrain@cal.berkeley.edu
6
7 /*
8 This work, including the source code, documentation
9 and related data, is placed into the public domain.
10
11 The orginal author is Karl Malbrain.
12
13 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
14 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
15 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
16 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
17 RESULTING FROM THE USE, MODIFICATION, OR
18 REDISTRIBUTION OF THIS SOFTWARE.
19 */
20
21 // Please see the project home page for documentation
22 // code.google.com/p/high-concurrency-btree
23
24 #define _FILE_OFFSET_BITS 64
25 #define _LARGEFILE64_SOURCE
26
27 #ifdef linux
28 #define _GNU_SOURCE
29 #endif
30
31 #ifdef unix
32 #include <unistd.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <time.h>
36 #include <fcntl.h>
37 #include <sys/mman.h>
38 #include <errno.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #include <io.h>
47 #endif
48
49 #include <memory.h>
50 #include <string.h>
51
52 typedef unsigned long long      uid;
53
54 #ifndef unix
55 typedef unsigned long long      off64_t;
56 typedef unsigned short          ushort;
57 typedef unsigned int            uint;
58 #endif
59
60 #define BT_ro 0x6f72    // ro
61 #define BT_rw 0x7772    // rw
62 #define BT_fl 0x6c66    // fl
63
64 #define BT_maxbits              15                                      // maximum page size in bits
65 #define BT_minbits              12                                      // minimum page size in bits
66 #define BT_minpage              (1 << BT_minbits)       // minimum page size
67 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
68
69 //  BTree page number constants
70 #define ALLOC_page              0
71 #define ROOT_page               1
72 #define LEAF_page               2
73 #define LATCH_page              3
74
75 //      Number of levels to create in a new BTree
76
77 #define MIN_lvl                 2
78 #define MAX_lvl                 15
79
80 /*
81 There are five lock types for each node in three independent sets: 
82 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
83 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
84 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
85 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
86 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
87 */
88
89 typedef enum{
90         BtLockAccess,
91         BtLockDelete,
92         BtLockRead,
93         BtLockWrite,
94         BtLockParent
95 }BtLock;
96
97 //      definition for latch implementation
98
99 // exclusive is set for write access
100 // share is count of read accessors
101 // grant write lock when share == 0
102
103 volatile typedef struct {
104         unsigned char mutex[1];
105         unsigned char exclusive:1;
106         unsigned char pending:1;
107         ushort share;
108 } BtSpinLatch;
109
110 //      Define the length of the page and key pointers
111
112 #define BtId 6
113
114 //      Page key slot definition.
115
116 //      If BT_maxbits is 15 or less, you can save 2 bytes
117 //      for each key stored by making the first two uints
118 //      into ushorts.  You can also save 4 bytes by removing
119 //      the tod field from the key.
120
121 //      Keys are marked dead, but remain on the page until
122 //      cleanup is called. The fence key (highest key) for
123 //      the page is always present, even if dead.
124
125 typedef struct {
126 #ifdef USETOD
127         uint tod;                                       // time-stamp for key
128 #endif
129         ushort off:BT_maxbits;          // page offset for key start
130         ushort dead:1;                          // set for deleted key
131         unsigned char id[BtId];         // id associated with key
132 } BtSlot;
133
134 //      The key structure occupies space at the upper end of
135 //      each page.  It's a length byte followed by the value
136 //      bytes.
137
138 typedef struct {
139         unsigned char len;
140         unsigned char key[0];
141 } *BtKey;
142
143 //      The first part of an index page.
144 //      It is immediately followed
145 //      by the BtSlot array of keys.
146
147 typedef struct BtPage_ {
148         uint cnt;                                       // count of keys in page
149         uint act;                                       // count of active keys
150         uint min;                                       // next key offset
151         unsigned char bits:6;           // page size in bits
152         unsigned char free:1;           // page is on free list
153         unsigned char dirty:1;          // page is dirty in cache
154         unsigned char lvl:6;            // level of page
155         unsigned char kill:1;           // page is being deleted
156         unsigned char clean:1;          // page needs cleaning
157         unsigned char right[BtId];      // page number to right
158 } *BtPage;
159
160 typedef struct {
161         struct BtPage_ alloc[2];        // next & free page_nos in right ptr
162         BtSpinLatch lock[1];            // allocation area lite latch
163         volatile uint latchdeployed;// highest number of latch entries deployed
164         volatile uint nlatchpage;       // number of latch pages at BT_latch
165         volatile uint latchtotal;       // number of page latch entries
166         volatile uint latchhash;        // number of latch hash table slots
167         volatile uint latchvictim;      // next latch hash entry to examine
168         volatile uint safelevel;        // safe page level in cache
169         volatile uint cache[MAX_lvl];// cache census counts by btree level
170 } BtLatchMgr;
171
172 //  latch hash table entries
173
174 typedef struct {
175         volatile uint slot;             // Latch table entry at head of collision chain
176         BtSpinLatch latch[1];   // lock for the collision chain
177 } BtHashEntry;
178
179 //      latch manager table structure
180
181 typedef struct {
182         volatile uid page_no;           // latch set page number on disk
183         BtSpinLatch readwr[1];          // read/write page lock
184         BtSpinLatch access[1];          // Access Intent/Page delete
185         BtSpinLatch parent[1];          // Posting of fence key in parent
186         volatile ushort pin;            // number of pins/level/clock bits
187         volatile uint next;                     // next entry in hash table chain
188         volatile uint prev;                     // prev entry in hash table chain
189 } BtLatchSet;
190
191 #define CLOCK_mask 0xe000
192 #define CLOCK_unit 0x2000
193 #define PIN_mask 0x07ff
194 #define LVL_mask 0x1800
195 #define LVL_shift 11
196
197 //      The object structure for Btree access
198
199 typedef struct _BtDb {
200         uint page_size;         // each page size       
201         uint page_bits;         // each page size in bits       
202         uid page_no;            // current page number  
203         uid cursor_page;        // current cursor page number   
204         int  err;
205         uint mode;                      // read-write mode
206         BtPage cursor;          // cached frame for start/next (never mapped)
207         BtPage frame;           // spare frame for the page split (never mapped)
208         BtPage page;            // current mapped page in buffer pool
209         BtLatchSet *latch;                      // current page latch
210         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
211         BtLatchSet *latchsets;          // mapped latch set from latch pages
212         unsigned char *pagepool;        // cached page pool set
213         BtHashEntry *table;     // the hash table
214 #ifdef unix
215         int idx;
216 #else
217         HANDLE idx;
218         HANDLE halloc;          // allocation and latch table handle
219 #endif
220         unsigned char *mem;     // frame, cursor, memory buffers
221         uint found;                     // last deletekey found key
222 } BtDb;
223
224 typedef enum {
225 BTERR_ok = 0,
226 BTERR_notfound,
227 BTERR_struct,
228 BTERR_ovflw,
229 BTERR_read,
230 BTERR_lock,
231 BTERR_hash,
232 BTERR_kill,
233 BTERR_map,
234 BTERR_wrt,
235 BTERR_eof
236 } BTERR;
237
238 // B-Tree functions
239 extern void bt_close (BtDb *bt);
240 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk);
241 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
242 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
243 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
244 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
245 extern uint bt_nextkey   (BtDb *bt, uint slot);
246
247 //      internal functions
248 void bt_update (BtDb *bt, BtPage page);
249 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch);
250 //  Helper functions to return slot values
251
252 extern BtKey bt_key (BtDb *bt, uint slot);
253 extern uid bt_uid (BtDb *bt, uint slot);
254 #ifdef USETOD
255 extern uint bt_tod (BtDb *bt, uint slot);
256 #endif
257
258 //  The page is allocated from low and hi ends.
259 //  The key offsets and row-id's are allocated
260 //  from the bottom, while the text of the key
261 //  is allocated from the top.  When the two
262 //  areas meet, the page is split into two.
263
264 //  A key consists of a length byte, two bytes of
265 //  index number (0 - 65534), and up to 253 bytes
266 //  of key value.  Duplicate keys are discarded.
267 //  Associated with each key is a 48 bit row-id.
268
269 //  The b-tree root is always located at page 1.
270 //      The first leaf page of level zero is always
271 //      located on page 2.
272
273 //      The b-tree pages are linked with right
274 //      pointers to facilitate enumerators,
275 //      and provide for concurrency.
276
277 //      When to root page fills, it is split in two and
278 //      the tree height is raised by a new root at page
279 //      one with two keys.
280
281 //      Deleted keys are marked with a dead bit until
282 //      page cleanup The fence key for a node is always
283 //      present, even after deletion and cleanup.
284
285 //  Deleted leaf pages are reclaimed  on a free list.
286 //      The upper levels of the btree are fixed on creation.
287
288 //  To achieve maximum concurrency one page is locked at a time
289 //  as the tree is traversed to find leaf key in question. The right
290 //  page numbers are used in cases where the page is being split,
291 //      or consolidated.
292
293 //  Page 0 (ALLOC page) is dedicated to lock for new page extensions,
294 //      and chains empty leaf pages together for reuse.
295
296 //      Parent locks are obtained to prevent resplitting or deleting a node
297 //      before its fence is posted into its upper level.
298
299 //      A special open mode of BT_fl is provided to safely access files on
300 //      WIN32 networks. WIN32 network operations should not use memory mapping.
301 //      This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
302 //      to prevent local caching of network file contents.
303
304 //      Access macros to address slot and key values from the page.
305 //      Page slots use 1 based indexing.
306
307 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
308 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
309
310 void bt_putid(unsigned char *dest, uid id)
311 {
312 int i = BtId;
313
314         while( i-- )
315                 dest[i] = (unsigned char)id, id >>= 8;
316 }
317
318 uid bt_getid(unsigned char *src)
319 {
320 uid id = 0;
321 int i;
322
323         for( i = 0; i < BtId; i++ )
324                 id <<= 8, id |= *src++; 
325
326         return id;
327 }
328
329 BTERR bt_abort (BtDb *bt, BtPage page, uid page_no, BTERR err)
330 {
331 BtKey ptr;
332
333         fprintf(stderr, "\n Btree2 abort, error %d on page %.8x\n", err, page_no);
334         fprintf(stderr, "level=%d kill=%d free=%d cnt=%x act=%x\n", page->lvl, page->kill, page->free, page->cnt, page->act);
335         ptr = keyptr(page, page->cnt);
336         fprintf(stderr, "fence='%.*s'\n", ptr->len, ptr->key);
337         fprintf(stderr, "right=%.8x\n", bt_getid(page->right));
338         return bt->err = err;
339 }
340
341 //      Latch Manager
342
343 //      wait until write lock mode is clear
344 //      and add 1 to the share count
345
346 void bt_spinreadlock(BtSpinLatch *latch)
347 {
348 ushort prev;
349
350   do {
351         //      obtain latch mutex
352 #ifdef unix
353         if( __sync_lock_test_and_set(latch->mutex, 1) )
354                 continue;
355 #else
356         if( _InterlockedExchange8(latch->mutex, 1) )
357                 continue;
358 #endif
359         //  see if exclusive request is granted or pending
360
361         if( prev = !(latch->exclusive | latch->pending) )
362                 latch->share++;
363
364 #ifdef unix
365         *latch->mutex = 0;
366 #else
367         _InterlockedExchange8(latch->mutex, 0);
368 #endif
369
370         if( prev )
371                 return;
372
373 #ifdef  unix
374   } while( sched_yield(), 1 );
375 #else
376   } while( SwitchToThread(), 1 );
377 #endif
378 }
379
380 //      wait for other read and write latches to relinquish
381
382 void bt_spinwritelock(BtSpinLatch *latch)
383 {
384 uint prev;
385
386   do {
387 #ifdef  unix
388         if( __sync_lock_test_and_set(latch->mutex, 1) )
389                 continue;
390 #else
391         if( _InterlockedExchange8(latch->mutex, 1) )
392                 continue;
393 #endif
394         if( prev = !(latch->share | latch->exclusive) )
395                 latch->exclusive = 1, latch->pending = 0;
396         else
397                 latch->pending = 1;
398 #ifdef unix
399         *latch->mutex = 0;
400 #else
401         _InterlockedExchange8(latch->mutex, 0);
402 #endif
403         if( prev )
404                 return;
405 #ifdef  unix
406   } while( sched_yield(), 1 );
407 #else
408   } while( SwitchToThread(), 1 );
409 #endif
410 }
411
412 //      try to obtain write lock
413
414 //      return 1 if obtained,
415 //              0 otherwise
416
417 int bt_spinwritetry(BtSpinLatch *latch)
418 {
419 uint prev;
420
421 #ifdef unix
422         if( __sync_lock_test_and_set(latch->mutex, 1) )
423                 return 0;
424 #else
425         if( _InterlockedExchange8(latch->mutex, 1) )
426                 return 0;
427 #endif
428         //      take write access if all bits are clear
429
430         if( prev = !(latch->exclusive | latch->share) )
431                 latch->exclusive = 1;
432
433 #ifdef unix
434         *latch->mutex = 0;
435 #else
436         _InterlockedExchange8(latch->mutex, 0);
437 #endif
438         return prev;
439 }
440
441 //      clear write mode
442
443 void bt_spinreleasewrite(BtSpinLatch *latch)
444 {
445 #ifdef unix
446         while( __sync_lock_test_and_set(latch->mutex, 1) )
447                 sched_yield();
448 #else
449         while( _InterlockedExchange8(latch->mutex, 1) )
450                 SwitchToThread();
451 #endif
452         latch->exclusive = 0;
453 #ifdef unix
454         *latch->mutex = 0;
455 #else
456         _InterlockedExchange8(latch->mutex, 0);
457 #endif
458 }
459
460 //      decrement reader count
461
462 void bt_spinreleaseread(BtSpinLatch *latch)
463 {
464 #ifdef unix
465         while( __sync_lock_test_and_set(latch->mutex, 1) )
466                 sched_yield();
467 #else
468         while( _InterlockedExchange8(latch->mutex, 1) )
469                 SwitchToThread();
470 #endif
471         latch->share--;
472 #ifdef unix
473         *latch->mutex = 0;
474 #else
475         _InterlockedExchange8(latch->mutex, 0);
476 #endif
477 }
478
479 //      read page from permanent location in Btree file
480
481 BTERR bt_readpage (BtDb *bt, BtPage page, uid page_no)
482 {
483 off64_t off = page_no << bt->page_bits;
484
485 #ifdef unix
486         if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) < bt->page_size ) {
487                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
488                 return bt->err = BTERR_read;
489         }
490 #else
491 OVERLAPPED ovl[1];
492 uint amt[1];
493
494         memset (ovl, 0, sizeof(OVERLAPPED));
495         ovl->Offset = off;
496         ovl->OffsetHigh = off >> 32;
497
498         if( !ReadFile(bt->idx, page, bt->page_size, amt, ovl)) {
499                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
500                 return bt->err = BTERR_read;
501         }
502         if( *amt <  bt->page_size ) {
503                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
504                 return bt->err = BTERR_read;
505         }
506 #endif
507         return 0;
508 }
509
510 //      write page to permanent location in Btree file
511 //      clear the dirty bit
512
513 BTERR bt_writepage (BtDb *bt, BtPage page, uid page_no)
514 {
515 off64_t off = page_no << bt->page_bits;
516
517 #ifdef unix
518         page->dirty = 0;
519
520         if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
521                 return bt->err = BTERR_wrt;
522 #else
523 OVERLAPPED ovl[1];
524 uint amt[1];
525
526         memset (ovl, 0, sizeof(OVERLAPPED));
527         ovl->Offset = off;
528         ovl->OffsetHigh = off >> 32;
529         page->dirty = 0;
530
531         if( !WriteFile(bt->idx, page, bt->page_size, amt, ovl) )
532                 return bt->err = BTERR_wrt;
533
534         if( *amt <  bt->page_size )
535                 return bt->err = BTERR_wrt;
536 #endif
537         return 0;
538 }
539
540 //      link latch table entry into head of latch hash table
541
542 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no)
543 {
544 BtPage page = (BtPage)((uid)slot * bt->page_size + bt->pagepool);
545 BtLatchSet *latch = bt->latchsets + slot;
546 int lvl;
547
548         if( latch->next = bt->table[hashidx].slot )
549                 bt->latchsets[latch->next].prev = slot;
550
551         bt->table[hashidx].slot = slot;
552         latch->page_no = page_no;
553         latch->prev = 0;
554         latch->pin = 1;
555
556         if( bt_readpage (bt, page, page_no) )
557                 return bt->err;
558
559         lvl = page->lvl << LVL_shift;
560         if( lvl > LVL_mask )
561                 lvl = LVL_mask;
562         latch->pin |= lvl;              // store lvl
563         latch->pin |= lvl << 3; // initialize clock
564
565 #ifdef unix
566         __sync_fetch_and_add (&bt->latchmgr->cache[page->lvl], 1);
567 #else
568         _InterlockedAdd(&bt->latchmgr->cache[page->lvl], 1);
569 #endif
570         return bt->err = 0;
571 }
572
573 //      release latch pin
574
575 void bt_unpinlatch (BtLatchSet *latch)
576 {
577 #ifdef unix
578         __sync_fetch_and_add(&latch->pin, -1);
579 #else
580         _InterlockedDecrement16 (&latch->pin);
581 #endif
582 }
583
584 //      find existing latchset or inspire new one
585 //      return with latchset pinned
586
587 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
588 {
589 uint hashidx = page_no % bt->latchmgr->latchhash;
590 BtLatchSet *latch;
591 uint slot, idx;
592 uint lvl, cnt;
593 off64_t off;
594 uint amt[1];
595 BtPage page;
596
597   //  try to find our entry
598
599   bt_spinwritelock(bt->table[hashidx].latch);
600
601   if( slot = bt->table[hashidx].slot ) do
602   {
603         latch = bt->latchsets + slot;
604         if( page_no == latch->page_no )
605                 break;
606   } while( slot = latch->next );
607
608   //  found our entry
609   //  increment clock
610
611   if( slot ) {
612         latch = bt->latchsets + slot;
613         lvl = (latch->pin & LVL_mask) >> LVL_shift;
614         lvl *= CLOCK_unit * 2;
615         lvl |= CLOCK_unit;
616 #ifdef unix
617         __sync_fetch_and_add(&latch->pin, 1);
618         __sync_fetch_and_or(&latch->pin, lvl);
619 #else
620         _InterlockedIncrement16 (&latch->pin);
621         _InterlockedOr16 (&latch->pin, lvl);
622 #endif
623         bt_spinreleasewrite(bt->table[hashidx].latch);
624         return latch;
625   }
626
627         //  see if there are any unused pool entries
628 #ifdef unix
629         slot = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1;
630 #else
631         slot = _InterlockedIncrement (&bt->latchmgr->latchdeployed);
632 #endif
633
634         if( slot < bt->latchmgr->latchtotal ) {
635                 latch = bt->latchsets + slot;
636                 if( bt_latchlink (bt, hashidx, slot, page_no) )
637                         return NULL;
638                 bt_spinreleasewrite (bt->table[hashidx].latch);
639                 return latch;
640         }
641
642 #ifdef unix
643         __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1);
644 #else
645         _InterlockedDecrement (&bt->latchmgr->latchdeployed);
646 #endif
647   //  find and reuse previous entry on victim
648
649   while( 1 ) {
650 #ifdef unix
651         slot = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
652 #else
653         slot = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1;
654 #endif
655         // try to get write lock on hash chain
656         //      skip entry if not obtained
657         //      or has outstanding pins
658
659         slot %= bt->latchmgr->latchtotal;
660
661         //      on slot wraparound, check census
662         //      count and increment safe level
663
664         cnt = bt->latchmgr->cache[bt->latchmgr->safelevel];
665
666         if( !slot ) {
667           if( cnt < bt->latchmgr->latchtotal / 10 )
668 #ifdef unix
669                 __sync_fetch_and_add(&bt->latchmgr->safelevel, 1);
670 #else
671                 _InterlockedIncrement (&bt->latchmgr->safelevel);
672 #endif
673 fprintf(stderr, "X");
674                 continue;
675         }
676
677         latch = bt->latchsets + slot;
678         idx = latch->page_no % bt->latchmgr->latchhash;
679         lvl = (latch->pin & LVL_mask) >> LVL_shift;
680
681         //      see if we are evicting this level yet
682
683         if( lvl > bt->latchmgr->safelevel )
684                 continue;
685
686         if( !bt_spinwritetry (bt->table[idx].latch) )
687                 continue;
688
689         if( latch->pin & ~LVL_mask ) {
690           if( latch->pin & CLOCK_mask )
691 #ifdef unix
692                 __sync_fetch_and_add(&latch->pin, -CLOCK_unit);
693 #else
694                 _InterlockedExchangeAdd16 (&latch->pin, -CLOCK_unit);
695 #endif
696           bt_spinreleasewrite (bt->table[idx].latch);
697           continue;
698         }
699
700         //  update permanent page area in btree
701
702         page = (BtPage)((uid)slot * bt->page_size + bt->pagepool);
703 #ifdef unix
704         posix_fadvise (bt->idx, page_no << bt->page_bits, bt->page_size, POSIX_FADV_WILLNEED);
705 if(page->lvl > 0 )
706 fprintf(stderr, "%d", page->lvl);
707         __sync_fetch_and_add (&bt->latchmgr->cache[page->lvl], -1);
708 #else
709         _InterlockedAdd(&bt->latchmgr->cache[page->lvl], -1);
710 #endif
711         if( page->dirty )
712           if( bt_writepage (bt, page, latch->page_no) )
713                 return NULL;
714
715         //  unlink our available slot from its hash chain
716
717         if( latch->prev )
718                 bt->latchsets[latch->prev].next = latch->next;
719         else
720                 bt->table[idx].slot = latch->next;
721
722         if( latch->next )
723                 bt->latchsets[latch->next].prev = latch->prev;
724
725         bt_spinreleasewrite (bt->table[idx].latch);
726
727         if( bt_latchlink (bt, hashidx, slot, page_no) )
728                 return NULL;
729
730         bt_spinreleasewrite (bt->table[hashidx].latch);
731         return latch;
732   }
733 }
734
735 //      close and release memory
736
737 void bt_close (BtDb *bt)
738 {
739 #ifdef unix
740         munmap (bt->table, bt->latchmgr->nlatchpage * bt->page_size);
741         munmap (bt->latchmgr, bt->page_size);
742 #else
743         FlushViewOfFile(bt->latchmgr, 0);
744         UnmapViewOfFile(bt->latchmgr);
745         CloseHandle(bt->halloc);
746 #endif
747 #ifdef unix
748         if( bt->mem )
749                 free (bt->mem);
750         close (bt->idx);
751         free (bt);
752 #else
753         if( bt->mem)
754                 VirtualFree (bt->mem, 0, MEM_RELEASE);
755         FlushFileBuffers(bt->idx);
756         CloseHandle(bt->idx);
757         GlobalFree (bt);
758 #endif
759 }
760 //  open/create new btree
761
762 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
763 //              size of mapped page pool (e.g. 8192)
764
765 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax)
766 {
767 uint lvl, attr, last, slot, idx;
768 uint nlatchpage, latchhash;
769 BtLatchMgr *latchmgr;
770 off64_t size, off;
771 uint amt[1];
772 BtKey key;
773 BtDb* bt;
774 int flag;
775
776 #ifndef unix
777 OVERLAPPED ovl[1];
778 #else
779 struct flock lock[1];
780 #endif
781
782         // determine sanity of page size and buffer pool
783
784         if( bits > BT_maxbits )
785                 bits = BT_maxbits;
786         else if( bits < BT_minbits )
787                 bits = BT_minbits;
788
789         if( mode == BT_ro ) {
790                 fprintf(stderr, "ReadOnly mode not supported: %s\n", name);
791                 return NULL;
792         }
793 #ifdef unix
794         bt = calloc (1, sizeof(BtDb));
795
796         bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
797         posix_fadvise( bt->idx, 0, 0, POSIX_FADV_RANDOM);
798  
799         if( bt->idx == -1 ) {
800                 fprintf(stderr, "unable to open %s\n", name);
801                 return free(bt), NULL;
802         }
803 #else
804         bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb));
805         attr = FILE_ATTRIBUTE_NORMAL;
806         bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
807
808         if( bt->idx == INVALID_HANDLE_VALUE ) {
809                 fprintf(stderr, "unable to open %s\n", name);
810                 return GlobalFree(bt), NULL;
811         }
812 #endif
813 #ifdef unix
814         memset (lock, 0, sizeof(lock));
815         lock->l_len = sizeof(struct BtPage_);
816         lock->l_type = F_WRLCK;
817
818         if( fcntl (bt->idx, F_SETLKW, lock) < 0 ) {
819                 fprintf(stderr, "unable to lock record zero %s\n", name);
820                 return bt_close (bt), NULL;
821         }
822 #else
823         memset (ovl, 0, sizeof(ovl));
824
825         //      use large offsets to
826         //      simulate advisory locking
827
828         ovl->OffsetHigh |= 0x80000000;
829
830         if( !LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, sizeof(struct BtPage_), 0L, ovl) ) {
831                 fprintf(stderr, "unable to lock record zero %s, GetLastError = %d\n", name, GetLastError());
832                 return bt_close (bt), NULL;
833         }
834 #endif 
835
836 #ifdef unix
837         latchmgr = valloc (BT_maxpage);
838         *amt = 0;
839
840         // read minimum page size to get root info
841
842         if( size = lseek (bt->idx, 0L, 2) ) {
843                 if( pread(bt->idx, latchmgr, BT_minpage, 0) == BT_minpage )
844                         bits = latchmgr->alloc->bits;
845                 else {
846                         fprintf(stderr, "Unable to read page zero\n");
847                         return free(bt), free(latchmgr), NULL;
848                 }
849         }
850 #else
851         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
852         size = GetFileSize(bt->idx, amt);
853
854         if( size || *amt ) {
855                 if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) ) {
856                         fprintf(stderr, "Unable to read page zero\n");
857                         return bt_close (bt), NULL;
858                 } else
859                         bits = latchmgr->alloc->bits;
860         }
861 #endif
862
863         bt->page_size = 1 << bits;
864         bt->page_bits = bits;
865
866         bt->mode = mode;
867
868         if( size || *amt ) {
869                 nlatchpage = latchmgr->nlatchpage;
870                 goto btlatch;
871         }
872
873         if( nodemax < 16 ) {
874                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
875                 return bt_close(bt), NULL;
876         }
877
878         // initialize an empty b-tree with latch page, root page, page of leaves
879         // and page(s) of latches and page pool cache
880
881         memset (latchmgr, 0, 1 << bits);
882         latchmgr->alloc->bits = bt->page_bits;
883
884         //  calculate number of latch hash table entries
885
886         nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + bt->page_size - 1) / bt->page_size;
887         latchhash = nlatchpage * bt->page_size / sizeof(BtHashEntry);
888
889         nlatchpage += nodemax;          // size of the buffer pool in pages
890         nlatchpage += (sizeof(BtLatchSet) * nodemax + bt->page_size - 1)/bt->page_size;
891
892         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
893         latchmgr->nlatchpage = nlatchpage;
894         latchmgr->latchtotal = nodemax;
895         latchmgr->latchhash = latchhash;
896
897         if( bt_writepage (bt, latchmgr->alloc, 0) ) {
898                 fprintf (stderr, "Unable to create btree page zero\n");
899                 return bt_close (bt), NULL;
900         }
901
902         memset (latchmgr, 0, 1 << bits);
903         latchmgr->alloc->bits = bt->page_bits;
904
905         for( lvl=MIN_lvl; lvl--; ) {
906                 last = MIN_lvl - lvl;   // page number
907                 slotptr(latchmgr->alloc, 1)->off = bt->page_size - 3;
908                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? last + 1 : 0);
909                 key = keyptr(latchmgr->alloc, 1);
910                 key->len = 2;           // create stopper key
911                 key->key[0] = 0xff;
912                 key->key[1] = 0xff;
913
914                 latchmgr->alloc->min = bt->page_size - 3;
915                 latchmgr->alloc->lvl = lvl;
916                 latchmgr->alloc->cnt = 1;
917                 latchmgr->alloc->act = 1;
918
919                 if( bt_writepage (bt, latchmgr->alloc, last) ) {
920                         fprintf (stderr, "Unable to create btree page %.8x\n", last);
921                         return bt_close (bt), NULL;
922                 }
923         }
924
925         // clear out buffer pool pages
926
927         memset(latchmgr, 0, bt->page_size);
928         last = MIN_lvl + nlatchpage;
929
930         if( bt_writepage (bt, latchmgr->alloc, last) ) {
931                 fprintf (stderr, "Unable to write buffer pool page %.8x\n", last);
932                 return bt_close (bt), NULL;
933         }
934                 
935 #ifdef unix
936         free (latchmgr);
937 #else
938         VirtualFree (latchmgr, 0, MEM_RELEASE);
939 #endif
940
941 btlatch:
942 #ifdef unix
943         lock->l_type = F_UNLCK;
944         if( fcntl (bt->idx, F_SETLK, lock) < 0 ) {
945                 fprintf (stderr, "Unable to unlock page zero\n");
946                 return bt_close (bt), NULL;
947         }
948 #else
949         if( !UnlockFileEx (bt->idx, 0, sizeof(struct BtPage_), 0, ovl) ) {
950                 fprintf (stderr, "Unable to unlock page zero, GetLastError = %d\n", GetLastError());
951                 return bt_close (bt), NULL;
952         }
953 #endif
954 #ifdef unix
955         flag = PROT_READ | PROT_WRITE;
956         bt->latchmgr = mmap (0, bt->page_size, flag, MAP_SHARED, bt->idx, ALLOC_page * bt->page_size);
957         if( bt->latchmgr == MAP_FAILED ) {
958                 fprintf (stderr, "Unable to mmap page zero, errno = %d", errno);
959                 return bt_close (bt), NULL;
960         }
961         bt->table = (void *)mmap (0, (uid)nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size);
962         if( bt->table == MAP_FAILED ) {
963                 fprintf (stderr, "Unable to mmap buffer pool, errno = %d", errno);
964                 return bt_close (bt), NULL;
965         }
966         madvise (bt->table, (uid)nlatchpage << bt->page_bits, MADV_RANDOM | MADV_WILLNEED);
967 #else
968         flag = PAGE_READWRITE;
969         bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size, NULL);
970         if( !bt->halloc ) {
971                 fprintf (stderr, "Unable to create file mapping for buffer pool mgr, GetLastError = %d\n", GetLastError());
972                 return bt_close (bt), NULL;
973         }
974
975         flag = FILE_MAP_WRITE;
976         bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size);
977         if( !bt->latchmgr ) {
978                 fprintf (stderr, "Unable to map buffer pool, GetLastError = %d\n", GetLastError());
979                 return bt_close (bt), NULL;
980         }
981
982         bt->table = (void *)((char *)bt->latchmgr + LATCH_page * bt->page_size);
983 #endif
984         bt->pagepool = (unsigned char *)bt->table + (uid)(nlatchpage - bt->latchmgr->latchtotal) * bt->page_size;
985         bt->latchsets = (BtLatchSet *)(bt->pagepool - (uid)bt->latchmgr->latchtotal * sizeof(BtLatchSet));
986
987 #ifdef unix
988         bt->mem = valloc (2 * bt->page_size);
989 #else
990         bt->mem = VirtualAlloc(NULL, 2 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
991 #endif
992         bt->frame = (BtPage)bt->mem;
993         bt->cursor = (BtPage)(bt->mem + bt->page_size);
994         return bt;
995 }
996
997 // place write, read, or parent lock on requested page_no.
998
999 void bt_lockpage(BtLock mode, BtLatchSet *latch)
1000 {
1001         switch( mode ) {
1002         case BtLockRead:
1003                 bt_spinreadlock (latch->readwr);
1004                 break;
1005         case BtLockWrite:
1006                 bt_spinwritelock (latch->readwr);
1007                 break;
1008         case BtLockAccess:
1009                 bt_spinreadlock (latch->access);
1010                 break;
1011         case BtLockDelete:
1012                 bt_spinwritelock (latch->access);
1013                 break;
1014         case BtLockParent:
1015                 bt_spinwritelock (latch->parent);
1016                 break;
1017         }
1018 }
1019
1020 // remove write, read, or parent lock on requested page
1021
1022 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1023 {
1024         switch( mode ) {
1025         case BtLockRead:
1026                 bt_spinreleaseread (latch->readwr);
1027                 break;
1028         case BtLockWrite:
1029                 bt_spinreleasewrite (latch->readwr);
1030                 break;
1031         case BtLockAccess:
1032                 bt_spinreleaseread (latch->access);
1033                 break;
1034         case BtLockDelete:
1035                 bt_spinreleasewrite (latch->access);
1036                 break;
1037         case BtLockParent:
1038                 bt_spinreleasewrite (latch->parent);
1039                 break;
1040         }
1041 }
1042
1043 //      allocate a new page and write page into it
1044
1045 uid bt_newpage(BtDb *bt, BtPage page)
1046 {
1047 BtLatchSet *latch;
1048 uid new_page;
1049 BtPage temp;
1050
1051         //      lock allocation page
1052
1053         bt_spinwritelock(bt->latchmgr->lock);
1054
1055         // use empty chain first
1056         // else allocate empty page
1057
1058         if( new_page = bt_getid(bt->latchmgr->alloc[1].right) ) {
1059           if( latch = bt_pinlatch (bt, new_page) )
1060                 temp = bt_mappage (bt, latch);
1061           else
1062                 return 0;
1063
1064           bt_putid(bt->latchmgr->alloc[1].right, bt_getid(temp->right));
1065           bt_spinreleasewrite(bt->latchmgr->lock);
1066           memcpy (temp, page, bt->page_size);
1067
1068           bt_update (bt, temp);
1069           bt_unpinlatch (latch);
1070           return new_page;
1071         } else {
1072           new_page = bt_getid(bt->latchmgr->alloc->right);
1073           bt_putid(bt->latchmgr->alloc->right, new_page+1);
1074           bt_spinreleasewrite(bt->latchmgr->lock);
1075
1076           if( bt_writepage (bt, page, new_page) )
1077                 return 0;
1078         }
1079
1080         bt_update (bt, bt->latchmgr->alloc);
1081         return new_page;
1082 }
1083
1084 //  compare two keys, returning > 0, = 0, or < 0
1085 //  as the comparison value
1086
1087 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1088 {
1089 uint len1 = key1->len;
1090 int ans;
1091
1092         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1093                 return ans;
1094
1095         if( len1 > len2 )
1096                 return 1;
1097         if( len1 < len2 )
1098                 return -1;
1099
1100         return 0;
1101 }
1102
1103 //  Update current page of btree by
1104 //      flushing mapped area to disk backing of cache pool.
1105 //      mark page as dirty for rewrite to permanent location
1106
1107 void bt_update (BtDb *bt, BtPage page)
1108 {
1109 #ifdef unix
1110         msync (page, bt->page_size, MS_ASYNC);
1111 #else
1112 //      FlushViewOfFile (page, bt->page_size);
1113 #endif
1114         page->dirty = 1;
1115 }
1116
1117 //  map the btree cached page onto current page
1118
1119 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
1120 {
1121         return (BtPage)((uid)(latch - bt->latchsets) * bt->page_size + bt->pagepool);
1122 }
1123
1124 //      deallocate a deleted page 
1125 //      place on free chain out of allocator page
1126 //      call with page latched for Writing and Deleting
1127
1128 BTERR bt_freepage(BtDb *bt, uid page_no, BtLatchSet *latch)
1129 {
1130 BtPage page = bt_mappage (bt, latch);
1131
1132         //      lock allocation page
1133
1134         bt_spinwritelock (bt->latchmgr->lock);
1135
1136         //      store chain in second right
1137         bt_putid(page->right, bt_getid(bt->latchmgr->alloc[1].right));
1138         bt_putid(bt->latchmgr->alloc[1].right, page_no);
1139
1140         page->free = 1;
1141         bt_update(bt, page);
1142
1143         // unlock released page
1144
1145         bt_unlockpage (BtLockDelete, latch);
1146         bt_unlockpage (BtLockWrite, latch);
1147         bt_unpinlatch (latch);
1148
1149         // unlock allocation page
1150
1151         bt_spinreleasewrite (bt->latchmgr->lock);
1152         bt_update (bt, bt->latchmgr->alloc);
1153         return 0;
1154 }
1155
1156 //  find slot in page for given key at a given level
1157
1158 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1159 {
1160 uint diff, higher = bt->page->cnt, low = 1, slot;
1161 uint good = 0;
1162
1163         //      make stopper key an infinite fence value
1164
1165         if( bt_getid (bt->page->right) )
1166                 higher++;
1167         else
1168                 good++;
1169
1170         //      low is the lowest candidate, higher is already
1171         //      tested as .ge. the given key, loop ends when they meet
1172
1173         while( diff = higher - low ) {
1174                 slot = low + ( diff >> 1 );
1175                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1176                         low = slot + 1;
1177                 else
1178                         higher = slot, good++;
1179         }
1180
1181         //      return zero if key is on right link page
1182
1183         return good ? higher : 0;
1184 }
1185
1186 //  find and load page at given level for given key
1187 //      leave page rd or wr locked as requested
1188
1189 int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
1190 {
1191 uid page_no = ROOT_page, prevpage = 0;
1192 uint drill = 0xff, slot;
1193 BtLatchSet *prevlatch;
1194 uint mode, prevmode;
1195
1196   //  start at root of btree and drill down
1197
1198   do {
1199         // determine lock mode of drill level
1200         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1201
1202         if( bt->latch = bt_pinlatch(bt, page_no) )
1203                 bt->page_no = page_no;
1204         else
1205                 return 0;
1206
1207         // obtain access lock using lock chaining
1208
1209         if( page_no > ROOT_page )
1210                 bt_lockpage(BtLockAccess, bt->latch);
1211
1212         if( prevpage ) {
1213                 bt_unlockpage(prevmode, prevlatch);
1214                 bt_unpinlatch(prevlatch);
1215                 prevpage = 0;
1216         }
1217
1218         // obtain read lock using lock chaining
1219
1220         bt_lockpage(mode, bt->latch);
1221
1222         if( page_no > ROOT_page )
1223                 bt_unlockpage(BtLockAccess, bt->latch);
1224
1225         //      map/obtain page contents
1226
1227         bt->page = bt_mappage (bt, bt->latch);
1228
1229         // re-read and re-lock root after determining actual level of root
1230
1231         if( bt->page->lvl != drill) {
1232                 if( bt->page_no != ROOT_page )
1233                         return bt->err = BTERR_struct, 0;
1234                         
1235                 drill = bt->page->lvl;
1236
1237                 if( lock != BtLockRead && drill == lvl ) {
1238                         bt_unlockpage(mode, bt->latch);
1239                         bt_unpinlatch(bt->latch);
1240                         continue;
1241                 }
1242         }
1243
1244         prevpage = bt->page_no;
1245         prevlatch = bt->latch;
1246         prevmode = mode;
1247
1248         //  find key on page at this level
1249         //  and descend to requested level
1250
1251         if( !bt->page->kill )
1252          if( slot = bt_findslot (bt, key, len) ) {
1253           if( drill == lvl )
1254                 return slot;
1255
1256           while( slotptr(bt->page, slot)->dead )
1257                 if( slot++ < bt->page->cnt )
1258                         continue;
1259                 else
1260                         goto slideright;
1261
1262           page_no = bt_getid(slotptr(bt->page, slot)->id);
1263           drill--;
1264           continue;
1265          }
1266
1267         //  or slide right into next page
1268
1269 slideright:
1270         page_no = bt_getid(bt->page->right);
1271
1272   } while( page_no );
1273
1274   // return error on end of right chain
1275
1276   bt->err = BTERR_eof;
1277   return 0;     // return error
1278 }
1279
1280 //      a fence key was deleted from a page
1281 //      push new fence value upwards
1282
1283 BTERR bt_fixfence (BtDb *bt, uid page_no, uint lvl)
1284 {
1285 unsigned char leftkey[256], rightkey[256];
1286 BtLatchSet *latch = bt->latch;
1287 BtKey ptr;
1288
1289         // remove deleted key, the old fence value
1290
1291         ptr = keyptr(bt->page, bt->page->cnt);
1292         memcpy(rightkey, ptr, ptr->len + 1);
1293
1294         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1295         bt->page->clean = 1;
1296
1297         ptr = keyptr(bt->page, bt->page->cnt);
1298         memcpy(leftkey, ptr, ptr->len + 1);
1299
1300         bt_update (bt, bt->page);
1301         bt_lockpage (BtLockParent, latch);
1302         bt_unlockpage (BtLockWrite, latch);
1303
1304         //  insert new (now smaller) fence key
1305
1306         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl + 1, page_no, time(NULL)) )
1307                 return bt->err;
1308
1309         //  remove old (larger) fence key
1310
1311         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl + 1) )
1312                 return bt->err;
1313
1314         bt_unlockpage (BtLockParent, latch);
1315         bt_unpinlatch (latch);
1316         return 0;
1317 }
1318
1319 //      root has a single child
1320 //      collapse a level from the btree
1321 //      call with root locked in bt->page
1322
1323 BTERR bt_collapseroot (BtDb *bt, BtPage root)
1324 {
1325 BtLatchSet *latch;
1326 BtPage temp;
1327 uid child;
1328 uint idx;
1329
1330         // find the child entry
1331         //      and promote to new root
1332
1333   do {
1334         for( idx = 0; idx++ < root->cnt; )
1335           if( !slotptr(root, idx)->dead )
1336                 break;
1337
1338         child = bt_getid (slotptr(root, idx)->id);
1339         if( latch = bt_pinlatch (bt, child) )
1340                 temp = bt_mappage (bt, latch);
1341         else
1342                 return bt->err;
1343
1344         bt_lockpage (BtLockDelete, latch);
1345         bt_lockpage (BtLockWrite, latch);
1346         memcpy (root, temp, bt->page_size);
1347
1348         bt_update (bt, root);
1349
1350         if( bt_freepage (bt, child, latch) )
1351                 return bt->err;
1352
1353   } while( root->lvl > 1 && root->act == 1 );
1354
1355   bt_unlockpage (BtLockWrite, bt->latch);
1356   bt_unpinlatch (bt->latch);
1357   return 0;
1358 }
1359
1360 //  find and delete key on page by marking delete flag bit
1361 //  when page becomes empty, delete it
1362
1363 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1364 {
1365 unsigned char lowerkey[256], higherkey[256];
1366 uint slot, dirty = 0, idx, fence, found;
1367 BtLatchSet *latch, *rlatch;
1368 uid page_no, right;
1369 BtPage temp;
1370 BtKey ptr;
1371
1372         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1373                 ptr = keyptr(bt->page, slot);
1374         else
1375                 return bt->err;
1376
1377         // are we deleting a fence slot?
1378
1379         fence = slot == bt->page->cnt;
1380
1381         // if key is found delete it, otherwise ignore request
1382
1383         if( found = !keycmp (ptr, key, len) )
1384           if( found = slotptr(bt->page, slot)->dead == 0 ) {
1385                 dirty = slotptr(bt->page,slot)->dead = 1;
1386                 bt->page->clean = 1;
1387                 bt->page->act--;
1388
1389                 // collapse empty slots
1390
1391                 while( idx = bt->page->cnt - 1 )
1392                   if( slotptr(bt->page, idx)->dead ) {
1393                         *slotptr(bt->page, idx) = *slotptr(bt->page, idx + 1);
1394                         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1395                   } else
1396                         break;
1397           }
1398
1399         right = bt_getid(bt->page->right);
1400         page_no = bt->page_no;
1401         latch = bt->latch;
1402
1403         if( !dirty ) {
1404           if( lvl )
1405                 return bt_abort (bt, bt->page, page_no, BTERR_notfound);
1406           bt_unlockpage(BtLockWrite, latch);
1407           bt_unpinlatch (latch);
1408           return bt->found = found, 0;
1409         }
1410
1411         //      did we delete a fence key in an upper level?
1412
1413         if( lvl && bt->page->act && fence )
1414           if( bt_fixfence (bt, page_no, lvl) )
1415                 return bt->err;
1416           else
1417                 return bt->found = found, 0;
1418
1419         //      is this a collapsed root?
1420
1421         if( lvl > 1 && page_no == ROOT_page && bt->page->act == 1 )
1422           if( bt_collapseroot (bt, bt->page) )
1423                 return bt->err;
1424           else
1425                 return bt->found = found, 0;
1426
1427         // return if page is not empty
1428
1429         if( bt->page->act ) {
1430           bt_update(bt, bt->page);
1431           bt_unlockpage(BtLockWrite, latch);
1432           bt_unpinlatch (latch);
1433           return bt->found = found, 0;
1434         }
1435
1436         // cache copy of fence key
1437         //      in order to find parent
1438
1439         ptr = keyptr(bt->page, bt->page->cnt);
1440         memcpy(lowerkey, ptr, ptr->len + 1);
1441
1442         // obtain lock on right page
1443
1444         if( rlatch = bt_pinlatch (bt, right) )
1445                 temp = bt_mappage (bt, rlatch);
1446         else
1447                 return bt->err;
1448
1449         bt_lockpage(BtLockWrite, rlatch);
1450
1451         if( temp->kill ) {
1452                 bt_abort(bt, temp, right, 0);
1453                 return bt_abort(bt, bt->page, bt->page_no, BTERR_kill);
1454         }
1455
1456         // pull contents of next page into current empty page 
1457
1458         memcpy (bt->page, temp, bt->page_size);
1459
1460         //      cache copy of key to update
1461
1462         ptr = keyptr(temp, temp->cnt);
1463         memcpy(higherkey, ptr, ptr->len + 1);
1464
1465         //  Mark right page as deleted and point it to left page
1466         //      until we can post updates at higher level.
1467
1468         bt_putid(temp->right, page_no);
1469         temp->kill = 1;
1470
1471         bt_update(bt, bt->page);
1472         bt_update(bt, temp);
1473
1474         bt_lockpage(BtLockParent, latch);
1475         bt_unlockpage(BtLockWrite, latch);
1476
1477         bt_lockpage(BtLockParent, rlatch);
1478         bt_unlockpage(BtLockWrite, rlatch);
1479
1480         //  redirect higher key directly to consolidated node
1481
1482         if( bt_insertkey (bt, higherkey+1, *higherkey, lvl+1, page_no, time(NULL)) )
1483                 return bt->err;
1484
1485         //  delete old lower key to consolidated node
1486
1487         if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) )
1488                 return bt->err;
1489
1490         //  obtain write & delete lock on deleted node
1491         //      add right block to free chain
1492
1493         bt_lockpage(BtLockDelete, rlatch);
1494         bt_lockpage(BtLockWrite, rlatch);
1495         bt_unlockpage(BtLockParent, rlatch);
1496
1497         if( bt_freepage (bt, right, rlatch) )
1498                 return bt->err;
1499
1500         bt_unlockpage(BtLockParent, latch);
1501         bt_unpinlatch(latch);
1502         return 0;
1503 }
1504
1505 //      find key in leaf level and return row-id
1506
1507 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1508 {
1509 uint  slot;
1510 BtKey ptr;
1511 uid id;
1512
1513         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1514                 ptr = keyptr(bt->page, slot);
1515         else
1516                 return 0;
1517
1518         // if key exists, return row-id
1519         //      otherwise return 0
1520
1521         if( ptr->len == len && !memcmp (ptr->key, key, len) )
1522                 id = bt_getid(slotptr(bt->page,slot)->id);
1523         else
1524                 id = 0;
1525
1526         bt_unlockpage (BtLockRead, bt->latch);
1527         bt_unpinlatch (bt->latch);
1528         return id;
1529 }
1530
1531 //      check page for space available,
1532 //      clean if necessary and return
1533 //      0 - page needs splitting
1534 //      >0 - go ahead with new slot
1535  
1536 uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
1537 {
1538 uint nxt = bt->page_size;
1539 BtPage page = bt->page;
1540 uint cnt = 0, idx = 0;
1541 uint max = page->cnt;
1542 uint newslot = slot;
1543 BtKey key;
1544 int ret;
1545
1546         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1547                 return slot;
1548
1549         //      skip cleanup if nothing to reclaim
1550
1551         if( !page->clean )
1552                 return 0;
1553
1554         memcpy (bt->frame, page, bt->page_size);
1555
1556         // skip page info and set rest of page to zero
1557
1558         memset (page+1, 0, bt->page_size - sizeof(*page));
1559         page->act = 0;
1560
1561         while( cnt++ < max ) {
1562                 if( cnt == slot )
1563                         newslot = idx + 1;
1564                 // always leave fence key in list
1565                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1566                         continue;
1567
1568                 // copy key
1569                 key = keyptr(bt->frame, cnt);
1570                 nxt -= key->len + 1;
1571                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1572
1573                 // copy slot
1574                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1575                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1576                         page->act++;
1577 #ifdef USETOD
1578                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1579 #endif
1580                 slotptr(page, idx)->off = nxt;
1581         }
1582
1583         page->min = nxt;
1584         page->cnt = idx;
1585
1586         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1587                 return newslot;
1588
1589         return 0;
1590 }
1591
1592 // split the root and raise the height of the btree
1593
1594 BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
1595 {
1596 uint nxt = bt->page_size;
1597 BtPage root = bt->page;
1598 uid right;
1599
1600         //  Obtain an empty page to use, and copy the current
1601         //  root contents into it
1602
1603         if( !(right = bt_newpage(bt, root)) )
1604                 return bt->err;
1605
1606         // preserve the page info at the bottom
1607         // and set rest to zero
1608
1609         memset(root+1, 0, bt->page_size - sizeof(*root));
1610
1611         // insert first key on newroot page
1612
1613         nxt -= *leftkey + 1;
1614         memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
1615         bt_putid(slotptr(root, 1)->id, right);
1616         slotptr(root, 1)->off = nxt;
1617         
1618         // insert second key on newroot page
1619         // and increase the root height
1620
1621         nxt -= 3;
1622         ((unsigned char *)root)[nxt] = 2;
1623         ((unsigned char *)root)[nxt+1] = 0xff;
1624         ((unsigned char *)root)[nxt+2] = 0xff;
1625         bt_putid(slotptr(root, 2)->id, page_no2);
1626         slotptr(root, 2)->off = nxt;
1627
1628         bt_putid(root->right, 0);
1629         root->min = nxt;                // reset lowest used offset and key count
1630         root->cnt = 2;
1631         root->act = 2;
1632         root->lvl++;
1633
1634         // update and release root (bt->page)
1635
1636         bt_update(bt, root);
1637
1638         bt_unlockpage(BtLockWrite, bt->latch);
1639         bt_unpinlatch(bt->latch);
1640         return 0;
1641 }
1642
1643 //  split already locked full node
1644 //      return unlocked.
1645
1646 BTERR bt_splitpage (BtDb *bt)
1647 {
1648 uint cnt = 0, idx = 0, max, nxt = bt->page_size;
1649 unsigned char fencekey[256], rightkey[256];
1650 uid page_no = bt->page_no, right;
1651 BtLatchSet *latch, *rlatch;
1652 BtPage page = bt->page;
1653 uint lvl = page->lvl;
1654 BtKey key;
1655
1656         latch = bt->latch;
1657
1658         //  split higher half of keys to bt->frame
1659         //      the last key (fence key) might be dead
1660
1661         memset (bt->frame, 0, bt->page_size);
1662         max = page->cnt;
1663         cnt = max / 2;
1664         idx = 0;
1665
1666         while( cnt++ < max ) {
1667                 key = keyptr(page, cnt);
1668                 nxt -= key->len + 1;
1669                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1670                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1671                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
1672                         bt->frame->act++;
1673 #ifdef USETOD
1674                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1675 #endif
1676                 slotptr(bt->frame, idx)->off = nxt;
1677         }
1678
1679         //      remember fence key for new right page
1680
1681         memcpy (rightkey, key, key->len + 1);
1682
1683         bt->frame->bits = bt->page_bits;
1684         bt->frame->min = nxt;
1685         bt->frame->cnt = idx;
1686         bt->frame->lvl = lvl;
1687
1688         // link right node
1689
1690         if( page_no > ROOT_page )
1691                 memcpy (bt->frame->right, page->right, BtId);
1692
1693         //      get new free page and write frame to it.
1694
1695         if( !(right = bt_newpage(bt, bt->frame)) )
1696                 return bt->err;
1697
1698         //      update lower keys to continue in old page
1699
1700         memcpy (bt->frame, page, bt->page_size);
1701         memset (page+1, 0, bt->page_size - sizeof(*page));
1702         nxt = bt->page_size;
1703         page->clean = 0;
1704         page->act = 0;
1705         cnt = 0;
1706         idx = 0;
1707
1708         //  assemble page of smaller keys
1709         //      (they're all active keys)
1710
1711         while( cnt++ < max / 2 ) {
1712                 key = keyptr(bt->frame, cnt);
1713                 nxt -= key->len + 1;
1714                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1715                 memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1716 #ifdef USETOD
1717                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1718 #endif
1719                 slotptr(page, idx)->off = nxt;
1720                 page->act++;
1721         }
1722
1723         // remember fence key for smaller page
1724
1725         memcpy (fencekey, key, key->len + 1);
1726
1727         bt_putid(page->right, right);
1728         page->min = nxt;
1729         page->cnt = idx;
1730
1731         // if current page is the root page, split it
1732
1733         if( page_no == ROOT_page )
1734                 return bt_splitroot (bt, fencekey, right);
1735
1736         //      lock right page
1737
1738         if( rlatch = bt_pinlatch (bt, right) )
1739                 bt_lockpage (BtLockParent, rlatch);
1740         else
1741                 return bt->err;
1742
1743         // update left (containing) node
1744
1745         bt_update(bt, page);
1746
1747         bt_lockpage (BtLockParent, latch);
1748         bt_unlockpage (BtLockWrite, latch);
1749
1750         // insert new fence for reformulated left block
1751
1752         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, page_no, time(NULL)) )
1753                 return bt->err;
1754
1755         //      switch fence for right block of larger keys to new right page
1756
1757         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, right, time(NULL)) )
1758                 return bt->err;
1759
1760         bt_unlockpage (BtLockParent, latch);
1761         bt_unlockpage (BtLockParent, rlatch);
1762
1763         bt_unpinlatch (rlatch);
1764         bt_unpinlatch (latch);
1765         return 0;
1766 }
1767
1768 //  Insert new key into the btree at requested level.
1769 //  Pages are unlocked at exit.
1770
1771 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
1772 {
1773 uint slot, idx;
1774 BtPage page;
1775 BtKey ptr;
1776
1777   while( 1 ) {
1778         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1779                 ptr = keyptr(bt->page, slot);
1780         else
1781         {
1782                 if( !bt->err )
1783                         bt->err = BTERR_ovflw;
1784                 return bt->err;
1785         }
1786
1787         // if key already exists, update id and return
1788
1789         page = bt->page;
1790
1791         if( !keycmp (ptr, key, len) ) {
1792           if( slotptr(page, slot)->dead )
1793                 page->act++;
1794           slotptr(page, slot)->dead = 0;
1795 #ifdef USETOD
1796           slotptr(page, slot)->tod = tod;
1797 #endif
1798           bt_putid(slotptr(page,slot)->id, id);
1799           bt_update(bt, bt->page);
1800           bt_unlockpage(BtLockWrite, bt->latch);
1801           bt_unpinlatch (bt->latch);
1802           return 0;
1803         }
1804
1805         // check if page has enough space
1806
1807         if( slot = bt_cleanpage (bt, len, slot) )
1808                 break;
1809
1810         if( bt_splitpage (bt) )
1811                 return bt->err;
1812   }
1813
1814   // calculate next available slot and copy key into page
1815
1816   page->min -= len + 1; // reset lowest used offset
1817   ((unsigned char *)page)[page->min] = len;
1818   memcpy ((unsigned char *)page + page->min +1, key, len );
1819
1820   for( idx = slot; idx < page->cnt; idx++ )
1821         if( slotptr(page, idx)->dead )
1822                 break;
1823
1824   // now insert key into array before slot
1825   // preserving the fence slot
1826
1827   if( idx == page->cnt )
1828         idx++, page->cnt++;
1829
1830   page->act++;
1831
1832   while( idx > slot )
1833         *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
1834
1835   bt_putid(slotptr(page,slot)->id, id);
1836   slotptr(page, slot)->off = page->min;
1837 #ifdef USETOD
1838   slotptr(page, slot)->tod = tod;
1839 #endif
1840   slotptr(page, slot)->dead = 0;
1841
1842   bt_update(bt, bt->page);
1843
1844   bt_unlockpage(BtLockWrite, bt->latch);
1845   bt_unpinlatch(bt->latch);
1846   return 0;
1847 }
1848
1849 //  cache page of keys into cursor and return starting slot for given key
1850
1851 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
1852 {
1853 uint slot;
1854
1855         // cache page for retrieval
1856
1857         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1858           memcpy (bt->cursor, bt->page, bt->page_size);
1859         else
1860           return 0;
1861
1862         bt_unlockpage(BtLockRead, bt->latch);
1863         bt->cursor_page = bt->page_no;
1864         bt_unpinlatch (bt->latch);
1865         return slot;
1866 }
1867
1868 //  return next slot for cursor page
1869 //  or slide cursor right into next page
1870
1871 uint bt_nextkey (BtDb *bt, uint slot)
1872 {
1873 BtLatchSet *latch;
1874 off64_t right;
1875
1876   do {
1877         right = bt_getid(bt->cursor->right);
1878
1879         while( slot++ < bt->cursor->cnt )
1880           if( slotptr(bt->cursor,slot)->dead )
1881                 continue;
1882           else if( right || (slot < bt->cursor->cnt))
1883                 return slot;
1884           else
1885                 break;
1886
1887         if( !right )
1888                 break;
1889
1890         bt->cursor_page = right;
1891
1892         if( latch = bt_pinlatch (bt, right) )
1893         bt_lockpage(BtLockRead, latch);
1894         else
1895                 return 0;
1896
1897         bt->page = bt_mappage (bt, latch);
1898         memcpy (bt->cursor, bt->page, bt->page_size);
1899         bt_unlockpage(BtLockRead, latch);
1900         bt_unpinlatch (latch);
1901         slot = 0;
1902   } while( 1 );
1903
1904   return bt->err = 0;
1905 }
1906
1907 BtKey bt_key(BtDb *bt, uint slot)
1908 {
1909         return keyptr(bt->cursor, slot);
1910 }
1911
1912 uid bt_uid(BtDb *bt, uint slot)
1913 {
1914         return bt_getid(slotptr(bt->cursor,slot)->id);
1915 }
1916
1917 #ifdef USETOD
1918 uint bt_tod(BtDb *bt, uint slot)
1919 {
1920         return slotptr(bt->cursor,slot)->tod;
1921 }
1922 #endif
1923
1924 #ifdef STANDALONE
1925
1926 uint bt_audit (BtDb *bt)
1927 {
1928 uint idx, hashidx;
1929 uid next, page_no;
1930 BtLatchSet *latch;
1931 uint blks[64];
1932 uint cnt = 0;
1933 BtPage page;
1934 uint amt[1];
1935 BtKey ptr;
1936
1937 #ifdef unix
1938         posix_fadvise( bt->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
1939 #endif
1940         if( *(ushort *)(bt->latchmgr->lock) )
1941                 fprintf(stderr, "Alloc page locked\n");
1942         *(ushort *)(bt->latchmgr->lock) = 0;
1943
1944         memset (blks, 0, sizeof(blks));
1945
1946         for( idx = 1; idx <= bt->latchmgr->latchdeployed; idx++ ) {
1947                 latch = bt->latchsets + idx;
1948                 if( *(ushort *)latch->readwr )
1949                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
1950                 *(ushort *)latch->readwr = 0;
1951
1952                 if( *(ushort *)latch->access )
1953                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
1954                 *(ushort *)latch->access = 0;
1955
1956                 if( *(ushort *)latch->parent )
1957                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
1958                 *(ushort *)latch->parent = 0;
1959
1960                 if( latch->pin & PIN_mask ) {
1961                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
1962                         latch->pin = 0;
1963                 }
1964                 page = (BtPage)((uid)idx * bt->page_size + bt->pagepool);
1965                 blks[page->lvl]++;
1966
1967             if( page->dirty )
1968                  if( bt_writepage (bt, page, latch->page_no) )
1969                         fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
1970         }
1971
1972         for( idx = 0; blks[idx]; idx++ )
1973                 fprintf(stderr, "cache: %d lvl %d blocks\n", blks[idx], idx);
1974
1975         for( hashidx = 0; hashidx < bt->latchmgr->latchhash; hashidx++ ) {
1976           if( *(ushort *)(bt->table[hashidx].latch) )
1977                         fprintf(stderr, "hash entry %d locked\n", hashidx);
1978
1979           *(ushort *)(bt->table[hashidx].latch) = 0;
1980         }
1981
1982         memset (blks, 0, sizeof(blks));
1983
1984         next = bt->latchmgr->nlatchpage + LATCH_page;
1985         page_no = LEAF_page;
1986
1987         while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
1988                 if( bt_readpage (bt, bt->frame, page_no) )
1989                   fprintf(stderr, "page %.8x unreadable\n", page_no);
1990                 if( !bt->frame->free ) {
1991                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
1992                   ptr = keyptr(bt->frame, idx+1);
1993                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
1994                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
1995                  }
1996                  if( !bt->frame->lvl )
1997                         cnt += bt->frame->act;
1998                  blks[bt->frame->lvl]++;
1999                 }
2000
2001                 if( page_no > LEAF_page )
2002                         next = page_no + 1;
2003                 page_no = next;
2004         }
2005
2006         for( idx = 0; blks[idx]; idx++ )
2007                 fprintf(stderr, "btree: %d lvl %d blocks\n", blks[idx], idx);
2008
2009         return cnt - 1;
2010 }
2011
2012 #ifndef unix
2013 double getCpuTime(int type)
2014 {
2015 FILETIME crtime[1];
2016 FILETIME xittime[1];
2017 FILETIME systime[1];
2018 FILETIME usrtime[1];
2019 SYSTEMTIME timeconv[1];
2020 double ans = 0;
2021
2022         memset (timeconv, 0, sizeof(SYSTEMTIME));
2023
2024         switch( type ) {
2025         case 0:
2026                 GetSystemTimeAsFileTime (xittime);
2027                 FileTimeToSystemTime (xittime, timeconv);
2028                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2029                 break;
2030         case 1:
2031                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2032                 FileTimeToSystemTime (usrtime, timeconv);
2033                 break;
2034         case 2:
2035                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2036                 FileTimeToSystemTime (systime, timeconv);
2037                 break;
2038         }
2039
2040         ans += (double)timeconv->wHour * 3600;
2041         ans += (double)timeconv->wMinute * 60;
2042         ans += (double)timeconv->wSecond;
2043         ans += (double)timeconv->wMilliseconds / 1000;
2044         return ans;
2045 }
2046 #else
2047 #include <time.h>
2048 #include <sys/resource.h>
2049
2050 double getCpuTime(int type)
2051 {
2052 struct rusage used[1];
2053 struct timeval tv[1];
2054
2055         switch( type ) {
2056         case 0:
2057                 gettimeofday(tv, NULL);
2058                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2059
2060         case 1:
2061                 getrusage(RUSAGE_SELF, used);
2062                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2063
2064         case 2:
2065                 getrusage(RUSAGE_SELF, used);
2066                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2067         }
2068
2069         return 0;
2070 }
2071 #endif
2072
2073 //  standalone program to index file of keys
2074 //  then list them onto std-out
2075
2076 int main (int argc, char **argv)
2077 {
2078 uint slot, line = 0, off = 0, found = 0;
2079 int ch, cnt = 0, bits = 12, idx;
2080 unsigned char key[256];
2081 double done, start;
2082 uid next, page_no;
2083 BtLatchSet *latch;
2084 float elapsed;
2085 time_t tod[1];
2086 uint scan = 0;
2087 uint len = 0;
2088 uint map = 0;
2089 BtPage page;
2090 BtKey ptr;
2091 BtDb *bt;
2092 FILE *in;
2093
2094 #ifdef WIN32
2095         _setmode (1, _O_BINARY);
2096 #endif
2097         if( argc < 4 ) {
2098                 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find/Count [page_bits mapped_pool_pages start_line_number]\n", argv[0]);
2099                 fprintf (stderr, "  page_bits: size of btree page in bits\n");
2100                 fprintf (stderr, "  mapped_pool_pages: number of pages in buffer pool\n");
2101                 exit(0);
2102         }
2103
2104         start = getCpuTime(0);
2105         time(tod);
2106
2107         if( argc > 4 )
2108                 bits = atoi(argv[4]);
2109
2110         if( argc > 5 )
2111                 map = atoi(argv[5]);
2112
2113         if( argc > 6 )
2114                 off = atoi(argv[6]);
2115
2116         bt = bt_open ((argv[1]), BT_rw, bits, map);
2117
2118         if( !bt ) {
2119                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2120                 exit (1);
2121         }
2122
2123         switch(argv[3][0]| 0x20)
2124         {
2125         case 'p':       // display page
2126                 if( latch = bt_pinlatch (bt, off) )
2127                         page = bt_mappage (bt, latch);
2128                 else
2129                         fprintf(stderr, "unable to read page %.8x\n", off);
2130
2131                 write (1, page, bt->page_size);
2132                 break;
2133
2134         case 'a':       // buffer pool audit
2135                 fprintf(stderr, "started audit for %s\n", argv[1]);
2136                 cnt = bt_audit (bt);
2137                 fprintf(stderr, "finished audit for %s, %d keys\n", argv[1], cnt);
2138                 break;
2139
2140         case 'w':       // write keys
2141                 fprintf(stderr, "started indexing for %s\n", argv[2]);
2142                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2143 #ifdef unix
2144                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2145 #endif
2146                   while( ch = getc(in), ch != EOF )
2147                         if( ch == '\n' )
2148                         {
2149                           if( off )
2150                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2151
2152                           if( bt_insertkey (bt, key, len, 0, ++line, *tod) )
2153                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2154                           len = 0;
2155                         }
2156                         else if( len < 245 )
2157                                 key[len++] = ch;
2158                   }
2159                 fprintf(stderr, "finished adding keys for %s, %d \n", argv[2], line);
2160                 break;
2161
2162         case 'd':       // delete keys
2163                 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2164                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2165 #ifdef unix
2166                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2167 #endif
2168                   while( ch = getc(in), ch != EOF )
2169                         if( ch == '\n' )
2170                         {
2171                           if( off )
2172                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2173                           line++;
2174                           if( bt_deletekey (bt, key, len, 0) )
2175                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2176                           len = 0;
2177                         }
2178                         else if( len < 245 )
2179                                 key[len++] = ch;
2180                   }
2181                 fprintf(stderr, "finished deleting keys for %s, %d \n", argv[2], line);
2182                 break;
2183
2184         case 'f':       // find keys
2185                 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2186                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2187 #ifdef unix
2188                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2189 #endif
2190                   while( ch = getc(in), ch != EOF )
2191                         if( ch == '\n' )
2192                         {
2193                           if( off )
2194                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2195                           line++;
2196                           if( bt_findkey (bt, key, len) )
2197                                 found++;
2198                           else if( bt->err )
2199                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2200                           len = 0;
2201                         }
2202                         else if( len < 245 )
2203                                 key[len++] = ch;
2204                   }
2205                 fprintf(stderr, "finished search of %d keys for %s, found %d\n", line, argv[2], found);
2206                 break;
2207
2208         case 's':       // scan and print keys
2209                 fprintf(stderr, "started scaning\n");
2210                 cnt = len = key[0] = 0;
2211
2212                 if( slot = bt_startkey (bt, key, len) )
2213                   slot--;
2214                 else
2215                   fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2216
2217                 while( slot = bt_nextkey (bt, slot) ) {
2218                         ptr = bt_key(bt, slot);
2219                         fwrite (ptr->key, ptr->len, 1, stdout);
2220                         fputc ('\n', stdout);
2221                         cnt++;
2222                 }
2223
2224                 fprintf(stderr, " Total keys read %d\n", cnt - 1);
2225                 break;
2226
2227         case 'c':       // count keys
2228           fprintf(stderr, "started counting\n");
2229           cnt = 0;
2230
2231           next = bt->latchmgr->nlatchpage + LATCH_page;
2232           page_no = LEAF_page;
2233
2234           while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
2235                 if( latch = bt_pinlatch (bt, page_no) )
2236                         page = bt_mappage (bt, latch);
2237                 if( !page->free && !page->lvl )
2238                         cnt += page->act;
2239                 if( page_no > LEAF_page )
2240                         next = page_no + 1;
2241                 if( scan )
2242                  for( idx = 0; idx++ < page->cnt; ) {
2243                   if( slotptr(page, idx)->dead )
2244                         continue;
2245                   ptr = keyptr(page, idx);
2246                   if( idx != page->cnt && bt_getid (page->right) ) {
2247                         fwrite (ptr->key, ptr->len, 1, stdout);
2248                         fputc ('\n', stdout);
2249                   }
2250                  }
2251                 bt_unpinlatch (latch);
2252                 page_no = next;
2253           }
2254                 
2255           cnt--;        // remove stopper key
2256           fprintf(stderr, " Total keys read %d\n", cnt);
2257           break;
2258         }
2259
2260         done = getCpuTime(0);
2261         elapsed = (float)(done - start);
2262         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2263         elapsed = getCpuTime(1);
2264         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2265         elapsed = getCpuTime(2);
2266         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2267         return 0;
2268 }
2269
2270 #endif  //STANDALONE