]> pd.if.org Git - btree/blob - btree2t.c
include newest lite weight latch code.
[btree] / btree2t.c
1 // btree version 2t  sched_yield version of spinlocks
2 //      with reworked bt_deletekey code
3 // 12 MAR 2014
4
5 // author: karl malbrain, malbrain@cal.berkeley.edu
6
7 /*
8 This work, including the source code, documentation
9 and related data, is placed into the public domain.
10
11 The orginal author is Karl Malbrain.
12
13 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
14 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
15 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
16 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
17 RESULTING FROM THE USE, MODIFICATION, OR
18 REDISTRIBUTION OF THIS SOFTWARE.
19 */
20
21 // Please see the project home page for documentation
22 // code.google.com/p/high-concurrency-btree
23
24 #define _FILE_OFFSET_BITS 64
25 #define _LARGEFILE64_SOURCE
26
27 #ifdef linux
28 #define _GNU_SOURCE
29 #endif
30
31 #ifdef unix
32 #include <unistd.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <time.h>
36 #include <fcntl.h>
37 #include <sys/mman.h>
38 #include <errno.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #endif
47
48 #include <memory.h>
49 #include <string.h>
50
51 typedef unsigned long long      uid;
52
53 #ifndef unix
54 typedef unsigned long long      off64_t;
55 typedef unsigned short          ushort;
56 typedef unsigned int            uint;
57 #endif
58
59 #define BT_latchtable   128                                     // number of latch manager slots
60
61 #define BT_ro 0x6f72    // ro
62 #define BT_rw 0x7772    // rw
63 #define BT_fl 0x6c66    // fl
64
65 #define BT_maxbits              24                                      // maximum page size in bits
66 #define BT_minbits              9                                       // minimum page size in bits
67 #define BT_minpage              (1 << BT_minbits)       // minimum page size
68 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
69
70 /*
71 There are five lock types for each node in three independent sets: 
72 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
73 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
74 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
75 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
76 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
77 */
78
79 typedef enum{
80         BtLockAccess,
81         BtLockDelete,
82         BtLockRead,
83         BtLockWrite,
84         BtLockParent
85 }BtLock;
86
87 //      definition for latch implementation
88
89 // exclusive is set for write access
90 // share is count of read accessors
91 // grant write lock when share == 0
92
93 volatile typedef struct {
94         ushort exclusive:1;
95         ushort pending:1;
96         ushort share:14;
97 } BtSpinLatch;
98
99 #define XCL 1
100 #define PEND 2
101 #define BOTH 3
102 #define SHARE 4
103
104 //  hash table entries
105
106 typedef struct {
107         BtSpinLatch latch[1];
108         volatile ushort slot;           // Latch table entry at head of chain
109 } BtHashEntry;
110
111 //      latch manager table structure
112
113 typedef struct {
114         BtSpinLatch readwr[1];          // read/write page lock
115         BtSpinLatch access[1];          // Access Intent/Page delete
116         BtSpinLatch parent[1];          // Posting of fence key in parent
117         BtSpinLatch busy[1];            // slot is being moved between chains
118         volatile ushort next;           // next entry in hash table chain
119         volatile ushort prev;           // prev entry in hash table chain
120         volatile ushort pin;            // number of outstanding locks
121         volatile ushort hash;           // hash slot entry is under
122         volatile uid page_no;           // latch set page number
123 } BtLatchSet;
124
125 //      Define the length of the page and key pointers
126
127 #define BtId 6
128
129 //      Page key slot definition.
130
131 //      If BT_maxbits is 15 or less, you can save 2 bytes
132 //      for each key stored by making the first two uints
133 //      into ushorts.  You can also save 4 bytes by removing
134 //      the tod field from the key.
135
136 //      Keys are marked dead, but remain on the page until
137 //      cleanup is called. The fence key (highest key) for
138 //      the page is always present, even if dead.
139
140 typedef struct {
141         uint off:BT_maxbits;            // page offset for key start
142         uint dead:1;                            // set for deleted key
143         uint tod;                                       // time-stamp for key
144         unsigned char id[BtId];         // id associated with key
145 } BtSlot;
146
147 //      The key structure occupies space at the upper end of
148 //      each page.  It's a length byte followed by the value
149 //      bytes.
150
151 typedef struct {
152         unsigned char len;
153         unsigned char key[0];
154 } *BtKey;
155
156 //      The first part of an index page.
157 //      It is immediately followed
158 //      by the BtSlot array of keys.
159
160 typedef struct BtPage_ {
161         uint cnt;                                       // count of keys in page
162         uint act;                                       // count of active keys
163         uint min;                                       // next key offset
164         unsigned char bits:7;           // page size in bits
165         unsigned char free:1;           // page is on free list
166         unsigned char lvl:6;            // level of page
167         unsigned char kill:1;           // page is being deleted
168         unsigned char dirty:1;          // page is dirty
169         unsigned char right[BtId];      // page number to right
170 } *BtPage;
171
172 //      The memory mapping hash table entry
173
174 typedef struct {
175         BtPage page;            // mapped page pointer
176         uid  page_no;           // mapped page number
177         void *lruprev;          // least recently used previous cache block
178         void *lrunext;          // lru next cache block
179         void *hashprev;         // previous cache block for the same hash idx
180         void *hashnext;         // next cache block for the same hash idx
181 #ifndef unix
182         HANDLE hmap;
183 #endif
184 }BtHash;
185
186 typedef struct {
187         struct BtPage_ alloc[2];        // next & free page_nos in right ptr
188         BtSpinLatch lock[1];            // allocation area lite latch
189         ushort latchdeployed;           // highest number of latch entries deployed
190         ushort nlatchpage;                      // number of latch pages at BT_latch
191         ushort latchtotal;                      // number of page latch entries
192         ushort latchhash;                       // number of latch hash table slots
193         ushort latchvictim;                     // next latch entry to examine
194         BtHashEntry table[0];           // the hash table
195 } BtLatchMgr;
196
197 //      The object structure for Btree access
198
199 typedef struct _BtDb {
200         uint page_size;         // each page size       
201         uint page_bits;         // each page size in bits       
202         uint seg_bits;          // segment size in pages in bits
203         uid page_no;            // current page number  
204         uid cursor_page;        // current cursor page number   
205         int  err;
206         uint mode;                      // read-write mode
207         uint mapped_io;         // use memory mapping
208         BtPage temp;            // temporary frame buffer (memory mapped/file IO)
209         BtPage alloc;           // frame buffer for alloc page ( page 0 )
210         BtPage cursor;          // cached frame for start/next (never mapped)
211         BtPage frame;           // spare frame for the page split (never mapped)
212         BtPage zero;            // zeroes frame buffer (never mapped)
213         BtPage page;            // current page
214         BtLatchSet *latch;              // current page latch
215         BtLatchMgr *latchmgr;   // mapped latch page from allocation page
216         BtLatchSet *latchsets;  // mapped latch set from latch pages
217 #ifdef unix
218         int idx;
219 #else
220         HANDLE idx;
221         HANDLE halloc;          // allocation and latch table handle
222 #endif
223         unsigned char *mem;     // frame, cursor, page memory buffer
224         int nodecnt;            // highest page cache segment in use
225         int nodemax;            // highest page cache segment allocated
226         int hashmask;           // number of pages in segments - 1
227         int hashsize;           // size of hash table
228         int found;                      // last deletekey found key
229         BtHash *lrufirst;       // lru list head
230         BtHash *lrulast;        // lru list tail
231         ushort *cache;          // hash table for cached segments
232         BtHash *nodes;          // segment cache
233 } BtDb;
234
235 typedef enum {
236 BTERR_ok = 0,
237 BTERR_notfound,
238 BTERR_struct,
239 BTERR_ovflw,
240 BTERR_lock,
241 BTERR_hash,
242 BTERR_kill,
243 BTERR_map,
244 BTERR_wrt,
245 BTERR_eof
246 } BTERR;
247
248 // B-Tree functions
249 extern void bt_close (BtDb *bt);
250 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk, uint pgblk, uint hashsize);
251 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
252 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
253 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
254 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
255 extern uint bt_nextkey   (BtDb *bt, uint slot);
256
257 //      internal functions
258 BTERR bt_update (BtDb *bt, BtPage page, uid page_no);
259 BTERR bt_mappage (BtDb *bt, BtPage *page, uid page_no);
260 //  Helper functions to return slot values
261
262 extern BtKey bt_key (BtDb *bt, uint slot);
263 extern uid bt_uid (BtDb *bt, uint slot);
264 extern uint bt_tod (BtDb *bt, uint slot);
265
266 //  BTree page number constants
267 #define ALLOC_page              0
268 #define ROOT_page               1
269 #define LEAF_page               2
270 #define LATCH_page              3
271
272 //      Number of levels to create in a new BTree
273
274 #define MIN_lvl                 2
275
276 //  The page is allocated from low and hi ends.
277 //  The key offsets and row-id's are allocated
278 //  from the bottom, while the text of the key
279 //  is allocated from the top.  When the two
280 //  areas meet, the page is split into two.
281
282 //  A key consists of a length byte, two bytes of
283 //  index number (0 - 65534), and up to 253 bytes
284 //  of key value.  Duplicate keys are discarded.
285 //  Associated with each key is a 48 bit row-id.
286
287 //  The b-tree root is always located at page 1.
288 //      The first leaf page of level zero is always
289 //      located on page 2.
290
291 //      The b-tree pages are linked with right
292 //      pointers to facilitate enumerators,
293 //      and provide for concurrency.
294
295 //      When to root page fills, it is split in two and
296 //      the tree height is raised by a new root at page
297 //      one with two keys.
298
299 //      Deleted keys are marked with a dead bit until
300 //      page cleanup The fence key for a node is always
301 //      present, even after deletion and cleanup.
302
303 //  Deleted leaf pages are reclaimed  on a free list.
304 //      The upper levels of the btree are fixed on creation.
305
306 //  Groups of pages from the btree are optionally
307 //  cached with memory mapping. A hash table is used to keep
308 //  track of the cached pages.  This behaviour is controlled
309 //  by the number of cache blocks parameter and pages per block
310 //      given to bt_open.
311
312 //  To achieve maximum concurrency one page is locked at a time
313 //  as the tree is traversed to find leaf key in question. The right
314 //  page numbers are used in cases where the page is being split,
315 //      or consolidated.
316
317 //  Page 0 (ALLOC page) is dedicated to lock for new page extensions,
318 //      and chains empty leaf pages together for reuse.
319
320 //      Parent locks are obtained to prevent resplitting or deleting a node
321 //      before its fence is posted into its upper level.
322
323 //      A special open mode of BT_fl is provided to safely access files on
324 //      WIN32 networks. WIN32 network operations should not use memory mapping.
325 //      This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
326 //      to prevent local caching of network file contents.
327
328 //      Access macros to address slot and key values from the page.
329 //      Page slots use 1 based indexing.
330
331 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
332 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
333
334 void bt_putid(unsigned char *dest, uid id)
335 {
336 int i = BtId;
337
338         while( i-- )
339                 dest[i] = (unsigned char)id, id >>= 8;
340 }
341
342 uid bt_getid(unsigned char *src)
343 {
344 uid id = 0;
345 int i;
346
347         for( i = 0; i < BtId; i++ )
348                 id <<= 8, id |= *src++; 
349
350         return id;
351 }
352
353 BTERR bt_abort (BtDb *bt, BtPage page, uid page_no, BTERR err)
354 {
355 BtKey ptr;
356
357         fprintf(stderr, "\n Btree2 abort, error %d on page %.8x\n", err, page_no);
358         fprintf(stderr, "level=%d kill=%d free=%d cnt=%x act=%x\n", page->lvl, page->kill, page->free, page->cnt, page->act);
359         ptr = keyptr(page, page->cnt);
360         fprintf(stderr, "fence='%.*s'\n", ptr->len, ptr->key);
361         fprintf(stderr, "right=%.8x\n", bt_getid(page->right));
362         return bt->err = err;
363 }
364
365 //      Spin Latch Manager
366
367 //      wait until write lock mode is clear
368 //      and add 1 to the share count
369
370 void bt_spinreadlock(BtSpinLatch *latch)
371 {
372 ushort prev;
373
374   do {
375 #ifdef unix
376         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
377 #else
378         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
379 #endif
380         //  see if exclusive request is granted or pending
381
382         if( !(prev & BOTH) )
383                 return;
384 #ifdef unix
385         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
386 #else
387         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
388 #endif
389 #ifdef  unix
390   } while( sched_yield(), 1 );
391 #else
392   } while( SwitchToThread(), 1 );
393 #endif
394 }
395
396 //      wait for other read and write latches to relinquish
397
398 void bt_spinwritelock(BtSpinLatch *latch)
399 {
400 ushort prev;
401
402   do {
403 #ifdef  unix
404         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
405 #else
406         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
407 #endif
408         if( !(prev & XCL) )
409           if( !(prev & ~BOTH) )
410                 return;
411           else
412 #ifdef unix
413                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
414 #else
415                 _InterlockedAnd16((ushort *)latch, ~XCL);
416 #endif
417 #ifdef  unix
418   } while( sched_yield(), 1 );
419 #else
420   } while( SwitchToThread(), 1 );
421 #endif
422 }
423
424 //      try to obtain write lock
425
426 //      return 1 if obtained,
427 //              0 otherwise
428
429 int bt_spinwritetry(BtSpinLatch *latch)
430 {
431 ushort prev;
432
433 #ifdef  unix
434         prev = __sync_fetch_and_or((ushort *)latch, XCL);
435 #else
436         prev = _InterlockedOr16((ushort *)latch, XCL);
437 #endif
438         //      take write access if all bits are clear
439
440         if( !(prev & XCL) )
441           if( !(prev & ~BOTH) )
442                 return 1;
443           else
444 #ifdef unix
445                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
446 #else
447                 _InterlockedAnd16((ushort *)latch, ~XCL);
448 #endif
449         return 0;
450 }
451
452 //      clear write mode
453
454 void bt_spinreleasewrite(BtSpinLatch *latch)
455 {
456 #ifdef unix
457         __sync_fetch_and_and((ushort *)latch, ~BOTH);
458 #else
459         _InterlockedAnd16((ushort *)latch, ~BOTH);
460 #endif
461 }
462
463 //      decrement reader count
464
465 void bt_spinreleaseread(BtSpinLatch *latch)
466 {
467 #ifdef unix
468         __sync_fetch_and_add((ushort *)latch, -SHARE);
469 #else
470         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
471 #endif
472 }
473
474 //      link latch table entry into latch hash table
475
476 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
477 {
478 BtLatchSet *latch = bt->latchsets + victim;
479
480         if( latch->next = bt->latchmgr->table[hashidx].slot )
481                 bt->latchsets[latch->next].prev = victim;
482
483         bt->latchmgr->table[hashidx].slot = victim;
484         latch->page_no = page_no;
485         latch->hash = hashidx;
486         latch->prev = 0;
487 }
488
489 //      release latch pin
490
491 void bt_unpinlatch (BtLatchSet *latch)
492 {
493 #ifdef unix
494         __sync_fetch_and_add(&latch->pin, -1);
495 #else
496         _InterlockedDecrement16 (&latch->pin);
497 #endif
498 }
499
500 //      find existing latchset or inspire new one
501 //      return with latchset pinned
502
503 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
504 {
505 ushort hashidx = page_no % bt->latchmgr->latchhash;
506 ushort slot, avail = 0, victim, idx;
507 BtLatchSet *latch;
508
509         //  obtain read lock on hash table entry
510
511         bt_spinreadlock(bt->latchmgr->table[hashidx].latch);
512
513         if( slot = bt->latchmgr->table[hashidx].slot ) do
514         {
515                 latch = bt->latchsets + slot;
516                 if( page_no == latch->page_no )
517                         break;
518         } while( slot = latch->next );
519
520         if( slot ) {
521 #ifdef unix
522                 __sync_fetch_and_add(&latch->pin, 1);
523 #else
524                 _InterlockedIncrement16 (&latch->pin);
525 #endif
526         }
527
528     bt_spinreleaseread (bt->latchmgr->table[hashidx].latch);
529
530         if( slot )
531                 return latch;
532
533   //  try again, this time with write lock
534
535   bt_spinwritelock(bt->latchmgr->table[hashidx].latch);
536
537   if( slot = bt->latchmgr->table[hashidx].slot ) do
538   {
539         latch = bt->latchsets + slot;
540         if( page_no == latch->page_no )
541                 break;
542         if( !latch->pin && !avail )
543                 avail = slot;
544   } while( slot = latch->next );
545
546   //  found our entry, or take over an unpinned one
547
548   if( slot || (slot = avail) ) {
549         latch = bt->latchsets + slot;
550 #ifdef unix
551         __sync_fetch_and_add(&latch->pin, 1);
552 #else
553         _InterlockedIncrement16 (&latch->pin);
554 #endif
555         latch->page_no = page_no;
556         bt_spinreleasewrite(bt->latchmgr->table[hashidx].latch);
557         return latch;
558   }
559
560         //  see if there are any unused entries
561 #ifdef unix
562         victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1;
563 #else
564         victim = _InterlockedIncrement16 (&bt->latchmgr->latchdeployed);
565 #endif
566
567         if( victim < bt->latchmgr->latchtotal ) {
568                 latch = bt->latchsets + victim;
569 #ifdef unix
570                 __sync_fetch_and_add(&latch->pin, 1);
571 #else
572                 _InterlockedIncrement16 (&latch->pin);
573 #endif
574                 bt_latchlink (bt, hashidx, victim, page_no);
575                 bt_spinreleasewrite (bt->latchmgr->table[hashidx].latch);
576                 return latch;
577         }
578
579 #ifdef unix
580         victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1);
581 #else
582         victim = _InterlockedDecrement16 (&bt->latchmgr->latchdeployed);
583 #endif
584   //  find and reuse previous lock entry
585
586   while( 1 ) {
587 #ifdef unix
588         victim = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
589 #else
590         victim = _InterlockedIncrement16 (&bt->latchmgr->latchvictim) - 1;
591 #endif
592         //      we don't use slot zero
593
594         if( victim %= bt->latchmgr->latchtotal )
595                 latch = bt->latchsets + victim;
596         else
597                 continue;
598
599         //      take control of our slot
600         //      from other threads
601
602         if( latch->pin || !bt_spinwritetry (latch->busy) )
603                 continue;
604
605         idx = latch->hash;
606
607         // try to get write lock on hash chain
608         //      skip entry if not obtained
609         //      or has outstanding locks
610
611         if( !bt_spinwritetry (bt->latchmgr->table[idx].latch) ) {
612                 bt_spinreleasewrite (latch->busy);
613                 continue;
614         }
615
616         if( latch->pin ) {
617                 bt_spinreleasewrite (latch->busy);
618                 bt_spinreleasewrite (bt->latchmgr->table[idx].latch);
619                 continue;
620         }
621
622         //  unlink our available victim from its hash chain
623
624         if( latch->prev )
625                 bt->latchsets[latch->prev].next = latch->next;
626         else
627                 bt->latchmgr->table[idx].slot = latch->next;
628
629         if( latch->next )
630                 bt->latchsets[latch->next].prev = latch->prev;
631
632         bt_spinreleasewrite (bt->latchmgr->table[idx].latch);
633 #ifdef unix
634         __sync_fetch_and_add(&latch->pin, 1);
635 #else
636         _InterlockedIncrement16 (&latch->pin);
637 #endif
638         bt_latchlink (bt, hashidx, victim, page_no);
639         bt_spinreleasewrite (bt->latchmgr->table[hashidx].latch);
640         bt_spinreleasewrite (latch->busy);
641         return latch;
642   }
643 }
644
645 //      close and release memory
646
647 void bt_close (BtDb *bt)
648 {
649 BtHash *hash;
650 #ifdef unix
651         munmap (bt->latchsets, bt->latchmgr->nlatchpage * bt->page_size);
652         munmap (bt->latchmgr, bt->page_size);
653 #else
654         FlushViewOfFile(bt->latchmgr, 0);
655         UnmapViewOfFile(bt->latchmgr);
656         CloseHandle(bt->halloc);
657 #endif
658 #ifdef unix
659         // release mapped pages
660
661         if( hash = bt->lrufirst )
662                 do munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
663                 while(hash = hash->lrunext);
664
665         if( bt->mem )
666                 free (bt->mem);
667         close (bt->idx);
668         free (bt->cache);
669         free (bt);
670 #else
671         if( hash = bt->lrufirst )
672           do
673           {
674                 FlushViewOfFile(hash->page, 0);
675                 UnmapViewOfFile(hash->page);
676                 CloseHandle(hash->hmap);
677           } while(hash = hash->lrunext);
678
679         if( bt->mem)
680                 VirtualFree (bt->mem, 0, MEM_RELEASE);
681         FlushFileBuffers(bt->idx);
682         CloseHandle(bt->idx);
683         GlobalFree (bt->cache);
684         GlobalFree (bt);
685 #endif
686 }
687 //  open/create new btree
688
689 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
690 //              size of mapped page pool (e.g. 8192)
691
692 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax, uint segsize, uint hashsize)
693 {
694 uint lvl, attr, cacheblk, last, slot, idx;
695 uint nlatchpage, latchhash;
696 BtLatchMgr *latchmgr;
697 off64_t size;
698 uint amt[1];
699 BtKey key;
700 BtDb* bt;
701 int flag;
702
703 #ifndef unix
704 SYSTEM_INFO sysinfo[1];
705 OVERLAPPED ovl[1];
706 uint len, flags;
707 #else
708 struct flock lock[1];
709 #endif
710
711         // determine sanity of page size and buffer pool
712
713         if( bits > BT_maxbits )
714                 bits = BT_maxbits;
715         else if( bits < BT_minbits )
716                 bits = BT_minbits;
717
718 #ifdef unix
719         bt = calloc (1, sizeof(BtDb));
720
721         bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
722
723         if( bt->idx == -1 )
724                 return free(bt), NULL;
725         
726         cacheblk = 4096;        // minimum mmap segment size for unix
727
728 #else
729         bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb));
730         attr = FILE_ATTRIBUTE_NORMAL;
731         bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
732
733         if( bt->idx == INVALID_HANDLE_VALUE )
734                 return GlobalFree(bt), NULL;
735
736         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
737         GetSystemInfo(sysinfo);
738         cacheblk = sysinfo->dwAllocationGranularity;
739 #endif
740
741 #ifdef unix
742         memset (lock, 0, sizeof(lock));
743
744         lock->l_type = F_WRLCK;
745         lock->l_len = sizeof(struct BtPage_);
746         lock->l_whence = 0;
747
748         if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
749                 return bt_close (bt), NULL;
750 #else
751         memset (ovl, 0, sizeof(ovl));
752         len = sizeof(struct BtPage_);
753
754         //      use large offsets to
755         //      simulate advisory locking
756
757         ovl->OffsetHigh |= 0x80000000;
758
759         if( mode == BtLockDelete || mode == BtLockWrite || mode == BtLockParent )
760                 flags |= LOCKFILE_EXCLUSIVE_LOCK;
761
762         if( LockFileEx (bt->idx, flags, 0, len, 0L, ovl) )
763                 return bt_close (bt), NULL;
764 #endif 
765 #ifdef unix
766         latchmgr = malloc (BT_maxpage);
767         *amt = 0;
768
769         // read minimum page size to get root info
770
771         if( size = lseek (bt->idx, 0L, 2) ) {
772                 if( pread(bt->idx, latchmgr, BT_minpage, 0) == BT_minpage )
773                         bits = latchmgr->alloc->bits;
774                 else
775                         return free(bt), free(latchmgr), NULL;
776         } else if( mode == BT_ro )
777                 return free(latchmgr), bt_close (bt), NULL;
778 #else
779         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
780         size = GetFileSize(bt->idx, amt);
781
782         if( size || *amt ) {
783                 if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
784                         return bt_close (bt), NULL;
785                 bits = latchmgr->alloc->bits;
786         } else if( mode == BT_ro )
787                 return bt_close (bt), NULL;
788 #endif
789
790         bt->page_size = 1 << bits;
791         bt->page_bits = bits;
792
793         bt->mode = mode;
794
795         if( cacheblk < bt->page_size )
796                 cacheblk = bt->page_size;
797
798         //  mask for partial memmaps
799
800         bt->hashmask = (cacheblk >> bits) - 1;
801
802         //      see if requested size of pages per memmap is greater
803
804         if( (1 << segsize) > bt->hashmask )
805                 bt->hashmask = (1 << segsize) - 1;
806
807         bt->seg_bits = 0;
808
809         while( (1 << bt->seg_bits) <= bt->hashmask )
810                 bt->seg_bits++;
811
812         bt->hashsize = hashsize;
813
814         if( bt->nodemax = nodemax ) {
815 #ifdef unix
816           bt->nodes = calloc (nodemax, sizeof(BtHash));
817           bt->cache = calloc (hashsize, sizeof(ushort));
818 #else
819           bt->nodes = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, nodemax * sizeof(BtHash));
820           bt->cache = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
821 #endif
822           bt->mapped_io = 1;
823         }
824
825         if( size || *amt ) {
826                 goto btlatch;
827         }
828
829         // initialize an empty b-tree with latch page, root page, page of leaves
830         // and page(s) of latches
831
832         memset (latchmgr, 0, 1 << bits);
833         nlatchpage = BT_latchtable / (bt->page_size / sizeof(BtLatchSet)) + 1; 
834         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
835         latchmgr->alloc->bits = bt->page_bits;
836
837         latchmgr->nlatchpage = nlatchpage;
838         latchmgr->latchtotal = nlatchpage * (bt->page_size / sizeof(BtLatchSet));
839
840         //  initialize latch manager
841
842         latchhash = (bt->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
843
844         //      size of hash table = total number of latchsets
845
846         if( latchhash > latchmgr->latchtotal )
847                 latchhash = latchmgr->latchtotal;
848
849         latchmgr->latchhash = latchhash;
850
851 #ifdef unix
852         if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size )
853                 return bt_close (bt), NULL;
854 #else
855         if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
856                 return bt_close (bt), NULL;
857
858         if( *amt < bt->page_size )
859                 return bt_close (bt), NULL;
860 #endif
861
862         memset (latchmgr, 0, 1 << bits);
863         latchmgr->alloc->bits = bt->page_bits;
864
865         for( lvl=MIN_lvl; lvl--; ) {
866                 slotptr(latchmgr->alloc, 1)->off = bt->page_size - 3;
867                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
868                 key = keyptr(latchmgr->alloc, 1);
869                 key->len = 2;           // create stopper key
870                 key->key[0] = 0xff;
871                 key->key[1] = 0xff;
872                 latchmgr->alloc->min = bt->page_size - 3;
873                 latchmgr->alloc->lvl = lvl;
874                 latchmgr->alloc->cnt = 1;
875                 latchmgr->alloc->act = 1;
876 #ifdef unix
877                 if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size )
878                         return bt_close (bt), NULL;
879 #else
880                 if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
881                         return bt_close (bt), NULL;
882
883                 if( *amt < bt->page_size )
884                         return bt_close (bt), NULL;
885 #endif
886         }
887
888         // clear out latch manager locks
889         //      and rest of pages to round out segment
890
891         memset(latchmgr, 0, bt->page_size);
892         last = MIN_lvl + 1;
893
894         while( last <= ((MIN_lvl + 1 + nlatchpage) | bt->hashmask) ) {
895 #ifdef unix
896                 pwrite(bt->idx, latchmgr, bt->page_size, last << bt->page_bits);
897 #else
898                 SetFilePointer (bt->idx, last << bt->page_bits, NULL, FILE_BEGIN);
899                 if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
900                         return bt_close (bt), NULL;
901                 if( *amt < bt->page_size )
902                         return bt_close (bt), NULL;
903 #endif
904                 last++;
905         }
906
907 btlatch:
908 #ifdef unix
909         lock->l_type = F_UNLCK;
910         if( fcntl (bt->idx, F_SETLK, lock) < 0 )
911                 return bt_close (bt), NULL;
912 #else
913         if( !UnlockFileEx (bt->idx, 0, sizeof(struct BtPage_), 0, ovl) )
914                 return bt_close (bt), NULL;
915 #endif
916 #ifdef unix
917         flag = PROT_READ | PROT_WRITE;
918         bt->latchmgr = mmap (0, bt->page_size, flag, MAP_SHARED, bt->idx, ALLOC_page * bt->page_size);
919         if( bt->latchmgr == MAP_FAILED )
920                 return bt_close (bt), NULL;
921         bt->latchsets = (BtLatchSet *)mmap (0, bt->latchmgr->nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size);
922         if( bt->latchsets == MAP_FAILED )
923                 return bt_close (bt), NULL;
924 #else
925         flag = PAGE_READWRITE;
926         bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, (BT_latchtable / (bt->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * bt->page_size, NULL);
927         if( !bt->halloc )
928                 return bt_close (bt), NULL;
929
930         flag = FILE_MAP_WRITE;
931         bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, (BT_latchtable / (bt->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * bt->page_size);
932         if( !bt->latchmgr )
933                 return GetLastError(), bt_close (bt), NULL;
934
935         bt->latchsets = (void *)((char *)bt->latchmgr + LATCH_page * bt->page_size);
936 #endif
937
938 #ifdef unix
939         free (latchmgr);
940 #else
941         VirtualFree (latchmgr, 0, MEM_RELEASE);
942 #endif
943
944 #ifdef unix
945         bt->mem = malloc (6 * bt->page_size);
946 #else
947         bt->mem = VirtualAlloc(NULL, 6 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
948 #endif
949         bt->frame = (BtPage)bt->mem;
950         bt->cursor = (BtPage)(bt->mem + bt->page_size);
951         bt->page = (BtPage)(bt->mem + 2 * bt->page_size);
952         bt->alloc = (BtPage)(bt->mem + 3 * bt->page_size);
953         bt->temp = (BtPage)(bt->mem + 4 * bt->page_size);
954         bt->zero = (BtPage)(bt->mem + 5 * bt->page_size);
955
956         memset (bt->zero, 0, bt->page_size);
957         return bt;
958 }
959
960 // place write, read, or parent lock on requested page_no.
961
962 void bt_lockpage(BtLock mode, BtLatchSet *latch)
963 {
964         switch( mode ) {
965         case BtLockRead:
966                 bt_spinreadlock (latch->readwr);
967                 break;
968         case BtLockWrite:
969                 bt_spinwritelock (latch->readwr);
970                 break;
971         case BtLockAccess:
972                 bt_spinreadlock (latch->access);
973                 break;
974         case BtLockDelete:
975                 bt_spinwritelock (latch->access);
976                 break;
977         case BtLockParent:
978                 bt_spinwritelock (latch->parent);
979                 break;
980         }
981 }
982
983 // remove write, read, or parent lock on requested page
984
985 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
986 {
987         switch( mode ) {
988         case BtLockRead:
989                 bt_spinreleaseread (latch->readwr);
990                 break;
991         case BtLockWrite:
992                 bt_spinreleasewrite (latch->readwr);
993                 break;
994         case BtLockAccess:
995                 bt_spinreleaseread (latch->access);
996                 break;
997         case BtLockDelete:
998                 bt_spinreleasewrite (latch->access);
999                 break;
1000         case BtLockParent:
1001                 bt_spinreleasewrite (latch->parent);
1002                 break;
1003         }
1004 }
1005
1006 //      allocate a new page and write page into it
1007
1008 uid bt_newpage(BtDb *bt, BtPage page)
1009 {
1010 uid new_page;
1011 int reuse;
1012
1013         //      lock allocation page
1014
1015         bt_spinwritelock(bt->latchmgr->lock);
1016
1017         // use empty chain first
1018         // else allocate empty page
1019
1020         if( new_page = bt_getid(bt->latchmgr->alloc[1].right) ) {
1021                 if( bt_mappage (bt, &bt->temp, new_page) )
1022                         return 0;
1023                 bt_putid(bt->latchmgr->alloc[1].right, bt_getid(bt->temp->right));
1024                 reuse = 1;
1025         } else {
1026                 new_page = bt_getid(bt->latchmgr->alloc->right);
1027                 bt_putid(bt->latchmgr->alloc->right, new_page+1);
1028                 reuse = 0;
1029         }
1030
1031         bt_spinreleasewrite(bt->latchmgr->lock);
1032
1033         if( !bt->mapped_io )
1034           if( bt_update(bt, page, new_page) )
1035                 return 0;       //don't unlock on error
1036           else
1037                 return new_page;
1038
1039 #ifdef unix
1040         if( pwrite(bt->idx, page, bt->page_size, new_page << bt->page_bits) < bt->page_size )
1041                 return bt->err = BTERR_wrt, 0;
1042
1043         // if writing first page of pool block, zero last page in the block
1044
1045         if( !reuse && bt->hashmask > 0 && (new_page & bt->hashmask) == 0 )
1046         {
1047                 // use zero buffer to write zeros
1048                 if( pwrite(bt->idx,bt->zero, bt->page_size, (new_page | bt->hashmask) << bt->page_bits) < bt->page_size )
1049                         return bt->err = BTERR_wrt, 0;
1050         }
1051 #else
1052         //      bring new page into pool and copy page.
1053         //      this will extend the file into the new pages.
1054
1055         if( bt_mappage (bt, &bt->temp, new_page) )
1056                 return 0;
1057
1058         memcpy(bt->temp, page, bt->page_size);
1059 #endif
1060         return new_page;
1061 }
1062
1063 //  compare two keys, returning > 0, = 0, or < 0
1064 //  as the comparison value
1065
1066 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1067 {
1068 uint len1 = key1->len;
1069 int ans;
1070
1071         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1072                 return ans;
1073
1074         if( len1 > len2 )
1075                 return 1;
1076         if( len1 < len2 )
1077                 return -1;
1078
1079         return 0;
1080 }
1081
1082 //  Update current page of btree by writing file contents
1083 //      or flushing mapped area to disk.
1084
1085 BTERR bt_update (BtDb *bt, BtPage page, uid page_no)
1086 {
1087 off64_t off = page_no << bt->page_bits;
1088
1089 #ifdef unix
1090     if( !bt->mapped_io )
1091          if( pwrite(bt->idx, page, bt->page_size, off) != bt->page_size )
1092                  return bt->err = BTERR_wrt;
1093 #else
1094 uint amt[1];
1095         if( !bt->mapped_io )
1096         {
1097                 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
1098                 if( !WriteFile (bt->idx, (char *)page, bt->page_size, amt, NULL) )
1099                         return GetLastError(), bt->err = BTERR_wrt;
1100
1101                 if( *amt < bt->page_size )
1102                         return GetLastError(), bt->err = BTERR_wrt;
1103         } 
1104         else if( bt->mode == BT_fl ) {
1105                         FlushViewOfFile(page, bt->page_size);
1106                         FlushFileBuffers(bt->idx);
1107         }
1108 #endif
1109         return 0;
1110 }
1111
1112 // find page in cache 
1113
1114 BtHash *bt_findhash(BtDb *bt, uid page_no)
1115 {
1116 BtHash *hash;
1117 uint idx;
1118
1119         // compute cache block first page and hash idx 
1120
1121         page_no &= ~bt->hashmask;
1122         idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
1123
1124         if( bt->cache[idx] ) 
1125                 hash = bt->nodes + bt->cache[idx];
1126         else
1127                 return NULL;
1128
1129         do if( hash->page_no == page_no )
1130                  break;
1131         while(hash = hash->hashnext );
1132
1133         return hash;
1134 }
1135
1136 // add page cache entry to hash index
1137
1138 void bt_linkhash(BtDb *bt, BtHash *node, uid page_no)
1139 {
1140 uint idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
1141 BtHash *hash;
1142
1143         if( bt->cache[idx] ) {
1144                 node->hashnext = hash = bt->nodes + bt->cache[idx];
1145                 hash->hashprev = node;
1146         }
1147
1148         node->hashprev = NULL;
1149         bt->cache[idx] = (ushort)(node - bt->nodes);
1150 }
1151
1152 // remove cache entry from hash table
1153
1154 void bt_unlinkhash(BtDb *bt, BtHash *node)
1155 {
1156 uint idx = (uint)(node->page_no >> bt->seg_bits) % bt->hashsize;
1157 BtHash *hash;
1158
1159         // unlink node
1160         if( hash = node->hashprev )
1161                 hash->hashnext = node->hashnext;
1162         else if( hash = node->hashnext )
1163                 bt->cache[idx] = (ushort)(hash - bt->nodes);
1164         else
1165                 bt->cache[idx] = 0;
1166
1167         if( hash = node->hashnext )
1168                 hash->hashprev = node->hashprev;
1169 }
1170
1171 // add cache page to lru chain and map pages
1172
1173 BtPage bt_linklru(BtDb *bt, BtHash *hash, uid page_no)
1174 {
1175 int flag;
1176 off64_t off = (page_no & ~bt->hashmask) << bt->page_bits;
1177 off64_t limit = off + ((bt->hashmask+1) << bt->page_bits);
1178 BtHash *node;
1179
1180         memset(hash, 0, sizeof(BtHash));
1181         hash->page_no = (page_no & ~bt->hashmask);
1182         bt_linkhash(bt, hash, page_no);
1183
1184         if( node = hash->lrunext = bt->lrufirst )
1185                 node->lruprev = hash;
1186         else
1187                 bt->lrulast = hash;
1188
1189         bt->lrufirst = hash;
1190
1191 #ifdef unix
1192         flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
1193         hash->page = (BtPage)mmap (0, (bt->hashmask+1) << bt->page_bits, flag, MAP_SHARED, bt->idx, off);
1194         if( hash->page == MAP_FAILED )
1195                 return bt->err = BTERR_map, (BtPage)NULL;
1196
1197 #else
1198         flag = ( bt->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1199         hash->hmap = CreateFileMapping(bt->idx, NULL, flag,     (DWORD)(limit >> 32), (DWORD)limit, NULL);
1200         if( !hash->hmap )
1201                 return bt->err = BTERR_map, NULL;
1202
1203         flag = ( bt->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1204         hash->page = MapViewOfFile(hash->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->hashmask+1) << bt->page_bits);
1205         if( !hash->page )
1206                 return bt->err = BTERR_map, NULL;
1207 #endif
1208
1209         return (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
1210 }
1211
1212 //      find or place requested page in page-cache
1213 //      return memory address where page is located.
1214
1215 BtPage bt_hashpage(BtDb *bt, uid page_no)
1216 {
1217 BtHash *hash, *node, *next;
1218 BtPage page;
1219
1220         // find page in cache and move to top of lru list  
1221
1222         if( hash = bt_findhash(bt, page_no) ) {
1223                 page = (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
1224                 // swap node in lru list
1225                 if( node = hash->lruprev ) {
1226                         if( next = node->lrunext = hash->lrunext )
1227                                 next->lruprev = node;
1228                         else
1229                                 bt->lrulast = node;
1230
1231                         if( next = hash->lrunext = bt->lrufirst )
1232                                 next->lruprev = hash;
1233                         else
1234                                 return bt->err = BTERR_hash, (BtPage)NULL;
1235
1236                         hash->lruprev = NULL;
1237                         bt->lrufirst = hash;
1238                 }
1239                 return page;
1240         }
1241
1242         // map pages and add to cache entry
1243
1244         if( bt->nodecnt < bt->nodemax ) {
1245                 hash = bt->nodes + ++bt->nodecnt;
1246                 return bt_linklru(bt, hash, page_no);
1247         }
1248
1249         // hash table is already full, replace last lru entry from the cache
1250
1251         if( hash = bt->lrulast ) {
1252                 // unlink from lru list
1253                 if( node = bt->lrulast = hash->lruprev )
1254                         node->lrunext = NULL;
1255                 else
1256                         return bt->err = BTERR_hash, (BtPage)NULL;
1257
1258 #ifdef unix
1259                 munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
1260 #else
1261                 FlushViewOfFile(hash->page, 0);
1262                 UnmapViewOfFile(hash->page);
1263                 CloseHandle(hash->hmap);
1264 #endif
1265                 // unlink from hash table
1266
1267                 bt_unlinkhash(bt, hash);
1268
1269                 // map and add to cache
1270
1271                 return bt_linklru(bt, hash, page_no);
1272         }
1273
1274         return bt->err = BTERR_hash, (BtPage)NULL;
1275 }
1276
1277 //  map a btree page onto current page
1278
1279 BTERR bt_mappage (BtDb *bt, BtPage *page, uid page_no)
1280 {
1281 off64_t off = page_no << bt->page_bits;
1282 #ifndef unix
1283 int amt[1];
1284 #endif
1285         
1286         if( bt->mapped_io ) {
1287                 bt->err = 0;
1288                 *page = bt_hashpage(bt, page_no);
1289                 return bt->err;
1290         }
1291 #ifdef unix
1292         if( pread(bt->idx, *page, bt->page_size, off) < bt->page_size )
1293                 return bt->err = BTERR_map;
1294 #else
1295         SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
1296
1297         if( !ReadFile(bt->idx, *page, bt->page_size, amt, NULL) )
1298                 return bt->err = BTERR_map;
1299
1300         if( *amt <  bt->page_size )
1301                 return bt->err = BTERR_map;
1302 #endif
1303         return 0;
1304 }
1305
1306 //      deallocate a deleted page 
1307 //      place on free chain out of allocator page
1308 //      call with page latched for Writing and Deleting
1309
1310 BTERR bt_freepage(BtDb *bt, uid page_no, BtPage page, BtLatchSet *latch)
1311 {
1312         if( bt_mappage (bt, &page, page_no) )
1313                 return bt->err;
1314
1315         //      lock allocation page
1316
1317         bt_spinwritelock (bt->latchmgr->lock);
1318
1319         //      store chain in second right
1320         bt_putid(page->right, bt_getid(bt->latchmgr->alloc[1].right));
1321         bt_putid(bt->latchmgr->alloc[1].right, page_no);
1322         page->free = 1;
1323
1324         if( bt_update(bt, page, page_no) )
1325                 return bt->err;
1326
1327         // unlock released page
1328
1329         bt_unlockpage (BtLockDelete, latch);
1330         bt_unlockpage (BtLockWrite, latch);
1331         bt_unpinlatch (latch);
1332
1333         // unlock allocation page
1334
1335         bt_spinreleasewrite (bt->latchmgr->lock);
1336         return 0;
1337 }
1338
1339 //  find slot in page for given key at a given level
1340
1341 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1342 {
1343 uint diff, higher = bt->page->cnt, low = 1, slot;
1344 uint good = 0;
1345
1346         //      make stopper key an infinite fence value
1347
1348         if( bt_getid (bt->page->right) )
1349                 higher++;
1350         else
1351                 good++;
1352
1353         //      low is the lowest candidate, higher is already
1354         //      tested as .ge. the given key, loop ends when they meet
1355
1356         while( diff = higher - low ) {
1357                 slot = low + ( diff >> 1 );
1358                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1359                         low = slot + 1;
1360                 else
1361                         higher = slot, good++;
1362         }
1363
1364         //      return zero if key is on right link page
1365
1366         return good ? higher : 0;
1367 }
1368
1369 //  find and load page at given level for given key
1370 //      leave page rd or wr locked as requested
1371
1372 int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
1373 {
1374 uid page_no = ROOT_page, prevpage = 0;
1375 uint drill = 0xff, slot;
1376 BtLatchSet *prevlatch;
1377 uint mode, prevmode;
1378
1379   //  start at root of btree and drill down
1380
1381   do {
1382         // determine lock mode of drill level
1383         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1384
1385         bt->latch = bt_pinlatch(bt, page_no);
1386         bt->page_no = page_no;
1387
1388         // obtain access lock using lock chaining
1389
1390         if( page_no > ROOT_page )
1391                 bt_lockpage(BtLockAccess, bt->latch);
1392
1393         if( prevpage ) {
1394                 bt_unlockpage(prevmode, prevlatch);
1395                 bt_unpinlatch(prevlatch);
1396                 prevpage = 0;
1397         }
1398
1399         // obtain read lock using lock chaining
1400
1401         bt_lockpage(mode, bt->latch);
1402
1403         if( page_no > ROOT_page )
1404                 bt_unlockpage(BtLockAccess, bt->latch);
1405
1406         //      map/obtain page contents
1407
1408         if( bt_mappage (bt, &bt->page, page_no) )
1409                 return 0;
1410
1411         // re-read and re-lock root after determining actual level of root
1412
1413         if( bt->page->lvl != drill) {
1414                 if( bt->page_no != ROOT_page )
1415                         return bt->err = BTERR_struct, 0;
1416                         
1417                 drill = bt->page->lvl;
1418
1419                 if( lock != BtLockRead && drill == lvl ) {
1420                         bt_unlockpage(mode, bt->latch);
1421                         bt_unpinlatch(bt->latch);
1422                         continue;
1423                 }
1424         }
1425
1426         prevpage = bt->page_no;
1427         prevlatch = bt->latch;
1428         prevmode = mode;
1429
1430         //  find key on page at this level
1431         //  and descend to requested level
1432
1433         if( !bt->page->kill )
1434          if( slot = bt_findslot (bt, key, len) ) {
1435           if( drill == lvl )
1436                 return slot;
1437
1438           while( slotptr(bt->page, slot)->dead )
1439                 if( slot++ < bt->page->cnt )
1440                         continue;
1441                 else
1442                         goto slideright;
1443
1444           page_no = bt_getid(slotptr(bt->page, slot)->id);
1445           drill--;
1446           continue;
1447          }
1448
1449         //  or slide right into next page
1450
1451 slideright:
1452         page_no = bt_getid(bt->page->right);
1453
1454   } while( page_no );
1455
1456   // return error on end of right chain
1457
1458   bt->err = BTERR_eof;
1459   return 0;     // return error
1460 }
1461
1462 //      a fence key was deleted from a page
1463 //      push new fence value upwards
1464
1465 BTERR bt_fixfence (BtDb *bt, uid page_no, uint lvl)
1466 {
1467 unsigned char leftkey[256], rightkey[256];
1468 BtLatchSet *latch = bt->latch;
1469 BtKey ptr;
1470
1471         // remove deleted key, the old fence value
1472
1473         ptr = keyptr(bt->page, bt->page->cnt);
1474         memcpy(rightkey, ptr, ptr->len + 1);
1475
1476         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1477         bt->page->dirty = 1;
1478
1479         ptr = keyptr(bt->page, bt->page->cnt);
1480         memcpy(leftkey, ptr, ptr->len + 1);
1481
1482         if( bt_update (bt, bt->page, page_no) )
1483                 return bt->err;
1484
1485         bt_lockpage (BtLockParent, latch);
1486         bt_unlockpage (BtLockWrite, latch);
1487
1488         //  insert new (now smaller) fence key
1489
1490         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl + 1, page_no, time(NULL)) )
1491                 return bt->err;
1492
1493         //  remove old (larger) fence key
1494
1495         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl + 1) )
1496                 return bt->err;
1497
1498         bt_unlockpage (BtLockParent, latch);
1499         bt_unpinlatch (latch);
1500         return 0;
1501 }
1502
1503 //      root has a single child
1504 //      collapse a level from the btree
1505 //      call with root locked in bt->page
1506
1507 BTERR bt_collapseroot (BtDb *bt, BtPage root)
1508 {
1509 BtLatchSet *latch;
1510 uid child;
1511 uint idx;
1512
1513         // find the child entry
1514         //      and promote to new root
1515
1516   do {
1517         for( idx = 0; idx++ < root->cnt; )
1518           if( !slotptr(root, idx)->dead )
1519                 break;
1520
1521         child = bt_getid (slotptr(root, idx)->id);
1522         latch = bt_pinlatch (bt, child);
1523
1524         bt_lockpage (BtLockDelete, latch);
1525         bt_lockpage (BtLockWrite, latch);
1526
1527         if( bt_mappage (bt, &bt->temp, child) )
1528                 return bt->err;
1529
1530         memcpy (root, bt->temp, bt->page_size);
1531
1532         if( bt_update (bt, root, ROOT_page) )
1533                 return bt->err;
1534
1535         if( bt_freepage (bt, child, bt->temp, latch) )
1536                 return bt->err;
1537
1538   } while( root->lvl > 1 && root->act == 1 );
1539
1540   bt_unlockpage (BtLockWrite, bt->latch);
1541   bt_unpinlatch (bt->latch);
1542   return 0;
1543 }
1544
1545 //  find and delete key on page by marking delete flag bit
1546 //  when page becomes empty, delete it
1547
1548 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1549 {
1550 unsigned char lowerkey[256], higherkey[256];
1551 uint slot, dirty = 0, idx, fence, found;
1552 BtLatchSet *latch, *rlatch;
1553 uid page_no, right;
1554 BtKey ptr;
1555
1556         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1557                 ptr = keyptr(bt->page, slot);
1558         else
1559                 return bt->err;
1560
1561         // are we deleting a fence slot?
1562
1563         fence = slot == bt->page->cnt;
1564
1565         // if key is found delete it, otherwise ignore request
1566
1567         if( found = !keycmp (ptr, key, len) )
1568           if( found = slotptr(bt->page, slot)->dead == 0 ) {
1569                 dirty = slotptr(bt->page,slot)->dead = 1;
1570                 bt->page->dirty = 1;
1571                 bt->page->act--;
1572
1573                 // collapse empty slots
1574
1575                 while( idx = bt->page->cnt - 1 )
1576                   if( slotptr(bt->page, idx)->dead ) {
1577                         *slotptr(bt->page, idx) = *slotptr(bt->page, idx + 1);
1578                         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1579                   } else
1580                         break;
1581           }
1582
1583         right = bt_getid(bt->page->right);
1584         page_no = bt->page_no;
1585         latch = bt->latch;
1586
1587         if( !dirty ) {
1588           if( lvl )
1589                 return bt_abort (bt, bt->page, page_no, BTERR_notfound);
1590           bt_unlockpage(BtLockWrite, latch);
1591           bt_unpinlatch (latch);
1592           return bt->found = found, 0;
1593         }
1594
1595         //      did we delete a fence key in an upper level?
1596
1597         if( lvl && bt->page->act && fence )
1598           if( bt_fixfence (bt, page_no, lvl) )
1599                 return bt->err;
1600           else
1601                 return bt->found = found, 0;
1602
1603         //      is this a collapsed root?
1604
1605         if( lvl > 1 && page_no == ROOT_page && bt->page->act == 1 )
1606           if( bt_collapseroot (bt, bt->page) )
1607                 return bt->err;
1608           else
1609                 return bt->found = found, 0;
1610
1611         // return if page is not empty
1612
1613         if( bt->page->act ) {
1614           if( bt_update(bt, bt->page, page_no) )
1615                 return bt->err;
1616           bt_unlockpage(BtLockWrite, latch);
1617           bt_unpinlatch (latch);
1618           return bt->found = found, 0;
1619         }
1620
1621         // cache copy of fence key
1622         //      in order to find parent
1623
1624         ptr = keyptr(bt->page, bt->page->cnt);
1625         memcpy(lowerkey, ptr, ptr->len + 1);
1626
1627         // obtain lock on right page
1628
1629         rlatch = bt_pinlatch (bt, right);
1630         bt_lockpage(BtLockWrite, rlatch);
1631
1632         if( bt_mappage (bt, &bt->temp, right) )
1633                 return bt->err;
1634
1635         if( bt->temp->kill ) {
1636                 bt_abort(bt, bt->temp, right, 0);
1637                 return bt_abort(bt, bt->page, bt->page_no, BTERR_kill);
1638         }
1639
1640         // pull contents of next page into current empty page 
1641
1642         memcpy (bt->page, bt->temp, bt->page_size);
1643
1644         //      cache copy of key to update
1645
1646         ptr = keyptr(bt->temp, bt->temp->cnt);
1647         memcpy(higherkey, ptr, ptr->len + 1);
1648
1649         //  Mark right page as deleted and point it to left page
1650         //      until we can post updates at higher level.
1651
1652         bt_putid(bt->temp->right, page_no);
1653         bt->temp->kill = 1;
1654
1655         if( bt_update(bt, bt->page, page_no) )
1656                 return bt->err;
1657
1658         if( bt_update(bt, bt->temp, right) )
1659                 return bt->err;
1660
1661         bt_lockpage(BtLockParent, latch);
1662         bt_unlockpage(BtLockWrite, latch);
1663
1664         bt_lockpage(BtLockParent, rlatch);
1665         bt_unlockpage(BtLockWrite, rlatch);
1666
1667         //  redirect higher key directly to consolidated node
1668
1669         if( bt_insertkey (bt, higherkey+1, *higherkey, lvl+1, page_no, time(NULL)) )
1670                 return bt->err;
1671
1672         //  delete old lower key to consolidated node
1673
1674         if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) )
1675                 return bt->err;
1676
1677         //  obtain write & delete lock on deleted node
1678         //      add right block to free chain
1679
1680         bt_lockpage(BtLockDelete, rlatch);
1681         bt_lockpage(BtLockWrite, rlatch);
1682         bt_unlockpage(BtLockParent, rlatch);
1683
1684         if( bt_freepage (bt, right, bt->temp, rlatch) )
1685                 return bt->err;
1686
1687         bt_unlockpage(BtLockParent, latch);
1688         bt_unpinlatch(latch);
1689         return 0;
1690 }
1691
1692 //      find key in leaf level and return row-id
1693
1694 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1695 {
1696 uint  slot;
1697 BtKey ptr;
1698 uid id;
1699
1700         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1701                 ptr = keyptr(bt->page, slot);
1702         else
1703                 return 0;
1704
1705         // if key exists, return row-id
1706         //      otherwise return 0
1707
1708         if( ptr->len == len && !memcmp (ptr->key, key, len) )
1709                 id = bt_getid(slotptr(bt->page,slot)->id);
1710         else
1711                 id = 0;
1712
1713         bt_unlockpage (BtLockRead, bt->latch);
1714         bt_unpinlatch (bt->latch);
1715         return id;
1716 }
1717
1718 //      check page for space available,
1719 //      clean if necessary and return
1720 //      0 - page needs splitting
1721 //      >0 - go ahead with new slot
1722  
1723 uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
1724 {
1725 uint nxt = bt->page_size;
1726 BtPage page = bt->page;
1727 uint cnt = 0, idx = 0;
1728 uint max = page->cnt;
1729 uint newslot = slot;
1730 BtKey key;
1731 int ret;
1732
1733         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1734                 return slot;
1735
1736         //      skip cleanup if nothing to reclaim
1737
1738         if( !page->dirty )
1739                 return 0;
1740
1741         memcpy (bt->frame, page, bt->page_size);
1742
1743         // skip page info and set rest of page to zero
1744
1745         memset (page+1, 0, bt->page_size - sizeof(*page));
1746         page->act = 0;
1747
1748         while( cnt++ < max ) {
1749                 if( cnt == slot )
1750                         newslot = idx + 1;
1751                 // always leave fence key in list
1752                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1753                         continue;
1754
1755                 // copy key
1756                 key = keyptr(bt->frame, cnt);
1757                 nxt -= key->len + 1;
1758                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1759
1760                 // copy slot
1761                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1762                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1763                         page->act++;
1764                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1765                 slotptr(page, idx)->off = nxt;
1766         }
1767
1768         page->min = nxt;
1769         page->cnt = idx;
1770
1771         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1772                 return newslot;
1773
1774         return 0;
1775 }
1776
1777 // split the root and raise the height of the btree
1778
1779 BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
1780 {
1781 uint nxt = bt->page_size;
1782 BtPage root = bt->page;
1783 uid right;
1784
1785         //  Obtain an empty page to use, and copy the current
1786         //  root contents into it
1787
1788         if( !(right = bt_newpage(bt, root)) )
1789                 return bt->err;
1790
1791         // preserve the page info at the bottom
1792         // and set rest to zero
1793
1794         memset(root+1, 0, bt->page_size - sizeof(*root));
1795
1796         // insert first key on newroot page
1797
1798         nxt -= *leftkey + 1;
1799         memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
1800         bt_putid(slotptr(root, 1)->id, right);
1801         slotptr(root, 1)->off = nxt;
1802         
1803         // insert second key on newroot page
1804         // and increase the root height
1805
1806         nxt -= 3;
1807         ((unsigned char *)root)[nxt] = 2;
1808         ((unsigned char *)root)[nxt+1] = 0xff;
1809         ((unsigned char *)root)[nxt+2] = 0xff;
1810         bt_putid(slotptr(root, 2)->id, page_no2);
1811         slotptr(root, 2)->off = nxt;
1812
1813         bt_putid(root->right, 0);
1814         root->min = nxt;                // reset lowest used offset and key count
1815         root->cnt = 2;
1816         root->act = 2;
1817         root->lvl++;
1818
1819         // update and release root (bt->page)
1820
1821         if( bt_update(bt, root, bt->page_no) )
1822                 return bt->err;
1823
1824         bt_unlockpage(BtLockWrite, bt->latch);
1825         bt_unpinlatch(bt->latch);
1826         return 0;
1827 }
1828
1829 //  split already locked full node
1830 //      return unlocked.
1831
1832 BTERR bt_splitpage (BtDb *bt)
1833 {
1834 uint cnt = 0, idx = 0, max, nxt = bt->page_size;
1835 unsigned char fencekey[256], rightkey[256];
1836 uid page_no = bt->page_no, right;
1837 BtLatchSet *latch, *rlatch;
1838 BtPage page = bt->page;
1839 uint lvl = page->lvl;
1840 BtKey key;
1841
1842         latch = bt->latch;
1843
1844         //  split higher half of keys to bt->frame
1845         //      the last key (fence key) might be dead
1846
1847         memset (bt->frame, 0, bt->page_size);
1848         max = page->cnt;
1849         cnt = max / 2;
1850         idx = 0;
1851
1852         while( cnt++ < max ) {
1853                 key = keyptr(page, cnt);
1854                 nxt -= key->len + 1;
1855                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1856                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1857                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
1858                         bt->frame->act++;
1859                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1860                 slotptr(bt->frame, idx)->off = nxt;
1861         }
1862
1863         //      remember fence key for new right page
1864
1865         memcpy (rightkey, key, key->len + 1);
1866
1867         bt->frame->bits = bt->page_bits;
1868         bt->frame->min = nxt;
1869         bt->frame->cnt = idx;
1870         bt->frame->lvl = lvl;
1871
1872         // link right node
1873
1874         if( page_no > ROOT_page )
1875                 memcpy (bt->frame->right, page->right, BtId);
1876
1877         //      get new free page and write frame to it.
1878
1879         if( !(right = bt_newpage(bt, bt->frame)) )
1880                 return bt->err;
1881
1882         //      update lower keys to continue in old page
1883
1884         memcpy (bt->frame, page, bt->page_size);
1885         memset (page+1, 0, bt->page_size - sizeof(*page));
1886         nxt = bt->page_size;
1887         page->dirty = 0;
1888         page->act = 0;
1889         cnt = 0;
1890         idx = 0;
1891
1892         //  assemble page of smaller keys
1893         //      (they're all active keys)
1894
1895         while( cnt++ < max / 2 ) {
1896                 key = keyptr(bt->frame, cnt);
1897                 nxt -= key->len + 1;
1898                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1899                 memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1900                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1901                 slotptr(page, idx)->off = nxt;
1902                 page->act++;
1903         }
1904
1905         // remember fence key for smaller page
1906
1907         memcpy (fencekey, key, key->len + 1);
1908
1909         bt_putid(page->right, right);
1910         page->min = nxt;
1911         page->cnt = idx;
1912
1913         // if current page is the root page, split it
1914
1915         if( page_no == ROOT_page )
1916                 return bt_splitroot (bt, fencekey, right);
1917
1918         //      lock right page
1919
1920         rlatch = bt_pinlatch (bt, right);
1921         bt_lockpage (BtLockParent, rlatch);
1922
1923         // update left (containing) node
1924
1925         if( bt_update(bt, page, page_no) )
1926                 return bt->err;
1927
1928         bt_lockpage (BtLockParent, latch);
1929         bt_unlockpage (BtLockWrite, latch);
1930
1931         // insert new fence for reformulated left block
1932
1933         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, page_no, time(NULL)) )
1934                 return bt->err;
1935
1936         //      switch fence for right block of larger keys to new right page
1937
1938         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, right, time(NULL)) )
1939                 return bt->err;
1940
1941         bt_unlockpage (BtLockParent, latch);
1942         bt_unlockpage (BtLockParent, rlatch);
1943
1944         bt_unpinlatch (rlatch);
1945         bt_unpinlatch (latch);
1946         return 0;
1947 }
1948
1949 //  Insert new key into the btree at requested level.
1950 //  Pages are unlocked at exit.
1951
1952 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
1953 {
1954 uint slot, idx;
1955 BtPage page;
1956 BtKey ptr;
1957
1958   while( 1 ) {
1959         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1960                 ptr = keyptr(bt->page, slot);
1961         else
1962         {
1963                 if( !bt->err )
1964                         bt->err = BTERR_ovflw;
1965                 return bt->err;
1966         }
1967
1968         // if key already exists, update id and return
1969
1970         page = bt->page;
1971
1972         if( !keycmp (ptr, key, len) ) {
1973           if( slotptr(page, slot)->dead )
1974                 page->act++;
1975           slotptr(page, slot)->dead = 0;
1976           slotptr(page, slot)->tod = tod;
1977           bt_putid(slotptr(page,slot)->id, id);
1978           if( bt_update(bt, bt->page, bt->page_no) )
1979                 return bt->err;
1980           bt_unlockpage(BtLockWrite, bt->latch);
1981           bt_unpinlatch (bt->latch);
1982           return 0;
1983         }
1984
1985         // check if page has enough space
1986
1987         if( slot = bt_cleanpage (bt, len, slot) )
1988                 break;
1989
1990         if( bt_splitpage (bt) )
1991                 return bt->err;
1992   }
1993
1994   // calculate next available slot and copy key into page
1995
1996   page->min -= len + 1; // reset lowest used offset
1997   ((unsigned char *)page)[page->min] = len;
1998   memcpy ((unsigned char *)page + page->min +1, key, len );
1999
2000   for( idx = slot; idx < page->cnt; idx++ )
2001         if( slotptr(page, idx)->dead )
2002                 break;
2003
2004   // now insert key into array before slot
2005   // preserving the fence slot
2006
2007   if( idx == page->cnt )
2008         idx++, page->cnt++;
2009
2010   page->act++;
2011
2012   while( idx > slot )
2013         *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
2014
2015   bt_putid(slotptr(page,slot)->id, id);
2016   slotptr(page, slot)->off = page->min;
2017   slotptr(page, slot)->tod = tod;
2018   slotptr(page, slot)->dead = 0;
2019
2020   if( bt_update(bt, bt->page, bt->page_no) )
2021           return bt->err;
2022
2023   bt_unlockpage(BtLockWrite, bt->latch);
2024   bt_unpinlatch(bt->latch);
2025   return 0;
2026 }
2027
2028 //  cache page of keys into cursor and return starting slot for given key
2029
2030 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2031 {
2032 uint slot;
2033
2034         // cache page for retrieval
2035
2036         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
2037           memcpy (bt->cursor, bt->page, bt->page_size);
2038         else
2039           return 0;
2040
2041         bt_unlockpage(BtLockRead, bt->latch);
2042         bt->cursor_page = bt->page_no;
2043         bt_unpinlatch (bt->latch);
2044         return slot;
2045 }
2046
2047 //  return next slot for cursor page
2048 //  or slide cursor right into next page
2049
2050 uint bt_nextkey (BtDb *bt, uint slot)
2051 {
2052 BtLatchSet *latch;
2053 off64_t right;
2054
2055   do {
2056         right = bt_getid(bt->cursor->right);
2057
2058         while( slot++ < bt->cursor->cnt )
2059           if( slotptr(bt->cursor,slot)->dead )
2060                 continue;
2061           else if( right || (slot < bt->cursor->cnt))
2062                 return slot;
2063           else
2064                 break;
2065
2066         if( !right )
2067                 break;
2068
2069         bt->cursor_page = right;
2070         latch = bt_pinlatch (bt, right);
2071     bt_lockpage(BtLockRead, latch);
2072
2073         if( bt_mappage (bt, &bt->page, right) )
2074                 return 0;
2075
2076         memcpy (bt->cursor, bt->page, bt->page_size);
2077         bt_unlockpage(BtLockRead, latch);
2078         bt_unpinlatch (latch);
2079         slot = 0;
2080   } while( 1 );
2081
2082   return bt->err = 0;
2083 }
2084
2085 BtKey bt_key(BtDb *bt, uint slot)
2086 {
2087         return keyptr(bt->cursor, slot);
2088 }
2089
2090 uid bt_uid(BtDb *bt, uint slot)
2091 {
2092         return bt_getid(slotptr(bt->cursor,slot)->id);
2093 }
2094
2095 uint bt_tod(BtDb *bt, uint slot)
2096 {
2097         return slotptr(bt->cursor,slot)->tod;
2098 }
2099
2100
2101 #ifdef STANDALONE
2102
2103 uint bt_audit (BtDb *bt)
2104 {
2105 ushort idx, hashidx;
2106 uid next, page_no;
2107 BtLatchSet *latch;
2108 uint cnt = 0;
2109 BtKey ptr;
2110
2111 #ifdef unix
2112         posix_fadvise( bt->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2113 #endif
2114         if( *(ushort *)(bt->latchmgr->lock) )
2115                 fprintf(stderr, "Alloc page locked\n");
2116         *(ushort *)(bt->latchmgr->lock) = 0;
2117
2118         for( idx = 1; idx <= bt->latchmgr->latchdeployed; idx++ ) {
2119                 latch = bt->latchsets + idx;
2120                 if( *(ushort *)latch->readwr )
2121                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2122                 *(ushort *)latch->readwr = 0;
2123
2124                 if( *(ushort *)latch->access )
2125                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2126                 *(ushort *)latch->access = 0;
2127
2128                 if( *(ushort *)latch->parent )
2129                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2130                 *(ushort *)latch->parent = 0;
2131
2132                 if( latch->pin ) {
2133                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2134                         latch->pin = 0;
2135                 }
2136         }
2137
2138         for( hashidx = 0; hashidx < bt->latchmgr->latchhash; hashidx++ ) {
2139           if( *(ushort *)(bt->latchmgr->table[hashidx].latch) )
2140                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2141
2142           *(ushort *)(bt->latchmgr->table[hashidx].latch) = 0;
2143
2144           if( idx = bt->latchmgr->table[hashidx].slot ) do {
2145                 latch = bt->latchsets + idx;
2146                 if( *(ushort *)latch->busy )
2147                         fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2148                 *(ushort *)latch->busy = 0;
2149                 if( latch->hash != hashidx )
2150                         fprintf(stderr, "latchset %d wrong hashidx\n", idx);
2151                 if( latch->pin )
2152                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2153           } while( idx = latch->next );
2154         }
2155
2156         next = bt->latchmgr->nlatchpage + LATCH_page;
2157         page_no = LEAF_page;
2158
2159         while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
2160         off64_t off = page_no << bt->page_bits;
2161 #ifdef unix
2162                 pread (bt->idx, bt->frame, bt->page_size, off);
2163 #else
2164                 DWORD amt[1];
2165
2166                   SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2167
2168                   if( !ReadFile(bt->idx, bt->frame, bt->page_size, amt, NULL))
2169                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2170
2171                   if( *amt <  bt->page_size )
2172                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2173 #endif
2174                 if( !bt->frame->free ) {
2175                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2176                   ptr = keyptr(bt->frame, idx+1);
2177                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2178                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2179                  }
2180                  if( !bt->frame->lvl )
2181                         cnt += bt->frame->act;
2182                 }
2183
2184                 if( page_no > LEAF_page )
2185                         next = page_no + 1;
2186                 page_no = next;
2187         }
2188         return cnt - 1;
2189 }
2190
2191 #ifndef unix
2192 double getCpuTime(int type)
2193 {
2194 FILETIME crtime[1];
2195 FILETIME xittime[1];
2196 FILETIME systime[1];
2197 FILETIME usrtime[1];
2198 SYSTEMTIME timeconv[1];
2199 double ans = 0;
2200
2201         memset (timeconv, 0, sizeof(SYSTEMTIME));
2202
2203         switch( type ) {
2204         case 0:
2205                 GetSystemTimeAsFileTime (xittime);
2206                 FileTimeToSystemTime (xittime, timeconv);
2207                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2208                 break;
2209         case 1:
2210                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2211                 FileTimeToSystemTime (usrtime, timeconv);
2212                 break;
2213         case 2:
2214                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2215                 FileTimeToSystemTime (systime, timeconv);
2216                 break;
2217         }
2218
2219         ans += (double)timeconv->wHour * 3600;
2220         ans += (double)timeconv->wMinute * 60;
2221         ans += (double)timeconv->wSecond;
2222         ans += (double)timeconv->wMilliseconds / 1000;
2223         return ans;
2224 }
2225 #else
2226 #include <time.h>
2227 #include <sys/resource.h>
2228
2229 double getCpuTime(int type)
2230 {
2231 struct rusage used[1];
2232 struct timeval tv[1];
2233
2234         switch( type ) {
2235         case 0:
2236                 gettimeofday(tv, NULL);
2237                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2238
2239         case 1:
2240                 getrusage(RUSAGE_SELF, used);
2241                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2242
2243         case 2:
2244                 getrusage(RUSAGE_SELF, used);
2245                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2246         }
2247
2248         return 0;
2249 }
2250 #endif
2251
2252 //  standalone program to index file of keys
2253 //  then list them onto std-out
2254
2255 int main (int argc, char **argv)
2256 {
2257 uint slot, line = 0, off = 0, found = 0;
2258 int ch, cnt = 0, bits = 12;
2259 unsigned char key[256];
2260 double done, start;
2261 uid next, page_no;
2262 uint pgblk = 0;
2263 float elapsed;
2264 time_t tod[1];
2265 uint scan = 0;
2266 uint len = 0;
2267 uint map = 0;
2268 BtKey ptr;
2269 BtDb *bt;
2270 FILE *in;
2271
2272         if( argc < 4 ) {
2273                 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find [page_bits mapped_pool_segments pages_per_segment start_line_number]\n", argv[0]);
2274                 fprintf (stderr, "  page_bits: size of btree page in bits\n");
2275                 fprintf (stderr, "  mapped_pool_segments: size of buffer pool in segments\n");
2276                 fprintf (stderr, "  pages_per_segment: size of buffer pool segment in pages in bits\n");
2277                 exit(0);
2278         }
2279
2280         start = getCpuTime(0);
2281         time(tod);
2282
2283         if( argc > 4 )
2284                 bits = atoi(argv[4]);
2285
2286         if( argc > 5 )
2287                 map = atoi(argv[5]);
2288
2289         if( map > 65536 )
2290                 fprintf (stderr, "Warning: buffer_pool > 65536 segments\n");
2291
2292         if( map && map < 8 )
2293                 fprintf (stderr, "Buffer_pool too small\n");
2294
2295         if( argc > 6 )
2296                 pgblk = atoi(argv[6]);
2297
2298         if( bits + pgblk > 30 )
2299                 fprintf (stderr, "Warning: very large buffer pool segment size\n");
2300
2301         if( argc > 7 )
2302                 off = atoi(argv[7]);
2303
2304         bt = bt_open ((argv[1]), BT_rw, bits, map, pgblk, map /  8);
2305
2306         if( !bt ) {
2307                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2308                 exit (1);
2309         }
2310
2311         switch(argv[3][0]| 0x20)
2312         {
2313         case 'a':
2314                 fprintf(stderr, "started audit for %s\n", argv[2]);
2315                 cnt = bt_audit (bt);
2316                 fprintf(stderr, "finished audit for %s, %d keys\n", argv[2], cnt);
2317                 break;
2318
2319         case 'w':
2320                 fprintf(stderr, "started indexing for %s\n", argv[2]);
2321                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2322                   while( ch = getc(in), ch != EOF )
2323                         if( ch == '\n' )
2324                         {
2325                           if( off )
2326                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2327
2328                           if( bt_insertkey (bt, key, len, 0, ++line, *tod) )
2329                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2330                           len = 0;
2331                         }
2332                         else if( len < 245 )
2333                                 key[len++] = ch;
2334                 fprintf(stderr, "finished adding keys for %s, %d \n", argv[2], line);
2335                 break;
2336
2337         case 'd':
2338                 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2339                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2340                   while( ch = getc(in), ch != EOF )
2341                         if( ch == '\n' )
2342                         {
2343                           if( off )
2344                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2345                           line++;
2346                           if( bt_deletekey (bt, key, len, 0) )
2347                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2348                           len = 0;
2349                         }
2350                         else if( len < 245 )
2351                                 key[len++] = ch;
2352                 fprintf(stderr, "finished deleting keys for %s, %d \n", argv[2], line);
2353                 break;
2354
2355         case 'f':
2356                 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2357                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2358                   while( ch = getc(in), ch != EOF )
2359                         if( ch == '\n' )
2360                         {
2361                           if( off )
2362                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2363                           line++;
2364                           if( bt_findkey (bt, key, len) )
2365                                 found++;
2366                           else if( bt->err )
2367                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2368                           len = 0;
2369                         }
2370                         else if( len < 245 )
2371                                 key[len++] = ch;
2372                 fprintf(stderr, "finished search of %d keys for %s, found %d\n", line, argv[2], found);
2373                 break;
2374
2375         case 's':
2376                 fprintf(stderr, "started scaning\n");
2377                 cnt = len = key[0] = 0;
2378
2379                 if( slot = bt_startkey (bt, key, len) )
2380                   slot--;
2381                 else
2382                   fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2383
2384                 while( slot = bt_nextkey (bt, slot) ) {
2385                         ptr = bt_key(bt, slot);
2386                         fwrite (ptr->key, ptr->len, 1, stdout);
2387                         fputc ('\n', stdout);
2388                         cnt++;
2389                 }
2390
2391                 fprintf(stderr, " Total keys read %d\n", cnt - 1);
2392                 break;
2393
2394         case 'c':
2395           fprintf(stderr, "started counting\n");
2396
2397           next = bt->latchmgr->nlatchpage + LATCH_page;
2398           page_no = LEAF_page;
2399           cnt = 0;
2400
2401           while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
2402           uid off = page_no << bt->page_bits;
2403 #ifdef unix
2404                 pread (bt->idx, bt->frame, bt->page_size, off);
2405 #else
2406                 DWORD amt[1];
2407
2408                 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2409
2410                 if( !ReadFile(bt->idx, bt->frame, bt->page_size, amt, NULL))
2411                         fprintf (stderr, "unable to read page %.8x", page_no);
2412
2413                 if( *amt <  bt->page_size )
2414                         fprintf (stderr, "unable to read page %.8x", page_no);
2415 #endif
2416                 if( !bt->frame->free && !bt->frame->lvl )
2417                         cnt += bt->frame->act;
2418                 if( page_no > LEAF_page )
2419                         next = page_no + 1;
2420                 page_no = next;
2421           }
2422                 
2423           cnt--;        // remove stopper key
2424           fprintf(stderr, " Total keys read %d\n", cnt);
2425           break;
2426         }
2427
2428         done = getCpuTime(0);
2429         elapsed = (float)(done - start);
2430         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2431         elapsed = getCpuTime(1);
2432         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2433         elapsed = getCpuTime(2);
2434         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2435         return 0;
2436 }
2437
2438 #endif  //STANDALONE