]> pd.if.org Git - btree/blob - btree2u.c
Rework buffer pool manager along traditional file I/O lines
[btree] / btree2u.c
1 // btree version 2u
2 //      with combined latch & pool manager
3 // 26 FEB 2014
4
5 // author: karl malbrain, malbrain@cal.berkeley.edu
6
7 /*
8 This work, including the source code, documentation
9 and related data, is placed into the public domain.
10
11 The orginal author is Karl Malbrain.
12
13 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
14 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
15 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
16 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
17 RESULTING FROM THE USE, MODIFICATION, OR
18 REDISTRIBUTION OF THIS SOFTWARE.
19 */
20
21 // Please see the project home page for documentation
22 // code.google.com/p/high-concurrency-btree
23
24 #define _FILE_OFFSET_BITS 64
25 #define _LARGEFILE64_SOURCE
26
27 #ifdef linux
28 #define _GNU_SOURCE
29 #endif
30
31 #ifdef unix
32 #include <unistd.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <time.h>
36 #include <fcntl.h>
37 #include <sys/mman.h>
38 #include <errno.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #endif
47
48 #include <memory.h>
49 #include <string.h>
50
51 typedef unsigned long long      uid;
52
53 #ifndef unix
54 typedef unsigned long long      off64_t;
55 typedef unsigned short          ushort;
56 typedef unsigned int            uint;
57 #endif
58
59 #define BT_ro 0x6f72    // ro
60 #define BT_rw 0x7772    // rw
61 #define BT_fl 0x6c66    // fl
62
63 #define BT_maxbits              15                                      // maximum page size in bits
64 #define BT_minbits              12                                      // minimum page size in bits
65 #define BT_minpage              (1 << BT_minbits)       // minimum page size
66 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
67
68 /*
69 There are five lock types for each node in three independent sets: 
70 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
71 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
72 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
73 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
74 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
75 */
76
77 typedef enum{
78         BtLockAccess,
79         BtLockDelete,
80         BtLockRead,
81         BtLockWrite,
82         BtLockParent
83 }BtLock;
84
85 //      definition for latch implementation
86
87 // exclusive is set for write access
88 // share is count of read accessors
89 // grant write lock when share == 0
90
91 volatile typedef struct {
92         unsigned char mutex[1];
93         unsigned char exclusive:1;
94         unsigned char pending:1;
95         ushort share;
96 } BtSpinLatch;
97
98 //      Define the length of the page and key pointers
99
100 #define BtId 6
101
102 //      Page key slot definition.
103
104 //      If BT_maxbits is 15 or less, you can save 2 bytes
105 //      for each key stored by making the first two uints
106 //      into ushorts.  You can also save 4 bytes by removing
107 //      the tod field from the key.
108
109 //      Keys are marked dead, but remain on the page until
110 //      cleanup is called. The fence key (highest key) for
111 //      the page is always present, even if dead.
112
113 typedef struct {
114 #ifdef USETOD
115         uint tod;                                       // time-stamp for key
116 #endif
117         ushort off:BT_maxbits;          // page offset for key start
118         ushort dead:1;                          // set for deleted key
119         unsigned char id[BtId];         // id associated with key
120 } BtSlot;
121
122 //      The key structure occupies space at the upper end of
123 //      each page.  It's a length byte followed by the value
124 //      bytes.
125
126 typedef struct {
127         unsigned char len;
128         unsigned char key[0];
129 } *BtKey;
130
131 //      The first part of an index page.
132 //      It is immediately followed
133 //      by the BtSlot array of keys.
134
135 typedef struct BtPage_ {
136         uint cnt;                                       // count of keys in page
137         uint act;                                       // count of active keys
138         uint min;                                       // next key offset
139         unsigned char bits:7;           // page size in bits
140         unsigned char free:1;           // page is on free list
141         unsigned char lvl:6;            // level of page
142         unsigned char kill:1;           // page is being deleted
143         unsigned char dirty:1;          // page is dirty
144         unsigned char right[BtId];      // page number to right
145 } *BtPage;
146
147 typedef struct {
148         struct BtPage_ alloc[2];        // next & free page_nos in right ptr
149         BtSpinLatch lock[1];            // allocation area lite latch
150         uint latchdeployed;                     // highest number of latch entries deployed
151         uint nlatchpage;                        // number of latch pages at BT_latch
152         uint latchtotal;                        // number of page latch entries
153         uint latchhash;                         // number of latch hash table slots
154         uint latchvictim;                       // next latch entry to examine
155 } BtLatchMgr;
156
157 //  latch hash table entries
158
159 typedef struct {
160         volatile uint slot;             // Latch table entry at head of collision chain
161         BtSpinLatch latch[1];   // lock for the collision chain
162 } BtHashEntry;
163
164 //      latch manager table structure
165
166 typedef struct {
167         BtSpinLatch readwr[1];          // read/write page lock
168         BtSpinLatch access[1];          // Access Intent/Page delete
169         BtSpinLatch parent[1];          // Posting of fence key in parent
170         BtSpinLatch busy[1];            // slot is being moved between chains
171         volatile uint next;                     // next entry in hash table chain
172         volatile uint prev;                     // prev entry in hash table chain
173         volatile uint hash;                     // hash slot entry is under
174         volatile ushort dirty;          // page is dirty in cache
175         volatile ushort pin;            // number of outstanding pins
176         volatile uid page_no;           // latch set page number on disk
177 } BtLatchSet;
178
179 //      The object structure for Btree access
180
181 typedef struct _BtDb {
182         uint page_size;         // each page size       
183         uint page_bits;         // each page size in bits       
184         uid page_no;            // current page number  
185         uid cursor_page;        // current cursor page number   
186         int  err;
187         uint mode;                      // read-write mode
188         BtPage alloc;           // frame buffer for alloc page ( page 0 )
189         BtPage cursor;          // cached frame for start/next (never mapped)
190         BtPage frame;           // spare frame for the page split (never mapped)
191         BtPage zero;            // zeroes frame buffer (never mapped)
192         BtPage page;            // current page
193         BtLatchSet *latch;                      // current page latch
194         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
195         BtLatchSet *latchsets;          // mapped latch set from latch pages
196         unsigned char *latchpool;       // cached page pool set
197         BtHashEntry *table;     // the hash table
198 #ifdef unix
199         int idx;
200 #else
201         HANDLE idx;
202         HANDLE halloc;          // allocation and latch table handle
203 #endif
204         unsigned char *mem;     // frame, cursor, page memory buffer
205         uint found;                     // last deletekey found key
206 } BtDb;
207
208 typedef enum {
209 BTERR_ok = 0,
210 BTERR_notfound,
211 BTERR_struct,
212 BTERR_ovflw,
213 BTERR_read,
214 BTERR_lock,
215 BTERR_hash,
216 BTERR_kill,
217 BTERR_map,
218 BTERR_wrt,
219 BTERR_eof
220 } BTERR;
221
222 // B-Tree functions
223 extern void bt_close (BtDb *bt);
224 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk);
225 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
226 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
227 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
228 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
229 extern uint bt_nextkey   (BtDb *bt, uint slot);
230
231 //      internal functions
232 BTERR bt_update (BtDb *bt, BtPage page, BtLatchSet *latch);
233 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch);
234 //  Helper functions to return slot values
235
236 extern BtKey bt_key (BtDb *bt, uint slot);
237 extern uid bt_uid (BtDb *bt, uint slot);
238 #ifdef USETOD
239 extern uint bt_tod (BtDb *bt, uint slot);
240 #endif
241
242 //  BTree page number constants
243 #define ALLOC_page              0
244 #define ROOT_page               1
245 #define LEAF_page               2
246 #define LATCH_page              3
247
248 //      Number of levels to create in a new BTree
249
250 #define MIN_lvl                 2
251
252 //  The page is allocated from low and hi ends.
253 //  The key offsets and row-id's are allocated
254 //  from the bottom, while the text of the key
255 //  is allocated from the top.  When the two
256 //  areas meet, the page is split into two.
257
258 //  A key consists of a length byte, two bytes of
259 //  index number (0 - 65534), and up to 253 bytes
260 //  of key value.  Duplicate keys are discarded.
261 //  Associated with each key is a 48 bit row-id.
262
263 //  The b-tree root is always located at page 1.
264 //      The first leaf page of level zero is always
265 //      located on page 2.
266
267 //      The b-tree pages are linked with right
268 //      pointers to facilitate enumerators,
269 //      and provide for concurrency.
270
271 //      When to root page fills, it is split in two and
272 //      the tree height is raised by a new root at page
273 //      one with two keys.
274
275 //      Deleted keys are marked with a dead bit until
276 //      page cleanup The fence key for a node is always
277 //      present, even after deletion and cleanup.
278
279 //  Deleted leaf pages are reclaimed  on a free list.
280 //      The upper levels of the btree are fixed on creation.
281
282 //  To achieve maximum concurrency one page is locked at a time
283 //  as the tree is traversed to find leaf key in question. The right
284 //  page numbers are used in cases where the page is being split,
285 //      or consolidated.
286
287 //  Page 0 (ALLOC page) is dedicated to lock for new page extensions,
288 //      and chains empty leaf pages together for reuse.
289
290 //      Parent locks are obtained to prevent resplitting or deleting a node
291 //      before its fence is posted into its upper level.
292
293 //      A special open mode of BT_fl is provided to safely access files on
294 //      WIN32 networks. WIN32 network operations should not use memory mapping.
295 //      This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
296 //      to prevent local caching of network file contents.
297
298 //      Access macros to address slot and key values from the page.
299 //      Page slots use 1 based indexing.
300
301 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
302 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
303
304 void bt_putid(unsigned char *dest, uid id)
305 {
306 int i = BtId;
307
308         while( i-- )
309                 dest[i] = (unsigned char)id, id >>= 8;
310 }
311
312 uid bt_getid(unsigned char *src)
313 {
314 uid id = 0;
315 int i;
316
317         for( i = 0; i < BtId; i++ )
318                 id <<= 8, id |= *src++; 
319
320         return id;
321 }
322
323 BTERR bt_abort (BtDb *bt, BtPage page, uid page_no, BTERR err)
324 {
325 BtKey ptr;
326
327         fprintf(stderr, "\n Btree2 abort, error %d on page %.8x\n", err, page_no);
328         fprintf(stderr, "level=%d kill=%d free=%d cnt=%x act=%x\n", page->lvl, page->kill, page->free, page->cnt, page->act);
329         ptr = keyptr(page, page->cnt);
330         fprintf(stderr, "fence='%.*s'\n", ptr->len, ptr->key);
331         fprintf(stderr, "right=%.8x\n", bt_getid(page->right));
332         return bt->err = err;
333 }
334
335 //      Latch Manager
336
337 //      wait until write lock mode is clear
338 //      and add 1 to the share count
339
340 void bt_spinreadlock(BtSpinLatch *latch)
341 {
342 ushort prev;
343
344   do {
345         //      obtain latch mutex
346 #ifdef unix
347         if( __sync_lock_test_and_set(latch->mutex, 1) )
348                 continue;
349 #else
350         if( _InterlockedExchange8(latch->mutex, 1) )
351                 continue;
352 #endif
353         //  see if exclusive request is granted or pending
354
355         if( prev = !(latch->exclusive | latch->pending) )
356                 latch->share++;
357
358 #ifdef unix
359         *latch->mutex = 0;
360 #else
361         _InterlockedExchange8(latch->mutex, 0);
362 #endif
363
364         if( prev )
365                 return;
366
367 #ifdef  unix
368   } while( sched_yield(), 1 );
369 #else
370   } while( SwitchToThread(), 1 );
371 #endif
372 }
373
374 //      wait for other read and write latches to relinquish
375
376 void bt_spinwritelock(BtSpinLatch *latch)
377 {
378 uint prev;
379
380   do {
381 #ifdef  unix
382         if( __sync_lock_test_and_set(latch->mutex, 1) )
383                 continue;
384 #else
385         if( _InterlockedExchange8(latch->mutex, 1) )
386                 continue;
387 #endif
388         if( prev = !(latch->share | latch->exclusive) )
389                 latch->exclusive = 1, latch->pending = 0;
390         else
391                 latch->pending = 1;
392 #ifdef unix
393         *latch->mutex = 0;
394 #else
395         _InterlockedExchange8(latch->mutex, 0);
396 #endif
397         if( prev )
398                 return;
399 #ifdef  unix
400   } while( sched_yield(), 1 );
401 #else
402   } while( SwitchToThread(), 1 );
403 #endif
404 }
405
406 //      try to obtain write lock
407
408 //      return 1 if obtained,
409 //              0 otherwise
410
411 int bt_spinwritetry(BtSpinLatch *latch)
412 {
413 uint prev;
414
415 #ifdef unix
416         if( __sync_lock_test_and_set(latch->mutex, 1) )
417                 return 0;
418 #else
419         if( _InterlockedExchange8(latch->mutex, 1) )
420                 return 0;
421 #endif
422         //      take write access if all bits are clear
423
424         if( prev = !(latch->exclusive | latch->share) )
425                 latch->exclusive = 1;
426
427 #ifdef unix
428         *latch->mutex = 0;
429 #else
430         _InterlockedExchange8(latch->mutex, 0);
431 #endif
432         return prev;
433 }
434
435 //      clear write mode
436
437 void bt_spinreleasewrite(BtSpinLatch *latch)
438 {
439 #ifdef unix
440         while( __sync_lock_test_and_set(latch->mutex, 1) )
441                 sched_yield();
442 #else
443         while( _InterlockedExchange8(latch->mutex, 1) )
444                 SwitchToThread();
445 #endif
446         latch->exclusive = 0;
447 #ifdef unix
448         *latch->mutex = 0;
449 #else
450         _InterlockedExchange8(latch->mutex, 0);
451 #endif
452 }
453
454 //      decrement reader count
455
456 void bt_spinreleaseread(BtSpinLatch *latch)
457 {
458 #ifdef unix
459         while( __sync_lock_test_and_set(latch->mutex, 1) )
460                 sched_yield();
461 #else
462         while( _InterlockedExchange8(latch->mutex, 1) )
463                 SwitchToThread();
464 #endif
465         latch->share--;
466 #ifdef unix
467         *latch->mutex = 0;
468 #else
469         _InterlockedExchange8(latch->mutex, 0);
470 #endif
471 }
472
473 //      link latch table entry into head of latch hash table
474
475 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint victim, uid page_no)
476 {
477 BtPage page = (BtPage)(victim * bt->page_size + bt->latchpool);
478 BtLatchSet *latch = bt->latchsets + victim;
479 off64_t off = page_no << bt->page_bits;
480 uint amt[1];
481
482         if( latch->next = bt->table[hashidx].slot )
483                 bt->latchsets[latch->next].prev = victim;
484
485         bt->table[hashidx].slot = victim;
486         latch->page_no = page_no;
487         latch->hash = hashidx;
488         latch->dirty = 0;
489         latch->prev = 0;
490 #ifdef unix
491         if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) )
492                 return bt->err = BTERR_read;
493 #else
494         SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
495         if( !ReadFile(bt->idx, page, bt->page_size, amt, NULL))
496                 return bt->err = BTERR_read;
497         if( *amt <  bt->page_size )
498                 return bt->err = BTERR_read;
499 #endif
500         return 0;
501 }
502
503 //      release latch pin
504
505 void bt_unpinlatch (BtLatchSet *latch)
506 {
507 #ifdef unix
508         __sync_fetch_and_add(&latch->pin, -1);
509 #else
510         _InterlockedDecrement16 (&latch->pin);
511 #endif
512 }
513
514 //      find existing latchset or inspire new one
515 //      return with latchset pinned
516
517 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
518 {
519 uint hashidx = page_no % bt->latchmgr->latchhash;
520 uint slot, victim, idx;
521 BtLatchSet *latch;
522 off64_t off;
523 uint amt[1];
524 BtPage page;
525
526   //  try to find unpinned entry
527
528   bt_spinwritelock(bt->table[hashidx].latch);
529
530   if( slot = bt->table[hashidx].slot ) do
531   {
532         latch = bt->latchsets + slot;
533         if( page_no == latch->page_no )
534                 break;
535   } while( slot = latch->next );
536
537   //  found our entry
538
539   if( slot ) {
540         latch = bt->latchsets + slot;
541 #ifdef unix
542         __sync_fetch_and_add(&latch->pin, 1);
543 #else
544         _InterlockedIncrement16 (&latch->pin);
545 #endif
546         bt_spinreleasewrite(bt->table[hashidx].latch);
547         return latch;
548   }
549
550         //  see if there are any unused entries
551 #ifdef unix
552         victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1;
553 #else
554         victim = _InterlockedIncrement (&bt->latchmgr->latchdeployed);
555 #endif
556
557         if( victim < bt->latchmgr->latchtotal ) {
558                 latch = bt->latchsets + victim;
559 #ifdef unix
560                 __sync_fetch_and_add(&latch->pin, 1);
561 #else
562                 _InterlockedIncrement16 (&latch->pin);
563 #endif
564                 bt_latchlink (bt, hashidx, victim, page_no);
565                 bt_spinreleasewrite (bt->table[hashidx].latch);
566                 return latch;
567         }
568
569 #ifdef unix
570         victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1);
571 #else
572         victim = _InterlockedDecrement (&bt->latchmgr->latchdeployed);
573 #endif
574   //  find and reuse previous lock entry
575
576   while( 1 ) {
577 #ifdef unix
578         victim = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
579 #else
580         victim = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1;
581 #endif
582         //      we don't use slot zero
583
584         if( victim %= bt->latchmgr->latchtotal )
585                 latch = bt->latchsets + victim;
586         else
587                 continue;
588
589         //      take control of our slot
590         //      from other threads
591
592         if( latch->pin || !bt_spinwritetry (latch->busy) )
593                 continue;
594
595         idx = latch->hash;
596
597         // try to get write lock on hash chain
598         //      skip entry if not obtained
599         //      or has outstanding locks
600
601         if( !bt_spinwritetry (bt->table[idx].latch) ) {
602                 bt_spinreleasewrite (latch->busy);
603                 continue;
604         }
605
606         if( latch->pin ) {
607                 bt_spinreleasewrite (latch->busy);
608                 bt_spinreleasewrite (bt->table[idx].latch);
609                 continue;
610         }
611
612         //  update permanent page area in btree
613
614         page = (BtPage)(victim * bt->page_size + bt->latchpool);
615         off = latch->page_no << bt->page_bits;
616 #ifdef unix
617         if( latch->dirty )
618           if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
619                 return bt->err = BTERR_wrt, NULL;
620 #else
621         if( latch->dirty ) {
622           SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
623
624           if( !WriteFile(bt->idx, page, bt->page_size, amt, NULL) )
625                 return bt->err = BTERR_wrt, NULL;
626
627           if( *amt <  bt->page_size )
628                 return bt->err = BTERR_wrt, NULL;
629         }
630 #endif
631         //  unlink our available victim from its hash chain
632
633         if( latch->prev )
634                 bt->latchsets[latch->prev].next = latch->next;
635         else
636                 bt->table[idx].slot = latch->next;
637
638         if( latch->next )
639                 bt->latchsets[latch->next].prev = latch->prev;
640
641         bt_spinreleasewrite (bt->table[idx].latch);
642 #ifdef unix
643         __sync_fetch_and_add(&latch->pin, 1);
644 #else
645         _InterlockedIncrement16 (&latch->pin);
646 #endif
647         bt_latchlink (bt, hashidx, victim, page_no);
648         bt_spinreleasewrite (bt->table[hashidx].latch);
649         bt_spinreleasewrite (latch->busy);
650         return latch;
651   }
652 }
653
654 //      close and release memory
655
656 void bt_close (BtDb *bt)
657 {
658 #ifdef unix
659         munmap (bt->table, bt->latchmgr->nlatchpage * bt->page_size);
660         munmap (bt->latchmgr, bt->page_size);
661 #else
662         FlushViewOfFile(bt->latchmgr, 0);
663         UnmapViewOfFile(bt->latchmgr);
664         CloseHandle(bt->halloc);
665 #endif
666 #ifdef unix
667         if( bt->mem )
668                 free (bt->mem);
669         close (bt->idx);
670         free (bt);
671 #else
672         if( bt->mem)
673                 VirtualFree (bt->mem, 0, MEM_RELEASE);
674         FlushFileBuffers(bt->idx);
675         CloseHandle(bt->idx);
676         GlobalFree (bt);
677 #endif
678 }
679 //  open/create new btree
680
681 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
682 //              size of mapped page pool (e.g. 8192)
683
684 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax)
685 {
686 uint lvl, attr, last, slot, idx;
687 uint nlatchpage, latchhash;
688 BtLatchMgr *latchmgr;
689 off64_t size, off;
690 uint amt[1];
691 BtKey key;
692 BtDb* bt;
693 int flag;
694
695 #ifndef unix
696 OVERLAPPED ovl[1];
697 #else
698 struct flock lock[1];
699 #endif
700
701         // determine sanity of page size and buffer pool
702
703         if( bits > BT_maxbits )
704                 bits = BT_maxbits;
705         else if( bits < BT_minbits )
706                 bits = BT_minbits;
707
708 #ifdef unix
709         bt = calloc (1, sizeof(BtDb));
710
711         bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
712
713         if( bt->idx == -1 )
714                 return free(bt), NULL;
715 #else
716         bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb));
717         attr = FILE_ATTRIBUTE_NORMAL;
718         bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
719
720         if( bt->idx == INVALID_HANDLE_VALUE )
721                 return GlobalFree(bt), NULL;
722 #endif
723 #ifdef unix
724         memset (lock, 0, sizeof(lock));
725
726         lock->l_type = F_WRLCK;
727         lock->l_len = sizeof(struct BtPage_);
728         lock->l_whence = 0;
729
730         if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
731                 return bt_close (bt), NULL;
732 #else
733         memset (ovl, 0, sizeof(ovl));
734         len = sizeof(struct BtPage_);
735
736         //      use large offsets to
737         //      simulate advisory locking
738
739         ovl->OffsetHigh |= 0x80000000;
740
741         if( LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, len, 0L, ovl) )
742                 return bt_close (bt), NULL;
743 #endif 
744
745 #ifdef unix
746         latchmgr = malloc (BT_maxpage);
747         *amt = 0;
748
749         // read minimum page size to get root info
750
751         if( size = lseek (bt->idx, 0L, 2) ) {
752                 if( pread(bt->idx, latchmgr, BT_minpage, 0) == BT_minpage )
753                         bits = latchmgr->alloc->bits;
754                 else
755                         return free(bt), free(latchmgr), NULL;
756         } else if( mode == BT_ro )
757                 return free(latchmgr), bt_close (bt), NULL;
758 #else
759         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
760         size = GetFileSize(bt->idx, amt);
761
762         if( size || *amt ) {
763                 if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
764                         return bt_close (bt), NULL;
765                 bits = latchmgr->alloc->bits;
766         } else if( mode == BT_ro )
767                 return bt_close (bt), NULL;
768 #endif
769
770         bt->page_size = 1 << bits;
771         bt->page_bits = bits;
772
773         bt->mode = mode;
774
775         if( size || *amt )
776                 goto btlatch;
777
778         // initialize an empty b-tree with latch page, root page, page of leaves
779         // and page(s) of latches and page pool cache
780
781         memset (latchmgr, 0, 1 << bits);
782         latchmgr->alloc->bits = bt->page_bits;
783
784         //  calculate number of latch hash table entries
785
786         nlatchpage = (nodemax/8 * sizeof(BtHashEntry) + bt->page_size - 1) / bt->page_size;
787         latchhash = nlatchpage * bt->page_size / sizeof(BtHashEntry);
788
789         nlatchpage += nodemax;          // size of the buffer pool in pages
790         nlatchpage += (sizeof(BtLatchSet) * nodemax + bt->page_size - 1)/bt->page_size;
791
792         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
793         latchmgr->nlatchpage = nlatchpage;
794         latchmgr->latchtotal = nodemax;
795         latchmgr->latchhash = latchhash;
796 #ifdef unix
797         if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size )
798                 return bt_close (bt), NULL;
799 #else
800         if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
801                 return bt_close (bt), NULL;
802
803         if( *amt < bt->page_size )
804                 return bt_close (bt), NULL;
805 #endif
806         memset (latchmgr, 0, 1 << bits);
807         latchmgr->alloc->bits = bt->page_bits;
808
809         for( lvl=MIN_lvl; lvl--; ) {
810                 slotptr(latchmgr->alloc, 1)->off = bt->page_size - 3;
811                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
812                 key = keyptr(latchmgr->alloc, 1);
813                 key->len = 2;           // create stopper key
814                 key->key[0] = 0xff;
815                 key->key[1] = 0xff;
816                 latchmgr->alloc->min = bt->page_size - 3;
817                 latchmgr->alloc->lvl = lvl;
818                 latchmgr->alloc->cnt = 1;
819                 latchmgr->alloc->act = 1;
820 #ifdef unix
821                 if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size )
822                         return bt_close (bt), NULL;
823 #else
824                 if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
825                         return bt_close (bt), NULL;
826
827                 if( *amt < bt->page_size )
828                         return bt_close (bt), NULL;
829 #endif
830         }
831
832         // clear out latch manager pages
833
834         memset(latchmgr, 0, bt->page_size);
835         last = MIN_lvl + 1;
836
837         while( last < ((MIN_lvl + 1 + nlatchpage) ) ) {
838                 off = (uid)last << bt->page_bits;
839 #ifdef unix
840                 pwrite(bt->idx, latchmgr, bt->page_size, off);
841 #else
842                 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
843                 if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
844                         return bt_close (bt), NULL;
845                 if( *amt < bt->page_size )
846                         return bt_close (bt), NULL;
847 #endif
848                 last++;
849         }
850
851 btlatch:
852 #ifdef unix
853         lock->l_type = F_UNLCK;
854         if( fcntl (bt->idx, F_SETLK, lock) < 0 )
855                 return bt_close (bt), NULL;
856 #else
857         if( !UnlockFileEx (bt->idx, 0, sizeof(struct BtPage_), 0, ovl) )
858                 return bt_close (bt), NULL;
859 #endif
860 #ifdef unix
861         flag = PROT_READ | PROT_WRITE;
862         bt->latchmgr = mmap (0, bt->page_size, flag, MAP_SHARED, bt->idx, ALLOC_page * bt->page_size);
863         if( bt->latchmgr == MAP_FAILED )
864                 return bt_close (bt), NULL;
865         bt->table = (void *)mmap (0, bt->latchmgr->nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size);
866         if( bt->table == MAP_FAILED )
867                 return bt_close (bt), NULL;
868 #else
869         flag = PAGE_READWRITE;
870         bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, (bt->latchmgr->nlatchpage + LATCH_page) * bt->page_size, NULL);
871         if( !bt->halloc )
872                 return bt_close (bt), NULL;
873
874         flag = FILE_MAP_WRITE;
875         bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, (bt->latchmgr->nlatchpage + LATCH_page) * bt->page_size);
876         if( !bt->latchmgr )
877                 return GetLastError(), bt_close (bt), NULL;
878
879         bt->table = (void *)((char *)bt->latchmgr + LATCH_page * bt->page_size);
880 #endif
881         bt->latchpool = (unsigned char *)bt->table + (bt->latchmgr->nlatchpage - bt->latchmgr->latchtotal) * bt->page_size;
882         bt->latchsets = (BtLatchSet *)(bt->latchpool - bt->latchmgr->latchtotal * sizeof(BtLatchSet));
883
884 #ifdef unix
885         free (latchmgr);
886 #else
887         VirtualFree (latchmgr, 0, MEM_RELEASE);
888 #endif
889
890 #ifdef unix
891         bt->mem = malloc (3 * bt->page_size);
892 #else
893         bt->mem = VirtualAlloc(NULL, 3 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
894 #endif
895         bt->frame = (BtPage)bt->mem;
896         bt->cursor = (BtPage)(bt->mem + bt->page_size);
897         bt->zero = (BtPage)(bt->mem + 2 * bt->page_size);
898
899         memset (bt->zero, 0, bt->page_size);
900         return bt;
901 }
902
903 // place write, read, or parent lock on requested page_no.
904
905 void bt_lockpage(BtLock mode, BtLatchSet *latch)
906 {
907         switch( mode ) {
908         case BtLockRead:
909                 bt_spinreadlock (latch->readwr);
910                 break;
911         case BtLockWrite:
912                 bt_spinwritelock (latch->readwr);
913                 break;
914         case BtLockAccess:
915                 bt_spinreadlock (latch->access);
916                 break;
917         case BtLockDelete:
918                 bt_spinwritelock (latch->access);
919                 break;
920         case BtLockParent:
921                 bt_spinwritelock (latch->parent);
922                 break;
923         }
924 }
925
926 // remove write, read, or parent lock on requested page
927
928 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
929 {
930         switch( mode ) {
931         case BtLockRead:
932                 bt_spinreleaseread (latch->readwr);
933                 break;
934         case BtLockWrite:
935                 bt_spinreleasewrite (latch->readwr);
936                 break;
937         case BtLockAccess:
938                 bt_spinreleaseread (latch->access);
939                 break;
940         case BtLockDelete:
941                 bt_spinreleasewrite (latch->access);
942                 break;
943         case BtLockParent:
944                 bt_spinreleasewrite (latch->parent);
945                 break;
946         }
947 }
948
949 //      allocate a new page and write page into it
950
951 uid bt_newpage(BtDb *bt, BtPage page)
952 {
953 BtLatchSet *latch;
954 uid new_page;
955 BtPage temp;
956 off64_t off;
957 uint amt[1];
958 int reuse;
959
960         //      lock allocation page
961
962         bt_spinwritelock(bt->latchmgr->lock);
963
964         // use empty chain first
965         // else allocate empty page
966
967         if( new_page = bt_getid(bt->latchmgr->alloc[1].right) ) {
968           latch = bt_pinlatch (bt, new_page);
969           temp = bt_mappage (bt, latch);
970
971           bt_putid(bt->latchmgr->alloc[1].right, bt_getid(temp->right));
972           bt_spinreleasewrite(bt->latchmgr->lock);
973           memcpy (temp, page, bt->page_size);
974
975           if( bt_update (bt, temp, latch) )
976                 return 0;
977
978           bt_unpinlatch (latch);
979         } else {
980           new_page = bt_getid(bt->latchmgr->alloc->right);
981           bt_putid(bt->latchmgr->alloc->right, new_page+1);
982           bt_spinreleasewrite(bt->latchmgr->lock);
983           off = new_page << bt->page_bits;
984 #ifdef unix
985           if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
986                 return bt->err = BTERR_wrt, 0;
987 #else
988           SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
989
990           if( !WriteFile(bt->idx, page, bt->page_size, amt, NULL) )
991                 return bt->err = BTERR_wrt, 0;
992
993           if( *amt <  bt->page_size )
994                 return bt->err = BTERR_wrt, 0;
995 #endif
996         }
997
998         return new_page;
999 }
1000
1001 //  compare two keys, returning > 0, = 0, or < 0
1002 //  as the comparison value
1003
1004 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1005 {
1006 uint len1 = key1->len;
1007 int ans;
1008
1009         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1010                 return ans;
1011
1012         if( len1 > len2 )
1013                 return 1;
1014         if( len1 < len2 )
1015                 return -1;
1016
1017         return 0;
1018 }
1019
1020 //  Update current page of btree by
1021 //      flushing mapped area to disk backing of cache pool.
1022
1023 BTERR bt_update (BtDb *bt, BtPage page, BtLatchSet *latch)
1024 {
1025 #ifdef unix
1026         msync (page, bt->page_size, MS_ASYNC);
1027 #else
1028         FlushViewOfFile (page, bt->page_size);
1029 #endif
1030         latch->dirty = 1;
1031         return 0;
1032 }
1033
1034 //  map the btree cached page onto current page
1035
1036 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
1037 {
1038         return (BtPage)((latch - bt->latchsets) * bt->page_size + bt->latchpool);
1039 }
1040
1041 //      deallocate a deleted page 
1042 //      place on free chain out of allocator page
1043 //      call with page latched for Writing and Deleting
1044
1045 BTERR bt_freepage(BtDb *bt, uid page_no, BtLatchSet *latch)
1046 {
1047 BtPage page = bt_mappage (bt, latch);
1048
1049         //      lock allocation page
1050
1051         bt_spinwritelock (bt->latchmgr->lock);
1052
1053         //      store chain in second right
1054         bt_putid(page->right, bt_getid(bt->latchmgr->alloc[1].right));
1055         bt_putid(bt->latchmgr->alloc[1].right, page_no);
1056         page->free = 1;
1057
1058         if( bt_update(bt, page, latch) )
1059                 return bt->err;
1060
1061         // unlock released page
1062
1063         bt_unlockpage (BtLockDelete, latch);
1064         bt_unlockpage (BtLockWrite, latch);
1065         bt_unpinlatch (latch);
1066
1067         // unlock allocation page
1068
1069         bt_spinreleasewrite (bt->latchmgr->lock);
1070         return 0;
1071 }
1072
1073 //  find slot in page for given key at a given level
1074
1075 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1076 {
1077 uint diff, higher = bt->page->cnt, low = 1, slot;
1078 uint good = 0;
1079
1080         //      make stopper key an infinite fence value
1081
1082         if( bt_getid (bt->page->right) )
1083                 higher++;
1084         else
1085                 good++;
1086
1087         //      low is the lowest candidate, higher is already
1088         //      tested as .ge. the given key, loop ends when they meet
1089
1090         while( diff = higher - low ) {
1091                 slot = low + ( diff >> 1 );
1092                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1093                         low = slot + 1;
1094                 else
1095                         higher = slot, good++;
1096         }
1097
1098         //      return zero if key is on right link page
1099
1100         return good ? higher : 0;
1101 }
1102
1103 //  find and load page at given level for given key
1104 //      leave page rd or wr locked as requested
1105
1106 int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
1107 {
1108 uid page_no = ROOT_page, prevpage = 0;
1109 uint drill = 0xff, slot;
1110 BtLatchSet *prevlatch;
1111 uint mode, prevmode;
1112
1113   //  start at root of btree and drill down
1114
1115   do {
1116         // determine lock mode of drill level
1117         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1118
1119         bt->latch = bt_pinlatch(bt, page_no);
1120         bt->page_no = page_no;
1121
1122         // obtain access lock using lock chaining
1123
1124         if( page_no > ROOT_page )
1125                 bt_lockpage(BtLockAccess, bt->latch);
1126
1127         if( prevpage ) {
1128                 bt_unlockpage(prevmode, prevlatch);
1129                 bt_unpinlatch(prevlatch);
1130                 prevpage = 0;
1131         }
1132
1133         // obtain read lock using lock chaining
1134
1135         bt_lockpage(mode, bt->latch);
1136
1137         if( page_no > ROOT_page )
1138                 bt_unlockpage(BtLockAccess, bt->latch);
1139
1140         //      map/obtain page contents
1141
1142         bt->page = bt_mappage (bt, bt->latch);
1143
1144         // re-read and re-lock root after determining actual level of root
1145
1146         if( bt->page->lvl != drill) {
1147                 if( bt->page_no != ROOT_page )
1148                         return bt->err = BTERR_struct, 0;
1149                         
1150                 drill = bt->page->lvl;
1151
1152                 if( lock != BtLockRead && drill == lvl ) {
1153                         bt_unlockpage(mode, bt->latch);
1154                         bt_unpinlatch(bt->latch);
1155                         continue;
1156                 }
1157         }
1158
1159         prevpage = bt->page_no;
1160         prevlatch = bt->latch;
1161         prevmode = mode;
1162
1163         //  find key on page at this level
1164         //  and descend to requested level
1165
1166         if( !bt->page->kill )
1167          if( slot = bt_findslot (bt, key, len) ) {
1168           if( drill == lvl )
1169                 return slot;
1170
1171           while( slotptr(bt->page, slot)->dead )
1172                 if( slot++ < bt->page->cnt )
1173                         continue;
1174                 else
1175                         goto slideright;
1176
1177           page_no = bt_getid(slotptr(bt->page, slot)->id);
1178           drill--;
1179           continue;
1180          }
1181
1182         //  or slide right into next page
1183
1184 slideright:
1185         page_no = bt_getid(bt->page->right);
1186
1187   } while( page_no );
1188
1189   // return error on end of right chain
1190
1191   bt->err = BTERR_eof;
1192   return 0;     // return error
1193 }
1194
1195 //      a fence key was deleted from a page
1196 //      push new fence value upwards
1197
1198 BTERR bt_fixfence (BtDb *bt, uid page_no, uint lvl)
1199 {
1200 unsigned char leftkey[256], rightkey[256];
1201 BtLatchSet *latch = bt->latch;
1202 BtKey ptr;
1203
1204         // remove deleted key, the old fence value
1205
1206         ptr = keyptr(bt->page, bt->page->cnt);
1207         memcpy(rightkey, ptr, ptr->len + 1);
1208
1209         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1210         bt->page->dirty = 1;
1211
1212         ptr = keyptr(bt->page, bt->page->cnt);
1213         memcpy(leftkey, ptr, ptr->len + 1);
1214
1215         if( bt_update (bt, bt->page, latch) )
1216                 return bt->err;
1217
1218         bt_lockpage (BtLockParent, latch);
1219         bt_unlockpage (BtLockWrite, latch);
1220
1221         //  insert new (now smaller) fence key
1222
1223         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl + 1, page_no, time(NULL)) )
1224                 return bt->err;
1225
1226         //  remove old (larger) fence key
1227
1228         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl + 1) )
1229                 return bt->err;
1230
1231         bt_unlockpage (BtLockParent, latch);
1232         bt_unpinlatch (latch);
1233         return 0;
1234 }
1235
1236 //      root has a single child
1237 //      collapse a level from the btree
1238 //      call with root locked in bt->page
1239
1240 BTERR bt_collapseroot (BtDb *bt, BtPage root)
1241 {
1242 BtLatchSet *latch;
1243 BtPage temp;
1244 uid child;
1245 uint idx;
1246
1247         // find the child entry
1248         //      and promote to new root
1249
1250   do {
1251         for( idx = 0; idx++ < root->cnt; )
1252           if( !slotptr(root, idx)->dead )
1253                 break;
1254
1255         child = bt_getid (slotptr(root, idx)->id);
1256         latch = bt_pinlatch (bt, child);
1257
1258         bt_lockpage (BtLockDelete, latch);
1259         bt_lockpage (BtLockWrite, latch);
1260
1261         temp = bt_mappage (bt, latch);
1262         memcpy (root, temp, bt->page_size);
1263
1264         if( bt_update (bt, root, bt->latch) )
1265                 return bt->err;
1266
1267         if( bt_freepage (bt, child, latch) )
1268                 return bt->err;
1269
1270   } while( root->lvl > 1 && root->act == 1 );
1271
1272   bt_unlockpage (BtLockWrite, bt->latch);
1273   bt_unpinlatch (bt->latch);
1274   return 0;
1275 }
1276
1277 //  find and delete key on page by marking delete flag bit
1278 //  when page becomes empty, delete it
1279
1280 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1281 {
1282 unsigned char lowerkey[256], higherkey[256];
1283 uint slot, dirty = 0, idx, fence, found;
1284 BtLatchSet *latch, *rlatch;
1285 uid page_no, right;
1286 BtPage temp;
1287 BtKey ptr;
1288
1289         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1290                 ptr = keyptr(bt->page, slot);
1291         else
1292                 return bt->err;
1293
1294         // are we deleting a fence slot?
1295
1296         fence = slot == bt->page->cnt;
1297
1298         // if key is found delete it, otherwise ignore request
1299
1300         if( found = !keycmp (ptr, key, len) )
1301           if( found = slotptr(bt->page, slot)->dead == 0 ) {
1302                 dirty = slotptr(bt->page,slot)->dead = 1;
1303                 bt->page->dirty = 1;
1304                 bt->page->act--;
1305
1306                 // collapse empty slots
1307
1308                 while( idx = bt->page->cnt - 1 )
1309                   if( slotptr(bt->page, idx)->dead ) {
1310                         *slotptr(bt->page, idx) = *slotptr(bt->page, idx + 1);
1311                         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1312                   } else
1313                         break;
1314           }
1315
1316         right = bt_getid(bt->page->right);
1317         page_no = bt->page_no;
1318         latch = bt->latch;
1319
1320         if( !dirty ) {
1321           if( lvl )
1322                 return bt_abort (bt, bt->page, page_no, BTERR_notfound);
1323           bt_unlockpage(BtLockWrite, latch);
1324           bt_unpinlatch (latch);
1325           return bt->found = found, 0;
1326         }
1327
1328         //      did we delete a fence key in an upper level?
1329
1330         if( lvl && bt->page->act && fence )
1331           if( bt_fixfence (bt, page_no, lvl) )
1332                 return bt->err;
1333           else
1334                 return bt->found = found, 0;
1335
1336         //      is this a collapsed root?
1337
1338         if( lvl > 1 && page_no == ROOT_page && bt->page->act == 1 )
1339           if( bt_collapseroot (bt, bt->page) )
1340                 return bt->err;
1341           else
1342                 return bt->found = found, 0;
1343
1344         // return if page is not empty
1345
1346         if( bt->page->act ) {
1347           if( bt_update(bt, bt->page, latch) )
1348                 return bt->err;
1349           bt_unlockpage(BtLockWrite, latch);
1350           bt_unpinlatch (latch);
1351           return bt->found = found, 0;
1352         }
1353
1354         // cache copy of fence key
1355         //      in order to find parent
1356
1357         ptr = keyptr(bt->page, bt->page->cnt);
1358         memcpy(lowerkey, ptr, ptr->len + 1);
1359
1360         // obtain lock on right page
1361
1362         rlatch = bt_pinlatch (bt, right);
1363         bt_lockpage(BtLockWrite, rlatch);
1364
1365         temp = bt_mappage (bt, rlatch);
1366
1367         if( temp->kill ) {
1368                 bt_abort(bt, temp, right, 0);
1369                 return bt_abort(bt, bt->page, bt->page_no, BTERR_kill);
1370         }
1371
1372         // pull contents of next page into current empty page 
1373
1374         memcpy (bt->page, temp, bt->page_size);
1375
1376         //      cache copy of key to update
1377
1378         ptr = keyptr(temp, temp->cnt);
1379         memcpy(higherkey, ptr, ptr->len + 1);
1380
1381         //  Mark right page as deleted and point it to left page
1382         //      until we can post updates at higher level.
1383
1384         bt_putid(temp->right, page_no);
1385         temp->kill = 1;
1386
1387         if( bt_update(bt, bt->page, latch) )
1388                 return bt->err;
1389
1390         if( bt_update(bt, temp, rlatch) )
1391                 return bt->err;
1392
1393         bt_lockpage(BtLockParent, latch);
1394         bt_unlockpage(BtLockWrite, latch);
1395
1396         bt_lockpage(BtLockParent, rlatch);
1397         bt_unlockpage(BtLockWrite, rlatch);
1398
1399         //  redirect higher key directly to consolidated node
1400
1401         if( bt_insertkey (bt, higherkey+1, *higherkey, lvl+1, page_no, time(NULL)) )
1402                 return bt->err;
1403
1404         //  delete old lower key to consolidated node
1405
1406         if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) )
1407                 return bt->err;
1408
1409         //  obtain write & delete lock on deleted node
1410         //      add right block to free chain
1411
1412         bt_lockpage(BtLockDelete, rlatch);
1413         bt_lockpage(BtLockWrite, rlatch);
1414         bt_unlockpage(BtLockParent, rlatch);
1415
1416         if( bt_freepage (bt, right, rlatch) )
1417                 return bt->err;
1418
1419         bt_unlockpage(BtLockParent, latch);
1420         bt_unpinlatch(latch);
1421         return 0;
1422 }
1423
1424 //      find key in leaf level and return row-id
1425
1426 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1427 {
1428 uint  slot;
1429 BtKey ptr;
1430 uid id;
1431
1432         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1433                 ptr = keyptr(bt->page, slot);
1434         else
1435                 return 0;
1436
1437         // if key exists, return row-id
1438         //      otherwise return 0
1439
1440         if( ptr->len == len && !memcmp (ptr->key, key, len) )
1441                 id = bt_getid(slotptr(bt->page,slot)->id);
1442         else
1443                 id = 0;
1444
1445         bt_unlockpage (BtLockRead, bt->latch);
1446         bt_unpinlatch (bt->latch);
1447         return id;
1448 }
1449
1450 //      check page for space available,
1451 //      clean if necessary and return
1452 //      0 - page needs splitting
1453 //      >0 - go ahead with new slot
1454  
1455 uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
1456 {
1457 uint nxt = bt->page_size;
1458 BtPage page = bt->page;
1459 uint cnt = 0, idx = 0;
1460 uint max = page->cnt;
1461 uint newslot = slot;
1462 BtKey key;
1463 int ret;
1464
1465         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1466                 return slot;
1467
1468         //      skip cleanup if nothing to reclaim
1469
1470         if( !page->dirty )
1471                 return 0;
1472
1473         memcpy (bt->frame, page, bt->page_size);
1474
1475         // skip page info and set rest of page to zero
1476
1477         memset (page+1, 0, bt->page_size - sizeof(*page));
1478         page->act = 0;
1479
1480         while( cnt++ < max ) {
1481                 if( cnt == slot )
1482                         newslot = idx + 1;
1483                 // always leave fence key in list
1484                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1485                         continue;
1486
1487                 // copy key
1488                 key = keyptr(bt->frame, cnt);
1489                 nxt -= key->len + 1;
1490                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1491
1492                 // copy slot
1493                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1494                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1495                         page->act++;
1496 #ifdef USETOD
1497                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1498 #endif
1499                 slotptr(page, idx)->off = nxt;
1500         }
1501
1502         page->min = nxt;
1503         page->cnt = idx;
1504
1505         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1506                 return newslot;
1507
1508         return 0;
1509 }
1510
1511 // split the root and raise the height of the btree
1512
1513 BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
1514 {
1515 uint nxt = bt->page_size;
1516 BtPage root = bt->page;
1517 uid right;
1518
1519         //  Obtain an empty page to use, and copy the current
1520         //  root contents into it
1521
1522         if( !(right = bt_newpage(bt, root)) )
1523                 return bt->err;
1524
1525         // preserve the page info at the bottom
1526         // and set rest to zero
1527
1528         memset(root+1, 0, bt->page_size - sizeof(*root));
1529
1530         // insert first key on newroot page
1531
1532         nxt -= *leftkey + 1;
1533         memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
1534         bt_putid(slotptr(root, 1)->id, right);
1535         slotptr(root, 1)->off = nxt;
1536         
1537         // insert second key on newroot page
1538         // and increase the root height
1539
1540         nxt -= 3;
1541         ((unsigned char *)root)[nxt] = 2;
1542         ((unsigned char *)root)[nxt+1] = 0xff;
1543         ((unsigned char *)root)[nxt+2] = 0xff;
1544         bt_putid(slotptr(root, 2)->id, page_no2);
1545         slotptr(root, 2)->off = nxt;
1546
1547         bt_putid(root->right, 0);
1548         root->min = nxt;                // reset lowest used offset and key count
1549         root->cnt = 2;
1550         root->act = 2;
1551         root->lvl++;
1552
1553         // update and release root (bt->page)
1554
1555         if( bt_update(bt, root, bt->latch) )
1556                 return bt->err;
1557
1558         bt_unlockpage(BtLockWrite, bt->latch);
1559         bt_unpinlatch(bt->latch);
1560         return 0;
1561 }
1562
1563 //  split already locked full node
1564 //      return unlocked.
1565
1566 BTERR bt_splitpage (BtDb *bt)
1567 {
1568 uint cnt = 0, idx = 0, max, nxt = bt->page_size;
1569 unsigned char fencekey[256], rightkey[256];
1570 uid page_no = bt->page_no, right;
1571 BtLatchSet *latch, *rlatch;
1572 BtPage page = bt->page;
1573 uint lvl = page->lvl;
1574 BtKey key;
1575
1576         latch = bt->latch;
1577
1578         //  split higher half of keys to bt->frame
1579         //      the last key (fence key) might be dead
1580
1581         memset (bt->frame, 0, bt->page_size);
1582         max = page->cnt;
1583         cnt = max / 2;
1584         idx = 0;
1585
1586         while( cnt++ < max ) {
1587                 key = keyptr(page, cnt);
1588                 nxt -= key->len + 1;
1589                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1590                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1591                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
1592                         bt->frame->act++;
1593 #ifdef USETOD
1594                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1595 #endif
1596                 slotptr(bt->frame, idx)->off = nxt;
1597         }
1598
1599         //      remember fence key for new right page
1600
1601         memcpy (rightkey, key, key->len + 1);
1602
1603         bt->frame->bits = bt->page_bits;
1604         bt->frame->min = nxt;
1605         bt->frame->cnt = idx;
1606         bt->frame->lvl = lvl;
1607
1608         // link right node
1609
1610         if( page_no > ROOT_page )
1611                 memcpy (bt->frame->right, page->right, BtId);
1612
1613         //      get new free page and write frame to it.
1614
1615         if( !(right = bt_newpage(bt, bt->frame)) )
1616                 return bt->err;
1617
1618         //      update lower keys to continue in old page
1619
1620         memcpy (bt->frame, page, bt->page_size);
1621         memset (page+1, 0, bt->page_size - sizeof(*page));
1622         nxt = bt->page_size;
1623         page->dirty = 0;
1624         page->act = 0;
1625         cnt = 0;
1626         idx = 0;
1627
1628         //  assemble page of smaller keys
1629         //      (they're all active keys)
1630
1631         while( cnt++ < max / 2 ) {
1632                 key = keyptr(bt->frame, cnt);
1633                 nxt -= key->len + 1;
1634                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1635                 memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1636 #ifdef USETOD
1637                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1638 #endif
1639                 slotptr(page, idx)->off = nxt;
1640                 page->act++;
1641         }
1642
1643         // remember fence key for smaller page
1644
1645         memcpy (fencekey, key, key->len + 1);
1646
1647         bt_putid(page->right, right);
1648         page->min = nxt;
1649         page->cnt = idx;
1650
1651         // if current page is the root page, split it
1652
1653         if( page_no == ROOT_page )
1654                 return bt_splitroot (bt, fencekey, right);
1655
1656         //      lock right page
1657
1658         rlatch = bt_pinlatch (bt, right);
1659         bt_lockpage (BtLockParent, rlatch);
1660
1661         // update left (containing) node
1662
1663         if( bt_update(bt, page, latch) )
1664                 return bt->err;
1665
1666         bt_lockpage (BtLockParent, latch);
1667         bt_unlockpage (BtLockWrite, latch);
1668
1669         // insert new fence for reformulated left block
1670
1671         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, page_no, time(NULL)) )
1672                 return bt->err;
1673
1674         //      switch fence for right block of larger keys to new right page
1675
1676         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, right, time(NULL)) )
1677                 return bt->err;
1678
1679         bt_unlockpage (BtLockParent, latch);
1680         bt_unlockpage (BtLockParent, rlatch);
1681
1682         bt_unpinlatch (rlatch);
1683         bt_unpinlatch (latch);
1684         return 0;
1685 }
1686
1687 //  Insert new key into the btree at requested level.
1688 //  Pages are unlocked at exit.
1689
1690 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
1691 {
1692 uint slot, idx;
1693 BtPage page;
1694 BtKey ptr;
1695
1696   while( 1 ) {
1697         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1698                 ptr = keyptr(bt->page, slot);
1699         else
1700         {
1701                 if( !bt->err )
1702                         bt->err = BTERR_ovflw;
1703                 return bt->err;
1704         }
1705
1706         // if key already exists, update id and return
1707
1708         page = bt->page;
1709
1710         if( !keycmp (ptr, key, len) ) {
1711           if( slotptr(page, slot)->dead )
1712                 page->act++;
1713           slotptr(page, slot)->dead = 0;
1714 #ifdef USETOD
1715           slotptr(page, slot)->tod = tod;
1716 #endif
1717           bt_putid(slotptr(page,slot)->id, id);
1718           if( bt_update(bt, bt->page, bt->latch) )
1719                 return bt->err;
1720           bt_unlockpage(BtLockWrite, bt->latch);
1721           bt_unpinlatch (bt->latch);
1722           return 0;
1723         }
1724
1725         // check if page has enough space
1726
1727         if( slot = bt_cleanpage (bt, len, slot) )
1728                 break;
1729
1730         if( bt_splitpage (bt) )
1731                 return bt->err;
1732   }
1733
1734   // calculate next available slot and copy key into page
1735
1736   page->min -= len + 1; // reset lowest used offset
1737   ((unsigned char *)page)[page->min] = len;
1738   memcpy ((unsigned char *)page + page->min +1, key, len );
1739
1740   for( idx = slot; idx < page->cnt; idx++ )
1741         if( slotptr(page, idx)->dead )
1742                 break;
1743
1744   // now insert key into array before slot
1745   // preserving the fence slot
1746
1747   if( idx == page->cnt )
1748         idx++, page->cnt++;
1749
1750   page->act++;
1751
1752   while( idx > slot )
1753         *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
1754
1755   bt_putid(slotptr(page,slot)->id, id);
1756   slotptr(page, slot)->off = page->min;
1757 #ifdef USETOD
1758   slotptr(page, slot)->tod = tod;
1759 #endif
1760   slotptr(page, slot)->dead = 0;
1761
1762   if( bt_update(bt, bt->page, bt->latch) )
1763           return bt->err;
1764
1765   bt_unlockpage(BtLockWrite, bt->latch);
1766   bt_unpinlatch(bt->latch);
1767   return 0;
1768 }
1769
1770 //  cache page of keys into cursor and return starting slot for given key
1771
1772 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
1773 {
1774 uint slot;
1775
1776         // cache page for retrieval
1777
1778         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1779           memcpy (bt->cursor, bt->page, bt->page_size);
1780         else
1781           return 0;
1782
1783         bt_unlockpage(BtLockRead, bt->latch);
1784         bt->cursor_page = bt->page_no;
1785         bt_unpinlatch (bt->latch);
1786         return slot;
1787 }
1788
1789 //  return next slot for cursor page
1790 //  or slide cursor right into next page
1791
1792 uint bt_nextkey (BtDb *bt, uint slot)
1793 {
1794 BtLatchSet *latch;
1795 off64_t right;
1796
1797   do {
1798         right = bt_getid(bt->cursor->right);
1799
1800         while( slot++ < bt->cursor->cnt )
1801           if( slotptr(bt->cursor,slot)->dead )
1802                 continue;
1803           else if( right || (slot < bt->cursor->cnt))
1804                 return slot;
1805           else
1806                 break;
1807
1808         if( !right )
1809                 break;
1810
1811         bt->cursor_page = right;
1812         latch = bt_pinlatch (bt, right);
1813     bt_lockpage(BtLockRead, latch);
1814
1815         bt->page = bt_mappage (bt, latch);
1816         memcpy (bt->cursor, bt->page, bt->page_size);
1817         bt_unlockpage(BtLockRead, latch);
1818         bt_unpinlatch (latch);
1819         slot = 0;
1820   } while( 1 );
1821
1822   return bt->err = 0;
1823 }
1824
1825 BtKey bt_key(BtDb *bt, uint slot)
1826 {
1827         return keyptr(bt->cursor, slot);
1828 }
1829
1830 uid bt_uid(BtDb *bt, uint slot)
1831 {
1832         return bt_getid(slotptr(bt->cursor,slot)->id);
1833 }
1834
1835 #ifdef USETOD
1836 uint bt_tod(BtDb *bt, uint slot)
1837 {
1838         return slotptr(bt->cursor,slot)->tod;
1839 }
1840 #endif
1841
1842 #ifdef STANDALONE
1843
1844 uint bt_audit (BtDb *bt)
1845 {
1846 uint idx, hashidx;
1847 uid next, page_no;
1848 BtLatchSet *latch;
1849 uint cnt = 0;
1850 BtPage page;
1851 off64_t off;
1852 uint amt[1];
1853 BtKey ptr;
1854
1855 #ifdef unix
1856         if( *(ushort *)(bt->latchmgr->lock) )
1857                 fprintf(stderr, "Alloc page locked\n");
1858         *(ushort *)(bt->latchmgr->lock) = 0;
1859
1860         for( idx = 1; idx <= bt->latchmgr->latchdeployed; idx++ ) {
1861                 latch = bt->latchsets + idx;
1862                 if( *(ushort *)latch->readwr )
1863                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
1864                 *(ushort *)latch->readwr = 0;
1865
1866                 if( *(ushort *)latch->access )
1867                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
1868                 *(ushort *)latch->access = 0;
1869
1870                 if( *(ushort *)latch->parent )
1871                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
1872                 *(ushort *)latch->parent = 0;
1873
1874                 if( latch->pin ) {
1875                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
1876                         latch->pin = 0;
1877                 }
1878                 page = (BtPage)(idx * bt->page_size + bt->latchpool);
1879                 off = latch->page_no << bt->page_bits;
1880 #ifdef unix
1881             if( latch->dirty )
1882                  if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
1883                         fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
1884 #else
1885             if( latch->dirty ) {
1886                   SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
1887
1888                   if( !WriteFile(bt->idx, page, bt->page_size, amt, NULL) )
1889                         fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
1890
1891                   if( *amt <  bt->page_size )
1892                         fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
1893             }
1894 #endif
1895             latch->dirty = 0;
1896         }
1897
1898         for( hashidx = 0; hashidx < bt->latchmgr->latchhash; hashidx++ ) {
1899           if( *(ushort *)(bt->table[hashidx].latch) )
1900                         fprintf(stderr, "hash entry %d locked\n", hashidx);
1901
1902           *(ushort *)(bt->table[hashidx].latch) = 0;
1903
1904           if( idx = bt->table[hashidx].slot ) do {
1905                 latch = bt->latchsets + idx;
1906                 if( *(ushort *)latch->busy )
1907                         fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
1908                 *(ushort *)latch->busy = 0;
1909                 if( latch->hash != hashidx )
1910                         fprintf(stderr, "latchset %d wrong hashidx\n", idx);
1911                 if( latch->pin )
1912                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
1913           } while( idx = latch->next );
1914         }
1915
1916         next = bt->latchmgr->nlatchpage + LATCH_page;
1917         page_no = LEAF_page;
1918
1919         while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
1920                 pread (bt->idx, bt->frame, bt->page_size, page_no << bt->page_bits);
1921                 if( !bt->frame->free ) {
1922                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
1923                   ptr = keyptr(bt->frame, idx+1);
1924                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
1925                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
1926                  }
1927                  if( !bt->frame->lvl )
1928                         cnt += bt->frame->act;
1929                 }
1930
1931                 if( page_no > LEAF_page )
1932                         next = page_no + 1;
1933                 page_no = next;
1934         }
1935         return cnt - 1;
1936 #else
1937         return 0;
1938 #endif
1939 }
1940
1941 #ifndef unix
1942 double getCpuTime(int type)
1943 {
1944 FILETIME crtime[1];
1945 FILETIME xittime[1];
1946 FILETIME systime[1];
1947 FILETIME usrtime[1];
1948 SYSTEMTIME timeconv[1];
1949 double ans = 0;
1950
1951         memset (timeconv, 0, sizeof(SYSTEMTIME));
1952
1953         switch( type ) {
1954         case 0:
1955                 GetSystemTimeAsFileTime (xittime);
1956                 FileTimeToSystemTime (xittime, timeconv);
1957                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
1958                 break;
1959         case 1:
1960                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
1961                 FileTimeToSystemTime (usrtime, timeconv);
1962                 break;
1963         case 2:
1964                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
1965                 FileTimeToSystemTime (systime, timeconv);
1966                 break;
1967         }
1968
1969         ans += (double)timeconv->wHour * 3600;
1970         ans += (double)timeconv->wMinute * 60;
1971         ans += (double)timeconv->wSecond;
1972         ans += (double)timeconv->wMilliseconds / 1000;
1973         return ans;
1974 }
1975 #else
1976 #include <time.h>
1977 #include <sys/resource.h>
1978
1979 double getCpuTime(int type)
1980 {
1981 struct rusage used[1];
1982 struct timeval tv[1];
1983
1984         switch( type ) {
1985         case 0:
1986                 gettimeofday(tv, NULL);
1987                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
1988
1989         case 1:
1990                 getrusage(RUSAGE_SELF, used);
1991                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
1992
1993         case 2:
1994                 getrusage(RUSAGE_SELF, used);
1995                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
1996         }
1997
1998         return 0;
1999 }
2000 #endif
2001
2002 //  standalone program to index file of keys
2003 //  then list them onto std-out
2004
2005 int main (int argc, char **argv)
2006 {
2007 uint slot, line = 0, off = 0, found = 0;
2008 int ch, cnt = 0, bits = 12, idx;
2009 unsigned char key[256];
2010 double done, start;
2011 uid next, page_no;
2012 float elapsed;
2013 time_t tod[1];
2014 uint scan = 0;
2015 uint len = 0;
2016 uint map = 0;
2017 BtKey ptr;
2018 BtDb *bt;
2019 FILE *in;
2020
2021         if( argc < 4 ) {
2022                 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find/Count [page_bits mapped_pool_pages start_line_number]\n", argv[0]);
2023                 fprintf (stderr, "  page_bits: size of btree page in bits\n");
2024                 fprintf (stderr, "  mapped_pool_pages: number of pages in buffer pool\n");
2025                 exit(0);
2026         }
2027
2028         start = getCpuTime(0);
2029         time(tod);
2030
2031         if( argc > 4 )
2032                 bits = atoi(argv[4]);
2033
2034         if( argc > 5 )
2035                 map = atoi(argv[5]);
2036
2037         if( argc > 6 )
2038                 off = atoi(argv[6]);
2039
2040         bt = bt_open ((argv[1]), BT_rw, bits, map);
2041
2042         if( !bt ) {
2043                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2044                 exit (1);
2045         }
2046
2047         switch(argv[3][0]| 0x20)
2048         {
2049         case 'a':
2050                 fprintf(stderr, "started audit for %s\n", argv[2]);
2051                 cnt = bt_audit (bt);
2052                 fprintf(stderr, "finished audit for %s, %d keys\n", argv[2], cnt);
2053                 break;
2054
2055         case 'w':
2056                 fprintf(stderr, "started indexing for %s\n", argv[2]);
2057                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2058                   while( ch = getc(in), ch != EOF )
2059                         if( ch == '\n' )
2060                         {
2061                           if( off )
2062                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2063
2064                           if( bt_insertkey (bt, key, len, 0, ++line, *tod) )
2065                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2066                           len = 0;
2067                         }
2068                         else if( len < 245 )
2069                                 key[len++] = ch;
2070                 fprintf(stderr, "finished adding keys for %s, %d \n", argv[2], line);
2071                 break;
2072
2073         case 'd':
2074                 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2075                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2076                   while( ch = getc(in), ch != EOF )
2077                         if( ch == '\n' )
2078                         {
2079                           if( off )
2080                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2081                           line++;
2082                           if( bt_deletekey (bt, key, len, 0) )
2083                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2084                           len = 0;
2085                         }
2086                         else if( len < 245 )
2087                                 key[len++] = ch;
2088                 fprintf(stderr, "finished deleting keys for %s, %d \n", argv[2], line);
2089                 break;
2090
2091         case 'f':
2092                 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2093                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2094                   while( ch = getc(in), ch != EOF )
2095                         if( ch == '\n' )
2096                         {
2097                           if( off )
2098                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2099                           line++;
2100                           if( bt_findkey (bt, key, len) )
2101                                 found++;
2102                           else if( bt->err )
2103                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2104                           len = 0;
2105                         }
2106                         else if( len < 245 )
2107                                 key[len++] = ch;
2108                 fprintf(stderr, "finished search of %d keys for %s, found %d\n", line, argv[2], found);
2109                 break;
2110
2111         case 's':
2112                 scan++;
2113
2114         case 'c':
2115           fprintf(stderr, "started counting\n");
2116           cnt = 0;
2117
2118           next = bt->latchmgr->nlatchpage + LATCH_page;
2119           page_no = LEAF_page;
2120
2121           while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
2122           uid off = page_no << bt->page_bits;
2123 #ifdef unix
2124                 pread (bt->idx, bt->frame, bt->page_size, off);
2125 #else
2126                 DWORD amt[1];
2127
2128                 SetFilePointer (bt->idx, (long)off, NULL, FILE_BEGIN);
2129
2130                 if( !ReadFile(bt->idx, bt->frame, bt->page_size, amt, NULL))
2131                         fprintf (stderr, "unable to read page %.8x", page_no);
2132
2133                 if( *amt <  bt->page_size )
2134                         fprintf (stderr, "unable to read page %.8x", page_no);
2135 #endif
2136                 if( !bt->frame->free && !bt->frame->lvl )
2137                         cnt += bt->frame->act;
2138                 if( page_no > LEAF_page )
2139                         next = page_no + 1;
2140                 page_no = next;
2141           }
2142                 
2143           cnt--;        // remove stopper key
2144           fprintf(stderr, " Total keys read %d\n", cnt);
2145           break;
2146         }
2147
2148         done = getCpuTime(0);
2149         elapsed = (float)(done - start);
2150         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2151         elapsed = getCpuTime(1);
2152         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2153         elapsed = getCpuTime(2);
2154         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2155         return 0;
2156 }
2157
2158 #endif  //STANDALONE