]> pd.if.org Git - btree/blob - btree2u.c
Implement CLOCK buffer pool page replacement method
[btree] / btree2u.c
1 // btree version 2u
2 //      with combined latch & pool manager
3 // 26 FEB 2014
4
5 // author: karl malbrain, malbrain@cal.berkeley.edu
6
7 /*
8 This work, including the source code, documentation
9 and related data, is placed into the public domain.
10
11 The orginal author is Karl Malbrain.
12
13 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
14 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
15 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
16 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
17 RESULTING FROM THE USE, MODIFICATION, OR
18 REDISTRIBUTION OF THIS SOFTWARE.
19 */
20
21 // Please see the project home page for documentation
22 // code.google.com/p/high-concurrency-btree
23
24 #define _FILE_OFFSET_BITS 64
25 #define _LARGEFILE64_SOURCE
26
27 #ifdef linux
28 #define _GNU_SOURCE
29 #endif
30
31 #ifdef unix
32 #include <unistd.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <time.h>
36 #include <fcntl.h>
37 #include <sys/mman.h>
38 #include <errno.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #include <io.h>
47 #endif
48
49 #include <memory.h>
50 #include <string.h>
51
52 typedef unsigned long long      uid;
53
54 #ifndef unix
55 typedef unsigned long long      off64_t;
56 typedef unsigned short          ushort;
57 typedef unsigned int            uint;
58 #endif
59
60 #define BT_ro 0x6f72    // ro
61 #define BT_rw 0x7772    // rw
62 #define BT_fl 0x6c66    // fl
63
64 #define BT_maxbits              15                                      // maximum page size in bits
65 #define BT_minbits              12                                      // minimum page size in bits
66 #define BT_minpage              (1 << BT_minbits)       // minimum page size
67 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
68
69 /*
70 There are five lock types for each node in three independent sets: 
71 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
72 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
73 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
74 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
75 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
76 */
77
78 typedef enum{
79         BtLockAccess,
80         BtLockDelete,
81         BtLockRead,
82         BtLockWrite,
83         BtLockParent
84 }BtLock;
85
86 //      definition for latch implementation
87
88 // exclusive is set for write access
89 // share is count of read accessors
90 // grant write lock when share == 0
91
92 volatile typedef struct {
93         unsigned char mutex[1];
94         unsigned char exclusive:1;
95         unsigned char pending:1;
96         ushort share;
97 } BtSpinLatch;
98
99 //      Define the length of the page and key pointers
100
101 #define BtId 6
102
103 //      Page key slot definition.
104
105 //      If BT_maxbits is 15 or less, you can save 2 bytes
106 //      for each key stored by making the first two uints
107 //      into ushorts.  You can also save 4 bytes by removing
108 //      the tod field from the key.
109
110 //      Keys are marked dead, but remain on the page until
111 //      cleanup is called. The fence key (highest key) for
112 //      the page is always present, even if dead.
113
114 typedef struct {
115 #ifdef USETOD
116         uint tod;                                       // time-stamp for key
117 #endif
118         ushort off:BT_maxbits;          // page offset for key start
119         ushort dead:1;                          // set for deleted key
120         unsigned char id[BtId];         // id associated with key
121 } BtSlot;
122
123 //      The key structure occupies space at the upper end of
124 //      each page.  It's a length byte followed by the value
125 //      bytes.
126
127 typedef struct {
128         unsigned char len;
129         unsigned char key[0];
130 } *BtKey;
131
132 //      The first part of an index page.
133 //      It is immediately followed
134 //      by the BtSlot array of keys.
135
136 typedef struct BtPage_ {
137         uint cnt;                                       // count of keys in page
138         uint act;                                       // count of active keys
139         uint min;                                       // next key offset
140         unsigned char bits:6;           // page size in bits
141         unsigned char free:1;           // page is on free list
142         unsigned char dirty:1;          // page is dirty in cache
143         unsigned char lvl:6;            // level of page
144         unsigned char kill:1;           // page is being deleted
145         unsigned char clean:1;          // page needs cleaning
146         unsigned char right[BtId];      // page number to right
147 } *BtPage;
148
149 typedef struct {
150         struct BtPage_ alloc[2];        // next & free page_nos in right ptr
151         BtSpinLatch lock[1];            // allocation area lite latch
152         uint latchdeployed;                     // highest number of latch entries deployed
153         uint nlatchpage;                        // number of latch pages at BT_latch
154         uint latchtotal;                        // number of page latch entries
155         uint latchhash;                         // number of latch hash table slots
156         uint latchvictim;                       // next latch hash entry to examine
157 } BtLatchMgr;
158
159 //  latch hash table entries
160
161 typedef struct {
162         volatile uint slot;             // Latch table entry at head of collision chain
163         BtSpinLatch latch[1];   // lock for the collision chain
164 } BtHashEntry;
165
166 //      latch manager table structure
167
168 typedef struct {
169         volatile uid page_no;           // latch set page number on disk
170         BtSpinLatch readwr[1];          // read/write page lock
171         BtSpinLatch access[1];          // Access Intent/Page delete
172         BtSpinLatch parent[1];          // Posting of fence key in parent
173         volatile ushort clock;          // accessed since last clock pass
174         volatile uint next;                     // next entry in hash table chain
175         volatile uint prev;                     // prev entry in hash table chain
176         volatile uint pin;                      // number of outstanding pins
177 } BtLatchSet;
178
179 //      The object structure for Btree access
180
181 typedef struct _BtDb {
182         uint page_size;         // each page size       
183         uint page_bits;         // each page size in bits       
184         uid page_no;            // current page number  
185         uid cursor_page;        // current cursor page number   
186         int  err;
187         uint mode;                      // read-write mode
188         BtPage cursor;          // cached frame for start/next (never mapped)
189         BtPage frame;           // spare frame for the page split (never mapped)
190         BtPage page;            // current mapped page in buffer pool
191         BtLatchSet *latch;                      // current page latch
192         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
193         BtLatchSet *latchsets;          // mapped latch set from latch pages
194         unsigned char *pagepool;        // cached page pool set
195         BtHashEntry *table;     // the hash table
196 #ifdef unix
197         int idx;
198 #else
199         HANDLE idx;
200         HANDLE halloc;          // allocation and latch table handle
201 #endif
202         unsigned char *mem;     // frame, cursor, memory buffers
203         uint found;                     // last deletekey found key
204 } BtDb;
205
206 typedef enum {
207 BTERR_ok = 0,
208 BTERR_notfound,
209 BTERR_struct,
210 BTERR_ovflw,
211 BTERR_read,
212 BTERR_lock,
213 BTERR_hash,
214 BTERR_kill,
215 BTERR_map,
216 BTERR_wrt,
217 BTERR_eof
218 } BTERR;
219
220 // B-Tree functions
221 extern void bt_close (BtDb *bt);
222 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk);
223 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
224 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
225 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
226 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
227 extern uint bt_nextkey   (BtDb *bt, uint slot);
228
229 //      internal functions
230 void bt_update (BtDb *bt, BtPage page);
231 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch);
232 //  Helper functions to return slot values
233
234 extern BtKey bt_key (BtDb *bt, uint slot);
235 extern uid bt_uid (BtDb *bt, uint slot);
236 #ifdef USETOD
237 extern uint bt_tod (BtDb *bt, uint slot);
238 #endif
239
240 //  BTree page number constants
241 #define ALLOC_page              0
242 #define ROOT_page               1
243 #define LEAF_page               2
244 #define LATCH_page              3
245
246 //      Number of levels to create in a new BTree
247
248 #define MIN_lvl                 2
249
250 //  The page is allocated from low and hi ends.
251 //  The key offsets and row-id's are allocated
252 //  from the bottom, while the text of the key
253 //  is allocated from the top.  When the two
254 //  areas meet, the page is split into two.
255
256 //  A key consists of a length byte, two bytes of
257 //  index number (0 - 65534), and up to 253 bytes
258 //  of key value.  Duplicate keys are discarded.
259 //  Associated with each key is a 48 bit row-id.
260
261 //  The b-tree root is always located at page 1.
262 //      The first leaf page of level zero is always
263 //      located on page 2.
264
265 //      The b-tree pages are linked with right
266 //      pointers to facilitate enumerators,
267 //      and provide for concurrency.
268
269 //      When to root page fills, it is split in two and
270 //      the tree height is raised by a new root at page
271 //      one with two keys.
272
273 //      Deleted keys are marked with a dead bit until
274 //      page cleanup The fence key for a node is always
275 //      present, even after deletion and cleanup.
276
277 //  Deleted leaf pages are reclaimed  on a free list.
278 //      The upper levels of the btree are fixed on creation.
279
280 //  To achieve maximum concurrency one page is locked at a time
281 //  as the tree is traversed to find leaf key in question. The right
282 //  page numbers are used in cases where the page is being split,
283 //      or consolidated.
284
285 //  Page 0 (ALLOC page) is dedicated to lock for new page extensions,
286 //      and chains empty leaf pages together for reuse.
287
288 //      Parent locks are obtained to prevent resplitting or deleting a node
289 //      before its fence is posted into its upper level.
290
291 //      A special open mode of BT_fl is provided to safely access files on
292 //      WIN32 networks. WIN32 network operations should not use memory mapping.
293 //      This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
294 //      to prevent local caching of network file contents.
295
296 //      Access macros to address slot and key values from the page.
297 //      Page slots use 1 based indexing.
298
299 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
300 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
301
302 void bt_putid(unsigned char *dest, uid id)
303 {
304 int i = BtId;
305
306         while( i-- )
307                 dest[i] = (unsigned char)id, id >>= 8;
308 }
309
310 uid bt_getid(unsigned char *src)
311 {
312 uid id = 0;
313 int i;
314
315         for( i = 0; i < BtId; i++ )
316                 id <<= 8, id |= *src++; 
317
318         return id;
319 }
320
321 BTERR bt_abort (BtDb *bt, BtPage page, uid page_no, BTERR err)
322 {
323 BtKey ptr;
324
325         fprintf(stderr, "\n Btree2 abort, error %d on page %.8x\n", err, page_no);
326         fprintf(stderr, "level=%d kill=%d free=%d cnt=%x act=%x\n", page->lvl, page->kill, page->free, page->cnt, page->act);
327         ptr = keyptr(page, page->cnt);
328         fprintf(stderr, "fence='%.*s'\n", ptr->len, ptr->key);
329         fprintf(stderr, "right=%.8x\n", bt_getid(page->right));
330         return bt->err = err;
331 }
332
333 //      Latch Manager
334
335 //      wait until write lock mode is clear
336 //      and add 1 to the share count
337
338 void bt_spinreadlock(BtSpinLatch *latch)
339 {
340 ushort prev;
341
342   do {
343         //      obtain latch mutex
344 #ifdef unix
345         if( __sync_lock_test_and_set(latch->mutex, 1) )
346                 continue;
347 #else
348         if( _InterlockedExchange8(latch->mutex, 1) )
349                 continue;
350 #endif
351         //  see if exclusive request is granted or pending
352
353         if( prev = !(latch->exclusive | latch->pending) )
354                 latch->share++;
355
356 #ifdef unix
357         *latch->mutex = 0;
358 #else
359         _InterlockedExchange8(latch->mutex, 0);
360 #endif
361
362         if( prev )
363                 return;
364
365 #ifdef  unix
366   } while( sched_yield(), 1 );
367 #else
368   } while( SwitchToThread(), 1 );
369 #endif
370 }
371
372 //      wait for other read and write latches to relinquish
373
374 void bt_spinwritelock(BtSpinLatch *latch)
375 {
376 uint prev;
377
378   do {
379 #ifdef  unix
380         if( __sync_lock_test_and_set(latch->mutex, 1) )
381                 continue;
382 #else
383         if( _InterlockedExchange8(latch->mutex, 1) )
384                 continue;
385 #endif
386         if( prev = !(latch->share | latch->exclusive) )
387                 latch->exclusive = 1, latch->pending = 0;
388         else
389                 latch->pending = 1;
390 #ifdef unix
391         *latch->mutex = 0;
392 #else
393         _InterlockedExchange8(latch->mutex, 0);
394 #endif
395         if( prev )
396                 return;
397 #ifdef  unix
398   } while( sched_yield(), 1 );
399 #else
400   } while( SwitchToThread(), 1 );
401 #endif
402 }
403
404 //      try to obtain write lock
405
406 //      return 1 if obtained,
407 //              0 otherwise
408
409 int bt_spinwritetry(BtSpinLatch *latch)
410 {
411 uint prev;
412
413 #ifdef unix
414         if( __sync_lock_test_and_set(latch->mutex, 1) )
415                 return 0;
416 #else
417         if( _InterlockedExchange8(latch->mutex, 1) )
418                 return 0;
419 #endif
420         //      take write access if all bits are clear
421
422         if( prev = !(latch->exclusive | latch->share) )
423                 latch->exclusive = 1;
424
425 #ifdef unix
426         *latch->mutex = 0;
427 #else
428         _InterlockedExchange8(latch->mutex, 0);
429 #endif
430         return prev;
431 }
432
433 //      clear write mode
434
435 void bt_spinreleasewrite(BtSpinLatch *latch)
436 {
437 #ifdef unix
438         while( __sync_lock_test_and_set(latch->mutex, 1) )
439                 sched_yield();
440 #else
441         while( _InterlockedExchange8(latch->mutex, 1) )
442                 SwitchToThread();
443 #endif
444         latch->exclusive = 0;
445 #ifdef unix
446         *latch->mutex = 0;
447 #else
448         _InterlockedExchange8(latch->mutex, 0);
449 #endif
450 }
451
452 //      decrement reader count
453
454 void bt_spinreleaseread(BtSpinLatch *latch)
455 {
456 #ifdef unix
457         while( __sync_lock_test_and_set(latch->mutex, 1) )
458                 sched_yield();
459 #else
460         while( _InterlockedExchange8(latch->mutex, 1) )
461                 SwitchToThread();
462 #endif
463         latch->share--;
464 #ifdef unix
465         *latch->mutex = 0;
466 #else
467         _InterlockedExchange8(latch->mutex, 0);
468 #endif
469 }
470
471 //      read page from permanent location in Btree file
472
473 BTERR bt_readpage (BtDb *bt, BtPage page, uid page_no)
474 {
475 off64_t off = page_no << bt->page_bits;
476
477 #ifdef unix
478         if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) < bt->page_size ) {
479                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
480                 return bt->err = BTERR_read;
481         }
482 #else
483 OVERLAPPED ovl[1];
484 uint amt[1];
485
486         memset (ovl, 0, sizeof(OVERLAPPED));
487         ovl->Offset = off;
488         ovl->OffsetHigh = off >> 32;
489
490         if( !ReadFile(bt->idx, page, bt->page_size, amt, ovl)) {
491                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
492                 return bt->err = BTERR_read;
493         }
494         if( *amt <  bt->page_size ) {
495                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
496                 return bt->err = BTERR_read;
497         }
498 #endif
499         return 0;
500 }
501
502 //      write page to permanent location in Btree file
503 //      clear the dirty bit
504
505 BTERR bt_writepage (BtDb *bt, BtPage page, uid page_no)
506 {
507 off64_t off = page_no << bt->page_bits;
508
509 #ifdef unix
510         page->dirty = 0;
511
512         if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
513                 return bt->err = BTERR_wrt;
514 #else
515 OVERLAPPED ovl[1];
516 uint amt[1];
517
518         memset (ovl, 0, sizeof(OVERLAPPED));
519         ovl->Offset = off;
520         ovl->OffsetHigh = off >> 32;
521         page->dirty = 0;
522
523         if( !WriteFile(bt->idx, page, bt->page_size, amt, ovl) )
524                 return bt->err = BTERR_wrt;
525
526         if( *amt <  bt->page_size )
527                 return bt->err = BTERR_wrt;
528 #endif
529         return 0;
530 }
531
532 //      link latch table entry into head of latch hash table
533
534 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no)
535 {
536 BtPage page = (BtPage)((uid)slot * bt->page_size + bt->pagepool);
537 BtLatchSet *latch = bt->latchsets + slot;
538
539         if( latch->next = bt->table[hashidx].slot )
540                 bt->latchsets[latch->next].prev = slot;
541
542         bt->table[hashidx].slot = slot;
543         latch->page_no = page_no;
544         latch->clock = 1;
545         latch->prev = 0;
546         latch->pin = 1;
547
548         return bt_readpage (bt, page, page_no);
549 }
550
551 //      release latch pin
552
553 void bt_unpinlatch (BtLatchSet *latch)
554 {
555 #ifdef unix
556         __sync_fetch_and_add(&latch->pin, -1);
557 #else
558         _InterlockedDecrement (&latch->pin);
559 #endif
560 }
561
562 //      find existing latchset or inspire new one
563 //      return with latchset pinned
564
565 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
566 {
567 uint hashidx = page_no % bt->latchmgr->latchhash;
568 BtLatchSet *latch;
569 uint slot, idx;
570 off64_t off;
571 uint amt[1];
572 BtPage page;
573
574   //  try to find our entry
575
576   bt_spinwritelock(bt->table[hashidx].latch);
577
578   if( slot = bt->table[hashidx].slot ) do
579   {
580         latch = bt->latchsets + slot;
581         if( page_no == latch->page_no )
582                 break;
583   } while( slot = latch->next );
584
585   //  found our entry
586
587   if( slot ) {
588         latch = bt->latchsets + slot;
589         latch->clock = 1;
590         latch->pin++;
591         bt_spinreleasewrite(bt->table[hashidx].latch);
592         return latch;
593   }
594
595         //  see if there are any unused pool entries
596 #ifdef unix
597         slot = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1;
598 #else
599         slot = _InterlockedIncrement (&bt->latchmgr->latchdeployed);
600 #endif
601
602         if( slot < bt->latchmgr->latchtotal ) {
603                 latch = bt->latchsets + slot;
604                 if( bt_latchlink (bt, hashidx, slot, page_no) )
605                         return NULL;
606                 bt_spinreleasewrite (bt->table[hashidx].latch);
607                 return latch;
608         }
609
610 #ifdef unix
611         __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1);
612 #else
613         _InterlockedDecrement (&bt->latchmgr->latchdeployed);
614 #endif
615   //  find and reuse previous entry on victim
616
617   while( 1 ) {
618 #ifdef unix
619         slot = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
620 #else
621         slot = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1;
622 #endif
623         // try to get write lock on hash chain
624         //      skip entry if not obtained
625         //      or has outstanding pins
626
627         slot %= bt->latchmgr->latchtotal;
628         latch = bt->latchsets + slot;
629
630         idx = latch->page_no % bt->latchmgr->latchhash;
631
632         if( !slot )
633                 continue;
634
635         if( !bt_spinwritetry (bt->table[idx].latch) )
636                 continue;
637
638         if( latch->clock ) {
639                 latch->clock = 0;
640                 bt_spinreleasewrite (bt->table[idx].latch);
641                 continue;
642         }
643
644         //  update permanent page area in btree
645
646         page = (BtPage)((uid)slot * bt->page_size + bt->pagepool);
647 #ifdef unix
648         posix_fadvise (bt->idx, page_no << bt->page_bits, bt->page_size, POSIX_FADV_WILLNEED);
649 #endif
650         if( page->dirty )
651           if( bt_writepage (bt, page, latch->page_no) )
652                 return NULL;
653
654         //  unlink our available slot from its hash chain
655
656         if( latch->prev )
657                 bt->latchsets[latch->prev].next = latch->next;
658         else
659                 bt->table[idx].slot = latch->next;
660
661         if( latch->next )
662                 bt->latchsets[latch->next].prev = latch->prev;
663
664         bt_spinreleasewrite (bt->table[idx].latch);
665
666         if( bt_latchlink (bt, hashidx, slot, page_no) )
667                 return NULL;
668
669         bt_spinreleasewrite (bt->table[hashidx].latch);
670         return latch;
671   }
672 }
673
674 //      close and release memory
675
676 void bt_close (BtDb *bt)
677 {
678 #ifdef unix
679         munmap (bt->table, bt->latchmgr->nlatchpage * bt->page_size);
680         munmap (bt->latchmgr, bt->page_size);
681 #else
682         FlushViewOfFile(bt->latchmgr, 0);
683         UnmapViewOfFile(bt->latchmgr);
684         CloseHandle(bt->halloc);
685 #endif
686 #ifdef unix
687         if( bt->mem )
688                 free (bt->mem);
689         close (bt->idx);
690         free (bt);
691 #else
692         if( bt->mem)
693                 VirtualFree (bt->mem, 0, MEM_RELEASE);
694         FlushFileBuffers(bt->idx);
695         CloseHandle(bt->idx);
696         GlobalFree (bt);
697 #endif
698 }
699 //  open/create new btree
700
701 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
702 //              size of mapped page pool (e.g. 8192)
703
704 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax)
705 {
706 uint lvl, attr, last, slot, idx;
707 uint nlatchpage, latchhash;
708 BtLatchMgr *latchmgr;
709 off64_t size, off;
710 uint amt[1];
711 BtKey key;
712 BtDb* bt;
713 int flag;
714
715 #ifndef unix
716 OVERLAPPED ovl[1];
717 #else
718 struct flock lock[1];
719 #endif
720
721         // determine sanity of page size and buffer pool
722
723         if( bits > BT_maxbits )
724                 bits = BT_maxbits;
725         else if( bits < BT_minbits )
726                 bits = BT_minbits;
727
728         if( mode == BT_ro ) {
729                 fprintf(stderr, "ReadOnly mode not supported: %s\n", name);
730                 return NULL;
731         }
732 #ifdef unix
733         bt = calloc (1, sizeof(BtDb));
734
735         bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
736         posix_fadvise( bt->idx, 0, 0, POSIX_FADV_RANDOM);
737  
738         if( bt->idx == -1 ) {
739                 fprintf(stderr, "unable to open %s\n", name);
740                 return free(bt), NULL;
741         }
742 #else
743         bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb));
744         attr = FILE_ATTRIBUTE_NORMAL;
745         bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
746
747         if( bt->idx == INVALID_HANDLE_VALUE ) {
748                 fprintf(stderr, "unable to open %s\n", name);
749                 return GlobalFree(bt), NULL;
750         }
751 #endif
752 #ifdef unix
753         memset (lock, 0, sizeof(lock));
754         lock->l_len = sizeof(struct BtPage_);
755         lock->l_type = F_WRLCK;
756
757         if( fcntl (bt->idx, F_SETLKW, lock) < 0 ) {
758                 fprintf(stderr, "unable to lock record zero %s\n", name);
759                 return bt_close (bt), NULL;
760         }
761 #else
762         memset (ovl, 0, sizeof(ovl));
763
764         //      use large offsets to
765         //      simulate advisory locking
766
767         ovl->OffsetHigh |= 0x80000000;
768
769         if( !LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, sizeof(struct BtPage_), 0L, ovl) ) {
770                 fprintf(stderr, "unable to lock record zero %s, GetLastError = %d\n", name, GetLastError());
771                 return bt_close (bt), NULL;
772         }
773 #endif 
774
775 #ifdef unix
776         latchmgr = valloc (BT_maxpage);
777         *amt = 0;
778
779         // read minimum page size to get root info
780
781         if( size = lseek (bt->idx, 0L, 2) ) {
782                 if( pread(bt->idx, latchmgr, BT_minpage, 0) == BT_minpage )
783                         bits = latchmgr->alloc->bits;
784                 else {
785                         fprintf(stderr, "Unable to read page zero\n");
786                         return free(bt), free(latchmgr), NULL;
787                 }
788         }
789 #else
790         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
791         size = GetFileSize(bt->idx, amt);
792
793         if( size || *amt ) {
794                 if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) ) {
795                         fprintf(stderr, "Unable to read page zero\n");
796                         return bt_close (bt), NULL;
797                 } else
798                         bits = latchmgr->alloc->bits;
799         }
800 #endif
801
802         bt->page_size = 1 << bits;
803         bt->page_bits = bits;
804
805         bt->mode = mode;
806
807         if( size || *amt ) {
808                 nlatchpage = latchmgr->nlatchpage;
809                 goto btlatch;
810         }
811
812         if( nodemax < 16 ) {
813                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
814                 return bt_close(bt), NULL;
815         }
816
817         // initialize an empty b-tree with latch page, root page, page of leaves
818         // and page(s) of latches and page pool cache
819
820         memset (latchmgr, 0, 1 << bits);
821         latchmgr->alloc->bits = bt->page_bits;
822
823         //  calculate number of latch hash table entries
824
825         nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + bt->page_size - 1) / bt->page_size;
826         latchhash = nlatchpage * bt->page_size / sizeof(BtHashEntry);
827
828         nlatchpage += nodemax;          // size of the buffer pool in pages
829         nlatchpage += (sizeof(BtLatchSet) * nodemax + bt->page_size - 1)/bt->page_size;
830
831         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
832         latchmgr->nlatchpage = nlatchpage;
833         latchmgr->latchtotal = nodemax;
834         latchmgr->latchhash = latchhash;
835
836         if( bt_writepage (bt, latchmgr->alloc, 0) ) {
837                 fprintf (stderr, "Unable to create btree page zero\n");
838                 return bt_close (bt), NULL;
839         }
840
841         memset (latchmgr, 0, 1 << bits);
842         latchmgr->alloc->bits = bt->page_bits;
843
844         for( lvl=MIN_lvl; lvl--; ) {
845                 last = MIN_lvl - lvl;   // page number
846                 slotptr(latchmgr->alloc, 1)->off = bt->page_size - 3;
847                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? last + 1 : 0);
848                 key = keyptr(latchmgr->alloc, 1);
849                 key->len = 2;           // create stopper key
850                 key->key[0] = 0xff;
851                 key->key[1] = 0xff;
852
853                 latchmgr->alloc->min = bt->page_size - 3;
854                 latchmgr->alloc->lvl = lvl;
855                 latchmgr->alloc->cnt = 1;
856                 latchmgr->alloc->act = 1;
857
858                 if( bt_writepage (bt, latchmgr->alloc, last) ) {
859                         fprintf (stderr, "Unable to create btree page %.8x\n", last);
860                         return bt_close (bt), NULL;
861                 }
862         }
863
864         // clear out buffer pool pages
865
866         memset(latchmgr, 0, bt->page_size);
867         last = MIN_lvl + nlatchpage;
868
869         if( bt_writepage (bt, latchmgr->alloc, last) ) {
870                 fprintf (stderr, "Unable to write buffer pool page %.8x\n", last);
871                 return bt_close (bt), NULL;
872         }
873                 
874 #ifdef unix
875         free (latchmgr);
876 #else
877         VirtualFree (latchmgr, 0, MEM_RELEASE);
878 #endif
879
880 btlatch:
881 #ifdef unix
882         lock->l_type = F_UNLCK;
883         if( fcntl (bt->idx, F_SETLK, lock) < 0 ) {
884                 fprintf (stderr, "Unable to unlock page zero\n");
885                 return bt_close (bt), NULL;
886         }
887 #else
888         if( !UnlockFileEx (bt->idx, 0, sizeof(struct BtPage_), 0, ovl) ) {
889                 fprintf (stderr, "Unable to unlock page zero, GetLastError = %d\n", GetLastError());
890                 return bt_close (bt), NULL;
891         }
892 #endif
893 #ifdef unix
894         flag = PROT_READ | PROT_WRITE;
895         bt->latchmgr = mmap (0, bt->page_size, flag, MAP_SHARED, bt->idx, ALLOC_page * bt->page_size);
896         if( bt->latchmgr == MAP_FAILED ) {
897                 fprintf (stderr, "Unable to mmap page zero, errno = %d", errno);
898                 return bt_close (bt), NULL;
899         }
900         bt->table = (void *)mmap (0, (uid)nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size);
901         if( bt->table == MAP_FAILED ) {
902                 fprintf (stderr, "Unable to mmap buffer pool, errno = %d", errno);
903                 return bt_close (bt), NULL;
904         }
905         madvise (bt->table, (uid)nlatchpage << bt->page_bits, MADV_RANDOM | MADV_WILLNEED);
906 #else
907         flag = PAGE_READWRITE;
908         bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size, NULL);
909         if( !bt->halloc ) {
910                 fprintf (stderr, "Unable to create file mapping for buffer pool mgr, GetLastError = %d\n", GetLastError());
911                 return bt_close (bt), NULL;
912         }
913
914         flag = FILE_MAP_WRITE;
915         bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size);
916         if( !bt->latchmgr ) {
917                 fprintf (stderr, "Unable to map buffer pool, GetLastError = %d\n", GetLastError());
918                 return bt_close (bt), NULL;
919         }
920
921         bt->table = (void *)((char *)bt->latchmgr + LATCH_page * bt->page_size);
922 #endif
923         bt->pagepool = (unsigned char *)bt->table + (uid)(nlatchpage - bt->latchmgr->latchtotal) * bt->page_size;
924         bt->latchsets = (BtLatchSet *)(bt->pagepool - (uid)bt->latchmgr->latchtotal * sizeof(BtLatchSet));
925
926 #ifdef unix
927         bt->mem = valloc (2 * bt->page_size);
928 #else
929         bt->mem = VirtualAlloc(NULL, 2 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
930 #endif
931         bt->frame = (BtPage)bt->mem;
932         bt->cursor = (BtPage)(bt->mem + bt->page_size);
933         return bt;
934 }
935
936 // place write, read, or parent lock on requested page_no.
937
938 void bt_lockpage(BtLock mode, BtLatchSet *latch)
939 {
940         switch( mode ) {
941         case BtLockRead:
942                 bt_spinreadlock (latch->readwr);
943                 break;
944         case BtLockWrite:
945                 bt_spinwritelock (latch->readwr);
946                 break;
947         case BtLockAccess:
948                 bt_spinreadlock (latch->access);
949                 break;
950         case BtLockDelete:
951                 bt_spinwritelock (latch->access);
952                 break;
953         case BtLockParent:
954                 bt_spinwritelock (latch->parent);
955                 break;
956         }
957 }
958
959 // remove write, read, or parent lock on requested page
960
961 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
962 {
963         switch( mode ) {
964         case BtLockRead:
965                 bt_spinreleaseread (latch->readwr);
966                 break;
967         case BtLockWrite:
968                 bt_spinreleasewrite (latch->readwr);
969                 break;
970         case BtLockAccess:
971                 bt_spinreleaseread (latch->access);
972                 break;
973         case BtLockDelete:
974                 bt_spinreleasewrite (latch->access);
975                 break;
976         case BtLockParent:
977                 bt_spinreleasewrite (latch->parent);
978                 break;
979         }
980 }
981
982 //      allocate a new page and write page into it
983
984 uid bt_newpage(BtDb *bt, BtPage page)
985 {
986 BtLatchSet *latch;
987 uid new_page;
988 BtPage temp;
989
990         //      lock allocation page
991
992         bt_spinwritelock(bt->latchmgr->lock);
993
994         // use empty chain first
995         // else allocate empty page
996
997         if( new_page = bt_getid(bt->latchmgr->alloc[1].right) ) {
998           if( latch = bt_pinlatch (bt, new_page) )
999                 temp = bt_mappage (bt, latch);
1000           else
1001                 return 0;
1002
1003           bt_putid(bt->latchmgr->alloc[1].right, bt_getid(temp->right));
1004           bt_spinreleasewrite(bt->latchmgr->lock);
1005           memcpy (temp, page, bt->page_size);
1006
1007           bt_update (bt, temp);
1008           bt_unpinlatch (latch);
1009           return new_page;
1010         } else {
1011           new_page = bt_getid(bt->latchmgr->alloc->right);
1012           bt_putid(bt->latchmgr->alloc->right, new_page+1);
1013           bt_spinreleasewrite(bt->latchmgr->lock);
1014
1015           if( bt_writepage (bt, page, new_page) )
1016                 return 0;
1017         }
1018
1019         bt_update (bt, bt->latchmgr->alloc);
1020         return new_page;
1021 }
1022
1023 //  compare two keys, returning > 0, = 0, or < 0
1024 //  as the comparison value
1025
1026 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1027 {
1028 uint len1 = key1->len;
1029 int ans;
1030
1031         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1032                 return ans;
1033
1034         if( len1 > len2 )
1035                 return 1;
1036         if( len1 < len2 )
1037                 return -1;
1038
1039         return 0;
1040 }
1041
1042 //  Update current page of btree by
1043 //      flushing mapped area to disk backing of cache pool.
1044 //      mark page as dirty for rewrite to permanent location
1045
1046 void bt_update (BtDb *bt, BtPage page)
1047 {
1048 #ifdef unix
1049 //      msync (page, bt->page_size, MS_ASYNC);
1050 #else
1051 //      FlushViewOfFile (page, bt->page_size);
1052 #endif
1053         page->dirty = 1;
1054 }
1055
1056 //  map the btree cached page onto current page
1057
1058 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
1059 {
1060         return (BtPage)((uid)(latch - bt->latchsets) * bt->page_size + bt->pagepool);
1061 }
1062
1063 //      deallocate a deleted page 
1064 //      place on free chain out of allocator page
1065 //      call with page latched for Writing and Deleting
1066
1067 BTERR bt_freepage(BtDb *bt, uid page_no, BtLatchSet *latch)
1068 {
1069 BtPage page = bt_mappage (bt, latch);
1070
1071         //      lock allocation page
1072
1073         bt_spinwritelock (bt->latchmgr->lock);
1074
1075         //      store chain in second right
1076         bt_putid(page->right, bt_getid(bt->latchmgr->alloc[1].right));
1077         bt_putid(bt->latchmgr->alloc[1].right, page_no);
1078
1079         page->free = 1;
1080         bt_update(bt, page);
1081
1082         // unlock released page
1083
1084         bt_unlockpage (BtLockDelete, latch);
1085         bt_unlockpage (BtLockWrite, latch);
1086         bt_unpinlatch (latch);
1087
1088         // unlock allocation page
1089
1090         bt_spinreleasewrite (bt->latchmgr->lock);
1091         bt_update (bt, bt->latchmgr->alloc);
1092         return 0;
1093 }
1094
1095 //  find slot in page for given key at a given level
1096
1097 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1098 {
1099 uint diff, higher = bt->page->cnt, low = 1, slot;
1100 uint good = 0;
1101
1102         //      make stopper key an infinite fence value
1103
1104         if( bt_getid (bt->page->right) )
1105                 higher++;
1106         else
1107                 good++;
1108
1109         //      low is the lowest candidate, higher is already
1110         //      tested as .ge. the given key, loop ends when they meet
1111
1112         while( diff = higher - low ) {
1113                 slot = low + ( diff >> 1 );
1114                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1115                         low = slot + 1;
1116                 else
1117                         higher = slot, good++;
1118         }
1119
1120         //      return zero if key is on right link page
1121
1122         return good ? higher : 0;
1123 }
1124
1125 //  find and load page at given level for given key
1126 //      leave page rd or wr locked as requested
1127
1128 int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
1129 {
1130 uid page_no = ROOT_page, prevpage = 0;
1131 uint drill = 0xff, slot;
1132 BtLatchSet *prevlatch;
1133 uint mode, prevmode;
1134
1135   //  start at root of btree and drill down
1136
1137   do {
1138         // determine lock mode of drill level
1139         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1140
1141         if( bt->latch = bt_pinlatch(bt, page_no) )
1142                 bt->page_no = page_no;
1143         else
1144                 return 0;
1145
1146         // obtain access lock using lock chaining
1147
1148         if( page_no > ROOT_page )
1149                 bt_lockpage(BtLockAccess, bt->latch);
1150
1151         if( prevpage ) {
1152                 bt_unlockpage(prevmode, prevlatch);
1153                 bt_unpinlatch(prevlatch);
1154                 prevpage = 0;
1155         }
1156
1157         // obtain read lock using lock chaining
1158
1159         bt_lockpage(mode, bt->latch);
1160
1161         if( page_no > ROOT_page )
1162                 bt_unlockpage(BtLockAccess, bt->latch);
1163
1164         //      map/obtain page contents
1165
1166         bt->page = bt_mappage (bt, bt->latch);
1167
1168         // re-read and re-lock root after determining actual level of root
1169
1170         if( bt->page->lvl != drill) {
1171                 if( bt->page_no != ROOT_page )
1172                         return bt->err = BTERR_struct, 0;
1173                         
1174                 drill = bt->page->lvl;
1175
1176                 if( lock != BtLockRead && drill == lvl ) {
1177                         bt_unlockpage(mode, bt->latch);
1178                         bt_unpinlatch(bt->latch);
1179                         continue;
1180                 }
1181         }
1182
1183         prevpage = bt->page_no;
1184         prevlatch = bt->latch;
1185         prevmode = mode;
1186
1187         //  find key on page at this level
1188         //  and descend to requested level
1189
1190         if( !bt->page->kill )
1191          if( slot = bt_findslot (bt, key, len) ) {
1192           if( drill == lvl )
1193                 return slot;
1194
1195           while( slotptr(bt->page, slot)->dead )
1196                 if( slot++ < bt->page->cnt )
1197                         continue;
1198                 else
1199                         goto slideright;
1200
1201           page_no = bt_getid(slotptr(bt->page, slot)->id);
1202           drill--;
1203           continue;
1204          }
1205
1206         //  or slide right into next page
1207
1208 slideright:
1209         page_no = bt_getid(bt->page->right);
1210
1211   } while( page_no );
1212
1213   // return error on end of right chain
1214
1215   bt->err = BTERR_eof;
1216   return 0;     // return error
1217 }
1218
1219 //      a fence key was deleted from a page
1220 //      push new fence value upwards
1221
1222 BTERR bt_fixfence (BtDb *bt, uid page_no, uint lvl)
1223 {
1224 unsigned char leftkey[256], rightkey[256];
1225 BtLatchSet *latch = bt->latch;
1226 BtKey ptr;
1227
1228         // remove deleted key, the old fence value
1229
1230         ptr = keyptr(bt->page, bt->page->cnt);
1231         memcpy(rightkey, ptr, ptr->len + 1);
1232
1233         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1234         bt->page->clean = 1;
1235
1236         ptr = keyptr(bt->page, bt->page->cnt);
1237         memcpy(leftkey, ptr, ptr->len + 1);
1238
1239         bt_update (bt, bt->page);
1240         bt_lockpage (BtLockParent, latch);
1241         bt_unlockpage (BtLockWrite, latch);
1242
1243         //  insert new (now smaller) fence key
1244
1245         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl + 1, page_no, time(NULL)) )
1246                 return bt->err;
1247
1248         //  remove old (larger) fence key
1249
1250         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl + 1) )
1251                 return bt->err;
1252
1253         bt_unlockpage (BtLockParent, latch);
1254         bt_unpinlatch (latch);
1255         return 0;
1256 }
1257
1258 //      root has a single child
1259 //      collapse a level from the btree
1260 //      call with root locked in bt->page
1261
1262 BTERR bt_collapseroot (BtDb *bt, BtPage root)
1263 {
1264 BtLatchSet *latch;
1265 BtPage temp;
1266 uid child;
1267 uint idx;
1268
1269         // find the child entry
1270         //      and promote to new root
1271
1272   do {
1273         for( idx = 0; idx++ < root->cnt; )
1274           if( !slotptr(root, idx)->dead )
1275                 break;
1276
1277         child = bt_getid (slotptr(root, idx)->id);
1278         if( latch = bt_pinlatch (bt, child) )
1279                 temp = bt_mappage (bt, latch);
1280         else
1281                 return bt->err;
1282
1283         bt_lockpage (BtLockDelete, latch);
1284         bt_lockpage (BtLockWrite, latch);
1285         memcpy (root, temp, bt->page_size);
1286
1287         bt_update (bt, root);
1288
1289         if( bt_freepage (bt, child, latch) )
1290                 return bt->err;
1291
1292   } while( root->lvl > 1 && root->act == 1 );
1293
1294   bt_unlockpage (BtLockWrite, bt->latch);
1295   bt_unpinlatch (bt->latch);
1296   return 0;
1297 }
1298
1299 //  find and delete key on page by marking delete flag bit
1300 //  when page becomes empty, delete it
1301
1302 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1303 {
1304 unsigned char lowerkey[256], higherkey[256];
1305 uint slot, dirty = 0, idx, fence, found;
1306 BtLatchSet *latch, *rlatch;
1307 uid page_no, right;
1308 BtPage temp;
1309 BtKey ptr;
1310
1311         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1312                 ptr = keyptr(bt->page, slot);
1313         else
1314                 return bt->err;
1315
1316         // are we deleting a fence slot?
1317
1318         fence = slot == bt->page->cnt;
1319
1320         // if key is found delete it, otherwise ignore request
1321
1322         if( found = !keycmp (ptr, key, len) )
1323           if( found = slotptr(bt->page, slot)->dead == 0 ) {
1324                 dirty = slotptr(bt->page,slot)->dead = 1;
1325                 bt->page->clean = 1;
1326                 bt->page->act--;
1327
1328                 // collapse empty slots
1329
1330                 while( idx = bt->page->cnt - 1 )
1331                   if( slotptr(bt->page, idx)->dead ) {
1332                         *slotptr(bt->page, idx) = *slotptr(bt->page, idx + 1);
1333                         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1334                   } else
1335                         break;
1336           }
1337
1338         right = bt_getid(bt->page->right);
1339         page_no = bt->page_no;
1340         latch = bt->latch;
1341
1342         if( !dirty ) {
1343           if( lvl )
1344                 return bt_abort (bt, bt->page, page_no, BTERR_notfound);
1345           bt_unlockpage(BtLockWrite, latch);
1346           bt_unpinlatch (latch);
1347           return bt->found = found, 0;
1348         }
1349
1350         //      did we delete a fence key in an upper level?
1351
1352         if( lvl && bt->page->act && fence )
1353           if( bt_fixfence (bt, page_no, lvl) )
1354                 return bt->err;
1355           else
1356                 return bt->found = found, 0;
1357
1358         //      is this a collapsed root?
1359
1360         if( lvl > 1 && page_no == ROOT_page && bt->page->act == 1 )
1361           if( bt_collapseroot (bt, bt->page) )
1362                 return bt->err;
1363           else
1364                 return bt->found = found, 0;
1365
1366         // return if page is not empty
1367
1368         if( bt->page->act ) {
1369           bt_update(bt, bt->page);
1370           bt_unlockpage(BtLockWrite, latch);
1371           bt_unpinlatch (latch);
1372           return bt->found = found, 0;
1373         }
1374
1375         // cache copy of fence key
1376         //      in order to find parent
1377
1378         ptr = keyptr(bt->page, bt->page->cnt);
1379         memcpy(lowerkey, ptr, ptr->len + 1);
1380
1381         // obtain lock on right page
1382
1383         if( rlatch = bt_pinlatch (bt, right) )
1384                 temp = bt_mappage (bt, rlatch);
1385         else
1386                 return bt->err;
1387
1388         bt_lockpage(BtLockWrite, rlatch);
1389
1390         if( temp->kill ) {
1391                 bt_abort(bt, temp, right, 0);
1392                 return bt_abort(bt, bt->page, bt->page_no, BTERR_kill);
1393         }
1394
1395         // pull contents of next page into current empty page 
1396
1397         memcpy (bt->page, temp, bt->page_size);
1398
1399         //      cache copy of key to update
1400
1401         ptr = keyptr(temp, temp->cnt);
1402         memcpy(higherkey, ptr, ptr->len + 1);
1403
1404         //  Mark right page as deleted and point it to left page
1405         //      until we can post updates at higher level.
1406
1407         bt_putid(temp->right, page_no);
1408         temp->kill = 1;
1409
1410         bt_update(bt, bt->page);
1411         bt_update(bt, temp);
1412
1413         bt_lockpage(BtLockParent, latch);
1414         bt_unlockpage(BtLockWrite, latch);
1415
1416         bt_lockpage(BtLockParent, rlatch);
1417         bt_unlockpage(BtLockWrite, rlatch);
1418
1419         //  redirect higher key directly to consolidated node
1420
1421         if( bt_insertkey (bt, higherkey+1, *higherkey, lvl+1, page_no, time(NULL)) )
1422                 return bt->err;
1423
1424         //  delete old lower key to consolidated node
1425
1426         if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) )
1427                 return bt->err;
1428
1429         //  obtain write & delete lock on deleted node
1430         //      add right block to free chain
1431
1432         bt_lockpage(BtLockDelete, rlatch);
1433         bt_lockpage(BtLockWrite, rlatch);
1434         bt_unlockpage(BtLockParent, rlatch);
1435
1436         if( bt_freepage (bt, right, rlatch) )
1437                 return bt->err;
1438
1439         bt_unlockpage(BtLockParent, latch);
1440         bt_unpinlatch(latch);
1441         return 0;
1442 }
1443
1444 //      find key in leaf level and return row-id
1445
1446 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1447 {
1448 uint  slot;
1449 BtKey ptr;
1450 uid id;
1451
1452         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1453                 ptr = keyptr(bt->page, slot);
1454         else
1455                 return 0;
1456
1457         // if key exists, return row-id
1458         //      otherwise return 0
1459
1460         if( ptr->len == len && !memcmp (ptr->key, key, len) )
1461                 id = bt_getid(slotptr(bt->page,slot)->id);
1462         else
1463                 id = 0;
1464
1465         bt_unlockpage (BtLockRead, bt->latch);
1466         bt_unpinlatch (bt->latch);
1467         return id;
1468 }
1469
1470 //      check page for space available,
1471 //      clean if necessary and return
1472 //      0 - page needs splitting
1473 //      >0 - go ahead with new slot
1474  
1475 uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
1476 {
1477 uint nxt = bt->page_size;
1478 BtPage page = bt->page;
1479 uint cnt = 0, idx = 0;
1480 uint max = page->cnt;
1481 uint newslot = slot;
1482 BtKey key;
1483 int ret;
1484
1485         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1486                 return slot;
1487
1488         //      skip cleanup if nothing to reclaim
1489
1490         if( !page->clean )
1491                 return 0;
1492
1493         memcpy (bt->frame, page, bt->page_size);
1494
1495         // skip page info and set rest of page to zero
1496
1497         memset (page+1, 0, bt->page_size - sizeof(*page));
1498         page->act = 0;
1499
1500         while( cnt++ < max ) {
1501                 if( cnt == slot )
1502                         newslot = idx + 1;
1503                 // always leave fence key in list
1504                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1505                         continue;
1506
1507                 // copy key
1508                 key = keyptr(bt->frame, cnt);
1509                 nxt -= key->len + 1;
1510                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1511
1512                 // copy slot
1513                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1514                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1515                         page->act++;
1516 #ifdef USETOD
1517                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1518 #endif
1519                 slotptr(page, idx)->off = nxt;
1520         }
1521
1522         page->min = nxt;
1523         page->cnt = idx;
1524
1525         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1526                 return newslot;
1527
1528         return 0;
1529 }
1530
1531 // split the root and raise the height of the btree
1532
1533 BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
1534 {
1535 uint nxt = bt->page_size;
1536 BtPage root = bt->page;
1537 uid right;
1538
1539         //  Obtain an empty page to use, and copy the current
1540         //  root contents into it
1541
1542         if( !(right = bt_newpage(bt, root)) )
1543                 return bt->err;
1544
1545         // preserve the page info at the bottom
1546         // and set rest to zero
1547
1548         memset(root+1, 0, bt->page_size - sizeof(*root));
1549
1550         // insert first key on newroot page
1551
1552         nxt -= *leftkey + 1;
1553         memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
1554         bt_putid(slotptr(root, 1)->id, right);
1555         slotptr(root, 1)->off = nxt;
1556         
1557         // insert second key on newroot page
1558         // and increase the root height
1559
1560         nxt -= 3;
1561         ((unsigned char *)root)[nxt] = 2;
1562         ((unsigned char *)root)[nxt+1] = 0xff;
1563         ((unsigned char *)root)[nxt+2] = 0xff;
1564         bt_putid(slotptr(root, 2)->id, page_no2);
1565         slotptr(root, 2)->off = nxt;
1566
1567         bt_putid(root->right, 0);
1568         root->min = nxt;                // reset lowest used offset and key count
1569         root->cnt = 2;
1570         root->act = 2;
1571         root->lvl++;
1572
1573         // update and release root (bt->page)
1574
1575         bt_update(bt, root);
1576
1577         bt_unlockpage(BtLockWrite, bt->latch);
1578         bt_unpinlatch(bt->latch);
1579         return 0;
1580 }
1581
1582 //  split already locked full node
1583 //      return unlocked.
1584
1585 BTERR bt_splitpage (BtDb *bt)
1586 {
1587 uint cnt = 0, idx = 0, max, nxt = bt->page_size;
1588 unsigned char fencekey[256], rightkey[256];
1589 uid page_no = bt->page_no, right;
1590 BtLatchSet *latch, *rlatch;
1591 BtPage page = bt->page;
1592 uint lvl = page->lvl;
1593 BtKey key;
1594
1595         latch = bt->latch;
1596
1597         //  split higher half of keys to bt->frame
1598         //      the last key (fence key) might be dead
1599
1600         memset (bt->frame, 0, bt->page_size);
1601         max = page->cnt;
1602         cnt = max / 2;
1603         idx = 0;
1604
1605         while( cnt++ < max ) {
1606                 key = keyptr(page, cnt);
1607                 nxt -= key->len + 1;
1608                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1609                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1610                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
1611                         bt->frame->act++;
1612 #ifdef USETOD
1613                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1614 #endif
1615                 slotptr(bt->frame, idx)->off = nxt;
1616         }
1617
1618         //      remember fence key for new right page
1619
1620         memcpy (rightkey, key, key->len + 1);
1621
1622         bt->frame->bits = bt->page_bits;
1623         bt->frame->min = nxt;
1624         bt->frame->cnt = idx;
1625         bt->frame->lvl = lvl;
1626
1627         // link right node
1628
1629         if( page_no > ROOT_page )
1630                 memcpy (bt->frame->right, page->right, BtId);
1631
1632         //      get new free page and write frame to it.
1633
1634         if( !(right = bt_newpage(bt, bt->frame)) )
1635                 return bt->err;
1636
1637         //      update lower keys to continue in old page
1638
1639         memcpy (bt->frame, page, bt->page_size);
1640         memset (page+1, 0, bt->page_size - sizeof(*page));
1641         nxt = bt->page_size;
1642         page->clean = 0;
1643         page->act = 0;
1644         cnt = 0;
1645         idx = 0;
1646
1647         //  assemble page of smaller keys
1648         //      (they're all active keys)
1649
1650         while( cnt++ < max / 2 ) {
1651                 key = keyptr(bt->frame, cnt);
1652                 nxt -= key->len + 1;
1653                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1654                 memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1655 #ifdef USETOD
1656                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1657 #endif
1658                 slotptr(page, idx)->off = nxt;
1659                 page->act++;
1660         }
1661
1662         // remember fence key for smaller page
1663
1664         memcpy (fencekey, key, key->len + 1);
1665
1666         bt_putid(page->right, right);
1667         page->min = nxt;
1668         page->cnt = idx;
1669
1670         // if current page is the root page, split it
1671
1672         if( page_no == ROOT_page )
1673                 return bt_splitroot (bt, fencekey, right);
1674
1675         //      lock right page
1676
1677         if( rlatch = bt_pinlatch (bt, right) )
1678                 bt_lockpage (BtLockParent, rlatch);
1679         else
1680                 return bt->err;
1681
1682         // update left (containing) node
1683
1684         bt_update(bt, page);
1685
1686         bt_lockpage (BtLockParent, latch);
1687         bt_unlockpage (BtLockWrite, latch);
1688
1689         // insert new fence for reformulated left block
1690
1691         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, page_no, time(NULL)) )
1692                 return bt->err;
1693
1694         //      switch fence for right block of larger keys to new right page
1695
1696         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, right, time(NULL)) )
1697                 return bt->err;
1698
1699         bt_unlockpage (BtLockParent, latch);
1700         bt_unlockpage (BtLockParent, rlatch);
1701
1702         bt_unpinlatch (rlatch);
1703         bt_unpinlatch (latch);
1704         return 0;
1705 }
1706
1707 //  Insert new key into the btree at requested level.
1708 //  Pages are unlocked at exit.
1709
1710 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
1711 {
1712 uint slot, idx;
1713 BtPage page;
1714 BtKey ptr;
1715
1716   while( 1 ) {
1717         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1718                 ptr = keyptr(bt->page, slot);
1719         else
1720         {
1721                 if( !bt->err )
1722                         bt->err = BTERR_ovflw;
1723                 return bt->err;
1724         }
1725
1726         // if key already exists, update id and return
1727
1728         page = bt->page;
1729
1730         if( !keycmp (ptr, key, len) ) {
1731           if( slotptr(page, slot)->dead )
1732                 page->act++;
1733           slotptr(page, slot)->dead = 0;
1734 #ifdef USETOD
1735           slotptr(page, slot)->tod = tod;
1736 #endif
1737           bt_putid(slotptr(page,slot)->id, id);
1738           bt_update(bt, bt->page);
1739           bt_unlockpage(BtLockWrite, bt->latch);
1740           bt_unpinlatch (bt->latch);
1741           return 0;
1742         }
1743
1744         // check if page has enough space
1745
1746         if( slot = bt_cleanpage (bt, len, slot) )
1747                 break;
1748
1749         if( bt_splitpage (bt) )
1750                 return bt->err;
1751   }
1752
1753   // calculate next available slot and copy key into page
1754
1755   page->min -= len + 1; // reset lowest used offset
1756   ((unsigned char *)page)[page->min] = len;
1757   memcpy ((unsigned char *)page + page->min +1, key, len );
1758
1759   for( idx = slot; idx < page->cnt; idx++ )
1760         if( slotptr(page, idx)->dead )
1761                 break;
1762
1763   // now insert key into array before slot
1764   // preserving the fence slot
1765
1766   if( idx == page->cnt )
1767         idx++, page->cnt++;
1768
1769   page->act++;
1770
1771   while( idx > slot )
1772         *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
1773
1774   bt_putid(slotptr(page,slot)->id, id);
1775   slotptr(page, slot)->off = page->min;
1776 #ifdef USETOD
1777   slotptr(page, slot)->tod = tod;
1778 #endif
1779   slotptr(page, slot)->dead = 0;
1780
1781   bt_update(bt, bt->page);
1782
1783   bt_unlockpage(BtLockWrite, bt->latch);
1784   bt_unpinlatch(bt->latch);
1785   return 0;
1786 }
1787
1788 //  cache page of keys into cursor and return starting slot for given key
1789
1790 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
1791 {
1792 uint slot;
1793
1794         // cache page for retrieval
1795
1796         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1797           memcpy (bt->cursor, bt->page, bt->page_size);
1798         else
1799           return 0;
1800
1801         bt_unlockpage(BtLockRead, bt->latch);
1802         bt->cursor_page = bt->page_no;
1803         bt_unpinlatch (bt->latch);
1804         return slot;
1805 }
1806
1807 //  return next slot for cursor page
1808 //  or slide cursor right into next page
1809
1810 uint bt_nextkey (BtDb *bt, uint slot)
1811 {
1812 BtLatchSet *latch;
1813 off64_t right;
1814
1815   do {
1816         right = bt_getid(bt->cursor->right);
1817
1818         while( slot++ < bt->cursor->cnt )
1819           if( slotptr(bt->cursor,slot)->dead )
1820                 continue;
1821           else if( right || (slot < bt->cursor->cnt))
1822                 return slot;
1823           else
1824                 break;
1825
1826         if( !right )
1827                 break;
1828
1829         bt->cursor_page = right;
1830
1831         if( latch = bt_pinlatch (bt, right) )
1832         bt_lockpage(BtLockRead, latch);
1833         else
1834                 return 0;
1835
1836         bt->page = bt_mappage (bt, latch);
1837         memcpy (bt->cursor, bt->page, bt->page_size);
1838         bt_unlockpage(BtLockRead, latch);
1839         bt_unpinlatch (latch);
1840         slot = 0;
1841   } while( 1 );
1842
1843   return bt->err = 0;
1844 }
1845
1846 BtKey bt_key(BtDb *bt, uint slot)
1847 {
1848         return keyptr(bt->cursor, slot);
1849 }
1850
1851 uid bt_uid(BtDb *bt, uint slot)
1852 {
1853         return bt_getid(slotptr(bt->cursor,slot)->id);
1854 }
1855
1856 #ifdef USETOD
1857 uint bt_tod(BtDb *bt, uint slot)
1858 {
1859         return slotptr(bt->cursor,slot)->tod;
1860 }
1861 #endif
1862
1863 #ifdef STANDALONE
1864
1865 uint bt_audit (BtDb *bt)
1866 {
1867 uint idx, hashidx;
1868 uid next, page_no;
1869 BtLatchSet *latch;
1870 uint blks[64];
1871 uint cnt = 0;
1872 BtPage page;
1873 uint amt[1];
1874 BtKey ptr;
1875
1876         memset (blks, 0, sizeof(blks));
1877
1878         if( *(ushort *)(bt->latchmgr->lock) )
1879                 fprintf(stderr, "Alloc page locked\n");
1880         *(ushort *)(bt->latchmgr->lock) = 0;
1881
1882         for( idx = 1; idx <= bt->latchmgr->latchdeployed; idx++ ) {
1883                 latch = bt->latchsets + idx;
1884                 if( *(ushort *)latch->readwr )
1885                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
1886                 *(ushort *)latch->readwr = 0;
1887
1888                 if( *(ushort *)latch->access )
1889                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
1890                 *(ushort *)latch->access = 0;
1891
1892                 if( *(ushort *)latch->parent )
1893                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
1894                 *(ushort *)latch->parent = 0;
1895
1896                 if( latch->pin ) {
1897                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
1898                         latch->pin = 0;
1899                 }
1900                 page = (BtPage)((uid)idx * bt->page_size + bt->pagepool);
1901
1902             if( page->dirty )
1903                  if( bt_writepage (bt, page, latch->page_no) )
1904                         fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
1905         }
1906
1907         for( hashidx = 0; hashidx < bt->latchmgr->latchhash; hashidx++ ) {
1908           if( *(ushort *)(bt->table[hashidx].latch) )
1909                         fprintf(stderr, "hash entry %d locked\n", hashidx);
1910
1911           *(ushort *)(bt->table[hashidx].latch) = 0;
1912
1913           if( idx = bt->table[hashidx].slot ) do {
1914                 latch = bt->latchsets + idx;
1915                 if( latch->pin )
1916                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
1917           } while( idx = latch->next );
1918         }
1919
1920         next = bt->latchmgr->nlatchpage + LATCH_page;
1921         page_no = LEAF_page;
1922
1923         while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
1924                 if( bt_readpage (bt, bt->frame, page_no) )
1925                   fprintf(stderr, "page %.8x unreadable\n", page_no);
1926                 if( !bt->frame->free ) {
1927                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
1928                   ptr = keyptr(bt->frame, idx+1);
1929                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
1930                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
1931                  }
1932                  if( !bt->frame->lvl )
1933                         cnt += bt->frame->act;
1934                  blks[bt->frame->lvl]++;
1935                 }
1936
1937                 if( page_no > LEAF_page )
1938                         next = page_no + 1;
1939                 page_no = next;
1940         }
1941         for( idx = 0; blks[idx]; idx++ )
1942                 fprintf(stderr, "%d lvl %d blocks\n", blks[idx], idx);
1943         return cnt - 1;
1944 }
1945
1946 #ifndef unix
1947 double getCpuTime(int type)
1948 {
1949 FILETIME crtime[1];
1950 FILETIME xittime[1];
1951 FILETIME systime[1];
1952 FILETIME usrtime[1];
1953 SYSTEMTIME timeconv[1];
1954 double ans = 0;
1955
1956         memset (timeconv, 0, sizeof(SYSTEMTIME));
1957
1958         switch( type ) {
1959         case 0:
1960                 GetSystemTimeAsFileTime (xittime);
1961                 FileTimeToSystemTime (xittime, timeconv);
1962                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
1963                 break;
1964         case 1:
1965                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
1966                 FileTimeToSystemTime (usrtime, timeconv);
1967                 break;
1968         case 2:
1969                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
1970                 FileTimeToSystemTime (systime, timeconv);
1971                 break;
1972         }
1973
1974         ans += (double)timeconv->wHour * 3600;
1975         ans += (double)timeconv->wMinute * 60;
1976         ans += (double)timeconv->wSecond;
1977         ans += (double)timeconv->wMilliseconds / 1000;
1978         return ans;
1979 }
1980 #else
1981 #include <time.h>
1982 #include <sys/resource.h>
1983
1984 double getCpuTime(int type)
1985 {
1986 struct rusage used[1];
1987 struct timeval tv[1];
1988
1989         switch( type ) {
1990         case 0:
1991                 gettimeofday(tv, NULL);
1992                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
1993
1994         case 1:
1995                 getrusage(RUSAGE_SELF, used);
1996                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
1997
1998         case 2:
1999                 getrusage(RUSAGE_SELF, used);
2000                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2001         }
2002
2003         return 0;
2004 }
2005 #endif
2006
2007 //  standalone program to index file of keys
2008 //  then list them onto std-out
2009
2010 int main (int argc, char **argv)
2011 {
2012 uint slot, line = 0, off = 0, found = 0;
2013 int ch, cnt = 0, bits = 12, idx;
2014 unsigned char key[256];
2015 double done, start;
2016 uid next, page_no;
2017 BtLatchSet *latch;
2018 float elapsed;
2019 time_t tod[1];
2020 uint scan = 0;
2021 uint len = 0;
2022 uint map = 0;
2023 BtPage page;
2024 BtKey ptr;
2025 BtDb *bt;
2026 FILE *in;
2027
2028 #ifdef WIN32
2029         _setmode (1, _O_BINARY);
2030 #endif
2031         if( argc < 4 ) {
2032                 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find/Count [page_bits mapped_pool_pages start_line_number]\n", argv[0]);
2033                 fprintf (stderr, "  page_bits: size of btree page in bits\n");
2034                 fprintf (stderr, "  mapped_pool_pages: number of pages in buffer pool\n");
2035                 exit(0);
2036         }
2037
2038         start = getCpuTime(0);
2039         time(tod);
2040
2041         if( argc > 4 )
2042                 bits = atoi(argv[4]);
2043
2044         if( argc > 5 )
2045                 map = atoi(argv[5]);
2046
2047         if( argc > 6 )
2048                 off = atoi(argv[6]);
2049
2050         bt = bt_open ((argv[1]), BT_rw, bits, map);
2051
2052         if( !bt ) {
2053                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2054                 exit (1);
2055         }
2056
2057         switch(argv[3][0]| 0x20)
2058         {
2059         case 'p':       // display page
2060                 if( latch = bt_pinlatch (bt, off) )
2061                         page = bt_mappage (bt, latch);
2062                 else
2063                         fprintf(stderr, "unable to read page %.8x\n", off);
2064
2065                 write (1, page, bt->page_size);
2066                 break;
2067
2068         case 'a':       // buffer pool audit
2069                 fprintf(stderr, "started audit for %s\n", argv[1]);
2070                 cnt = bt_audit (bt);
2071                 fprintf(stderr, "finished audit for %s, %d keys\n", argv[1], cnt);
2072                 break;
2073
2074         case 'w':       // write keys
2075                 fprintf(stderr, "started indexing for %s\n", argv[2]);
2076                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2077 #ifdef unix
2078                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2079 #endif
2080                   while( ch = getc(in), ch != EOF )
2081                         if( ch == '\n' )
2082                         {
2083                           if( off )
2084                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2085
2086                           if( bt_insertkey (bt, key, len, 0, ++line, *tod) )
2087                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2088                           len = 0;
2089                         }
2090                         else if( len < 245 )
2091                                 key[len++] = ch;
2092                   }
2093                 fprintf(stderr, "finished adding keys for %s, %d \n", argv[2], line);
2094                 break;
2095
2096         case 'd':       // delete keys
2097                 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2098                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2099 #ifdef unix
2100                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2101 #endif
2102                   while( ch = getc(in), ch != EOF )
2103                         if( ch == '\n' )
2104                         {
2105                           if( off )
2106                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2107                           line++;
2108                           if( bt_deletekey (bt, key, len, 0) )
2109                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2110                           len = 0;
2111                         }
2112                         else if( len < 245 )
2113                                 key[len++] = ch;
2114                   }
2115                 fprintf(stderr, "finished deleting keys for %s, %d \n", argv[2], line);
2116                 break;
2117
2118         case 'f':       // find keys
2119                 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2120                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2121 #ifdef unix
2122                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2123 #endif
2124                   while( ch = getc(in), ch != EOF )
2125                         if( ch == '\n' )
2126                         {
2127                           if( off )
2128                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2129                           line++;
2130                           if( bt_findkey (bt, key, len) )
2131                                 found++;
2132                           else if( bt->err )
2133                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2134                           len = 0;
2135                         }
2136                         else if( len < 245 )
2137                                 key[len++] = ch;
2138                   }
2139                 fprintf(stderr, "finished search of %d keys for %s, found %d\n", line, argv[2], found);
2140                 break;
2141
2142         case 's':       // scan and print keys
2143                 fprintf(stderr, "started scaning\n");
2144                 cnt = len = key[0] = 0;
2145
2146                 if( slot = bt_startkey (bt, key, len) )
2147                   slot--;
2148                 else
2149                   fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2150
2151                 while( slot = bt_nextkey (bt, slot) ) {
2152                         ptr = bt_key(bt, slot);
2153                         fwrite (ptr->key, ptr->len, 1, stdout);
2154                         fputc ('\n', stdout);
2155                         cnt++;
2156                 }
2157
2158                 fprintf(stderr, " Total keys read %d\n", cnt - 1);
2159                 break;
2160
2161         case 'c':       // count keys
2162           fprintf(stderr, "started counting\n");
2163           cnt = 0;
2164
2165           next = bt->latchmgr->nlatchpage + LATCH_page;
2166           page_no = LEAF_page;
2167
2168           while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
2169                 if( latch = bt_pinlatch (bt, page_no) )
2170                         page = bt_mappage (bt, latch);
2171                 if( !page->free && !page->lvl )
2172                         cnt += page->act;
2173                 if( page_no > LEAF_page )
2174                         next = page_no + 1;
2175                 if( scan )
2176                  for( idx = 0; idx++ < page->cnt; ) {
2177                   if( slotptr(page, idx)->dead )
2178                         continue;
2179                   ptr = keyptr(page, idx);
2180                   if( idx != page->cnt && bt_getid (page->right) ) {
2181                         fwrite (ptr->key, ptr->len, 1, stdout);
2182                         fputc ('\n', stdout);
2183                   }
2184                  }
2185                 bt_unpinlatch (latch);
2186                 page_no = next;
2187           }
2188                 
2189           cnt--;        // remove stopper key
2190           fprintf(stderr, " Total keys read %d\n", cnt);
2191           break;
2192         }
2193
2194         done = getCpuTime(0);
2195         elapsed = (float)(done - start);
2196         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2197         elapsed = getCpuTime(1);
2198         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2199         elapsed = getCpuTime(2);
2200         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2201         return 0;
2202 }
2203
2204 #endif  //STANDALONE