]> pd.if.org Git - btree/blob - btree2u.c
Separate scanning from counting. Utilize file locking during btree creation.
[btree] / btree2u.c
1 // btree version 2u
2 //      with combined latch & pool manager
3 // 26 FEB 2014
4
5 // author: karl malbrain, malbrain@cal.berkeley.edu
6
7 /*
8 This work, including the source code, documentation
9 and related data, is placed into the public domain.
10
11 The orginal author is Karl Malbrain.
12
13 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
14 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
15 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
16 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
17 RESULTING FROM THE USE, MODIFICATION, OR
18 REDISTRIBUTION OF THIS SOFTWARE.
19 */
20
21 // Please see the project home page for documentation
22 // code.google.com/p/high-concurrency-btree
23
24 #define _FILE_OFFSET_BITS 64
25 #define _LARGEFILE64_SOURCE
26
27 #ifdef linux
28 #define _GNU_SOURCE
29 #endif
30
31 #ifdef unix
32 #include <unistd.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <time.h>
36 #include <fcntl.h>
37 #include <sys/mman.h>
38 #include <errno.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #endif
47
48 #include <memory.h>
49 #include <string.h>
50
51 typedef unsigned long long      uid;
52
53 #ifndef unix
54 typedef unsigned long long      off64_t;
55 typedef unsigned short          ushort;
56 typedef unsigned int            uint;
57 #endif
58
59 #define BT_ro 0x6f72    // ro
60 #define BT_rw 0x7772    // rw
61 #define BT_fl 0x6c66    // fl
62
63 #define BT_maxbits              15                                      // maximum page size in bits
64 #define BT_minbits              12                                      // minimum page size in bits
65 #define BT_minpage              (1 << BT_minbits)       // minimum page size
66 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
67
68 /*
69 There are five lock types for each node in three independent sets: 
70 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
71 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
72 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
73 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
74 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
75 */
76
77 typedef enum{
78         BtLockAccess,
79         BtLockDelete,
80         BtLockRead,
81         BtLockWrite,
82         BtLockParent
83 }BtLock;
84
85 //      definition for latch implementation
86
87 // exclusive is set for write access
88 // share is count of read accessors
89 // grant write lock when share == 0
90
91 volatile typedef struct {
92         unsigned char mutex[1];
93         unsigned char exclusive:1;
94         unsigned char pending:1;
95         ushort share;
96 } BtSpinLatch;
97
98 //      Define the length of the page and key pointers
99
100 #define BtId 6
101
102 //      Page key slot definition.
103
104 //      If BT_maxbits is 15 or less, you can save 2 bytes
105 //      for each key stored by making the first two uints
106 //      into ushorts.  You can also save 4 bytes by removing
107 //      the tod field from the key.
108
109 //      Keys are marked dead, but remain on the page until
110 //      cleanup is called. The fence key (highest key) for
111 //      the page is always present, even if dead.
112
113 typedef struct {
114 #ifdef USETOD
115         uint tod;                                       // time-stamp for key
116 #endif
117         ushort off:BT_maxbits;          // page offset for key start
118         ushort dead:1;                          // set for deleted key
119         unsigned char id[BtId];         // id associated with key
120 } BtSlot;
121
122 //      The key structure occupies space at the upper end of
123 //      each page.  It's a length byte followed by the value
124 //      bytes.
125
126 typedef struct {
127         unsigned char len;
128         unsigned char key[0];
129 } *BtKey;
130
131 //      The first part of an index page.
132 //      It is immediately followed
133 //      by the BtSlot array of keys.
134
135 typedef struct BtPage_ {
136         uint cnt;                                       // count of keys in page
137         uint act;                                       // count of active keys
138         uint min;                                       // next key offset
139         unsigned char bits:7;           // page size in bits
140         unsigned char free:1;           // page is on free list
141         unsigned char lvl:6;            // level of page
142         unsigned char kill:1;           // page is being deleted
143         unsigned char dirty:1;          // page is dirty
144         unsigned char right[BtId];      // page number to right
145 } *BtPage;
146
147 typedef struct {
148         struct BtPage_ alloc[2];        // next & free page_nos in right ptr
149         BtSpinLatch lock[1];            // allocation area lite latch
150         uint latchdeployed;                     // highest number of latch entries deployed
151         uint nlatchpage;                        // number of latch pages at BT_latch
152         uint latchtotal;                        // number of page latch entries
153         uint latchhash;                         // number of latch hash table slots
154         uint latchvictim;                       // next latch hash entry to examine
155 } BtLatchMgr;
156
157 //  latch hash table entries
158
159 typedef struct {
160         volatile uint slot;             // Latch table entry at head of collision chain
161         BtSpinLatch latch[1];   // lock for the collision chain
162 } BtHashEntry;
163
164 //      latch manager table structure
165
166 typedef struct {
167         volatile uid page_no;           // latch set page number on disk
168         BtSpinLatch readwr[1];          // read/write page lock
169         BtSpinLatch access[1];          // Access Intent/Page delete
170         BtSpinLatch parent[1];          // Posting of fence key in parent
171         volatile ushort dirty;          // page is dirty in cache
172         volatile uint next;                     // next entry in hash table chain
173         volatile uint prev;                     // prev entry in hash table chain
174         volatile uint pin;                      // number of outstanding pins
175 } BtLatchSet;
176
177 //      The object structure for Btree access
178
179 typedef struct _BtDb {
180         uint page_size;         // each page size       
181         uint page_bits;         // each page size in bits       
182         uid page_no;            // current page number  
183         uid cursor_page;        // current cursor page number   
184         int  err;
185         uint mode;                      // read-write mode
186         BtPage alloc;           // frame buffer for alloc page ( page 0 )
187         BtPage cursor;          // cached frame for start/next (never mapped)
188         BtPage frame;           // spare frame for the page split (never mapped)
189         BtPage zero;            // zeroes frame buffer (never mapped)
190         BtPage page;            // current page
191         BtLatchSet *latch;                      // current page latch
192         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
193         BtLatchSet *latchsets;          // mapped latch set from latch pages
194         unsigned char *latchpool;       // cached page pool set
195         BtHashEntry *table;     // the hash table
196 #ifdef unix
197         int idx;
198 #else
199         HANDLE idx;
200         HANDLE halloc;          // allocation and latch table handle
201 #endif
202         unsigned char *mem;     // frame, cursor, page memory buffer
203         uint found;                     // last deletekey found key
204 } BtDb;
205
206 typedef enum {
207 BTERR_ok = 0,
208 BTERR_notfound,
209 BTERR_struct,
210 BTERR_ovflw,
211 BTERR_read,
212 BTERR_lock,
213 BTERR_hash,
214 BTERR_kill,
215 BTERR_map,
216 BTERR_wrt,
217 BTERR_eof
218 } BTERR;
219
220 // B-Tree functions
221 extern void bt_close (BtDb *bt);
222 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk);
223 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
224 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
225 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
226 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
227 extern uint bt_nextkey   (BtDb *bt, uint slot);
228
229 //      internal functions
230 BTERR bt_update (BtDb *bt, BtPage page, BtLatchSet *latch);
231 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch);
232 //  Helper functions to return slot values
233
234 extern BtKey bt_key (BtDb *bt, uint slot);
235 extern uid bt_uid (BtDb *bt, uint slot);
236 #ifdef USETOD
237 extern uint bt_tod (BtDb *bt, uint slot);
238 #endif
239
240 //  BTree page number constants
241 #define ALLOC_page              0
242 #define ROOT_page               1
243 #define LEAF_page               2
244 #define LATCH_page              3
245
246 //      Number of levels to create in a new BTree
247
248 #define MIN_lvl                 2
249
250 //  The page is allocated from low and hi ends.
251 //  The key offsets and row-id's are allocated
252 //  from the bottom, while the text of the key
253 //  is allocated from the top.  When the two
254 //  areas meet, the page is split into two.
255
256 //  A key consists of a length byte, two bytes of
257 //  index number (0 - 65534), and up to 253 bytes
258 //  of key value.  Duplicate keys are discarded.
259 //  Associated with each key is a 48 bit row-id.
260
261 //  The b-tree root is always located at page 1.
262 //      The first leaf page of level zero is always
263 //      located on page 2.
264
265 //      The b-tree pages are linked with right
266 //      pointers to facilitate enumerators,
267 //      and provide for concurrency.
268
269 //      When to root page fills, it is split in two and
270 //      the tree height is raised by a new root at page
271 //      one with two keys.
272
273 //      Deleted keys are marked with a dead bit until
274 //      page cleanup The fence key for a node is always
275 //      present, even after deletion and cleanup.
276
277 //  Deleted leaf pages are reclaimed  on a free list.
278 //      The upper levels of the btree are fixed on creation.
279
280 //  To achieve maximum concurrency one page is locked at a time
281 //  as the tree is traversed to find leaf key in question. The right
282 //  page numbers are used in cases where the page is being split,
283 //      or consolidated.
284
285 //  Page 0 (ALLOC page) is dedicated to lock for new page extensions,
286 //      and chains empty leaf pages together for reuse.
287
288 //      Parent locks are obtained to prevent resplitting or deleting a node
289 //      before its fence is posted into its upper level.
290
291 //      A special open mode of BT_fl is provided to safely access files on
292 //      WIN32 networks. WIN32 network operations should not use memory mapping.
293 //      This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
294 //      to prevent local caching of network file contents.
295
296 //      Access macros to address slot and key values from the page.
297 //      Page slots use 1 based indexing.
298
299 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
300 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
301
302 void bt_putid(unsigned char *dest, uid id)
303 {
304 int i = BtId;
305
306         while( i-- )
307                 dest[i] = (unsigned char)id, id >>= 8;
308 }
309
310 uid bt_getid(unsigned char *src)
311 {
312 uid id = 0;
313 int i;
314
315         for( i = 0; i < BtId; i++ )
316                 id <<= 8, id |= *src++; 
317
318         return id;
319 }
320
321 BTERR bt_abort (BtDb *bt, BtPage page, uid page_no, BTERR err)
322 {
323 BtKey ptr;
324
325         fprintf(stderr, "\n Btree2 abort, error %d on page %.8x\n", err, page_no);
326         fprintf(stderr, "level=%d kill=%d free=%d cnt=%x act=%x\n", page->lvl, page->kill, page->free, page->cnt, page->act);
327         ptr = keyptr(page, page->cnt);
328         fprintf(stderr, "fence='%.*s'\n", ptr->len, ptr->key);
329         fprintf(stderr, "right=%.8x\n", bt_getid(page->right));
330         return bt->err = err;
331 }
332
333 //      Latch Manager
334
335 //      wait until write lock mode is clear
336 //      and add 1 to the share count
337
338 void bt_spinreadlock(BtSpinLatch *latch)
339 {
340 ushort prev;
341
342   do {
343         //      obtain latch mutex
344 #ifdef unix
345         if( __sync_lock_test_and_set(latch->mutex, 1) )
346                 continue;
347 #else
348         if( _InterlockedExchange8(latch->mutex, 1) )
349                 continue;
350 #endif
351         //  see if exclusive request is granted or pending
352
353         if( prev = !(latch->exclusive | latch->pending) )
354                 latch->share++;
355
356 #ifdef unix
357         *latch->mutex = 0;
358 #else
359         _InterlockedExchange8(latch->mutex, 0);
360 #endif
361
362         if( prev )
363                 return;
364
365 #ifdef  unix
366   } while( sched_yield(), 1 );
367 #else
368   } while( SwitchToThread(), 1 );
369 #endif
370 }
371
372 //      wait for other read and write latches to relinquish
373
374 void bt_spinwritelock(BtSpinLatch *latch)
375 {
376 uint prev;
377
378   do {
379 #ifdef  unix
380         if( __sync_lock_test_and_set(latch->mutex, 1) )
381                 continue;
382 #else
383         if( _InterlockedExchange8(latch->mutex, 1) )
384                 continue;
385 #endif
386         if( prev = !(latch->share | latch->exclusive) )
387                 latch->exclusive = 1, latch->pending = 0;
388         else
389                 latch->pending = 1;
390 #ifdef unix
391         *latch->mutex = 0;
392 #else
393         _InterlockedExchange8(latch->mutex, 0);
394 #endif
395         if( prev )
396                 return;
397 #ifdef  unix
398   } while( sched_yield(), 1 );
399 #else
400   } while( SwitchToThread(), 1 );
401 #endif
402 }
403
404 //      try to obtain write lock
405
406 //      return 1 if obtained,
407 //              0 otherwise
408
409 int bt_spinwritetry(BtSpinLatch *latch)
410 {
411 uint prev;
412
413 #ifdef unix
414         if( __sync_lock_test_and_set(latch->mutex, 1) )
415                 return 0;
416 #else
417         if( _InterlockedExchange8(latch->mutex, 1) )
418                 return 0;
419 #endif
420         //      take write access if all bits are clear
421
422         if( prev = !(latch->exclusive | latch->share) )
423                 latch->exclusive = 1;
424
425 #ifdef unix
426         *latch->mutex = 0;
427 #else
428         _InterlockedExchange8(latch->mutex, 0);
429 #endif
430         return prev;
431 }
432
433 //      clear write mode
434
435 void bt_spinreleasewrite(BtSpinLatch *latch)
436 {
437 #ifdef unix
438         while( __sync_lock_test_and_set(latch->mutex, 1) )
439                 sched_yield();
440 #else
441         while( _InterlockedExchange8(latch->mutex, 1) )
442                 SwitchToThread();
443 #endif
444         latch->exclusive = 0;
445 #ifdef unix
446         *latch->mutex = 0;
447 #else
448         _InterlockedExchange8(latch->mutex, 0);
449 #endif
450 }
451
452 //      decrement reader count
453
454 void bt_spinreleaseread(BtSpinLatch *latch)
455 {
456 #ifdef unix
457         while( __sync_lock_test_and_set(latch->mutex, 1) )
458                 sched_yield();
459 #else
460         while( _InterlockedExchange8(latch->mutex, 1) )
461                 SwitchToThread();
462 #endif
463         latch->share--;
464 #ifdef unix
465         *latch->mutex = 0;
466 #else
467         _InterlockedExchange8(latch->mutex, 0);
468 #endif
469 }
470
471 //      link latch table entry into head of latch hash table
472
473 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no)
474 {
475 BtPage page = (BtPage)(slot * bt->page_size + bt->latchpool);
476 BtLatchSet *latch = bt->latchsets + slot;
477 off64_t off = page_no << bt->page_bits;
478 uint amt[1];
479
480         if( latch->next = bt->table[hashidx].slot )
481                 bt->latchsets[latch->next].prev = slot;
482
483         bt->table[hashidx].slot = slot;
484         latch->page_no = page_no;
485         latch->dirty = 0;
486         latch->prev = 0;
487         latch->pin = 1;
488 #ifdef unix
489         if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) < bt->page_size )
490                 return bt->err = BTERR_read;
491 #else
492         SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
493         if( !ReadFile(bt->idx, page, bt->page_size, amt, NULL))
494                 return bt->err = BTERR_read;
495         if( *amt <  bt->page_size )
496                 return bt->err = BTERR_read;
497 #endif
498         return 0;
499 }
500
501 //      release latch pin
502
503 void bt_unpinlatch (BtLatchSet *latch)
504 {
505 #ifdef unix
506         __sync_fetch_and_add(&latch->pin, -1);
507 #else
508         _InterlockedDecrement (&latch->pin);
509 #endif
510 }
511
512 //      find existing latchset or inspire new one
513 //      return with latchset pinned
514
515 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
516 {
517 uint hashidx = page_no % bt->latchmgr->latchhash;
518 BtLatchSet *latch;
519 uint slot, idx;
520 off64_t off;
521 uint amt[1];
522 BtPage page;
523
524   //  try to find unpinned entry
525
526   bt_spinwritelock(bt->table[hashidx].latch);
527
528   if( slot = bt->table[hashidx].slot ) do
529   {
530         latch = bt->latchsets + slot;
531         if( page_no == latch->page_no )
532                 break;
533   } while( slot = latch->next );
534
535   //  found our entry, bring to front of hash chain
536
537   if( slot ) {
538         latch = bt->latchsets + slot;
539 #ifdef unix
540         __sync_fetch_and_add(&latch->pin, 1);
541 #else
542         _InterlockedIncrement (&latch->pin);
543 #endif
544         //  unlink our entry from its hash chain position
545
546         if( latch->prev )
547                 bt->latchsets[latch->prev].next = latch->next;
548         else
549                 bt->table[hashidx].slot = latch->next;
550
551         if( latch->next )
552                 bt->latchsets[latch->next].prev = latch->prev;
553
554         //  now link into head of the hash chain
555
556         if( latch->next = bt->table[hashidx].slot )
557                 bt->latchsets[latch->next].prev = slot;
558
559         bt->table[hashidx].slot = slot;
560         latch->prev = 0;
561
562         bt_spinreleasewrite(bt->table[hashidx].latch);
563         return latch;
564   }
565
566         //  see if there are any unused pool entries
567 #ifdef unix
568         slot = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1;
569 #else
570         slot = _InterlockedIncrement (&bt->latchmgr->latchdeployed);
571 #endif
572
573         if( slot < bt->latchmgr->latchtotal ) {
574                 latch = bt->latchsets + slot;
575                 if( bt_latchlink (bt, hashidx, slot, page_no) )
576                         return NULL;
577                 bt_spinreleasewrite (bt->table[hashidx].latch);
578                 return latch;
579         }
580
581 #ifdef unix
582         __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1);
583 #else
584         _InterlockedDecrement (&bt->latchmgr->latchdeployed);
585 #endif
586   //  find and reuse previous lru lock entry on victim hash chain
587
588   while( 1 ) {
589 #ifdef unix
590         idx = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
591 #else
592         idx = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1;
593 #endif
594         // try to get write lock on hash chain
595         //      skip entry if not obtained
596         //      or has outstanding locks
597
598         idx %= bt->latchmgr->latchhash;
599
600         if( !bt_spinwritetry (bt->table[idx].latch) )
601                 continue;
602
603         if( slot = bt->table[idx].slot )
604           while( 1 ) {
605                 latch = bt->latchsets + slot;
606                 if( !latch->next )
607                         break;
608                 slot = latch->next;
609           }
610
611         if( !slot || latch->pin ) {
612                 bt_spinreleasewrite (bt->table[idx].latch);
613                 continue;
614         }
615
616         //  update permanent page area in btree
617
618         page = (BtPage)(slot * bt->page_size + bt->latchpool);
619         off = latch->page_no << bt->page_bits;
620 #ifdef unix
621         if( latch->dirty )
622           if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
623                 return bt->err = BTERR_wrt, NULL;
624 #else
625         if( latch->dirty ) {
626           SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
627
628           if( !WriteFile(bt->idx, page, bt->page_size, amt, NULL) )
629                 return bt->err = BTERR_wrt, NULL;
630
631           if( *amt <  bt->page_size )
632                 return bt->err = BTERR_wrt, NULL;
633         }
634 #endif
635         //  unlink our available slot from its hash chain
636
637         if( latch->prev )
638                 bt->latchsets[latch->prev].next = latch->next;
639         else
640                 bt->table[idx].slot = latch->next;
641
642         if( latch->next )
643                 bt->latchsets[latch->next].prev = latch->prev;
644
645         bt_spinreleasewrite (bt->table[idx].latch);
646         if( bt_latchlink (bt, hashidx, slot, page_no) )
647                 return NULL;
648         bt_spinreleasewrite (bt->table[hashidx].latch);
649         return latch;
650   }
651 }
652
653 //      close and release memory
654
655 void bt_close (BtDb *bt)
656 {
657 #ifdef unix
658         munmap (bt->table, bt->latchmgr->nlatchpage * bt->page_size);
659         munmap (bt->latchmgr, bt->page_size);
660 #else
661         FlushViewOfFile(bt->latchmgr, 0);
662         UnmapViewOfFile(bt->latchmgr);
663         CloseHandle(bt->halloc);
664 #endif
665 #ifdef unix
666         if( bt->mem )
667                 free (bt->mem);
668         close (bt->idx);
669         free (bt);
670 #else
671         if( bt->mem)
672                 VirtualFree (bt->mem, 0, MEM_RELEASE);
673         FlushFileBuffers(bt->idx);
674         CloseHandle(bt->idx);
675         GlobalFree (bt);
676 #endif
677 }
678 //  open/create new btree
679
680 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
681 //              size of mapped page pool (e.g. 8192)
682
683 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax)
684 {
685 uint lvl, attr, last, slot, idx;
686 uint nlatchpage, latchhash;
687 BtLatchMgr *latchmgr;
688 off64_t size, off;
689 uint amt[1];
690 BtKey key;
691 BtDb* bt;
692 int flag;
693
694 #ifndef unix
695 OVERLAPPED ovl[1];
696 #else
697 struct flock lock[1];
698 #endif
699
700         // determine sanity of page size and buffer pool
701
702         if( bits > BT_maxbits )
703                 bits = BT_maxbits;
704         else if( bits < BT_minbits )
705                 bits = BT_minbits;
706
707 #ifdef unix
708         bt = calloc (1, sizeof(BtDb));
709
710         bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
711
712         if( bt->idx == -1 )
713                 return free(bt), NULL;
714 #else
715         bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb));
716         attr = FILE_ATTRIBUTE_NORMAL;
717         bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
718
719         if( bt->idx == INVALID_HANDLE_VALUE )
720                 return GlobalFree(bt), NULL;
721 #endif
722 #ifdef unix
723         memset (lock, 0, sizeof(lock));
724         lock->l_len = sizeof(struct BtPage_);
725         lock->l_type = F_WRLCK;
726
727         if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
728                 return bt_close (bt), NULL;
729 #else
730         memset (ovl, 0, sizeof(ovl));
731         len = sizeof(struct BtPage_);
732
733         //      use large offsets to
734         //      simulate advisory locking
735
736         ovl->OffsetHigh |= 0x80000000;
737
738         if( LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, sizeof(struct BtPage_), 0L, ovl) )
739                 return bt_close (bt), NULL;
740 #endif 
741
742 #ifdef unix
743         latchmgr = malloc (BT_maxpage);
744         *amt = 0;
745
746         // read minimum page size to get root info
747
748         if( size = lseek (bt->idx, 0L, 2) ) {
749                 if( pread(bt->idx, latchmgr, BT_minpage, 0) == BT_minpage )
750                         bits = latchmgr->alloc->bits;
751                 else
752                         return free(bt), free(latchmgr), NULL;
753         } else if( mode == BT_ro )
754                 return free(latchmgr), bt_close (bt), NULL;
755 #else
756         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
757         size = GetFileSize(bt->idx, amt);
758
759         if( size || *amt ) {
760                 if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
761                         return bt_close (bt), NULL;
762                 bits = latchmgr->alloc->bits;
763         } else if( mode == BT_ro )
764                 return bt_close (bt), NULL;
765 #endif
766
767         bt->page_size = 1 << bits;
768         bt->page_bits = bits;
769
770         bt->mode = mode;
771
772         if( size || *amt )
773                 goto btlatch;
774
775         // initialize an empty b-tree with latch page, root page, page of leaves
776         // and page(s) of latches and page pool cache
777
778         memset (latchmgr, 0, 1 << bits);
779         latchmgr->alloc->bits = bt->page_bits;
780
781         //  calculate number of latch hash table entries
782
783         nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + bt->page_size - 1) / bt->page_size;
784         latchhash = nlatchpage * bt->page_size / sizeof(BtHashEntry);
785
786         nlatchpage += nodemax;          // size of the buffer pool in pages
787         nlatchpage += (sizeof(BtLatchSet) * nodemax + bt->page_size - 1)/bt->page_size;
788
789         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
790         latchmgr->nlatchpage = nlatchpage;
791         latchmgr->latchtotal = nodemax;
792         latchmgr->latchhash = latchhash;
793 #ifdef unix
794         if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size )
795                 return bt_close (bt), NULL;
796 #else
797         if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
798                 return bt_close (bt), NULL;
799
800         if( *amt < bt->page_size )
801                 return bt_close (bt), NULL;
802 #endif
803         memset (latchmgr, 0, 1 << bits);
804         latchmgr->alloc->bits = bt->page_bits;
805
806         for( lvl=MIN_lvl; lvl--; ) {
807                 slotptr(latchmgr->alloc, 1)->off = bt->page_size - 3;
808                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
809                 key = keyptr(latchmgr->alloc, 1);
810                 key->len = 2;           // create stopper key
811                 key->key[0] = 0xff;
812                 key->key[1] = 0xff;
813                 latchmgr->alloc->min = bt->page_size - 3;
814                 latchmgr->alloc->lvl = lvl;
815                 latchmgr->alloc->cnt = 1;
816                 latchmgr->alloc->act = 1;
817 #ifdef unix
818                 if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size )
819                         return bt_close (bt), NULL;
820 #else
821                 if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
822                         return bt_close (bt), NULL;
823
824                 if( *amt < bt->page_size )
825                         return bt_close (bt), NULL;
826 #endif
827         }
828
829         // clear out latch manager pages
830
831         memset(latchmgr, 0, bt->page_size);
832         last = MIN_lvl + 1;
833
834         while( last < ((MIN_lvl + 1 + nlatchpage) ) ) {
835                 off = (uid)last << bt->page_bits;
836 #ifdef unix
837                 pwrite(bt->idx, latchmgr, bt->page_size, off);
838 #else
839                 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
840                 if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
841                         return bt_close (bt), NULL;
842                 if( *amt < bt->page_size )
843                         return bt_close (bt), NULL;
844 #endif
845                 last++;
846         }
847
848 btlatch:
849 #ifdef unix
850         lock->l_type = F_UNLCK;
851         if( fcntl (bt->idx, F_SETLK, lock) < 0 )
852                 return bt_close (bt), NULL;
853 #else
854         if( !UnlockFileEx (bt->idx, 0, sizeof(struct BtPage_), 0, ovl) )
855                 return bt_close (bt), NULL;
856 #endif
857 #ifdef unix
858         flag = PROT_READ | PROT_WRITE;
859         bt->latchmgr = mmap (0, bt->page_size, flag, MAP_SHARED, bt->idx, ALLOC_page * bt->page_size);
860         if( bt->latchmgr == MAP_FAILED )
861                 return bt_close (bt), NULL;
862         bt->table = (void *)mmap (0, bt->latchmgr->nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size);
863         if( bt->table == MAP_FAILED )
864                 return bt_close (bt), NULL;
865 #else
866         flag = PAGE_READWRITE;
867         bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, (bt->latchmgr->nlatchpage + LATCH_page) * bt->page_size, NULL);
868         if( !bt->halloc )
869                 return bt_close (bt), NULL;
870
871         flag = FILE_MAP_WRITE;
872         bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, (bt->latchmgr->nlatchpage + LATCH_page) * bt->page_size);
873         if( !bt->latchmgr )
874                 return GetLastError(), bt_close (bt), NULL;
875
876         bt->table = (void *)((char *)bt->latchmgr + LATCH_page * bt->page_size);
877 #endif
878         bt->latchpool = (unsigned char *)bt->table + (bt->latchmgr->nlatchpage - bt->latchmgr->latchtotal) * bt->page_size;
879         bt->latchsets = (BtLatchSet *)(bt->latchpool - bt->latchmgr->latchtotal * sizeof(BtLatchSet));
880
881 #ifdef unix
882         free (latchmgr);
883 #else
884         VirtualFree (latchmgr, 0, MEM_RELEASE);
885 #endif
886
887 #ifdef unix
888         bt->mem = malloc (3 * bt->page_size);
889 #else
890         bt->mem = VirtualAlloc(NULL, 3 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
891 #endif
892         bt->frame = (BtPage)bt->mem;
893         bt->cursor = (BtPage)(bt->mem + bt->page_size);
894         bt->zero = (BtPage)(bt->mem + 2 * bt->page_size);
895
896         memset (bt->zero, 0, bt->page_size);
897         return bt;
898 }
899
900 // place write, read, or parent lock on requested page_no.
901
902 void bt_lockpage(BtLock mode, BtLatchSet *latch)
903 {
904         switch( mode ) {
905         case BtLockRead:
906                 bt_spinreadlock (latch->readwr);
907                 break;
908         case BtLockWrite:
909                 bt_spinwritelock (latch->readwr);
910                 break;
911         case BtLockAccess:
912                 bt_spinreadlock (latch->access);
913                 break;
914         case BtLockDelete:
915                 bt_spinwritelock (latch->access);
916                 break;
917         case BtLockParent:
918                 bt_spinwritelock (latch->parent);
919                 break;
920         }
921 }
922
923 // remove write, read, or parent lock on requested page
924
925 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
926 {
927         switch( mode ) {
928         case BtLockRead:
929                 bt_spinreleaseread (latch->readwr);
930                 break;
931         case BtLockWrite:
932                 bt_spinreleasewrite (latch->readwr);
933                 break;
934         case BtLockAccess:
935                 bt_spinreleaseread (latch->access);
936                 break;
937         case BtLockDelete:
938                 bt_spinreleasewrite (latch->access);
939                 break;
940         case BtLockParent:
941                 bt_spinreleasewrite (latch->parent);
942                 break;
943         }
944 }
945
946 //      allocate a new page and write page into it
947
948 uid bt_newpage(BtDb *bt, BtPage page)
949 {
950 BtLatchSet *latch;
951 uid new_page;
952 BtPage temp;
953 off64_t off;
954 uint amt[1];
955 int reuse;
956
957         //      lock allocation page
958
959         bt_spinwritelock(bt->latchmgr->lock);
960
961         // use empty chain first
962         // else allocate empty page
963
964         if( new_page = bt_getid(bt->latchmgr->alloc[1].right) ) {
965           if( latch = bt_pinlatch (bt, new_page) )
966                 temp = bt_mappage (bt, latch);
967           else
968                 return 0;
969
970           bt_putid(bt->latchmgr->alloc[1].right, bt_getid(temp->right));
971           bt_spinreleasewrite(bt->latchmgr->lock);
972           memcpy (temp, page, bt->page_size);
973
974           if( bt_update (bt, temp, latch) )
975                 return 0;
976
977           bt_unpinlatch (latch);
978         } else {
979           new_page = bt_getid(bt->latchmgr->alloc->right);
980           bt_putid(bt->latchmgr->alloc->right, new_page+1);
981           bt_spinreleasewrite(bt->latchmgr->lock);
982           off = new_page << bt->page_bits;
983 #ifdef unix
984           if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
985                 return bt->err = BTERR_wrt, 0;
986 #else
987           SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
988
989           if( !WriteFile(bt->idx, page, bt->page_size, amt, NULL) )
990                 return bt->err = BTERR_wrt, 0;
991
992           if( *amt <  bt->page_size )
993                 return bt->err = BTERR_wrt, 0;
994 #endif
995         }
996
997         return new_page;
998 }
999
1000 //  compare two keys, returning > 0, = 0, or < 0
1001 //  as the comparison value
1002
1003 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1004 {
1005 uint len1 = key1->len;
1006 int ans;
1007
1008         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1009                 return ans;
1010
1011         if( len1 > len2 )
1012                 return 1;
1013         if( len1 < len2 )
1014                 return -1;
1015
1016         return 0;
1017 }
1018
1019 //  Update current page of btree by
1020 //      flushing mapped area to disk backing of cache pool.
1021
1022 BTERR bt_update (BtDb *bt, BtPage page, BtLatchSet *latch)
1023 {
1024 #ifdef unix
1025         msync (page, bt->page_size, MS_ASYNC);
1026 #else
1027         FlushViewOfFile (page, bt->page_size);
1028 #endif
1029         latch->dirty = 1;
1030         return 0;
1031 }
1032
1033 //  map the btree cached page onto current page
1034
1035 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
1036 {
1037         return (BtPage)((latch - bt->latchsets) * bt->page_size + bt->latchpool);
1038 }
1039
1040 //      deallocate a deleted page 
1041 //      place on free chain out of allocator page
1042 //      call with page latched for Writing and Deleting
1043
1044 BTERR bt_freepage(BtDb *bt, uid page_no, BtLatchSet *latch)
1045 {
1046 BtPage page = bt_mappage (bt, latch);
1047
1048         //      lock allocation page
1049
1050         bt_spinwritelock (bt->latchmgr->lock);
1051
1052         //      store chain in second right
1053         bt_putid(page->right, bt_getid(bt->latchmgr->alloc[1].right));
1054         bt_putid(bt->latchmgr->alloc[1].right, page_no);
1055         page->free = 1;
1056
1057         if( bt_update(bt, page, latch) )
1058                 return bt->err;
1059
1060         // unlock released page
1061
1062         bt_unlockpage (BtLockDelete, latch);
1063         bt_unlockpage (BtLockWrite, latch);
1064         bt_unpinlatch (latch);
1065
1066         // unlock allocation page
1067
1068         bt_spinreleasewrite (bt->latchmgr->lock);
1069         return 0;
1070 }
1071
1072 //  find slot in page for given key at a given level
1073
1074 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1075 {
1076 uint diff, higher = bt->page->cnt, low = 1, slot;
1077 uint good = 0;
1078
1079         //      make stopper key an infinite fence value
1080
1081         if( bt_getid (bt->page->right) )
1082                 higher++;
1083         else
1084                 good++;
1085
1086         //      low is the lowest candidate, higher is already
1087         //      tested as .ge. the given key, loop ends when they meet
1088
1089         while( diff = higher - low ) {
1090                 slot = low + ( diff >> 1 );
1091                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1092                         low = slot + 1;
1093                 else
1094                         higher = slot, good++;
1095         }
1096
1097         //      return zero if key is on right link page
1098
1099         return good ? higher : 0;
1100 }
1101
1102 //  find and load page at given level for given key
1103 //      leave page rd or wr locked as requested
1104
1105 int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
1106 {
1107 uid page_no = ROOT_page, prevpage = 0;
1108 uint drill = 0xff, slot;
1109 BtLatchSet *prevlatch;
1110 uint mode, prevmode;
1111
1112   //  start at root of btree and drill down
1113
1114   do {
1115         // determine lock mode of drill level
1116         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1117
1118         if( bt->latch = bt_pinlatch(bt, page_no) )
1119                 bt->page_no = page_no;
1120         else
1121                 return 0;
1122
1123         // obtain access lock using lock chaining
1124
1125         if( page_no > ROOT_page )
1126                 bt_lockpage(BtLockAccess, bt->latch);
1127
1128         if( prevpage ) {
1129                 bt_unlockpage(prevmode, prevlatch);
1130                 bt_unpinlatch(prevlatch);
1131                 prevpage = 0;
1132         }
1133
1134         // obtain read lock using lock chaining
1135
1136         bt_lockpage(mode, bt->latch);
1137
1138         if( page_no > ROOT_page )
1139                 bt_unlockpage(BtLockAccess, bt->latch);
1140
1141         //      map/obtain page contents
1142
1143         bt->page = bt_mappage (bt, bt->latch);
1144
1145         // re-read and re-lock root after determining actual level of root
1146
1147         if( bt->page->lvl != drill) {
1148                 if( bt->page_no != ROOT_page )
1149                         return bt->err = BTERR_struct, 0;
1150                         
1151                 drill = bt->page->lvl;
1152
1153                 if( lock != BtLockRead && drill == lvl ) {
1154                         bt_unlockpage(mode, bt->latch);
1155                         bt_unpinlatch(bt->latch);
1156                         continue;
1157                 }
1158         }
1159
1160         prevpage = bt->page_no;
1161         prevlatch = bt->latch;
1162         prevmode = mode;
1163
1164         //  find key on page at this level
1165         //  and descend to requested level
1166
1167         if( !bt->page->kill )
1168          if( slot = bt_findslot (bt, key, len) ) {
1169           if( drill == lvl )
1170                 return slot;
1171
1172           while( slotptr(bt->page, slot)->dead )
1173                 if( slot++ < bt->page->cnt )
1174                         continue;
1175                 else
1176                         goto slideright;
1177
1178           page_no = bt_getid(slotptr(bt->page, slot)->id);
1179           drill--;
1180           continue;
1181          }
1182
1183         //  or slide right into next page
1184
1185 slideright:
1186         page_no = bt_getid(bt->page->right);
1187
1188   } while( page_no );
1189
1190   // return error on end of right chain
1191
1192   bt->err = BTERR_eof;
1193   return 0;     // return error
1194 }
1195
1196 //      a fence key was deleted from a page
1197 //      push new fence value upwards
1198
1199 BTERR bt_fixfence (BtDb *bt, uid page_no, uint lvl)
1200 {
1201 unsigned char leftkey[256], rightkey[256];
1202 BtLatchSet *latch = bt->latch;
1203 BtKey ptr;
1204
1205         // remove deleted key, the old fence value
1206
1207         ptr = keyptr(bt->page, bt->page->cnt);
1208         memcpy(rightkey, ptr, ptr->len + 1);
1209
1210         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1211         bt->page->dirty = 1;
1212
1213         ptr = keyptr(bt->page, bt->page->cnt);
1214         memcpy(leftkey, ptr, ptr->len + 1);
1215
1216         if( bt_update (bt, bt->page, latch) )
1217                 return bt->err;
1218
1219         bt_lockpage (BtLockParent, latch);
1220         bt_unlockpage (BtLockWrite, latch);
1221
1222         //  insert new (now smaller) fence key
1223
1224         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl + 1, page_no, time(NULL)) )
1225                 return bt->err;
1226
1227         //  remove old (larger) fence key
1228
1229         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl + 1) )
1230                 return bt->err;
1231
1232         bt_unlockpage (BtLockParent, latch);
1233         bt_unpinlatch (latch);
1234         return 0;
1235 }
1236
1237 //      root has a single child
1238 //      collapse a level from the btree
1239 //      call with root locked in bt->page
1240
1241 BTERR bt_collapseroot (BtDb *bt, BtPage root)
1242 {
1243 BtLatchSet *latch;
1244 BtPage temp;
1245 uid child;
1246 uint idx;
1247
1248         // find the child entry
1249         //      and promote to new root
1250
1251   do {
1252         for( idx = 0; idx++ < root->cnt; )
1253           if( !slotptr(root, idx)->dead )
1254                 break;
1255
1256         child = bt_getid (slotptr(root, idx)->id);
1257         if( latch = bt_pinlatch (bt, child) )
1258                 temp = bt_mappage (bt, latch);
1259         else
1260                 return bt->err;
1261
1262         bt_lockpage (BtLockDelete, latch);
1263         bt_lockpage (BtLockWrite, latch);
1264         memcpy (root, temp, bt->page_size);
1265
1266         if( bt_update (bt, root, bt->latch) )
1267                 return bt->err;
1268
1269         if( bt_freepage (bt, child, latch) )
1270                 return bt->err;
1271
1272   } while( root->lvl > 1 && root->act == 1 );
1273
1274   bt_unlockpage (BtLockWrite, bt->latch);
1275   bt_unpinlatch (bt->latch);
1276   return 0;
1277 }
1278
1279 //  find and delete key on page by marking delete flag bit
1280 //  when page becomes empty, delete it
1281
1282 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1283 {
1284 unsigned char lowerkey[256], higherkey[256];
1285 uint slot, dirty = 0, idx, fence, found;
1286 BtLatchSet *latch, *rlatch;
1287 uid page_no, right;
1288 BtPage temp;
1289 BtKey ptr;
1290
1291         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1292                 ptr = keyptr(bt->page, slot);
1293         else
1294                 return bt->err;
1295
1296         // are we deleting a fence slot?
1297
1298         fence = slot == bt->page->cnt;
1299
1300         // if key is found delete it, otherwise ignore request
1301
1302         if( found = !keycmp (ptr, key, len) )
1303           if( found = slotptr(bt->page, slot)->dead == 0 ) {
1304                 dirty = slotptr(bt->page,slot)->dead = 1;
1305                 bt->page->dirty = 1;
1306                 bt->page->act--;
1307
1308                 // collapse empty slots
1309
1310                 while( idx = bt->page->cnt - 1 )
1311                   if( slotptr(bt->page, idx)->dead ) {
1312                         *slotptr(bt->page, idx) = *slotptr(bt->page, idx + 1);
1313                         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1314                   } else
1315                         break;
1316           }
1317
1318         right = bt_getid(bt->page->right);
1319         page_no = bt->page_no;
1320         latch = bt->latch;
1321
1322         if( !dirty ) {
1323           if( lvl )
1324                 return bt_abort (bt, bt->page, page_no, BTERR_notfound);
1325           bt_unlockpage(BtLockWrite, latch);
1326           bt_unpinlatch (latch);
1327           return bt->found = found, 0;
1328         }
1329
1330         //      did we delete a fence key in an upper level?
1331
1332         if( lvl && bt->page->act && fence )
1333           if( bt_fixfence (bt, page_no, lvl) )
1334                 return bt->err;
1335           else
1336                 return bt->found = found, 0;
1337
1338         //      is this a collapsed root?
1339
1340         if( lvl > 1 && page_no == ROOT_page && bt->page->act == 1 )
1341           if( bt_collapseroot (bt, bt->page) )
1342                 return bt->err;
1343           else
1344                 return bt->found = found, 0;
1345
1346         // return if page is not empty
1347
1348         if( bt->page->act ) {
1349           if( bt_update(bt, bt->page, latch) )
1350                 return bt->err;
1351           bt_unlockpage(BtLockWrite, latch);
1352           bt_unpinlatch (latch);
1353           return bt->found = found, 0;
1354         }
1355
1356         // cache copy of fence key
1357         //      in order to find parent
1358
1359         ptr = keyptr(bt->page, bt->page->cnt);
1360         memcpy(lowerkey, ptr, ptr->len + 1);
1361
1362         // obtain lock on right page
1363
1364         if( rlatch = bt_pinlatch (bt, right) )
1365                 temp = bt_mappage (bt, rlatch);
1366         else
1367                 return bt->err;
1368
1369         bt_lockpage(BtLockWrite, rlatch);
1370
1371         if( temp->kill ) {
1372                 bt_abort(bt, temp, right, 0);
1373                 return bt_abort(bt, bt->page, bt->page_no, BTERR_kill);
1374         }
1375
1376         // pull contents of next page into current empty page 
1377
1378         memcpy (bt->page, temp, bt->page_size);
1379
1380         //      cache copy of key to update
1381
1382         ptr = keyptr(temp, temp->cnt);
1383         memcpy(higherkey, ptr, ptr->len + 1);
1384
1385         //  Mark right page as deleted and point it to left page
1386         //      until we can post updates at higher level.
1387
1388         bt_putid(temp->right, page_no);
1389         temp->kill = 1;
1390
1391         if( bt_update(bt, bt->page, latch) )
1392                 return bt->err;
1393
1394         if( bt_update(bt, temp, rlatch) )
1395                 return bt->err;
1396
1397         bt_lockpage(BtLockParent, latch);
1398         bt_unlockpage(BtLockWrite, latch);
1399
1400         bt_lockpage(BtLockParent, rlatch);
1401         bt_unlockpage(BtLockWrite, rlatch);
1402
1403         //  redirect higher key directly to consolidated node
1404
1405         if( bt_insertkey (bt, higherkey+1, *higherkey, lvl+1, page_no, time(NULL)) )
1406                 return bt->err;
1407
1408         //  delete old lower key to consolidated node
1409
1410         if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) )
1411                 return bt->err;
1412
1413         //  obtain write & delete lock on deleted node
1414         //      add right block to free chain
1415
1416         bt_lockpage(BtLockDelete, rlatch);
1417         bt_lockpage(BtLockWrite, rlatch);
1418         bt_unlockpage(BtLockParent, rlatch);
1419
1420         if( bt_freepage (bt, right, rlatch) )
1421                 return bt->err;
1422
1423         bt_unlockpage(BtLockParent, latch);
1424         bt_unpinlatch(latch);
1425         return 0;
1426 }
1427
1428 //      find key in leaf level and return row-id
1429
1430 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1431 {
1432 uint  slot;
1433 BtKey ptr;
1434 uid id;
1435
1436         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1437                 ptr = keyptr(bt->page, slot);
1438         else
1439                 return 0;
1440
1441         // if key exists, return row-id
1442         //      otherwise return 0
1443
1444         if( ptr->len == len && !memcmp (ptr->key, key, len) )
1445                 id = bt_getid(slotptr(bt->page,slot)->id);
1446         else
1447                 id = 0;
1448
1449         bt_unlockpage (BtLockRead, bt->latch);
1450         bt_unpinlatch (bt->latch);
1451         return id;
1452 }
1453
1454 //      check page for space available,
1455 //      clean if necessary and return
1456 //      0 - page needs splitting
1457 //      >0 - go ahead with new slot
1458  
1459 uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
1460 {
1461 uint nxt = bt->page_size;
1462 BtPage page = bt->page;
1463 uint cnt = 0, idx = 0;
1464 uint max = page->cnt;
1465 uint newslot = slot;
1466 BtKey key;
1467 int ret;
1468
1469         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1470                 return slot;
1471
1472         //      skip cleanup if nothing to reclaim
1473
1474         if( !page->dirty )
1475                 return 0;
1476
1477         memcpy (bt->frame, page, bt->page_size);
1478
1479         // skip page info and set rest of page to zero
1480
1481         memset (page+1, 0, bt->page_size - sizeof(*page));
1482         page->act = 0;
1483
1484         while( cnt++ < max ) {
1485                 if( cnt == slot )
1486                         newslot = idx + 1;
1487                 // always leave fence key in list
1488                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1489                         continue;
1490
1491                 // copy key
1492                 key = keyptr(bt->frame, cnt);
1493                 nxt -= key->len + 1;
1494                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1495
1496                 // copy slot
1497                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1498                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1499                         page->act++;
1500 #ifdef USETOD
1501                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1502 #endif
1503                 slotptr(page, idx)->off = nxt;
1504         }
1505
1506         page->min = nxt;
1507         page->cnt = idx;
1508
1509         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1510                 return newslot;
1511
1512         return 0;
1513 }
1514
1515 // split the root and raise the height of the btree
1516
1517 BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
1518 {
1519 uint nxt = bt->page_size;
1520 BtPage root = bt->page;
1521 uid right;
1522
1523         //  Obtain an empty page to use, and copy the current
1524         //  root contents into it
1525
1526         if( !(right = bt_newpage(bt, root)) )
1527                 return bt->err;
1528
1529         // preserve the page info at the bottom
1530         // and set rest to zero
1531
1532         memset(root+1, 0, bt->page_size - sizeof(*root));
1533
1534         // insert first key on newroot page
1535
1536         nxt -= *leftkey + 1;
1537         memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
1538         bt_putid(slotptr(root, 1)->id, right);
1539         slotptr(root, 1)->off = nxt;
1540         
1541         // insert second key on newroot page
1542         // and increase the root height
1543
1544         nxt -= 3;
1545         ((unsigned char *)root)[nxt] = 2;
1546         ((unsigned char *)root)[nxt+1] = 0xff;
1547         ((unsigned char *)root)[nxt+2] = 0xff;
1548         bt_putid(slotptr(root, 2)->id, page_no2);
1549         slotptr(root, 2)->off = nxt;
1550
1551         bt_putid(root->right, 0);
1552         root->min = nxt;                // reset lowest used offset and key count
1553         root->cnt = 2;
1554         root->act = 2;
1555         root->lvl++;
1556
1557         // update and release root (bt->page)
1558
1559         if( bt_update(bt, root, bt->latch) )
1560                 return bt->err;
1561
1562         bt_unlockpage(BtLockWrite, bt->latch);
1563         bt_unpinlatch(bt->latch);
1564         return 0;
1565 }
1566
1567 //  split already locked full node
1568 //      return unlocked.
1569
1570 BTERR bt_splitpage (BtDb *bt)
1571 {
1572 uint cnt = 0, idx = 0, max, nxt = bt->page_size;
1573 unsigned char fencekey[256], rightkey[256];
1574 uid page_no = bt->page_no, right;
1575 BtLatchSet *latch, *rlatch;
1576 BtPage page = bt->page;
1577 uint lvl = page->lvl;
1578 BtKey key;
1579
1580         latch = bt->latch;
1581
1582         //  split higher half of keys to bt->frame
1583         //      the last key (fence key) might be dead
1584
1585         memset (bt->frame, 0, bt->page_size);
1586         max = page->cnt;
1587         cnt = max / 2;
1588         idx = 0;
1589
1590         while( cnt++ < max ) {
1591                 key = keyptr(page, cnt);
1592                 nxt -= key->len + 1;
1593                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1594                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1595                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
1596                         bt->frame->act++;
1597 #ifdef USETOD
1598                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1599 #endif
1600                 slotptr(bt->frame, idx)->off = nxt;
1601         }
1602
1603         //      remember fence key for new right page
1604
1605         memcpy (rightkey, key, key->len + 1);
1606
1607         bt->frame->bits = bt->page_bits;
1608         bt->frame->min = nxt;
1609         bt->frame->cnt = idx;
1610         bt->frame->lvl = lvl;
1611
1612         // link right node
1613
1614         if( page_no > ROOT_page )
1615                 memcpy (bt->frame->right, page->right, BtId);
1616
1617         //      get new free page and write frame to it.
1618
1619         if( !(right = bt_newpage(bt, bt->frame)) )
1620                 return bt->err;
1621
1622         //      update lower keys to continue in old page
1623
1624         memcpy (bt->frame, page, bt->page_size);
1625         memset (page+1, 0, bt->page_size - sizeof(*page));
1626         nxt = bt->page_size;
1627         page->dirty = 0;
1628         page->act = 0;
1629         cnt = 0;
1630         idx = 0;
1631
1632         //  assemble page of smaller keys
1633         //      (they're all active keys)
1634
1635         while( cnt++ < max / 2 ) {
1636                 key = keyptr(bt->frame, cnt);
1637                 nxt -= key->len + 1;
1638                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1639                 memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1640 #ifdef USETOD
1641                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1642 #endif
1643                 slotptr(page, idx)->off = nxt;
1644                 page->act++;
1645         }
1646
1647         // remember fence key for smaller page
1648
1649         memcpy (fencekey, key, key->len + 1);
1650
1651         bt_putid(page->right, right);
1652         page->min = nxt;
1653         page->cnt = idx;
1654
1655         // if current page is the root page, split it
1656
1657         if( page_no == ROOT_page )
1658                 return bt_splitroot (bt, fencekey, right);
1659
1660         //      lock right page
1661
1662         if( rlatch = bt_pinlatch (bt, right) )
1663                 bt_lockpage (BtLockParent, rlatch);
1664         else
1665                 return bt->err;
1666
1667         // update left (containing) node
1668
1669         if( bt_update(bt, page, latch) )
1670                 return bt->err;
1671
1672         bt_lockpage (BtLockParent, latch);
1673         bt_unlockpage (BtLockWrite, latch);
1674
1675         // insert new fence for reformulated left block
1676
1677         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, page_no, time(NULL)) )
1678                 return bt->err;
1679
1680         //      switch fence for right block of larger keys to new right page
1681
1682         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, right, time(NULL)) )
1683                 return bt->err;
1684
1685         bt_unlockpage (BtLockParent, latch);
1686         bt_unlockpage (BtLockParent, rlatch);
1687
1688         bt_unpinlatch (rlatch);
1689         bt_unpinlatch (latch);
1690         return 0;
1691 }
1692
1693 //  Insert new key into the btree at requested level.
1694 //  Pages are unlocked at exit.
1695
1696 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
1697 {
1698 uint slot, idx;
1699 BtPage page;
1700 BtKey ptr;
1701
1702   while( 1 ) {
1703         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1704                 ptr = keyptr(bt->page, slot);
1705         else
1706         {
1707                 if( !bt->err )
1708                         bt->err = BTERR_ovflw;
1709                 return bt->err;
1710         }
1711
1712         // if key already exists, update id and return
1713
1714         page = bt->page;
1715
1716         if( !keycmp (ptr, key, len) ) {
1717           if( slotptr(page, slot)->dead )
1718                 page->act++;
1719           slotptr(page, slot)->dead = 0;
1720 #ifdef USETOD
1721           slotptr(page, slot)->tod = tod;
1722 #endif
1723           bt_putid(slotptr(page,slot)->id, id);
1724           if( bt_update(bt, bt->page, bt->latch) )
1725                 return bt->err;
1726           bt_unlockpage(BtLockWrite, bt->latch);
1727           bt_unpinlatch (bt->latch);
1728           return 0;
1729         }
1730
1731         // check if page has enough space
1732
1733         if( slot = bt_cleanpage (bt, len, slot) )
1734                 break;
1735
1736         if( bt_splitpage (bt) )
1737                 return bt->err;
1738   }
1739
1740   // calculate next available slot and copy key into page
1741
1742   page->min -= len + 1; // reset lowest used offset
1743   ((unsigned char *)page)[page->min] = len;
1744   memcpy ((unsigned char *)page + page->min +1, key, len );
1745
1746   for( idx = slot; idx < page->cnt; idx++ )
1747         if( slotptr(page, idx)->dead )
1748                 break;
1749
1750   // now insert key into array before slot
1751   // preserving the fence slot
1752
1753   if( idx == page->cnt )
1754         idx++, page->cnt++;
1755
1756   page->act++;
1757
1758   while( idx > slot )
1759         *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
1760
1761   bt_putid(slotptr(page,slot)->id, id);
1762   slotptr(page, slot)->off = page->min;
1763 #ifdef USETOD
1764   slotptr(page, slot)->tod = tod;
1765 #endif
1766   slotptr(page, slot)->dead = 0;
1767
1768   if( bt_update(bt, bt->page, bt->latch) )
1769           return bt->err;
1770
1771   bt_unlockpage(BtLockWrite, bt->latch);
1772   bt_unpinlatch(bt->latch);
1773   return 0;
1774 }
1775
1776 //  cache page of keys into cursor and return starting slot for given key
1777
1778 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
1779 {
1780 uint slot;
1781
1782         // cache page for retrieval
1783
1784         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1785           memcpy (bt->cursor, bt->page, bt->page_size);
1786         else
1787           return 0;
1788
1789         bt_unlockpage(BtLockRead, bt->latch);
1790         bt->cursor_page = bt->page_no;
1791         bt_unpinlatch (bt->latch);
1792         return slot;
1793 }
1794
1795 //  return next slot for cursor page
1796 //  or slide cursor right into next page
1797
1798 uint bt_nextkey (BtDb *bt, uint slot)
1799 {
1800 BtLatchSet *latch;
1801 off64_t right;
1802
1803   do {
1804         right = bt_getid(bt->cursor->right);
1805
1806         while( slot++ < bt->cursor->cnt )
1807           if( slotptr(bt->cursor,slot)->dead )
1808                 continue;
1809           else if( right || (slot < bt->cursor->cnt))
1810                 return slot;
1811           else
1812                 break;
1813
1814         if( !right )
1815                 break;
1816
1817         bt->cursor_page = right;
1818
1819         if( latch = bt_pinlatch (bt, right) )
1820         bt_lockpage(BtLockRead, latch);
1821         else
1822                 return 0;
1823
1824         bt->page = bt_mappage (bt, latch);
1825         memcpy (bt->cursor, bt->page, bt->page_size);
1826         bt_unlockpage(BtLockRead, latch);
1827         bt_unpinlatch (latch);
1828         slot = 0;
1829   } while( 1 );
1830
1831   return bt->err = 0;
1832 }
1833
1834 BtKey bt_key(BtDb *bt, uint slot)
1835 {
1836         return keyptr(bt->cursor, slot);
1837 }
1838
1839 uid bt_uid(BtDb *bt, uint slot)
1840 {
1841         return bt_getid(slotptr(bt->cursor,slot)->id);
1842 }
1843
1844 #ifdef USETOD
1845 uint bt_tod(BtDb *bt, uint slot)
1846 {
1847         return slotptr(bt->cursor,slot)->tod;
1848 }
1849 #endif
1850
1851 #ifdef STANDALONE
1852
1853 uint bt_audit (BtDb *bt)
1854 {
1855 uint idx, hashidx;
1856 uid next, page_no;
1857 BtLatchSet *latch;
1858 uint cnt = 0;
1859 BtPage page;
1860 off64_t off;
1861 uint amt[1];
1862 BtKey ptr;
1863
1864 #ifdef unix
1865         if( *(ushort *)(bt->latchmgr->lock) )
1866                 fprintf(stderr, "Alloc page locked\n");
1867         *(ushort *)(bt->latchmgr->lock) = 0;
1868
1869         for( idx = 1; idx <= bt->latchmgr->latchdeployed; idx++ ) {
1870                 latch = bt->latchsets + idx;
1871                 if( *(ushort *)latch->readwr )
1872                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
1873                 *(ushort *)latch->readwr = 0;
1874
1875                 if( *(ushort *)latch->access )
1876                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
1877                 *(ushort *)latch->access = 0;
1878
1879                 if( *(ushort *)latch->parent )
1880                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
1881                 *(ushort *)latch->parent = 0;
1882
1883                 if( latch->pin ) {
1884                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
1885                         latch->pin = 0;
1886                 }
1887                 page = (BtPage)(idx * bt->page_size + bt->latchpool);
1888                 off = latch->page_no << bt->page_bits;
1889 #ifdef unix
1890             if( latch->dirty )
1891                  if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
1892                         fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
1893 #else
1894             if( latch->dirty ) {
1895                   SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
1896
1897                   if( !WriteFile(bt->idx, page, bt->page_size, amt, NULL) )
1898                         fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
1899
1900                   if( *amt <  bt->page_size )
1901                         fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
1902             }
1903 #endif
1904             latch->dirty = 0;
1905         }
1906
1907         for( hashidx = 0; hashidx < bt->latchmgr->latchhash; hashidx++ ) {
1908           if( *(ushort *)(bt->table[hashidx].latch) )
1909                         fprintf(stderr, "hash entry %d locked\n", hashidx);
1910
1911           *(ushort *)(bt->table[hashidx].latch) = 0;
1912
1913           if( idx = bt->table[hashidx].slot ) do {
1914                 latch = bt->latchsets + idx;
1915                 if( latch->pin )
1916                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
1917           } while( idx = latch->next );
1918         }
1919
1920         next = bt->latchmgr->nlatchpage + LATCH_page;
1921         page_no = LEAF_page;
1922
1923         while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
1924                 pread (bt->idx, bt->frame, bt->page_size, page_no << bt->page_bits);
1925                 if( !bt->frame->free ) {
1926                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
1927                   ptr = keyptr(bt->frame, idx+1);
1928                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
1929                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
1930                  }
1931                  if( !bt->frame->lvl )
1932                         cnt += bt->frame->act;
1933                 }
1934
1935                 if( page_no > LEAF_page )
1936                         next = page_no + 1;
1937                 page_no = next;
1938         }
1939         return cnt - 1;
1940 #else
1941         return 0;
1942 #endif
1943 }
1944
1945 #ifndef unix
1946 double getCpuTime(int type)
1947 {
1948 FILETIME crtime[1];
1949 FILETIME xittime[1];
1950 FILETIME systime[1];
1951 FILETIME usrtime[1];
1952 SYSTEMTIME timeconv[1];
1953 double ans = 0;
1954
1955         memset (timeconv, 0, sizeof(SYSTEMTIME));
1956
1957         switch( type ) {
1958         case 0:
1959                 GetSystemTimeAsFileTime (xittime);
1960                 FileTimeToSystemTime (xittime, timeconv);
1961                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
1962                 break;
1963         case 1:
1964                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
1965                 FileTimeToSystemTime (usrtime, timeconv);
1966                 break;
1967         case 2:
1968                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
1969                 FileTimeToSystemTime (systime, timeconv);
1970                 break;
1971         }
1972
1973         ans += (double)timeconv->wHour * 3600;
1974         ans += (double)timeconv->wMinute * 60;
1975         ans += (double)timeconv->wSecond;
1976         ans += (double)timeconv->wMilliseconds / 1000;
1977         return ans;
1978 }
1979 #else
1980 #include <time.h>
1981 #include <sys/resource.h>
1982
1983 double getCpuTime(int type)
1984 {
1985 struct rusage used[1];
1986 struct timeval tv[1];
1987
1988         switch( type ) {
1989         case 0:
1990                 gettimeofday(tv, NULL);
1991                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
1992
1993         case 1:
1994                 getrusage(RUSAGE_SELF, used);
1995                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
1996
1997         case 2:
1998                 getrusage(RUSAGE_SELF, used);
1999                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2000         }
2001
2002         return 0;
2003 }
2004 #endif
2005
2006 //  standalone program to index file of keys
2007 //  then list them onto std-out
2008
2009 int main (int argc, char **argv)
2010 {
2011 uint slot, line = 0, off = 0, found = 0;
2012 int ch, cnt = 0, bits = 12, idx;
2013 unsigned char key[256];
2014 double done, start;
2015 uid next, page_no;
2016 float elapsed;
2017 time_t tod[1];
2018 uint scan = 0;
2019 uint len = 0;
2020 uint map = 0;
2021 BtKey ptr;
2022 BtDb *bt;
2023 FILE *in;
2024
2025         if( argc < 4 ) {
2026                 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find/Count [page_bits mapped_pool_pages start_line_number]\n", argv[0]);
2027                 fprintf (stderr, "  page_bits: size of btree page in bits\n");
2028                 fprintf (stderr, "  mapped_pool_pages: number of pages in buffer pool\n");
2029                 exit(0);
2030         }
2031
2032         start = getCpuTime(0);
2033         time(tod);
2034
2035         if( argc > 4 )
2036                 bits = atoi(argv[4]);
2037
2038         if( argc > 5 )
2039                 map = atoi(argv[5]);
2040
2041         if( argc > 6 )
2042                 off = atoi(argv[6]);
2043
2044         bt = bt_open ((argv[1]), BT_rw, bits, map);
2045
2046         if( !bt ) {
2047                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2048                 exit (1);
2049         }
2050
2051         switch(argv[3][0]| 0x20)
2052         {
2053         case 'a':
2054                 fprintf(stderr, "started audit for %s\n", argv[2]);
2055                 cnt = bt_audit (bt);
2056                 fprintf(stderr, "finished audit for %s, %d keys\n", argv[2], cnt);
2057                 break;
2058
2059         case 'w':
2060                 fprintf(stderr, "started indexing for %s\n", argv[2]);
2061                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2062                   while( ch = getc(in), ch != EOF )
2063                         if( ch == '\n' )
2064                         {
2065                           if( off )
2066                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2067
2068                           if( bt_insertkey (bt, key, len, 0, ++line, *tod) )
2069                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2070                           len = 0;
2071                         }
2072                         else if( len < 245 )
2073                                 key[len++] = ch;
2074                 fprintf(stderr, "finished adding keys for %s, %d \n", argv[2], line);
2075                 break;
2076
2077         case 'd':
2078                 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2079                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2080                   while( ch = getc(in), ch != EOF )
2081                         if( ch == '\n' )
2082                         {
2083                           if( off )
2084                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2085                           line++;
2086                           if( bt_deletekey (bt, key, len, 0) )
2087                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2088                           len = 0;
2089                         }
2090                         else if( len < 245 )
2091                                 key[len++] = ch;
2092                 fprintf(stderr, "finished deleting keys for %s, %d \n", argv[2], line);
2093                 break;
2094
2095         case 'f':
2096                 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2097                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2098                   while( ch = getc(in), ch != EOF )
2099                         if( ch == '\n' )
2100                         {
2101                           if( off )
2102                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2103                           line++;
2104                           if( bt_findkey (bt, key, len) )
2105                                 found++;
2106                           else if( bt->err )
2107                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2108                           len = 0;
2109                         }
2110                         else if( len < 245 )
2111                                 key[len++] = ch;
2112                 fprintf(stderr, "finished search of %d keys for %s, found %d\n", line, argv[2], found);
2113                 break;
2114
2115         case 's':
2116                 fprintf(stderr, "started scaning\n");
2117                 cnt = len = key[0] = 0;
2118
2119                 if( slot = bt_startkey (bt, key, len) )
2120                   slot--;
2121                 else
2122                   fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2123
2124                 while( slot = bt_nextkey (bt, slot) ) {
2125                         ptr = bt_key(bt, slot);
2126                         fwrite (ptr->key, ptr->len, 1, stdout);
2127                         fputc ('\n', stdout);
2128                         cnt++;
2129                 }
2130
2131                 fprintf(stderr, " Total keys read %d\n", cnt - 1);
2132                 break;
2133
2134         case 'c':
2135           fprintf(stderr, "started counting\n");
2136           cnt = 0;
2137
2138           next = bt->latchmgr->nlatchpage + LATCH_page;
2139           page_no = LEAF_page;
2140
2141           while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
2142           BtLatchSet *latch;
2143           BtPage page;
2144                 if( latch = bt_pinlatch (bt, page_no) )
2145                         page = bt_mappage (bt, latch);
2146                 if( !page->free && !page->lvl )
2147                         cnt += page->act;
2148                 if( page_no > LEAF_page )
2149                         next = page_no + 1;
2150                 if( scan )
2151                  for( idx = 0; idx++ < page->cnt; ) {
2152                   if( slotptr(page, idx)->dead )
2153                         continue;
2154                   ptr = keyptr(page, idx);
2155                   if( idx != page->cnt && bt_getid (page->right) ) {
2156                         fwrite (ptr->key, ptr->len, 1, stdout);
2157                         fputc ('\n', stdout);
2158                   }
2159                  }
2160                 bt_unpinlatch (latch);
2161                 page_no = next;
2162           }
2163                 
2164           cnt--;        // remove stopper key
2165           fprintf(stderr, " Total keys read %d\n", cnt);
2166           break;
2167         }
2168
2169         done = getCpuTime(0);
2170         elapsed = (float)(done - start);
2171         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2172         elapsed = getCpuTime(1);
2173         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2174         elapsed = getCpuTime(2);
2175         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2176         return 0;
2177 }
2178
2179 #endif  //STANDALONE