]> pd.if.org Git - btree/blob - btree2v.c
Add finalized threadskv10g LSM btree.
[btree] / btree2v.c
1 // btree version 2v  linux futex contention
2 //      with combined latch & pool manager
3 //      and phase-fair reader writer lock
4 // 12 MAR 2014
5
6 // author: karl malbrain, malbrain@cal.berkeley.edu
7
8 /*
9 This work, including the source code, documentation
10 and related data, is placed into the public domain.
11
12 The orginal author is Karl Malbrain.
13
14 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
15 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
16 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
17 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
18 RESULTING FROM THE USE, MODIFICATION, OR
19 REDISTRIBUTION OF THIS SOFTWARE.
20 */
21
22 // Please see the project home page for documentation
23 // code.google.com/p/high-concurrency-btree
24
25 #define _FILE_OFFSET_BITS 64
26 #define _LARGEFILE64_SOURCE
27
28 #ifdef linux
29 #define _GNU_SOURCE
30 #include <linux/futex.h>
31 #include <limits.h>
32 #define SYS_futex 202
33 #endif
34
35 #ifdef unix
36 #include <unistd.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <time.h>
40 #include <fcntl.h>
41 #include <sys/mman.h>
42 #include <errno.h>
43 #else
44 #define WIN32_LEAN_AND_MEAN
45 #include <windows.h>
46 #include <stdio.h>
47 #include <stdlib.h>
48 #include <time.h>
49 #include <fcntl.h>
50 #include <io.h>
51 #endif
52
53 #include <memory.h>
54 #include <string.h>
55
56 typedef unsigned long long      uid;
57
58 #ifndef unix
59 typedef unsigned long long      off64_t;
60 typedef unsigned short          ushort;
61 typedef unsigned int            uint;
62 #endif
63
64 #define BT_ro 0x6f72    // ro
65 #define BT_rw 0x7772    // rw
66 #define BT_fl 0x6c66    // fl
67
68 #define BT_maxbits              15                                      // maximum page size in bits
69 #define BT_minbits              12                                      // minimum page size in bits
70 #define BT_minpage              (1 << BT_minbits)       // minimum page size
71 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
72
73 //  BTree page number constants
74 #define ALLOC_page              0
75 #define ROOT_page               1
76 #define LEAF_page               2
77 #define LATCH_page              3
78
79 //      Number of levels to create in a new BTree
80
81 #define MIN_lvl                 2
82 #define MAX_lvl                 15
83
84 /*
85 There are five lock types for each node in three independent sets: 
86 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
87 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
88 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
89 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
90 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
91 */
92
93 typedef enum{
94         BtLockAccess,
95         BtLockDelete,
96         BtLockRead,
97         BtLockWrite,
98         BtLockParent
99 }BtLock;
100
101 enum {
102         QueRd = 1,              // reader queue
103         QueWr = 2               // writer queue
104 } RWQueue;
105
106 volatile typedef struct {
107         ushort rin[1];          // readers in count
108         ushort rout[1];         // readers out count
109         ushort ticket[1];       // writers in count
110         ushort serving[1];      // writers out count
111 } RWLock;
112
113 //      define bits at bottom of rin
114
115 #define PHID 0x1        // writer phase (0/1)
116 #define PRES 0x2        // writer present
117 #define MASK 0x3        // both write bits
118 #define RINC 0x4        // reader increment
119
120 typedef struct {
121   union {
122         struct {
123           ushort serving;
124           ushort next;
125         } bits[1];
126         uint value[1];
127   };
128 } BtMutex;
129
130 //      Define the length of the page and key pointers
131
132 #define BtId 6
133
134 //      Page key slot definition.
135
136 //      If BT_maxbits is 15 or less, you can save 2 bytes
137 //      for each key stored by making the first two uints
138 //      into ushorts.  You can also save 4 bytes by removing
139 //      the tod field from the key.
140
141 //      Keys are marked dead, but remain on the page until
142 //      cleanup is called. The fence key (highest key) for
143 //      the page is always present, even if dead.
144
145 typedef struct {
146 #ifdef USETOD
147         uint tod;                                       // time-stamp for key
148 #endif
149         ushort off:BT_maxbits;          // page offset for key start
150         ushort dead:1;                          // set for deleted key
151         unsigned char id[BtId];         // id associated with key
152 } BtSlot;
153
154 //      The key structure occupies space at the upper end of
155 //      each page.  It's a length byte followed by the value
156 //      bytes.
157
158 typedef struct {
159         unsigned char len;
160         unsigned char key[0];
161 } *BtKey;
162
163 //      The first part of an index page.
164 //      It is immediately followed
165 //      by the BtSlot array of keys.
166
167 typedef struct BtPage_ {
168         uint cnt;                                       // count of keys in page
169         uint act;                                       // count of active keys
170         uint min;                                       // next key offset
171         unsigned char bits:6;           // page size in bits
172         unsigned char free:1;           // page is on free list
173         unsigned char dirty:1;          // page is dirty in cache
174         unsigned char lvl:6;            // level of page
175         unsigned char kill:1;           // page is being deleted
176         unsigned char clean:1;          // page needs cleaning
177         unsigned char right[BtId];      // page number to right
178 } *BtPage;
179
180 typedef struct {
181         struct BtPage_ alloc[2];        // next & free page_nos in right ptr
182         BtMutex lock[1];                        // allocation area lite latch
183         volatile uint latchdeployed;// highest number of latch entries deployed
184         volatile uint nlatchpage;       // number of latch pages at BT_latch
185         volatile uint latchtotal;       // number of page latch entries
186         volatile uint latchhash;        // number of latch hash table slots
187         volatile uint latchvictim;      // next latch hash entry to examine
188         volatile uint safelevel;        // safe page level in cache
189         volatile uint cache[MAX_lvl];// cache census counts by btree level
190 } BtLatchMgr;
191
192 //  latch hash table entries
193
194 typedef struct {
195         unsigned char busy[1];  // Latch table entry is busy being reallocated
196         uint slot:24;                   // Latch table entry at head of collision chain
197 } BtHashEntry;
198
199 //      latch manager table structure
200
201 typedef struct {
202         volatile uid page_no;   // latch set page number on disk
203         RWLock readwr[1];               // read/write page lock
204         RWLock access[1];               // Access Intent/Page delete
205         RWLock parent[1];               // Posting of fence key in parent
206         volatile ushort pin;    // number of pins/level/clock bits
207         volatile uint next;             // next entry in hash table chain
208         volatile uint prev;             // prev entry in hash table chain
209 } BtLatchSet;
210
211 #define CLOCK_mask 0xe000
212 #define CLOCK_unit 0x2000
213 #define PIN_mask 0x07ff
214 #define LVL_mask 0x1800
215 #define LVL_shift 11
216
217 //      The object structure for Btree access
218
219 typedef struct _BtDb {
220         uint page_size;         // each page size       
221         uint page_bits;         // each page size in bits       
222         uid page_no;            // current page number  
223         uid cursor_page;        // current cursor page number   
224         int  err;
225         uint mode;                      // read-write mode
226         BtPage cursor;          // cached frame for start/next (never mapped)
227         BtPage frame;           // spare frame for the page split (never mapped)
228         BtPage page;            // current mapped page in buffer pool
229         BtLatchSet *latch;                      // current page latch
230         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
231         BtLatchSet *latchsets;          // mapped latch set from latch pages
232         unsigned char *pagepool;        // cached page pool set
233         BtHashEntry *table;     // the hash table
234 #ifdef unix
235         int idx;
236 #else
237         HANDLE idx;
238         HANDLE halloc;          // allocation and latch table handle
239 #endif
240         unsigned char *mem;     // frame, cursor, memory buffers
241         uint found;                     // last deletekey found key
242 } BtDb;
243
244 typedef enum {
245 BTERR_ok = 0,
246 BTERR_notfound,
247 BTERR_struct,
248 BTERR_ovflw,
249 BTERR_read,
250 BTERR_lock,
251 BTERR_hash,
252 BTERR_kill,
253 BTERR_map,
254 BTERR_wrt,
255 BTERR_eof
256 } BTERR;
257
258 // B-Tree functions
259 extern void bt_close (BtDb *bt);
260 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk);
261 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
262 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
263 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
264 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
265 extern uint bt_nextkey   (BtDb *bt, uint slot);
266
267 //      internal functions
268 void bt_update (BtDb *bt, BtPage page);
269 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch);
270 //  Helper functions to return slot values
271
272 extern BtKey bt_key (BtDb *bt, uint slot);
273 extern uid bt_uid (BtDb *bt, uint slot);
274 #ifdef USETOD
275 extern uint bt_tod (BtDb *bt, uint slot);
276 #endif
277
278 //  The page is allocated from low and hi ends.
279 //  The key offsets and row-id's are allocated
280 //  from the bottom, while the text of the key
281 //  is allocated from the top.  When the two
282 //  areas meet, the page is split into two.
283
284 //  A key consists of a length byte, two bytes of
285 //  index number (0 - 65534), and up to 253 bytes
286 //  of key value.  Duplicate keys are discarded.
287 //  Associated with each key is a 48 bit row-id.
288
289 //  The b-tree root is always located at page 1.
290 //      The first leaf page of level zero is always
291 //      located on page 2.
292
293 //      The b-tree pages are linked with right
294 //      pointers to facilitate enumerators,
295 //      and provide for concurrency.
296
297 //      When to root page fills, it is split in two and
298 //      the tree height is raised by a new root at page
299 //      one with two keys.
300
301 //      Deleted keys are marked with a dead bit until
302 //      page cleanup The fence key for a node is always
303 //      present, even after deletion and cleanup.
304
305 //  Deleted leaf pages are reclaimed  on a free list.
306 //      The upper levels of the btree are fixed on creation.
307
308 //  To achieve maximum concurrency one page is locked at a time
309 //  as the tree is traversed to find leaf key in question. The right
310 //  page numbers are used in cases where the page is being split,
311 //      or consolidated.
312
313 //  Page 0 (ALLOC page) is dedicated to lock for new page extensions,
314 //      and chains empty leaf pages together for reuse.
315
316 //      Parent locks are obtained to prevent resplitting or deleting a node
317 //      before its fence is posted into its upper level.
318
319 //      A special open mode of BT_fl is provided to safely access files on
320 //      WIN32 networks. WIN32 network operations should not use memory mapping.
321 //      This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
322 //      to prevent local caching of network file contents.
323
324 //      Access macros to address slot and key values from the page.
325 //      Page slots use 1 based indexing.
326
327 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
328 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
329
330 void bt_putid(unsigned char *dest, uid id)
331 {
332 int i = BtId;
333
334         while( i-- )
335                 dest[i] = (unsigned char)id, id >>= 8;
336 }
337
338 uid bt_getid(unsigned char *src)
339 {
340 uid id = 0;
341 int i;
342
343         for( i = 0; i < BtId; i++ )
344                 id <<= 8, id |= *src++; 
345
346         return id;
347 }
348
349 BTERR bt_abort (BtDb *bt, BtPage page, uid page_no, BTERR err)
350 {
351 BtKey ptr;
352
353         fprintf(stderr, "\n Btree2 abort, error %d on page %.8x\n", err, page_no);
354         fprintf(stderr, "level=%d kill=%d free=%d cnt=%x act=%x\n", page->lvl, page->kill, page->free, page->cnt, page->act);
355         ptr = keyptr(page, page->cnt);
356         fprintf(stderr, "fence='%.*s'\n", ptr->len, ptr->key);
357         fprintf(stderr, "right=%.8x\n", bt_getid(page->right));
358         return bt->err = err;
359 }
360
361 //      Phase-Fair reader/writer lock implementation
362 //      with futex calls on contention
363
364 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
365 {
366         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
367 }
368
369 //  a phase fair reader/writer lock implementation
370
371 void WriteLock (RWLock *lock)
372 {
373 ushort w, r, tix;
374 uint prev;
375
376         tix = __sync_fetch_and_add (lock->ticket, 1);
377
378         // wait for our ticket to come up
379
380         while( 1 ) {
381                 prev = *lock->ticket | *lock->serving << 16;
382                 if( tix == prev >> 16 )
383                   break;
384                 sys_futex( (uint *)lock->ticket, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
385         }
386
387         w = PRES | (tix & PHID);
388         r = __sync_fetch_and_add (lock->rin, w);
389
390         while( 1 ) {
391                 prev = *lock->rin | *lock->rout << 16;
392                 if( r == prev >> 16 )
393                   break;
394                 sys_futex( (uint *)lock->rin, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
395         }
396 }
397
398 void WriteRelease (RWLock *lock)
399 {
400         __sync_fetch_and_and (lock->rin, ~MASK);
401         lock->serving[0]++;
402
403         if( (*lock->rin & ~MASK) != (*lock->rout & ~MASK) )
404           if( sys_futex( (uint *)lock->rin, FUTEX_WAKE_BITSET, INT_MAX, NULL, NULL, QueRd ) )
405                 return;
406
407         if( *lock->ticket != *lock->serving )
408           sys_futex( (uint *)lock->ticket, FUTEX_WAKE_BITSET, INT_MAX, NULL, NULL, QueWr );
409 }
410
411 void ReadLock (RWLock *lock)
412 {
413 uint prev;
414 ushort w;
415
416         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
417
418         if( w )
419           while( 1 ) {
420                 prev = *lock->rin | *lock->rout << 16;
421                 if( w != (prev & MASK) )
422                   break;
423                 sys_futex( (uint *)lock->rin, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueRd );
424           }
425 }
426
427 void ReadRelease (RWLock *lock)
428 {
429         __sync_fetch_and_add (lock->rout, RINC);
430
431         if( *lock->ticket == *lock->serving )
432                 return;
433
434         if( *lock->rin & PRES )
435           if( sys_futex( (uint *)lock->rin, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr ) )
436                 return;
437
438         sys_futex( (uint *)lock->ticket, FUTEX_WAKE_BITSET, INT_MAX, NULL, NULL, QueWr );
439 }
440
441 //      lite weight FIFO mutex Manager
442
443 void bt_getmutex (BtMutex *mutex)
444 {
445 uint prev, ours;
446
447         ours = __sync_fetch_and_add (&mutex->bits->next, 1);
448
449         while( 1 ) {
450                 prev = mutex->value[0];
451                 if( ours == mutex->bits->serving )
452                   break;
453                 sys_futex( mutex->value, FUTEX_WAIT_BITSET, prev, NULL, NULL, 0 );
454         }
455 }
456
457 void bt_relmutex (BtMutex *mutex)
458 {
459 ushort serving = ++mutex->bits->serving;
460
461         if( mutex->bits->next != serving )
462           sys_futex( mutex->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, 0 );
463 }
464
465 //      read page from permanent location in Btree file
466
467 BTERR bt_readpage (BtDb *bt, BtPage page, uid page_no)
468 {
469 off64_t off = page_no << bt->page_bits;
470
471 #ifdef unix
472         if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) < bt->page_size ) {
473                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
474                 return bt->err = BTERR_read;
475         }
476 #else
477 OVERLAPPED ovl[1];
478 uint amt[1];
479
480         memset (ovl, 0, sizeof(OVERLAPPED));
481         ovl->Offset = off;
482         ovl->OffsetHigh = off >> 32;
483
484         if( !ReadFile(bt->idx, page, bt->page_size, amt, ovl)) {
485                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
486                 return bt->err = BTERR_read;
487         }
488         if( *amt <  bt->page_size ) {
489                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
490                 return bt->err = BTERR_read;
491         }
492 #endif
493         return 0;
494 }
495
496 //      write page to permanent location in Btree file
497 //      clear the dirty bit
498
499 BTERR bt_writepage (BtDb *bt, BtPage page, uid page_no)
500 {
501 off64_t off = page_no << bt->page_bits;
502
503 #ifdef unix
504         page->dirty = 0;
505
506         if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
507                 return bt->err = BTERR_wrt;
508 #else
509 OVERLAPPED ovl[1];
510 uint amt[1];
511
512         memset (ovl, 0, sizeof(OVERLAPPED));
513         ovl->Offset = off;
514         ovl->OffsetHigh = off >> 32;
515         page->dirty = 0;
516
517         if( !WriteFile(bt->idx, page, bt->page_size, amt, ovl) )
518                 return bt->err = BTERR_wrt;
519
520         if( *amt <  bt->page_size )
521                 return bt->err = BTERR_wrt;
522 #endif
523         return 0;
524 }
525
526 //      link latch table entry into head of latch hash table
527
528 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no)
529 {
530 BtPage page = (BtPage)((uid)slot * bt->page_size + bt->pagepool);
531 BtLatchSet *latch = bt->latchsets + slot;
532 int lvl;
533
534         if( latch->next = bt->table[hashidx].slot )
535                 bt->latchsets[latch->next].prev = slot;
536
537         bt->table[hashidx].slot = slot;
538         latch->page_no = page_no;
539         latch->prev = 0;
540         latch->pin = 1;
541
542         if( bt_readpage (bt, page, page_no) )
543                 return bt->err;
544
545         lvl = page->lvl << LVL_shift;
546         if( lvl > LVL_mask )
547                 lvl = LVL_mask;
548         latch->pin |= lvl;              // store lvl
549         latch->pin |= lvl << 3; // initialize clock
550
551 #ifdef unix
552         __sync_fetch_and_add (&bt->latchmgr->cache[page->lvl], 1);
553 #else
554         _InterlockedAdd(&bt->latchmgr->cache[page->lvl], 1);
555 #endif
556         return bt->err = 0;
557 }
558
559 //      release latch pin
560
561 void bt_unpinlatch (BtLatchSet *latch)
562 {
563 #ifdef unix
564         __sync_fetch_and_add(&latch->pin, -1);
565 #else
566         _InterlockedDecrement16 (&latch->pin);
567 #endif
568 }
569
570 //      find existing latchset or inspire new one
571 //      return with latchset pinned
572
573 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
574 {
575 uint hashidx = page_no % bt->latchmgr->latchhash;
576 BtLatchSet *latch;
577 uint slot, idx;
578 uint lvl, cnt;
579 off64_t off;
580 uint amt[1];
581 BtPage page;
582
583   //  try to find our entry
584
585   while( __sync_fetch_and_or (bt->table[hashidx].busy, 1) == 1 )
586         sched_yield();
587
588   if( slot = bt->table[hashidx].slot ) do
589   {
590         latch = bt->latchsets + slot;
591         if( page_no == latch->page_no )
592                 break;
593   } while( slot = latch->next );
594
595   //  found our entry
596   //  increment clock
597
598   if( slot ) {
599         latch = bt->latchsets + slot;
600         lvl = (latch->pin & LVL_mask) >> LVL_shift;
601         lvl *= CLOCK_unit * 2;
602         lvl |= CLOCK_unit;
603 #ifdef unix
604         __sync_fetch_and_add(&latch->pin, 1);
605         __sync_fetch_and_or(&latch->pin, lvl);
606 #else
607         _InterlockedIncrement16 (&latch->pin);
608         _InterlockedOr16 (&latch->pin, lvl);
609 #endif
610         bt->table[hashidx].busy[0] = 0;
611         return latch;
612   }
613
614         //  see if there are any unused pool entries
615 #ifdef unix
616         slot = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1;
617 #else
618         slot = _InterlockedIncrement (&bt->latchmgr->latchdeployed);
619 #endif
620
621         if( slot < bt->latchmgr->latchtotal ) {
622                 latch = bt->latchsets + slot;
623                 if( bt_latchlink (bt, hashidx, slot, page_no) )
624                         return NULL;
625                 bt->table[hashidx].busy[0] = 0;
626                 return latch;
627         }
628
629 #ifdef unix
630         __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1);
631 #else
632         _InterlockedDecrement (&bt->latchmgr->latchdeployed);
633 #endif
634   //  find and reuse previous entry on victim
635
636   while( 1 ) {
637 #ifdef unix
638         slot = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
639 #else
640         slot = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1;
641 #endif
642         // try to get write lock on hash chain
643         //      skip entry if not obtained
644         //      or has outstanding pins
645
646         slot %= bt->latchmgr->latchtotal;
647
648         //      on slot wraparound, check census
649         //      count and increment safe level
650
651         cnt = bt->latchmgr->cache[bt->latchmgr->safelevel];
652
653         if( !slot ) {
654           if( cnt < bt->latchmgr->latchtotal / 10 )
655 #ifdef unix
656                 __sync_fetch_and_add(&bt->latchmgr->safelevel, 1);
657 #else
658                 _InterlockedIncrement (&bt->latchmgr->safelevel);
659 #endif
660           continue;
661         }
662
663         latch = bt->latchsets + slot;
664         idx = latch->page_no % bt->latchmgr->latchhash;
665         lvl = (latch->pin & LVL_mask) >> LVL_shift;
666
667         //      see if we are evicting this level yet
668         //      or if we are on same chain as hashidx
669
670         if( idx == hashidx || lvl > bt->latchmgr->safelevel )
671                 continue;
672
673         if( __sync_fetch_and_or (bt->table[idx].busy, 1) == 1 )
674                 continue;
675
676         if( latch->pin & ~LVL_mask ) {
677           if( latch->pin & CLOCK_mask )
678 #ifdef unix
679                 __sync_fetch_and_add(&latch->pin, -CLOCK_unit);
680 #else
681                 _InterlockedExchangeAdd16 (&latch->pin, -CLOCK_unit);
682 #endif
683           bt->table[idx].busy[0] = 0;
684           continue;
685         }
686
687         //  update permanent page area in btree
688
689         page = (BtPage)((uid)slot * bt->page_size + bt->pagepool);
690 #ifdef unix
691         posix_fadvise (bt->idx, page_no << bt->page_bits, bt->page_size, POSIX_FADV_WILLNEED);
692         __sync_fetch_and_add (&bt->latchmgr->cache[page->lvl], -1);
693 #else
694         _InterlockedAdd(&bt->latchmgr->cache[page->lvl], -1);
695 #endif
696         if( page->dirty )
697           if( bt_writepage (bt, page, latch->page_no) )
698                 return NULL;
699
700         //  unlink our available slot from its hash chain
701
702         if( latch->prev )
703                 bt->latchsets[latch->prev].next = latch->next;
704         else
705                 bt->table[idx].slot = latch->next;
706
707         if( latch->next )
708                 bt->latchsets[latch->next].prev = latch->prev;
709
710         bt->table[idx].busy[0] = 0;
711
712         if( bt_latchlink (bt, hashidx, slot, page_no) )
713                 return NULL;
714
715         bt->table[hashidx].busy[0] = 0;
716         return latch;
717   }
718 }
719
720 //      close and release memory
721
722 void bt_close (BtDb *bt)
723 {
724 #ifdef unix
725         munmap (bt->table, bt->latchmgr->nlatchpage * bt->page_size);
726         munmap (bt->latchmgr, bt->page_size);
727 #else
728         FlushViewOfFile(bt->latchmgr, 0);
729         UnmapViewOfFile(bt->latchmgr);
730         CloseHandle(bt->halloc);
731 #endif
732 #ifdef unix
733         if( bt->mem )
734                 free (bt->mem);
735         close (bt->idx);
736         free (bt);
737 #else
738         if( bt->mem)
739                 VirtualFree (bt->mem, 0, MEM_RELEASE);
740         FlushFileBuffers(bt->idx);
741         CloseHandle(bt->idx);
742         GlobalFree (bt);
743 #endif
744 }
745 //  open/create new btree
746
747 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
748 //              size of mapped page pool (e.g. 8192)
749
750 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax)
751 {
752 uint lvl, attr, last, slot, idx;
753 uint nlatchpage, latchhash;
754 BtLatchMgr *latchmgr;
755 off64_t size, off;
756 uint amt[1];
757 BtKey key;
758 BtDb* bt;
759 int flag;
760
761 #ifndef unix
762 OVERLAPPED ovl[1];
763 #else
764 struct flock lock[1];
765 #endif
766
767         // determine sanity of page size and buffer pool
768
769         if( bits > BT_maxbits )
770                 bits = BT_maxbits;
771         else if( bits < BT_minbits )
772                 bits = BT_minbits;
773
774         if( mode == BT_ro ) {
775                 fprintf(stderr, "ReadOnly mode not supported: %s\n", name);
776                 return NULL;
777         }
778 #ifdef unix
779         bt = calloc (1, sizeof(BtDb));
780
781         bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
782         posix_fadvise( bt->idx, 0, 0, POSIX_FADV_RANDOM);
783  
784         if( bt->idx == -1 ) {
785                 fprintf(stderr, "unable to open %s\n", name);
786                 return free(bt), NULL;
787         }
788 #else
789         bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb));
790         attr = FILE_ATTRIBUTE_NORMAL;
791         bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
792
793         if( bt->idx == INVALID_HANDLE_VALUE ) {
794                 fprintf(stderr, "unable to open %s\n", name);
795                 return GlobalFree(bt), NULL;
796         }
797 #endif
798 #ifdef unix
799         memset (lock, 0, sizeof(lock));
800         lock->l_len = sizeof(struct BtPage_);
801         lock->l_type = F_WRLCK;
802
803         if( fcntl (bt->idx, F_SETLKW, lock) < 0 ) {
804                 fprintf(stderr, "unable to lock record zero %s\n", name);
805                 return bt_close (bt), NULL;
806         }
807 #else
808         memset (ovl, 0, sizeof(ovl));
809
810         //      use large offsets to
811         //      simulate advisory locking
812
813         ovl->OffsetHigh |= 0x80000000;
814
815         if( !LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, sizeof(struct BtPage_), 0L, ovl) ) {
816                 fprintf(stderr, "unable to lock record zero %s, GetLastError = %d\n", name, GetLastError());
817                 return bt_close (bt), NULL;
818         }
819 #endif 
820
821 #ifdef unix
822         latchmgr = valloc (BT_maxpage);
823         *amt = 0;
824
825         // read minimum page size to get root info
826
827         if( size = lseek (bt->idx, 0L, 2) ) {
828                 if( pread(bt->idx, latchmgr, BT_minpage, 0) == BT_minpage )
829                         bits = latchmgr->alloc->bits;
830                 else {
831                         fprintf(stderr, "Unable to read page zero\n");
832                         return free(bt), free(latchmgr), NULL;
833                 }
834         }
835 #else
836         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
837         size = GetFileSize(bt->idx, amt);
838
839         if( size || *amt ) {
840                 if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) ) {
841                         fprintf(stderr, "Unable to read page zero\n");
842                         return bt_close (bt), NULL;
843                 } else
844                         bits = latchmgr->alloc->bits;
845         }
846 #endif
847
848         bt->page_size = 1 << bits;
849         bt->page_bits = bits;
850
851         bt->mode = mode;
852
853         if( size || *amt ) {
854                 nlatchpage = latchmgr->nlatchpage;
855                 goto btlatch;
856         }
857
858         if( nodemax < 16 ) {
859                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
860                 return bt_close(bt), NULL;
861         }
862
863         // initialize an empty b-tree with latch page, root page, page of leaves
864         // and page(s) of latches and page pool cache
865
866         memset (latchmgr, 0, 1 << bits);
867         latchmgr->alloc->bits = bt->page_bits;
868
869         //  calculate number of latch hash table entries
870
871         nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + bt->page_size - 1) / bt->page_size;
872         latchhash = nlatchpage * bt->page_size / sizeof(BtHashEntry);
873
874         nlatchpage += nodemax;          // size of the buffer pool in pages
875         nlatchpage += (sizeof(BtLatchSet) * nodemax + bt->page_size - 1)/bt->page_size;
876
877         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
878         latchmgr->nlatchpage = nlatchpage;
879         latchmgr->latchtotal = nodemax;
880         latchmgr->latchhash = latchhash;
881
882         if( bt_writepage (bt, latchmgr->alloc, 0) ) {
883                 fprintf (stderr, "Unable to create btree page zero\n");
884                 return bt_close (bt), NULL;
885         }
886
887         memset (latchmgr, 0, 1 << bits);
888         latchmgr->alloc->bits = bt->page_bits;
889
890         for( lvl=MIN_lvl; lvl--; ) {
891                 last = MIN_lvl - lvl;   // page number
892                 slotptr(latchmgr->alloc, 1)->off = bt->page_size - 3;
893                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? last + 1 : 0);
894                 key = keyptr(latchmgr->alloc, 1);
895                 key->len = 2;           // create stopper key
896                 key->key[0] = 0xff;
897                 key->key[1] = 0xff;
898
899                 latchmgr->alloc->min = bt->page_size - 3;
900                 latchmgr->alloc->lvl = lvl;
901                 latchmgr->alloc->cnt = 1;
902                 latchmgr->alloc->act = 1;
903
904                 if( bt_writepage (bt, latchmgr->alloc, last) ) {
905                         fprintf (stderr, "Unable to create btree page %.8x\n", last);
906                         return bt_close (bt), NULL;
907                 }
908         }
909
910         // clear out buffer pool pages
911
912         memset(latchmgr, 0, bt->page_size);
913         last = MIN_lvl + nlatchpage;
914
915         if( bt_writepage (bt, latchmgr->alloc, last) ) {
916                 fprintf (stderr, "Unable to write buffer pool page %.8x\n", last);
917                 return bt_close (bt), NULL;
918         }
919                 
920 #ifdef unix
921         free (latchmgr);
922 #else
923         VirtualFree (latchmgr, 0, MEM_RELEASE);
924 #endif
925
926 btlatch:
927 #ifdef unix
928         lock->l_type = F_UNLCK;
929         if( fcntl (bt->idx, F_SETLK, lock) < 0 ) {
930                 fprintf (stderr, "Unable to unlock page zero\n");
931                 return bt_close (bt), NULL;
932         }
933 #else
934         if( !UnlockFileEx (bt->idx, 0, sizeof(struct BtPage_), 0, ovl) ) {
935                 fprintf (stderr, "Unable to unlock page zero, GetLastError = %d\n", GetLastError());
936                 return bt_close (bt), NULL;
937         }
938 #endif
939 #ifdef unix
940         flag = PROT_READ | PROT_WRITE;
941         bt->latchmgr = mmap (0, bt->page_size, flag, MAP_SHARED, bt->idx, ALLOC_page * bt->page_size);
942         if( bt->latchmgr == MAP_FAILED ) {
943                 fprintf (stderr, "Unable to mmap page zero, errno = %d", errno);
944                 return bt_close (bt), NULL;
945         }
946         bt->table = (void *)mmap (0, (uid)nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size);
947         if( bt->table == MAP_FAILED ) {
948                 fprintf (stderr, "Unable to mmap buffer pool, errno = %d", errno);
949                 return bt_close (bt), NULL;
950         }
951         madvise (bt->table, (uid)nlatchpage << bt->page_bits, MADV_RANDOM | MADV_WILLNEED);
952 #else
953         flag = PAGE_READWRITE;
954         bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size, NULL);
955         if( !bt->halloc ) {
956                 fprintf (stderr, "Unable to create file mapping for buffer pool mgr, GetLastError = %d\n", GetLastError());
957                 return bt_close (bt), NULL;
958         }
959
960         flag = FILE_MAP_WRITE;
961         bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size);
962         if( !bt->latchmgr ) {
963                 fprintf (stderr, "Unable to map buffer pool, GetLastError = %d\n", GetLastError());
964                 return bt_close (bt), NULL;
965         }
966
967         bt->table = (void *)((char *)bt->latchmgr + LATCH_page * bt->page_size);
968 #endif
969         bt->pagepool = (unsigned char *)bt->table + (uid)(nlatchpage - bt->latchmgr->latchtotal) * bt->page_size;
970         bt->latchsets = (BtLatchSet *)(bt->pagepool - (uid)bt->latchmgr->latchtotal * sizeof(BtLatchSet));
971
972 #ifdef unix
973         bt->mem = valloc (2 * bt->page_size);
974 #else
975         bt->mem = VirtualAlloc(NULL, 2 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
976 #endif
977         bt->frame = (BtPage)bt->mem;
978         bt->cursor = (BtPage)(bt->mem + bt->page_size);
979         return bt;
980 }
981
982 // place write, read, or parent lock on requested page_no.
983
984 void bt_lockpage(BtLock mode, BtLatchSet *latch)
985 {
986         switch( mode ) {
987         case BtLockRead:
988                 ReadLock (latch->readwr);
989                 break;
990         case BtLockWrite:
991                 WriteLock (latch->readwr);
992                 break;
993         case BtLockAccess:
994                 ReadLock (latch->access);
995                 break;
996         case BtLockDelete:
997                 WriteLock (latch->access);
998                 break;
999         case BtLockParent:
1000                 WriteLock (latch->parent);
1001                 break;
1002         }
1003 }
1004
1005 // remove write, read, or parent lock on requested page
1006
1007 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1008 {
1009         switch( mode ) {
1010         case BtLockRead:
1011                 ReadRelease (latch->readwr);
1012                 break;
1013         case BtLockWrite:
1014                 WriteRelease (latch->readwr);
1015                 break;
1016         case BtLockAccess:
1017                 ReadRelease (latch->access);
1018                 break;
1019         case BtLockDelete:
1020                 WriteRelease (latch->access);
1021                 break;
1022         case BtLockParent:
1023                 WriteRelease (latch->parent);
1024                 break;
1025         }
1026 }
1027
1028 //      allocate a new page and write page into it
1029
1030 uid bt_newpage(BtDb *bt, BtPage page)
1031 {
1032 BtLatchSet *latch;
1033 uid new_page;
1034 BtPage temp;
1035
1036         //      lock allocation page
1037
1038         bt_getmutex(bt->latchmgr->lock);
1039
1040         // use empty chain first
1041         // else allocate empty page
1042
1043         if( new_page = bt_getid(bt->latchmgr->alloc[1].right) ) {
1044           if( latch = bt_pinlatch (bt, new_page) )
1045                 temp = bt_mappage (bt, latch);
1046           else
1047                 return 0;
1048
1049           bt_putid(bt->latchmgr->alloc[1].right, bt_getid(temp->right));
1050           bt_relmutex(bt->latchmgr->lock);
1051           memcpy (temp, page, bt->page_size);
1052
1053           bt_update (bt, temp);
1054           bt_unpinlatch (latch);
1055           return new_page;
1056         } else {
1057           new_page = bt_getid(bt->latchmgr->alloc->right);
1058           bt_putid(bt->latchmgr->alloc->right, new_page+1);
1059           bt_relmutex(bt->latchmgr->lock);
1060
1061           if( bt_writepage (bt, page, new_page) )
1062                 return 0;
1063         }
1064
1065         bt_update (bt, bt->latchmgr->alloc);
1066         return new_page;
1067 }
1068
1069 //  compare two keys, returning > 0, = 0, or < 0
1070 //  as the comparison value
1071
1072 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1073 {
1074 uint len1 = key1->len;
1075 int ans;
1076
1077         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1078                 return ans;
1079
1080         if( len1 > len2 )
1081                 return 1;
1082         if( len1 < len2 )
1083                 return -1;
1084
1085         return 0;
1086 }
1087
1088 //  Update current page of btree by
1089 //      flushing mapped area to disk backing of cache pool.
1090 //      mark page as dirty for rewrite to permanent location
1091
1092 void bt_update (BtDb *bt, BtPage page)
1093 {
1094 #ifdef unix
1095         msync (page, bt->page_size, MS_ASYNC);
1096 #else
1097 //      FlushViewOfFile (page, bt->page_size);
1098 #endif
1099         page->dirty = 1;
1100 }
1101
1102 //  map the btree cached page onto current page
1103
1104 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
1105 {
1106         return (BtPage)((uid)(latch - bt->latchsets) * bt->page_size + bt->pagepool);
1107 }
1108
1109 //      deallocate a deleted page 
1110 //      place on free chain out of allocator page
1111 //      call with page latched for Writing and Deleting
1112
1113 BTERR bt_freepage(BtDb *bt, uid page_no, BtLatchSet *latch)
1114 {
1115 BtPage page = bt_mappage (bt, latch);
1116
1117         //      lock allocation page
1118
1119         bt_getmutex (bt->latchmgr->lock);
1120
1121         //      store chain in second right
1122         bt_putid(page->right, bt_getid(bt->latchmgr->alloc[1].right));
1123         bt_putid(bt->latchmgr->alloc[1].right, page_no);
1124
1125         page->free = 1;
1126         bt_update(bt, page);
1127
1128         // unlock released page
1129
1130         bt_unlockpage (BtLockDelete, latch);
1131         bt_unlockpage (BtLockWrite, latch);
1132         bt_unpinlatch (latch);
1133
1134         // unlock allocation page
1135
1136         bt_relmutex (bt->latchmgr->lock);
1137         bt_update (bt, bt->latchmgr->alloc);
1138         return 0;
1139 }
1140
1141 //  find slot in page for given key at a given level
1142
1143 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1144 {
1145 uint diff, higher = bt->page->cnt, low = 1, slot;
1146 uint good = 0;
1147
1148         //      make stopper key an infinite fence value
1149
1150         if( bt_getid (bt->page->right) )
1151                 higher++;
1152         else
1153                 good++;
1154
1155         //      low is the lowest candidate, higher is already
1156         //      tested as .ge. the given key, loop ends when they meet
1157
1158         while( diff = higher - low ) {
1159                 slot = low + ( diff >> 1 );
1160                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1161                         low = slot + 1;
1162                 else
1163                         higher = slot, good++;
1164         }
1165
1166         //      return zero if key is on right link page
1167
1168         return good ? higher : 0;
1169 }
1170
1171 //  find and load page at given level for given key
1172 //      leave page rd or wr locked as requested
1173
1174 int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
1175 {
1176 uid page_no = ROOT_page, prevpage = 0;
1177 uint drill = 0xff, slot;
1178 BtLatchSet *prevlatch;
1179 uint mode, prevmode;
1180
1181   //  start at root of btree and drill down
1182
1183   do {
1184         // determine lock mode of drill level
1185         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1186
1187         if( bt->latch = bt_pinlatch(bt, page_no) )
1188                 bt->page_no = page_no;
1189         else
1190                 return 0;
1191
1192         // obtain access lock using lock chaining
1193
1194         if( page_no > ROOT_page )
1195                 bt_lockpage(BtLockAccess, bt->latch);
1196
1197         if( prevpage ) {
1198                 bt_unlockpage(prevmode, prevlatch);
1199                 bt_unpinlatch(prevlatch);
1200                 prevpage = 0;
1201         }
1202
1203         // obtain read lock using lock chaining
1204
1205         bt_lockpage(mode, bt->latch);
1206
1207         if( page_no > ROOT_page )
1208                 bt_unlockpage(BtLockAccess, bt->latch);
1209
1210         //      map/obtain page contents
1211
1212         bt->page = bt_mappage (bt, bt->latch);
1213
1214         // re-read and re-lock root after determining actual level of root
1215
1216         if( bt->page->lvl != drill) {
1217                 if( bt->page_no != ROOT_page )
1218                         return bt->err = BTERR_struct, 0;
1219                         
1220                 drill = bt->page->lvl;
1221
1222                 if( lock != BtLockRead && drill == lvl ) {
1223                         bt_unlockpage(mode, bt->latch);
1224                         bt_unpinlatch(bt->latch);
1225                         continue;
1226                 }
1227         }
1228
1229         prevpage = bt->page_no;
1230         prevlatch = bt->latch;
1231         prevmode = mode;
1232
1233         //  find key on page at this level
1234         //  and descend to requested level
1235
1236         if( !bt->page->kill )
1237          if( slot = bt_findslot (bt, key, len) ) {
1238           if( drill == lvl )
1239                 return slot;
1240
1241           while( slotptr(bt->page, slot)->dead )
1242                 if( slot++ < bt->page->cnt )
1243                         continue;
1244                 else
1245                         goto slideright;
1246
1247           page_no = bt_getid(slotptr(bt->page, slot)->id);
1248           drill--;
1249           continue;
1250          }
1251
1252         //  or slide right into next page
1253
1254 slideright:
1255         page_no = bt_getid(bt->page->right);
1256
1257   } while( page_no );
1258
1259   // return error on end of right chain
1260
1261   bt->err = BTERR_eof;
1262   return 0;     // return error
1263 }
1264
1265 //      a fence key was deleted from a page
1266 //      push new fence value upwards
1267
1268 BTERR bt_fixfence (BtDb *bt, uid page_no, uint lvl)
1269 {
1270 unsigned char leftkey[256], rightkey[256];
1271 BtLatchSet *latch = bt->latch;
1272 BtKey ptr;
1273
1274         // remove deleted key, the old fence value
1275
1276         ptr = keyptr(bt->page, bt->page->cnt);
1277         memcpy(rightkey, ptr, ptr->len + 1);
1278
1279         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1280         bt->page->clean = 1;
1281
1282         ptr = keyptr(bt->page, bt->page->cnt);
1283         memcpy(leftkey, ptr, ptr->len + 1);
1284
1285         bt_update (bt, bt->page);
1286         bt_lockpage (BtLockParent, latch);
1287         bt_unlockpage (BtLockWrite, latch);
1288
1289         //  insert new (now smaller) fence key
1290
1291         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl + 1, page_no, time(NULL)) )
1292                 return bt->err;
1293
1294         //  remove old (larger) fence key
1295
1296         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl + 1) )
1297                 return bt->err;
1298
1299         bt_unlockpage (BtLockParent, latch);
1300         bt_unpinlatch (latch);
1301         return 0;
1302 }
1303
1304 //      root has a single child
1305 //      collapse a level from the btree
1306 //      call with root locked in bt->page
1307
1308 BTERR bt_collapseroot (BtDb *bt, BtPage root)
1309 {
1310 BtLatchSet *latch;
1311 BtPage temp;
1312 uid child;
1313 uint idx;
1314
1315         // find the child entry
1316         //      and promote to new root
1317
1318   do {
1319         for( idx = 0; idx++ < root->cnt; )
1320           if( !slotptr(root, idx)->dead )
1321                 break;
1322
1323         child = bt_getid (slotptr(root, idx)->id);
1324         if( latch = bt_pinlatch (bt, child) )
1325                 temp = bt_mappage (bt, latch);
1326         else
1327                 return bt->err;
1328
1329         bt_lockpage (BtLockDelete, latch);
1330         bt_lockpage (BtLockWrite, latch);
1331         memcpy (root, temp, bt->page_size);
1332
1333         bt_update (bt, root);
1334
1335         if( bt_freepage (bt, child, latch) )
1336                 return bt->err;
1337
1338   } while( root->lvl > 1 && root->act == 1 );
1339
1340   bt_unlockpage (BtLockWrite, bt->latch);
1341   bt_unpinlatch (bt->latch);
1342   return 0;
1343 }
1344
1345 //  find and delete key on page by marking delete flag bit
1346 //  when page becomes empty, delete it
1347
1348 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1349 {
1350 unsigned char lowerkey[256], higherkey[256];
1351 uint slot, dirty = 0, idx, fence, found;
1352 BtLatchSet *latch, *rlatch;
1353 uid page_no, right;
1354 BtPage temp;
1355 BtKey ptr;
1356
1357         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1358                 ptr = keyptr(bt->page, slot);
1359         else
1360                 return bt->err;
1361
1362         // are we deleting a fence slot?
1363
1364         fence = slot == bt->page->cnt;
1365
1366         // if key is found delete it, otherwise ignore request
1367
1368         if( found = !keycmp (ptr, key, len) )
1369           if( found = slotptr(bt->page, slot)->dead == 0 ) {
1370                 dirty = slotptr(bt->page,slot)->dead = 1;
1371                 bt->page->clean = 1;
1372                 bt->page->act--;
1373
1374                 // collapse empty slots
1375
1376                 while( idx = bt->page->cnt - 1 )
1377                   if( slotptr(bt->page, idx)->dead ) {
1378                         *slotptr(bt->page, idx) = *slotptr(bt->page, idx + 1);
1379                         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1380                   } else
1381                         break;
1382           }
1383
1384         right = bt_getid(bt->page->right);
1385         page_no = bt->page_no;
1386         latch = bt->latch;
1387
1388         if( !dirty ) {
1389           if( lvl )
1390                 return bt_abort (bt, bt->page, page_no, BTERR_notfound);
1391           bt_unlockpage(BtLockWrite, latch);
1392           bt_unpinlatch (latch);
1393           return bt->found = found, 0;
1394         }
1395
1396         //      did we delete a fence key in an upper level?
1397
1398         if( lvl && bt->page->act && fence )
1399           if( bt_fixfence (bt, page_no, lvl) )
1400                 return bt->err;
1401           else
1402                 return bt->found = found, 0;
1403
1404         //      is this a collapsed root?
1405
1406         if( lvl > 1 && page_no == ROOT_page && bt->page->act == 1 )
1407           if( bt_collapseroot (bt, bt->page) )
1408                 return bt->err;
1409           else
1410                 return bt->found = found, 0;
1411
1412         // return if page is not empty
1413
1414         if( bt->page->act ) {
1415           bt_update(bt, bt->page);
1416           bt_unlockpage(BtLockWrite, latch);
1417           bt_unpinlatch (latch);
1418           return bt->found = found, 0;
1419         }
1420
1421         // cache copy of fence key
1422         //      in order to find parent
1423
1424         ptr = keyptr(bt->page, bt->page->cnt);
1425         memcpy(lowerkey, ptr, ptr->len + 1);
1426
1427         // obtain lock on right page
1428
1429         if( rlatch = bt_pinlatch (bt, right) )
1430                 temp = bt_mappage (bt, rlatch);
1431         else
1432                 return bt->err;
1433
1434         bt_lockpage(BtLockWrite, rlatch);
1435
1436         if( temp->kill ) {
1437                 bt_abort(bt, temp, right, 0);
1438                 return bt_abort(bt, bt->page, bt->page_no, BTERR_kill);
1439         }
1440
1441         // pull contents of next page into current empty page 
1442
1443         memcpy (bt->page, temp, bt->page_size);
1444
1445         //      cache copy of key to update
1446
1447         ptr = keyptr(temp, temp->cnt);
1448         memcpy(higherkey, ptr, ptr->len + 1);
1449
1450         //  Mark right page as deleted and point it to left page
1451         //      until we can post updates at higher level.
1452
1453         bt_putid(temp->right, page_no);
1454         temp->kill = 1;
1455
1456         bt_update(bt, bt->page);
1457         bt_update(bt, temp);
1458
1459         bt_lockpage(BtLockParent, latch);
1460         bt_unlockpage(BtLockWrite, latch);
1461
1462         bt_lockpage(BtLockParent, rlatch);
1463         bt_unlockpage(BtLockWrite, rlatch);
1464
1465         //  redirect higher key directly to consolidated node
1466
1467         if( bt_insertkey (bt, higherkey+1, *higherkey, lvl+1, page_no, time(NULL)) )
1468                 return bt->err;
1469
1470         //  delete old lower key to consolidated node
1471
1472         if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) )
1473                 return bt->err;
1474
1475         //  obtain write & delete lock on deleted node
1476         //      add right block to free chain
1477
1478         bt_lockpage(BtLockDelete, rlatch);
1479         bt_lockpage(BtLockWrite, rlatch);
1480         bt_unlockpage(BtLockParent, rlatch);
1481
1482         if( bt_freepage (bt, right, rlatch) )
1483                 return bt->err;
1484
1485         bt_unlockpage(BtLockParent, latch);
1486         bt_unpinlatch(latch);
1487         return 0;
1488 }
1489
1490 //      find key in leaf level and return row-id
1491
1492 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1493 {
1494 uint  slot;
1495 BtKey ptr;
1496 uid id;
1497
1498         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1499                 ptr = keyptr(bt->page, slot);
1500         else
1501                 return 0;
1502
1503         // if key exists, return row-id
1504         //      otherwise return 0
1505
1506         if( ptr->len == len && !memcmp (ptr->key, key, len) )
1507                 id = bt_getid(slotptr(bt->page,slot)->id);
1508         else
1509                 id = 0;
1510
1511         bt_unlockpage (BtLockRead, bt->latch);
1512         bt_unpinlatch (bt->latch);
1513         return id;
1514 }
1515
1516 //      check page for space available,
1517 //      clean if necessary and return
1518 //      0 - page needs splitting
1519 //      >0 - go ahead with new slot
1520  
1521 uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
1522 {
1523 uint nxt = bt->page_size;
1524 BtPage page = bt->page;
1525 uint cnt = 0, idx = 0;
1526 uint max = page->cnt;
1527 uint newslot = slot;
1528 BtKey key;
1529 int ret;
1530
1531         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1532                 return slot;
1533
1534         //      skip cleanup if nothing to reclaim
1535
1536         if( !page->clean )
1537                 return 0;
1538
1539         memcpy (bt->frame, page, bt->page_size);
1540
1541         // skip page info and set rest of page to zero
1542
1543         memset (page+1, 0, bt->page_size - sizeof(*page));
1544         page->act = 0;
1545
1546         while( cnt++ < max ) {
1547                 if( cnt == slot )
1548                         newslot = idx + 1;
1549                 // always leave fence key in list
1550                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1551                         continue;
1552
1553                 // copy key
1554                 key = keyptr(bt->frame, cnt);
1555                 nxt -= key->len + 1;
1556                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1557
1558                 // copy slot
1559                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1560                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1561                         page->act++;
1562 #ifdef USETOD
1563                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1564 #endif
1565                 slotptr(page, idx)->off = nxt;
1566         }
1567
1568         page->min = nxt;
1569         page->cnt = idx;
1570
1571         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1572                 return newslot;
1573
1574         return 0;
1575 }
1576
1577 // split the root and raise the height of the btree
1578
1579 BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
1580 {
1581 uint nxt = bt->page_size;
1582 BtPage root = bt->page;
1583 uid right;
1584
1585         //  Obtain an empty page to use, and copy the current
1586         //  root contents into it
1587
1588         if( !(right = bt_newpage(bt, root)) )
1589                 return bt->err;
1590
1591         // preserve the page info at the bottom
1592         // and set rest to zero
1593
1594         memset(root+1, 0, bt->page_size - sizeof(*root));
1595
1596         // insert first key on newroot page
1597
1598         nxt -= *leftkey + 1;
1599         memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
1600         bt_putid(slotptr(root, 1)->id, right);
1601         slotptr(root, 1)->off = nxt;
1602         
1603         // insert second key on newroot page
1604         // and increase the root height
1605
1606         nxt -= 3;
1607         ((unsigned char *)root)[nxt] = 2;
1608         ((unsigned char *)root)[nxt+1] = 0xff;
1609         ((unsigned char *)root)[nxt+2] = 0xff;
1610         bt_putid(slotptr(root, 2)->id, page_no2);
1611         slotptr(root, 2)->off = nxt;
1612
1613         bt_putid(root->right, 0);
1614         root->min = nxt;                // reset lowest used offset and key count
1615         root->cnt = 2;
1616         root->act = 2;
1617         root->lvl++;
1618
1619         // update and release root (bt->page)
1620
1621         bt_update(bt, root);
1622
1623         bt_unlockpage(BtLockWrite, bt->latch);
1624         bt_unpinlatch(bt->latch);
1625         return 0;
1626 }
1627
1628 //  split already locked full node
1629 //      return unlocked.
1630
1631 BTERR bt_splitpage (BtDb *bt)
1632 {
1633 uint cnt = 0, idx = 0, max, nxt = bt->page_size;
1634 unsigned char fencekey[256], rightkey[256];
1635 uid page_no = bt->page_no, right;
1636 BtLatchSet *latch, *rlatch;
1637 BtPage page = bt->page;
1638 uint lvl = page->lvl;
1639 BtKey key;
1640
1641         latch = bt->latch;
1642
1643         //  split higher half of keys to bt->frame
1644         //      the last key (fence key) might be dead
1645
1646         memset (bt->frame, 0, bt->page_size);
1647         max = page->cnt;
1648         cnt = max / 2;
1649         idx = 0;
1650
1651         while( cnt++ < max ) {
1652                 key = keyptr(page, cnt);
1653                 nxt -= key->len + 1;
1654                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1655                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1656                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
1657                         bt->frame->act++;
1658 #ifdef USETOD
1659                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1660 #endif
1661                 slotptr(bt->frame, idx)->off = nxt;
1662         }
1663
1664         //      remember fence key for new right page
1665
1666         memcpy (rightkey, key, key->len + 1);
1667
1668         bt->frame->bits = bt->page_bits;
1669         bt->frame->min = nxt;
1670         bt->frame->cnt = idx;
1671         bt->frame->lvl = lvl;
1672
1673         // link right node
1674
1675         if( page_no > ROOT_page )
1676                 memcpy (bt->frame->right, page->right, BtId);
1677
1678         //      get new free page and write frame to it.
1679
1680         if( !(right = bt_newpage(bt, bt->frame)) )
1681                 return bt->err;
1682
1683         //      update lower keys to continue in old page
1684
1685         memcpy (bt->frame, page, bt->page_size);
1686         memset (page+1, 0, bt->page_size - sizeof(*page));
1687         nxt = bt->page_size;
1688         page->clean = 0;
1689         page->act = 0;
1690         cnt = 0;
1691         idx = 0;
1692
1693         //  assemble page of smaller keys
1694         //      (they're all active keys)
1695
1696         while( cnt++ < max / 2 ) {
1697                 key = keyptr(bt->frame, cnt);
1698                 nxt -= key->len + 1;
1699                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1700                 memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1701 #ifdef USETOD
1702                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1703 #endif
1704                 slotptr(page, idx)->off = nxt;
1705                 page->act++;
1706         }
1707
1708         // remember fence key for smaller page
1709
1710         memcpy (fencekey, key, key->len + 1);
1711
1712         bt_putid(page->right, right);
1713         page->min = nxt;
1714         page->cnt = idx;
1715
1716         // if current page is the root page, split it
1717
1718         if( page_no == ROOT_page )
1719                 return bt_splitroot (bt, fencekey, right);
1720
1721         //      lock right page
1722
1723         if( rlatch = bt_pinlatch (bt, right) )
1724                 bt_lockpage (BtLockParent, rlatch);
1725         else
1726                 return bt->err;
1727
1728         // update left (containing) node
1729
1730         bt_update(bt, page);
1731
1732         bt_lockpage (BtLockParent, latch);
1733         bt_unlockpage (BtLockWrite, latch);
1734
1735         // insert new fence for reformulated left block
1736
1737         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, page_no, time(NULL)) )
1738                 return bt->err;
1739
1740         //      switch fence for right block of larger keys to new right page
1741
1742         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, right, time(NULL)) )
1743                 return bt->err;
1744
1745         bt_unlockpage (BtLockParent, latch);
1746         bt_unlockpage (BtLockParent, rlatch);
1747
1748         bt_unpinlatch (rlatch);
1749         bt_unpinlatch (latch);
1750         return 0;
1751 }
1752
1753 //  Insert new key into the btree at requested level.
1754 //  Pages are unlocked at exit.
1755
1756 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
1757 {
1758 uint slot, idx;
1759 BtPage page;
1760 BtKey ptr;
1761
1762   while( 1 ) {
1763         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1764                 ptr = keyptr(bt->page, slot);
1765         else
1766         {
1767                 if( !bt->err )
1768                         bt->err = BTERR_ovflw;
1769                 return bt->err;
1770         }
1771
1772         // if key already exists, update id and return
1773
1774         page = bt->page;
1775
1776         if( !keycmp (ptr, key, len) ) {
1777           if( slotptr(page, slot)->dead )
1778                 page->act++;
1779           slotptr(page, slot)->dead = 0;
1780 #ifdef USETOD
1781           slotptr(page, slot)->tod = tod;
1782 #endif
1783           bt_putid(slotptr(page,slot)->id, id);
1784           bt_update(bt, bt->page);
1785           bt_unlockpage(BtLockWrite, bt->latch);
1786           bt_unpinlatch (bt->latch);
1787           return 0;
1788         }
1789
1790         // check if page has enough space
1791
1792         if( slot = bt_cleanpage (bt, len, slot) )
1793                 break;
1794
1795         if( bt_splitpage (bt) )
1796                 return bt->err;
1797   }
1798
1799   // calculate next available slot and copy key into page
1800
1801   page->min -= len + 1; // reset lowest used offset
1802   ((unsigned char *)page)[page->min] = len;
1803   memcpy ((unsigned char *)page + page->min +1, key, len );
1804
1805   for( idx = slot; idx < page->cnt; idx++ )
1806         if( slotptr(page, idx)->dead )
1807                 break;
1808
1809   // now insert key into array before slot
1810   // preserving the fence slot
1811
1812   if( idx == page->cnt )
1813         idx++, page->cnt++;
1814
1815   page->act++;
1816
1817   while( idx > slot )
1818         *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
1819
1820   bt_putid(slotptr(page,slot)->id, id);
1821   slotptr(page, slot)->off = page->min;
1822 #ifdef USETOD
1823   slotptr(page, slot)->tod = tod;
1824 #endif
1825   slotptr(page, slot)->dead = 0;
1826
1827   bt_update(bt, bt->page);
1828
1829   bt_unlockpage(BtLockWrite, bt->latch);
1830   bt_unpinlatch(bt->latch);
1831   return 0;
1832 }
1833
1834 //  cache page of keys into cursor and return starting slot for given key
1835
1836 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
1837 {
1838 uint slot;
1839
1840         // cache page for retrieval
1841
1842         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1843           memcpy (bt->cursor, bt->page, bt->page_size);
1844         else
1845           return 0;
1846
1847         bt_unlockpage(BtLockRead, bt->latch);
1848         bt->cursor_page = bt->page_no;
1849         bt_unpinlatch (bt->latch);
1850         return slot;
1851 }
1852
1853 //  return next slot for cursor page
1854 //  or slide cursor right into next page
1855
1856 uint bt_nextkey (BtDb *bt, uint slot)
1857 {
1858 BtLatchSet *latch;
1859 off64_t right;
1860
1861   do {
1862         right = bt_getid(bt->cursor->right);
1863
1864         while( slot++ < bt->cursor->cnt )
1865           if( slotptr(bt->cursor,slot)->dead )
1866                 continue;
1867           else if( right || (slot < bt->cursor->cnt))
1868                 return slot;
1869           else
1870                 break;
1871
1872         if( !right )
1873                 break;
1874
1875         bt->cursor_page = right;
1876
1877         if( latch = bt_pinlatch (bt, right) )
1878         bt_lockpage(BtLockRead, latch);
1879         else
1880                 return 0;
1881
1882         bt->page = bt_mappage (bt, latch);
1883         memcpy (bt->cursor, bt->page, bt->page_size);
1884         bt_unlockpage(BtLockRead, latch);
1885         bt_unpinlatch (latch);
1886         slot = 0;
1887   } while( 1 );
1888
1889   return bt->err = 0;
1890 }
1891
1892 BtKey bt_key(BtDb *bt, uint slot)
1893 {
1894         return keyptr(bt->cursor, slot);
1895 }
1896
1897 uid bt_uid(BtDb *bt, uint slot)
1898 {
1899         return bt_getid(slotptr(bt->cursor,slot)->id);
1900 }
1901
1902 #ifdef USETOD
1903 uint bt_tod(BtDb *bt, uint slot)
1904 {
1905         return slotptr(bt->cursor,slot)->tod;
1906 }
1907 #endif
1908
1909 #ifdef STANDALONE
1910
1911 uint bt_audit (BtDb *bt)
1912 {
1913 uint idx, hashidx;
1914 uid next, page_no;
1915 BtLatchSet *latch;
1916 uint blks[64];
1917 uint cnt = 0;
1918 BtPage page;
1919 uint amt[1];
1920 BtKey ptr;
1921
1922 #ifdef unix
1923         posix_fadvise( bt->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
1924 #endif
1925         if( *(ushort *)(bt->latchmgr->lock) )
1926                 fprintf(stderr, "Alloc page locked\n");
1927         *(ushort *)(bt->latchmgr->lock) = 0;
1928
1929         memset (blks, 0, sizeof(blks));
1930
1931         for( idx = 1; idx <= bt->latchmgr->latchdeployed; idx++ ) {
1932                 latch = bt->latchsets + idx;
1933                 if( *(ushort *)latch->readwr )
1934                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
1935                 *(ushort *)latch->readwr = 0;
1936
1937                 if( *(ushort *)latch->access )
1938                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
1939                 *(ushort *)latch->access = 0;
1940
1941                 if( *(ushort *)latch->parent )
1942                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
1943                 *(ushort *)latch->parent = 0;
1944
1945                 if( latch->pin & PIN_mask ) {
1946                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
1947                         latch->pin = 0;
1948                 }
1949                 page = (BtPage)((uid)idx * bt->page_size + bt->pagepool);
1950                 blks[page->lvl]++;
1951
1952             if( page->dirty )
1953                  if( bt_writepage (bt, page, latch->page_no) )
1954                         fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
1955         }
1956
1957         for( idx = 0; blks[idx]; idx++ )
1958                 fprintf(stderr, "cache: %d lvl %d blocks\n", blks[idx], idx);
1959
1960         for( hashidx = 0; hashidx < bt->latchmgr->latchhash; hashidx++ ) {
1961           if( bt->table[hashidx].busy[0] )
1962                         fprintf(stderr, "hash entry %d locked\n", hashidx);
1963
1964           bt->table[hashidx].busy[0] = 0;
1965         }
1966
1967         memset (blks, 0, sizeof(blks));
1968
1969         next = bt->latchmgr->nlatchpage + LATCH_page;
1970         page_no = LEAF_page;
1971
1972         while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
1973                 if( bt_readpage (bt, bt->frame, page_no) )
1974                   fprintf(stderr, "page %.8x unreadable\n", page_no);
1975                 if( !bt->frame->free ) {
1976                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
1977                   ptr = keyptr(bt->frame, idx+1);
1978                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
1979                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
1980                  }
1981                  if( !bt->frame->lvl )
1982                         cnt += bt->frame->act;
1983                  blks[bt->frame->lvl]++;
1984                 }
1985
1986                 if( page_no > LEAF_page )
1987                         next = page_no + 1;
1988                 page_no = next;
1989         }
1990
1991         for( idx = 0; blks[idx]; idx++ )
1992                 fprintf(stderr, "btree: %d lvl %d blocks\n", blks[idx], idx);
1993
1994         return cnt - 1;
1995 }
1996
1997 #ifndef unix
1998 double getCpuTime(int type)
1999 {
2000 FILETIME crtime[1];
2001 FILETIME xittime[1];
2002 FILETIME systime[1];
2003 FILETIME usrtime[1];
2004 SYSTEMTIME timeconv[1];
2005 double ans = 0;
2006
2007         memset (timeconv, 0, sizeof(SYSTEMTIME));
2008
2009         switch( type ) {
2010         case 0:
2011                 GetSystemTimeAsFileTime (xittime);
2012                 FileTimeToSystemTime (xittime, timeconv);
2013                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2014                 break;
2015         case 1:
2016                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2017                 FileTimeToSystemTime (usrtime, timeconv);
2018                 break;
2019         case 2:
2020                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2021                 FileTimeToSystemTime (systime, timeconv);
2022                 break;
2023         }
2024
2025         ans += (double)timeconv->wHour * 3600;
2026         ans += (double)timeconv->wMinute * 60;
2027         ans += (double)timeconv->wSecond;
2028         ans += (double)timeconv->wMilliseconds / 1000;
2029         return ans;
2030 }
2031 #else
2032 #include <time.h>
2033 #include <sys/resource.h>
2034
2035 double getCpuTime(int type)
2036 {
2037 struct rusage used[1];
2038 struct timeval tv[1];
2039
2040         switch( type ) {
2041         case 0:
2042                 gettimeofday(tv, NULL);
2043                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2044
2045         case 1:
2046                 getrusage(RUSAGE_SELF, used);
2047                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2048
2049         case 2:
2050                 getrusage(RUSAGE_SELF, used);
2051                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2052         }
2053
2054         return 0;
2055 }
2056 #endif
2057
2058 //  standalone program to index file of keys
2059 //  then list them onto std-out
2060
2061 int main (int argc, char **argv)
2062 {
2063 uint slot, line = 0, off = 0, found = 0;
2064 int ch, cnt = 0, bits = 12, idx;
2065 unsigned char key[256];
2066 double done, start;
2067 uid next, page_no;
2068 BtLatchSet *latch;
2069 float elapsed;
2070 time_t tod[1];
2071 uint scan = 0;
2072 uint len = 0;
2073 uint map = 0;
2074 BtPage page;
2075 BtKey ptr;
2076 BtDb *bt;
2077 FILE *in;
2078
2079 #ifdef WIN32
2080         _setmode (1, _O_BINARY);
2081 #endif
2082         if( argc < 4 ) {
2083                 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find/Count [page_bits mapped_pool_pages start_line_number]\n", argv[0]);
2084                 fprintf (stderr, "  page_bits: size of btree page in bits\n");
2085                 fprintf (stderr, "  mapped_pool_pages: number of pages in buffer pool\n");
2086                 exit(0);
2087         }
2088
2089         start = getCpuTime(0);
2090         time(tod);
2091
2092         if( argc > 4 )
2093                 bits = atoi(argv[4]);
2094
2095         if( argc > 5 )
2096                 map = atoi(argv[5]);
2097
2098         if( argc > 6 )
2099                 off = atoi(argv[6]);
2100
2101         bt = bt_open ((argv[1]), BT_rw, bits, map);
2102
2103         if( !bt ) {
2104                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2105                 exit (1);
2106         }
2107
2108         switch(argv[3][0]| 0x20)
2109         {
2110         case 'p':       // display page
2111                 if( latch = bt_pinlatch (bt, off) )
2112                         page = bt_mappage (bt, latch);
2113                 else
2114                         fprintf(stderr, "unable to read page %.8x\n", off);
2115
2116                 write (1, page, bt->page_size);
2117                 break;
2118
2119         case 'a':       // buffer pool audit
2120                 fprintf(stderr, "started audit for %s\n", argv[1]);
2121                 cnt = bt_audit (bt);
2122                 fprintf(stderr, "finished audit for %s, %d keys\n", argv[1], cnt);
2123                 break;
2124
2125         case 'w':       // write keys
2126                 fprintf(stderr, "started indexing for %s\n", argv[2]);
2127                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2128 #ifdef unix
2129                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2130 #endif
2131                   while( ch = getc(in), ch != EOF )
2132                         if( ch == '\n' )
2133                         {
2134                           if( off )
2135                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2136
2137                           if( bt_insertkey (bt, key, len, 0, ++line, *tod) )
2138                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2139                           len = 0;
2140                         }
2141                         else if( len < 245 )
2142                                 key[len++] = ch;
2143                   }
2144                 fprintf(stderr, "finished adding keys for %s, %d \n", argv[2], line);
2145                 break;
2146
2147         case 'd':       // delete keys
2148                 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2149                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2150 #ifdef unix
2151                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2152 #endif
2153                   while( ch = getc(in), ch != EOF )
2154                         if( ch == '\n' )
2155                         {
2156                           if( off )
2157                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2158                           line++;
2159                           if( bt_deletekey (bt, key, len, 0) )
2160                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2161                           len = 0;
2162                         }
2163                         else if( len < 245 )
2164                                 key[len++] = ch;
2165                   }
2166                 fprintf(stderr, "finished deleting keys for %s, %d \n", argv[2], line);
2167                 break;
2168
2169         case 'f':       // find keys
2170                 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2171                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2172 #ifdef unix
2173                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2174 #endif
2175                   while( ch = getc(in), ch != EOF )
2176                         if( ch == '\n' )
2177                         {
2178                           if( off )
2179                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2180                           line++;
2181                           if( bt_findkey (bt, key, len) )
2182                                 found++;
2183                           else if( bt->err )
2184                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2185                           len = 0;
2186                         }
2187                         else if( len < 245 )
2188                                 key[len++] = ch;
2189                   }
2190                 fprintf(stderr, "finished search of %d keys for %s, found %d\n", line, argv[2], found);
2191                 break;
2192
2193         case 's':       // scan and print keys
2194                 fprintf(stderr, "started scaning\n");
2195                 cnt = len = key[0] = 0;
2196
2197                 if( slot = bt_startkey (bt, key, len) )
2198                   slot--;
2199                 else
2200                   fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2201
2202                 while( slot = bt_nextkey (bt, slot) ) {
2203                         ptr = bt_key(bt, slot);
2204                         fwrite (ptr->key, ptr->len, 1, stdout);
2205                         fputc ('\n', stdout);
2206                         cnt++;
2207                 }
2208
2209                 fprintf(stderr, " Total keys read %d\n", cnt - 1);
2210                 break;
2211
2212         case 'c':       // count keys
2213           fprintf(stderr, "started counting\n");
2214           cnt = 0;
2215
2216           next = bt->latchmgr->nlatchpage + LATCH_page;
2217           page_no = LEAF_page;
2218
2219           while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
2220                 if( latch = bt_pinlatch (bt, page_no) )
2221                         page = bt_mappage (bt, latch);
2222                 if( !page->free && !page->lvl )
2223                         cnt += page->act;
2224                 if( page_no > LEAF_page )
2225                         next = page_no + 1;
2226                 if( scan )
2227                  for( idx = 0; idx++ < page->cnt; ) {
2228                   if( slotptr(page, idx)->dead )
2229                         continue;
2230                   ptr = keyptr(page, idx);
2231                   if( idx != page->cnt && bt_getid (page->right) ) {
2232                         fwrite (ptr->key, ptr->len, 1, stdout);
2233                         fputc ('\n', stdout);
2234                   }
2235                  }
2236                 bt_unpinlatch (latch);
2237                 page_no = next;
2238           }
2239                 
2240           cnt--;        // remove stopper key
2241           fprintf(stderr, " Total keys read %d\n", cnt);
2242           break;
2243         }
2244
2245         done = getCpuTime(0);
2246         elapsed = (float)(done - start);
2247         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2248         elapsed = getCpuTime(1);
2249         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2250         elapsed = getCpuTime(2);
2251         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2252         return 0;
2253 }
2254
2255 #endif  //STANDALONE