]> pd.if.org Git - btree/blob - btree2t.c
Fix a few bugs.
[btree] / btree2t.c
1 // btree version 2t  sched_yield version of spinlocks
2 //      with reworked bt_deletekey code
3 // 12 MAR 2014
4
5 // author: karl malbrain, malbrain@cal.berkeley.edu
6
7 /*
8 This work, including the source code, documentation
9 and related data, is placed into the public domain.
10
11 The orginal author is Karl Malbrain.
12
13 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
14 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
15 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
16 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
17 RESULTING FROM THE USE, MODIFICATION, OR
18 REDISTRIBUTION OF THIS SOFTWARE.
19 */
20
21 // Please see the project home page for documentation
22 // code.google.com/p/high-concurrency-btree
23
24 #define _FILE_OFFSET_BITS 64
25 #define _LARGEFILE64_SOURCE
26
27 #ifdef linux
28 #define _GNU_SOURCE
29 #endif
30
31 #ifdef unix
32 #include <unistd.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <time.h>
36 #include <fcntl.h>
37 #include <sys/mman.h>
38 #include <errno.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #endif
47
48 #include <memory.h>
49 #include <string.h>
50
51 typedef unsigned long long      uid;
52
53 #ifndef unix
54 typedef unsigned long long      off64_t;
55 typedef unsigned short          ushort;
56 typedef unsigned int            uint;
57 #endif
58
59 #define BT_latchtable   8192                                    // number of latch manager slots
60
61 #define BT_ro 0x6f72    // ro
62 #define BT_rw 0x7772    // rw
63 #define BT_fl 0x6c66    // fl
64
65 #define BT_maxbits              24                                      // maximum page size in bits
66 #define BT_minbits              9                                       // minimum page size in bits
67 #define BT_minpage              (1 << BT_minbits)       // minimum page size
68 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
69
70 /*
71 There are five lock types for each node in three independent sets: 
72 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
73 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
74 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
75 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
76 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
77 */
78
79 typedef enum{
80         BtLockAccess,
81         BtLockDelete,
82         BtLockRead,
83         BtLockWrite,
84         BtLockParent
85 }BtLock;
86
87 //      definition for latch implementation
88
89 // exclusive is set for write access
90 // share is count of read accessors
91 // grant write lock when share == 0
92
93 volatile typedef struct {
94         ushort exclusive:1;
95         ushort pending:1;
96         ushort share:14;
97 } BtSpinLatch;
98
99 #define XCL 1
100 #define PEND 2
101 #define BOTH 3
102 #define SHARE 4
103
104 //  hash table entries
105
106 typedef struct {
107         BtSpinLatch latch[1];
108         volatile ushort slot;           // Latch table entry at head of chain
109 } BtHashEntry;
110
111 //      latch manager table structure
112
113 typedef struct {
114         BtSpinLatch readwr[1];          // read/write page lock
115         BtSpinLatch access[1];          // Access Intent/Page delete
116         BtSpinLatch parent[1];          // Posting of fence key in parent
117         BtSpinLatch busy[1];            // slot is being moved between chains
118         volatile ushort next;           // next entry in hash table chain
119         volatile ushort prev;           // prev entry in hash table chain
120         volatile ushort pin;            // number of outstanding locks
121         volatile ushort hash;           // hash slot entry is under
122         volatile uid page_no;           // latch set page number
123 } BtLatchSet;
124
125 //      Define the length of the page and key pointers
126
127 #define BtId 6
128
129 //      Page key slot definition.
130
131 //      If BT_maxbits is 15 or less, you can save 2 bytes
132 //      for each key stored by making the first two uints
133 //      into ushorts.  You can also save 4 bytes by removing
134 //      the tod field from the key.
135
136 //      Keys are marked dead, but remain on the page until
137 //      cleanup is called. The fence key (highest key) for
138 //      the page is always present, even if dead.
139
140 typedef struct {
141         uint off:BT_maxbits;            // page offset for key start
142         uint dead:1;                            // set for deleted key
143         uint tod;                                       // time-stamp for key
144         unsigned char id[BtId];         // id associated with key
145 } BtSlot;
146
147 //      The key structure occupies space at the upper end of
148 //      each page.  It's a length byte followed by the value
149 //      bytes.
150
151 typedef struct {
152         unsigned char len;
153         unsigned char key[0];
154 } *BtKey;
155
156 //      The first part of an index page.
157 //      It is immediately followed
158 //      by the BtSlot array of keys.
159
160 typedef struct BtPage_ {
161         uint cnt;                                       // count of keys in page
162         uint act;                                       // count of active keys
163         uint min;                                       // next key offset
164         unsigned char bits:7;           // page size in bits
165         unsigned char free:1;           // page is on free list
166         unsigned char lvl:6;            // level of page
167         unsigned char kill:1;           // page is being deleted
168         unsigned char dirty:1;          // page is dirty
169         unsigned char right[BtId];      // page number to right
170 } *BtPage;
171
172 //      The memory mapping hash table entry
173
174 typedef struct {
175         BtPage page;            // mapped page pointer
176         uid  page_no;           // mapped page number
177         void *lruprev;          // least recently used previous cache block
178         void *lrunext;          // lru next cache block
179         void *hashprev;         // previous cache block for the same hash idx
180         void *hashnext;         // next cache block for the same hash idx
181 #ifndef unix
182         HANDLE hmap;
183 #endif
184 }BtHash;
185
186 typedef struct {
187         struct BtPage_ alloc[2];        // next & free page_nos in right ptr
188         BtSpinLatch lock[1];            // allocation area lite latch
189         ushort latchdeployed;           // highest number of latch entries deployed
190         ushort nlatchpage;                      // number of latch pages at BT_latch
191         ushort latchtotal;                      // number of page latch entries
192         ushort latchhash;                       // number of latch hash table slots
193         ushort latchvictim;                     // next latch entry to examine
194         BtHashEntry table[0];           // the hash table
195 } BtLatchMgr;
196
197 //      The object structure for Btree access
198
199 typedef struct _BtDb {
200         uint page_size;         // each page size       
201         uint page_bits;         // each page size in bits       
202         uint seg_bits;          // segment size in pages in bits
203         uid page_no;            // current page number  
204         uid cursor_page;        // current cursor page number   
205         int  err;
206         uint mode;                      // read-write mode
207         uint mapped_io;         // use memory mapping
208         BtPage temp;            // temporary frame buffer (memory mapped/file IO)
209         BtPage alloc;           // frame buffer for alloc page ( page 0 )
210         BtPage cursor;          // cached frame for start/next (never mapped)
211         BtPage frame;           // spare frame for the page split (never mapped)
212         BtPage zero;            // zeroes frame buffer (never mapped)
213         BtPage page;            // current page
214         BtLatchSet *latch;              // current page latch
215         BtLatchMgr *latchmgr;   // mapped latch page from allocation page
216         BtLatchSet *latchsets;  // mapped latch set from latch pages
217 #ifdef unix
218         int idx;
219 #else
220         HANDLE idx;
221         HANDLE halloc;          // allocation and latch table handle
222 #endif
223         unsigned char *mem;     // frame, cursor, page memory buffer
224         int nodecnt;            // highest page cache segment in use
225         int nodemax;            // highest page cache segment allocated
226         int hashmask;           // number of pages in segments - 1
227         int hashsize;           // size of hash table
228         int found;                      // last deletekey found key
229         BtHash *lrufirst;       // lru list head
230         BtHash *lrulast;        // lru list tail
231         ushort *cache;          // hash table for cached segments
232         BtHash *nodes;          // segment cache
233 } BtDb;
234
235 typedef enum {
236 BTERR_ok = 0,
237 BTERR_notfound,
238 BTERR_struct,
239 BTERR_ovflw,
240 BTERR_lock,
241 BTERR_hash,
242 BTERR_kill,
243 BTERR_map,
244 BTERR_wrt,
245 BTERR_eof
246 } BTERR;
247
248 // B-Tree functions
249 extern void bt_close (BtDb *bt);
250 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk, uint pgblk, uint hashsize);
251 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
252 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
253 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
254 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
255 extern uint bt_nextkey   (BtDb *bt, uint slot);
256
257 //      internal functions
258 BTERR bt_update (BtDb *bt, BtPage page, uid page_no);
259 BTERR bt_mappage (BtDb *bt, BtPage *page, uid page_no);
260 //  Helper functions to return slot values
261
262 extern BtKey bt_key (BtDb *bt, uint slot);
263 extern uid bt_uid (BtDb *bt, uint slot);
264 extern uint bt_tod (BtDb *bt, uint slot);
265
266 //  BTree page number constants
267 #define ALLOC_page              0
268 #define ROOT_page               1
269 #define LEAF_page               2
270 #define LATCH_page              3
271
272 //      Number of levels to create in a new BTree
273
274 #define MIN_lvl                 2
275
276 //  The page is allocated from low and hi ends.
277 //  The key offsets and row-id's are allocated
278 //  from the bottom, while the text of the key
279 //  is allocated from the top.  When the two
280 //  areas meet, the page is split into two.
281
282 //  A key consists of a length byte, two bytes of
283 //  index number (0 - 65534), and up to 253 bytes
284 //  of key value.  Duplicate keys are discarded.
285 //  Associated with each key is a 48 bit row-id.
286
287 //  The b-tree root is always located at page 1.
288 //      The first leaf page of level zero is always
289 //      located on page 2.
290
291 //      The b-tree pages are linked with right
292 //      pointers to facilitate enumerators,
293 //      and provide for concurrency.
294
295 //      When to root page fills, it is split in two and
296 //      the tree height is raised by a new root at page
297 //      one with two keys.
298
299 //      Deleted keys are marked with a dead bit until
300 //      page cleanup The fence key for a node is always
301 //      present, even after deletion and cleanup.
302
303 //  Deleted leaf pages are reclaimed  on a free list.
304 //      The upper levels of the btree are fixed on creation.
305
306 //  Groups of pages from the btree are optionally
307 //  cached with memory mapping. A hash table is used to keep
308 //  track of the cached pages.  This behaviour is controlled
309 //  by the number of cache blocks parameter and pages per block
310 //      given to bt_open.
311
312 //  To achieve maximum concurrency one page is locked at a time
313 //  as the tree is traversed to find leaf key in question. The right
314 //  page numbers are used in cases where the page is being split,
315 //      or consolidated.
316
317 //  Page 0 (ALLOC page) is dedicated to lock for new page extensions,
318 //      and chains empty leaf pages together for reuse.
319
320 //      Parent locks are obtained to prevent resplitting or deleting a node
321 //      before its fence is posted into its upper level.
322
323 //      A special open mode of BT_fl is provided to safely access files on
324 //      WIN32 networks. WIN32 network operations should not use memory mapping.
325 //      This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
326 //      to prevent local caching of network file contents.
327
328 //      Access macros to address slot and key values from the page.
329 //      Page slots use 1 based indexing.
330
331 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
332 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
333
334 void bt_putid(unsigned char *dest, uid id)
335 {
336 int i = BtId;
337
338         while( i-- )
339                 dest[i] = (unsigned char)id, id >>= 8;
340 }
341
342 uid bt_getid(unsigned char *src)
343 {
344 uid id = 0;
345 int i;
346
347         for( i = 0; i < BtId; i++ )
348                 id <<= 8, id |= *src++; 
349
350         return id;
351 }
352
353 BTERR bt_abort (BtDb *bt, BtPage page, uid page_no, BTERR err)
354 {
355 BtKey ptr;
356
357         fprintf(stderr, "\n Btree2 abort, error %d on page %.8x\n", err, page_no);
358         fprintf(stderr, "level=%d kill=%d free=%d cnt=%x act=%x\n", page->lvl, page->kill, page->free, page->cnt, page->act);
359         ptr = keyptr(page, page->cnt);
360         fprintf(stderr, "fence='%.*s'\n", ptr->len, ptr->key);
361         fprintf(stderr, "right=%.8x\n", bt_getid(page->right));
362         return bt->err = err;
363 }
364
365 //      Spin Latch Manager
366
367 //      wait until write lock mode is clear
368 //      and add 1 to the share count
369
370 void bt_spinreadlock(BtSpinLatch *latch)
371 {
372 ushort prev;
373
374   do {
375 #ifdef unix
376         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
377 #else
378         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
379 #endif
380         //  see if exclusive request is granted or pending
381
382         if( !(prev & BOTH) )
383                 return;
384 #ifdef unix
385         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
386 #else
387         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
388 #endif
389 #ifdef  unix
390   } while( sched_yield(), 1 );
391 #else
392   } while( SwitchToThread(), 1 );
393 #endif
394 }
395
396 //      wait for other read and write latches to relinquish
397
398 void bt_spinwritelock(BtSpinLatch *latch)
399 {
400 ushort prev;
401
402   do {
403 #ifdef  unix
404         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
405 #else
406         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
407 #endif
408         if( !(prev & XCL) )
409           if( !(prev & ~BOTH) )
410                 return;
411           else
412 #ifdef unix
413                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
414 #else
415                 _InterlockedAnd16((ushort *)latch, ~XCL);
416 #endif
417 #ifdef  unix
418   } while( sched_yield(), 1 );
419 #else
420   } while( SwitchToThread(), 1 );
421 #endif
422 }
423
424 //      try to obtain write lock
425
426 //      return 1 if obtained,
427 //              0 otherwise
428
429 int bt_spinwritetry(BtSpinLatch *latch)
430 {
431 ushort prev;
432
433 #ifdef  unix
434         prev = __sync_fetch_and_or((ushort *)latch, XCL);
435 #else
436         prev = _InterlockedOr16((ushort *)latch, XCL);
437 #endif
438         //      take write access if all bits are clear
439
440         if( !(prev & XCL) )
441           if( !(prev & ~BOTH) )
442                 return 1;
443           else
444 #ifdef unix
445                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
446 #else
447                 _InterlockedAnd16((ushort *)latch, ~XCL);
448 #endif
449         return 0;
450 }
451
452 //      clear write mode
453
454 void bt_spinreleasewrite(BtSpinLatch *latch)
455 {
456 #ifdef unix
457         __sync_fetch_and_and((ushort *)latch, ~BOTH);
458 #else
459         _InterlockedAnd16((ushort *)latch, ~BOTH);
460 #endif
461 }
462
463 //      decrement reader count
464
465 void bt_spinreleaseread(BtSpinLatch *latch)
466 {
467 #ifdef unix
468         __sync_fetch_and_add((ushort *)latch, -SHARE);
469 #else
470         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
471 #endif
472 }
473
474 //      link latch table entry into latch hash table
475
476 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
477 {
478 BtLatchSet *latch = bt->latchsets + victim;
479
480         if( latch->next = bt->latchmgr->table[hashidx].slot )
481                 bt->latchsets[latch->next].prev = victim;
482
483         bt->latchmgr->table[hashidx].slot = victim;
484         latch->page_no = page_no;
485         latch->hash = hashidx;
486         latch->prev = 0;
487 }
488
489 //      release latch pin
490
491 void bt_unpinlatch (BtLatchSet *latch)
492 {
493 #ifdef unix
494         __sync_fetch_and_add(&latch->pin, -1);
495 #else
496         _InterlockedDecrement16 (&latch->pin);
497 #endif
498 }
499
500 //      find existing latchset or inspire new one
501 //      return with latchset pinned
502
503 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
504 {
505 ushort hashidx = page_no % bt->latchmgr->latchhash;
506 ushort slot, avail = 0, victim, idx;
507 BtLatchSet *latch;
508
509         //  obtain read lock on hash table entry
510
511         bt_spinreadlock(bt->latchmgr->table[hashidx].latch);
512
513         if( slot = bt->latchmgr->table[hashidx].slot ) do
514         {
515                 latch = bt->latchsets + slot;
516                 if( page_no == latch->page_no )
517                         break;
518         } while( slot = latch->next );
519
520         if( slot ) {
521 #ifdef unix
522                 __sync_fetch_and_add(&latch->pin, 1);
523 #else
524                 _InterlockedIncrement16 (&latch->pin);
525 #endif
526         }
527
528     bt_spinreleaseread (bt->latchmgr->table[hashidx].latch);
529
530         if( slot )
531                 return latch;
532
533   //  try again, this time with write lock
534
535   bt_spinwritelock(bt->latchmgr->table[hashidx].latch);
536
537   if( slot = bt->latchmgr->table[hashidx].slot ) do
538   {
539         latch = bt->latchsets + slot;
540         if( page_no == latch->page_no )
541                 break;
542         if( !latch->pin && !avail )
543                 avail = slot;
544   } while( slot = latch->next );
545
546   //  found our entry, or take over an unpinned one
547
548   if( slot || (slot = avail) ) {
549         latch = bt->latchsets + slot;
550 #ifdef unix
551         __sync_fetch_and_add(&latch->pin, 1);
552 #else
553         _InterlockedIncrement16 (&latch->pin);
554 #endif
555         latch->page_no = page_no;
556         bt_spinreleasewrite(bt->latchmgr->table[hashidx].latch);
557         return latch;
558   }
559
560         //  see if there are any unused entries
561 #ifdef unix
562         victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1;
563 #else
564         victim = _InterlockedIncrement16 (&bt->latchmgr->latchdeployed);
565 #endif
566
567         if( victim < bt->latchmgr->latchtotal ) {
568                 latch = bt->latchsets + victim;
569 #ifdef unix
570                 __sync_fetch_and_add(&latch->pin, 1);
571 #else
572                 _InterlockedIncrement16 (&latch->pin);
573 #endif
574                 bt_latchlink (bt, hashidx, victim, page_no);
575                 bt_spinreleasewrite (bt->latchmgr->table[hashidx].latch);
576                 return latch;
577         }
578
579 #ifdef unix
580         victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1);
581 #else
582         victim = _InterlockedDecrement16 (&bt->latchmgr->latchdeployed);
583 #endif
584   //  find and reuse previous lock entry
585
586   while( 1 ) {
587 #ifdef unix
588         victim = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
589 #else
590         victim = _InterlockedIncrement16 (&bt->latchmgr->latchvictim) - 1;
591 #endif
592         //      we don't use slot zero
593
594         if( victim %= bt->latchmgr->latchtotal )
595                 latch = bt->latchsets + victim;
596         else
597                 continue;
598
599         //      take control of our slot
600         //      from other threads
601
602         if( latch->pin || !bt_spinwritetry (latch->busy) )
603                 continue;
604
605         idx = latch->hash;
606
607         // try to get write lock on hash chain
608         //      skip entry if not obtained
609         //      or has outstanding locks
610
611         if( !bt_spinwritetry (bt->latchmgr->table[idx].latch) ) {
612                 bt_spinreleasewrite (latch->busy);
613                 continue;
614         }
615
616         if( latch->pin ) {
617                 bt_spinreleasewrite (latch->busy);
618                 bt_spinreleasewrite (bt->latchmgr->table[idx].latch);
619                 continue;
620         }
621
622         //  unlink our available victim from its hash chain
623
624         if( latch->prev )
625                 bt->latchsets[latch->prev].next = latch->next;
626         else
627                 bt->latchmgr->table[idx].slot = latch->next;
628
629         if( latch->next )
630                 bt->latchsets[latch->next].prev = latch->prev;
631
632         bt_spinreleasewrite (bt->latchmgr->table[idx].latch);
633 #ifdef unix
634         __sync_fetch_and_add(&latch->pin, 1);
635 #else
636         _InterlockedIncrement16 (&latch->pin);
637 #endif
638         bt_latchlink (bt, hashidx, victim, page_no);
639         bt_spinreleasewrite (bt->latchmgr->table[hashidx].latch);
640         bt_spinreleasewrite (latch->busy);
641         return latch;
642   }
643 }
644
645 //      close and release memory
646
647 void bt_close (BtDb *bt)
648 {
649 BtHash *hash;
650 #ifdef unix
651         munmap (bt->latchsets, bt->latchmgr->nlatchpage * bt->page_size);
652         munmap (bt->latchmgr, bt->page_size);
653 #else
654         FlushViewOfFile(bt->latchmgr, 0);
655         UnmapViewOfFile(bt->latchmgr);
656         CloseHandle(bt->halloc);
657 #endif
658 #ifdef unix
659         // release mapped pages
660
661         if( hash = bt->lrufirst )
662                 do munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
663                 while(hash = hash->lrunext);
664
665         if( bt->mem )
666                 free (bt->mem);
667         close (bt->idx);
668         free (bt->cache);
669         free (bt);
670 #else
671         if( hash = bt->lrufirst )
672           do
673           {
674                 FlushViewOfFile(hash->page, 0);
675                 UnmapViewOfFile(hash->page);
676                 CloseHandle(hash->hmap);
677           } while(hash = hash->lrunext);
678
679         if( bt->mem)
680                 VirtualFree (bt->mem, 0, MEM_RELEASE);
681         FlushFileBuffers(bt->idx);
682         CloseHandle(bt->idx);
683         GlobalFree (bt->cache);
684         GlobalFree (bt);
685 #endif
686 }
687 //  open/create new btree
688
689 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
690 //              size of mapped page pool (e.g. 8192)
691
692 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax, uint segsize, uint hashsize)
693 {
694 uint lvl, attr, cacheblk, last, slot, idx;
695 uint nlatchpage, latchhash;
696 BtLatchMgr *latchmgr;
697 off64_t size;
698 uint amt[1];
699 BtKey key;
700 BtDb* bt;
701 int flag;
702
703 #ifndef unix
704 SYSTEM_INFO sysinfo[1];
705 OVERLAPPED ovl[1];
706 uint len, flags;
707 #else
708 struct flock lock[1];
709 #endif
710
711         // determine sanity of page size and buffer pool
712
713         if( bits > BT_maxbits )
714                 bits = BT_maxbits;
715         else if( bits < BT_minbits )
716                 bits = BT_minbits;
717
718 #ifdef unix
719         bt = calloc (1, sizeof(BtDb));
720
721         bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
722
723         if( bt->idx == -1 )
724                 return free(bt), NULL;
725         
726         cacheblk = 4096;        // minimum mmap segment size for unix
727
728 #else
729         bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb));
730         attr = FILE_ATTRIBUTE_NORMAL;
731         bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
732
733         if( bt->idx == INVALID_HANDLE_VALUE )
734                 return GlobalFree(bt), NULL;
735
736         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
737         GetSystemInfo(sysinfo);
738         cacheblk = sysinfo->dwAllocationGranularity;
739 #endif
740
741 #ifdef unix
742         memset (lock, 0, sizeof(lock));
743
744         lock->l_type = F_WRLCK;
745         lock->l_len = sizeof(struct BtPage_);
746         lock->l_whence = 0;
747
748         if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
749                 return bt_close (bt), NULL;
750 #else
751         memset (ovl, 0, sizeof(ovl));
752         len = sizeof(struct BtPage_);
753
754         //      use large offsets to
755         //      simulate advisory locking
756
757         ovl->OffsetHigh |= 0x80000000;
758
759         if( mode == BtLockDelete || mode == BtLockWrite || mode == BtLockParent )
760                 flags |= LOCKFILE_EXCLUSIVE_LOCK;
761
762         if( LockFileEx (bt->idx, flags, 0, len, 0L, ovl) )
763                 return bt_close (bt), NULL;
764 #endif 
765 #ifdef unix
766         latchmgr = malloc (BT_maxpage);
767         *amt = 0;
768
769         // read minimum page size to get root info
770
771         if( size = lseek (bt->idx, 0L, 2) ) {
772                 if( pread(bt->idx, latchmgr, BT_minpage, 0) == BT_minpage )
773                         bits = latchmgr->alloc->bits;
774                 else
775                         return free(bt), free(latchmgr), NULL;
776         } else if( mode == BT_ro )
777                 return free(latchmgr), bt_close (bt), NULL;
778 #else
779         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
780         size = GetFileSize(bt->idx, amt);
781
782         if( size || *amt ) {
783                 if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
784                         return bt_close (bt), NULL;
785                 bits = latchmgr->alloc->bits;
786         } else if( mode == BT_ro )
787                 return bt_close (bt), NULL;
788 #endif
789
790         bt->page_size = 1 << bits;
791         bt->page_bits = bits;
792
793         bt->mode = mode;
794
795         if( cacheblk < bt->page_size )
796                 cacheblk = bt->page_size;
797
798         //  mask for partial memmaps
799
800         bt->hashmask = (cacheblk >> bits) - 1;
801
802         //      see if requested size of pages per memmap is greater
803
804         if( (1 << segsize) > bt->hashmask )
805                 bt->hashmask = (1 << segsize) - 1;
806
807         bt->seg_bits = 0;
808
809         while( (1 << bt->seg_bits) <= bt->hashmask )
810                 bt->seg_bits++;
811
812         bt->hashsize = hashsize;
813
814         if( bt->nodemax = nodemax++ ) {
815 #ifdef unix
816           bt->nodes = calloc (nodemax, sizeof(BtHash));
817           bt->cache = calloc (hashsize, sizeof(ushort));
818 #else
819           bt->nodes = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, nodemax * sizeof(BtHash));
820           bt->cache = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
821 #endif
822           bt->mapped_io = 1;
823         }
824
825         if( size || *amt ) {
826                 goto btlatch;
827         }
828
829         // initialize an empty b-tree with latch page, root page, page of leaves
830         // and page(s) of latches
831
832         memset (latchmgr, 0, 1 << bits);
833
834         nlatchpage = BT_latchtable;
835         if( nlatchpage > nodemax )
836                 nlatchpage = nodemax;
837         nlatchpage *= sizeof(BtLatchSet);
838         nlatchpage += bt->page_size - 1;
839         nlatchpage /= bt->page_size;
840
841         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
842         latchmgr->alloc->bits = bt->page_bits;
843
844         latchmgr->nlatchpage = nlatchpage;
845         latchmgr->latchtotal = nlatchpage * bt->page_size / sizeof(BtLatchSet);
846
847         //  initialize latch manager
848
849         latchhash = (bt->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
850
851         //      size of hash table = total number of latchsets
852
853         if( latchhash > latchmgr->latchtotal )
854                 latchhash = latchmgr->latchtotal;
855
856         latchmgr->latchhash = latchhash;
857
858 #ifdef unix
859         if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size )
860                 return bt_close (bt), NULL;
861 #else
862         if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
863                 return bt_close (bt), NULL;
864
865         if( *amt < bt->page_size )
866                 return bt_close (bt), NULL;
867 #endif
868
869         memset (latchmgr, 0, 1 << bits);
870         latchmgr->alloc->bits = bt->page_bits;
871
872         for( lvl=MIN_lvl; lvl--; ) {
873                 slotptr(latchmgr->alloc, 1)->off = bt->page_size - 3;
874                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
875                 key = keyptr(latchmgr->alloc, 1);
876                 key->len = 2;           // create stopper key
877                 key->key[0] = 0xff;
878                 key->key[1] = 0xff;
879                 latchmgr->alloc->min = bt->page_size - 3;
880                 latchmgr->alloc->lvl = lvl;
881                 latchmgr->alloc->cnt = 1;
882                 latchmgr->alloc->act = 1;
883 #ifdef unix
884                 if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size )
885                         return bt_close (bt), NULL;
886 #else
887                 if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
888                         return bt_close (bt), NULL;
889
890                 if( *amt < bt->page_size )
891                         return bt_close (bt), NULL;
892 #endif
893         }
894
895         // clear out latch manager locks
896         //      and rest of pages to round out segment
897
898         memset(latchmgr, 0, bt->page_size);
899         last = MIN_lvl + 1;
900
901         while( last <= ((MIN_lvl + 1 + nlatchpage) | bt->hashmask) ) {
902 #ifdef unix
903                 pwrite(bt->idx, latchmgr, bt->page_size, last << bt->page_bits);
904 #else
905                 SetFilePointer (bt->idx, last << bt->page_bits, NULL, FILE_BEGIN);
906                 if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
907                         return bt_close (bt), NULL;
908                 if( *amt < bt->page_size )
909                         return bt_close (bt), NULL;
910 #endif
911                 last++;
912         }
913
914 btlatch:
915 #ifdef unix
916         lock->l_type = F_UNLCK;
917         if( fcntl (bt->idx, F_SETLK, lock) < 0 )
918                 return bt_close (bt), NULL;
919 #else
920         if( !UnlockFileEx (bt->idx, 0, sizeof(struct BtPage_), 0, ovl) )
921                 return bt_close (bt), NULL;
922 #endif
923 #ifdef unix
924         flag = PROT_READ | PROT_WRITE;
925         bt->latchmgr = mmap (0, bt->page_size, flag, MAP_SHARED, bt->idx, ALLOC_page * bt->page_size);
926         if( bt->latchmgr == MAP_FAILED )
927                 return bt_close (bt), NULL;
928         bt->latchsets = (BtLatchSet *)mmap (0, bt->latchmgr->nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size);
929         if( bt->latchsets == MAP_FAILED )
930                 return bt_close (bt), NULL;
931 #else
932         flag = PAGE_READWRITE;
933         bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, (BT_latchtable / (bt->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * bt->page_size, NULL);
934         if( !bt->halloc )
935                 return bt_close (bt), NULL;
936
937         flag = FILE_MAP_WRITE;
938         bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, (BT_latchtable / (bt->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * bt->page_size);
939         if( !bt->latchmgr )
940                 return GetLastError(), bt_close (bt), NULL;
941
942         bt->latchsets = (void *)((char *)bt->latchmgr + LATCH_page * bt->page_size);
943 #endif
944
945 #ifdef unix
946         free (latchmgr);
947 #else
948         VirtualFree (latchmgr, 0, MEM_RELEASE);
949 #endif
950
951 #ifdef unix
952         bt->mem = malloc (6 * bt->page_size);
953 #else
954         bt->mem = VirtualAlloc(NULL, 6 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
955 #endif
956         bt->frame = (BtPage)bt->mem;
957         bt->cursor = (BtPage)(bt->mem + bt->page_size);
958         bt->page = (BtPage)(bt->mem + 2 * bt->page_size);
959         bt->alloc = (BtPage)(bt->mem + 3 * bt->page_size);
960         bt->temp = (BtPage)(bt->mem + 4 * bt->page_size);
961         bt->zero = (BtPage)(bt->mem + 5 * bt->page_size);
962
963         memset (bt->zero, 0, bt->page_size);
964         return bt;
965 }
966
967 // place write, read, or parent lock on requested page_no.
968
969 void bt_lockpage(BtLock mode, BtLatchSet *latch)
970 {
971         switch( mode ) {
972         case BtLockRead:
973                 bt_spinreadlock (latch->readwr);
974                 break;
975         case BtLockWrite:
976                 bt_spinwritelock (latch->readwr);
977                 break;
978         case BtLockAccess:
979                 bt_spinreadlock (latch->access);
980                 break;
981         case BtLockDelete:
982                 bt_spinwritelock (latch->access);
983                 break;
984         case BtLockParent:
985                 bt_spinwritelock (latch->parent);
986                 break;
987         }
988 }
989
990 // remove write, read, or parent lock on requested page
991
992 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
993 {
994         switch( mode ) {
995         case BtLockRead:
996                 bt_spinreleaseread (latch->readwr);
997                 break;
998         case BtLockWrite:
999                 bt_spinreleasewrite (latch->readwr);
1000                 break;
1001         case BtLockAccess:
1002                 bt_spinreleaseread (latch->access);
1003                 break;
1004         case BtLockDelete:
1005                 bt_spinreleasewrite (latch->access);
1006                 break;
1007         case BtLockParent:
1008                 bt_spinreleasewrite (latch->parent);
1009                 break;
1010         }
1011 }
1012
1013 //      allocate a new page and write page into it
1014
1015 uid bt_newpage(BtDb *bt, BtPage page)
1016 {
1017 uid new_page;
1018 int reuse;
1019
1020         //      lock allocation page
1021
1022         bt_spinwritelock(bt->latchmgr->lock);
1023
1024         // use empty chain first
1025         // else allocate empty page
1026
1027         if( new_page = bt_getid(bt->latchmgr->alloc[1].right) ) {
1028                 if( bt_mappage (bt, &bt->temp, new_page) )
1029                         return 0;
1030                 bt_putid(bt->latchmgr->alloc[1].right, bt_getid(bt->temp->right));
1031                 reuse = 1;
1032         } else {
1033                 new_page = bt_getid(bt->latchmgr->alloc->right);
1034                 bt_putid(bt->latchmgr->alloc->right, new_page+1);
1035                 reuse = 0;
1036         }
1037
1038         bt_spinreleasewrite(bt->latchmgr->lock);
1039
1040         if( !bt->mapped_io )
1041           if( bt_update(bt, page, new_page) )
1042                 return 0;       //don't unlock on error
1043           else
1044                 return new_page;
1045
1046 #ifdef unix
1047         if( pwrite(bt->idx, page, bt->page_size, new_page << bt->page_bits) < bt->page_size )
1048                 return bt->err = BTERR_wrt, 0;
1049
1050         // if writing first page of pool block, zero last page in the block
1051
1052         if( !reuse && bt->hashmask > 0 && (new_page & bt->hashmask) == 0 )
1053         {
1054                 // use zero buffer to write zeros
1055                 if( pwrite(bt->idx,bt->zero, bt->page_size, (new_page | bt->hashmask) << bt->page_bits) < bt->page_size )
1056                         return bt->err = BTERR_wrt, 0;
1057         }
1058 #else
1059         //      bring new page into pool and copy page.
1060         //      this will extend the file into the new pages.
1061
1062         if( bt_mappage (bt, &bt->temp, new_page) )
1063                 return 0;
1064
1065         memcpy(bt->temp, page, bt->page_size);
1066 #endif
1067         return new_page;
1068 }
1069
1070 //  compare two keys, returning > 0, = 0, or < 0
1071 //  as the comparison value
1072
1073 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1074 {
1075 uint len1 = key1->len;
1076 int ans;
1077
1078         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1079                 return ans;
1080
1081         if( len1 > len2 )
1082                 return 1;
1083         if( len1 < len2 )
1084                 return -1;
1085
1086         return 0;
1087 }
1088
1089 //  Update current page of btree by writing file contents
1090 //      or flushing mapped area to disk.
1091
1092 BTERR bt_update (BtDb *bt, BtPage page, uid page_no)
1093 {
1094 off64_t off = page_no << bt->page_bits;
1095
1096 #ifdef unix
1097     if( !bt->mapped_io )
1098          if( pwrite(bt->idx, page, bt->page_size, off) != bt->page_size )
1099                  return bt->err = BTERR_wrt;
1100 #else
1101 uint amt[1];
1102         if( !bt->mapped_io )
1103         {
1104                 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
1105                 if( !WriteFile (bt->idx, (char *)page, bt->page_size, amt, NULL) )
1106                         return GetLastError(), bt->err = BTERR_wrt;
1107
1108                 if( *amt < bt->page_size )
1109                         return GetLastError(), bt->err = BTERR_wrt;
1110         } 
1111         else if( bt->mode == BT_fl ) {
1112                         FlushViewOfFile(page, bt->page_size);
1113                         FlushFileBuffers(bt->idx);
1114         }
1115 #endif
1116         return 0;
1117 }
1118
1119 // find page in cache 
1120
1121 BtHash *bt_findhash(BtDb *bt, uid page_no)
1122 {
1123 BtHash *hash;
1124 uint idx;
1125
1126         // compute cache block first page and hash idx 
1127
1128         page_no &= ~bt->hashmask;
1129         idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
1130
1131         if( bt->cache[idx] ) 
1132                 hash = bt->nodes + bt->cache[idx];
1133         else
1134                 return NULL;
1135
1136         do if( hash->page_no == page_no )
1137                  break;
1138         while(hash = hash->hashnext );
1139
1140         return hash;
1141 }
1142
1143 // add page cache entry to hash index
1144
1145 void bt_linkhash(BtDb *bt, BtHash *node, uid page_no)
1146 {
1147 uint idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
1148 BtHash *hash;
1149
1150         if( bt->cache[idx] ) {
1151                 node->hashnext = hash = bt->nodes + bt->cache[idx];
1152                 hash->hashprev = node;
1153         }
1154
1155         node->hashprev = NULL;
1156         bt->cache[idx] = (ushort)(node - bt->nodes);
1157 }
1158
1159 // remove cache entry from hash table
1160
1161 void bt_unlinkhash(BtDb *bt, BtHash *node)
1162 {
1163 uint idx = (uint)(node->page_no >> bt->seg_bits) % bt->hashsize;
1164 BtHash *hash;
1165
1166         // unlink node
1167         if( hash = node->hashprev )
1168                 hash->hashnext = node->hashnext;
1169         else if( hash = node->hashnext )
1170                 bt->cache[idx] = (ushort)(hash - bt->nodes);
1171         else
1172                 bt->cache[idx] = 0;
1173
1174         if( hash = node->hashnext )
1175                 hash->hashprev = node->hashprev;
1176 }
1177
1178 // add cache page to lru chain and map pages
1179
1180 BtPage bt_linklru(BtDb *bt, BtHash *hash, uid page_no)
1181 {
1182 int flag;
1183 off64_t off = (page_no & ~(uid)bt->hashmask) << bt->page_bits;
1184 off64_t limit = off + ((bt->hashmask+1) << bt->page_bits);
1185 BtHash *node;
1186
1187         memset(hash, 0, sizeof(BtHash));
1188         hash->page_no = (page_no & ~(uid)bt->hashmask);
1189         bt_linkhash(bt, hash, page_no);
1190
1191         if( node = hash->lrunext = bt->lrufirst )
1192                 node->lruprev = hash;
1193         else
1194                 bt->lrulast = hash;
1195
1196         bt->lrufirst = hash;
1197
1198 #ifdef unix
1199         flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
1200         hash->page = (BtPage)mmap (0, (bt->hashmask+1) << bt->page_bits, flag, MAP_SHARED, bt->idx, off);
1201         if( hash->page == MAP_FAILED )
1202                 return bt->err = BTERR_map, (BtPage)NULL;
1203
1204 #else
1205         flag = ( bt->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1206         hash->hmap = CreateFileMapping(bt->idx, NULL, flag,     (DWORD)(limit >> 32), (DWORD)limit, NULL);
1207         if( !hash->hmap )
1208                 return bt->err = BTERR_map, NULL;
1209
1210         flag = ( bt->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1211         hash->page = MapViewOfFile(hash->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->hashmask+1) << bt->page_bits);
1212         if( !hash->page )
1213                 return bt->err = BTERR_map, NULL;
1214 #endif
1215
1216         return (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
1217 }
1218
1219 //      find or place requested page in page-cache
1220 //      return memory address where page is located.
1221
1222 BtPage bt_hashpage(BtDb *bt, uid page_no)
1223 {
1224 BtHash *hash, *node, *next;
1225 BtPage page;
1226
1227         // find page in cache and move to top of lru list  
1228
1229         if( hash = bt_findhash(bt, page_no) ) {
1230                 page = (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
1231                 // swap node in lru list
1232                 if( node = hash->lruprev ) {
1233                         if( next = node->lrunext = hash->lrunext )
1234                                 next->lruprev = node;
1235                         else
1236                                 bt->lrulast = node;
1237
1238                         if( next = hash->lrunext = bt->lrufirst )
1239                                 next->lruprev = hash;
1240                         else
1241                                 return bt->err = BTERR_hash, (BtPage)NULL;
1242
1243                         hash->lruprev = NULL;
1244                         bt->lrufirst = hash;
1245                 }
1246                 return page;
1247         }
1248
1249         // map pages and add to cache entry
1250
1251         if( bt->nodecnt < bt->nodemax ) {
1252                 hash = bt->nodes + ++bt->nodecnt;
1253                 return bt_linklru(bt, hash, page_no);
1254         }
1255
1256         // hash table is already full, replace last lru entry from the cache
1257
1258         if( hash = bt->lrulast ) {
1259                 // unlink from lru list
1260                 if( node = bt->lrulast = hash->lruprev )
1261                         node->lrunext = NULL;
1262                 else
1263                         return bt->err = BTERR_hash, (BtPage)NULL;
1264
1265 #ifdef unix
1266                 munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
1267 #else
1268 //              FlushViewOfFile(hash->page, 0);
1269                 UnmapViewOfFile(hash->page);
1270                 CloseHandle(hash->hmap);
1271 #endif
1272                 // unlink from hash table
1273
1274                 bt_unlinkhash(bt, hash);
1275
1276                 // map and add to cache
1277
1278                 return bt_linklru(bt, hash, page_no);
1279         }
1280
1281         return bt->err = BTERR_hash, (BtPage)NULL;
1282 }
1283
1284 //  map a btree page onto current page
1285
1286 BTERR bt_mappage (BtDb *bt, BtPage *page, uid page_no)
1287 {
1288 off64_t off = page_no << bt->page_bits;
1289 #ifndef unix
1290 int amt[1];
1291 #endif
1292         
1293         if( bt->mapped_io ) {
1294                 bt->err = 0;
1295                 *page = bt_hashpage(bt, page_no);
1296                 return bt->err;
1297         }
1298 #ifdef unix
1299         if( pread(bt->idx, *page, bt->page_size, off) < bt->page_size )
1300                 return bt->err = BTERR_map;
1301 #else
1302         SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
1303
1304         if( !ReadFile(bt->idx, *page, bt->page_size, amt, NULL) )
1305                 return bt->err = BTERR_map;
1306
1307         if( *amt <  bt->page_size )
1308                 return bt->err = BTERR_map;
1309 #endif
1310         return 0;
1311 }
1312
1313 //      deallocate a deleted page 
1314 //      place on free chain out of allocator page
1315 //      call with page latched for Writing and Deleting
1316
1317 BTERR bt_freepage(BtDb *bt, uid page_no, BtPage page, BtLatchSet *latch)
1318 {
1319         if( bt_mappage (bt, &page, page_no) )
1320                 return bt->err;
1321
1322         //      lock allocation page
1323
1324         bt_spinwritelock (bt->latchmgr->lock);
1325
1326         //      store chain in second right
1327         bt_putid(page->right, bt_getid(bt->latchmgr->alloc[1].right));
1328         bt_putid(bt->latchmgr->alloc[1].right, page_no);
1329         page->free = 1;
1330
1331         if( bt_update(bt, page, page_no) )
1332                 return bt->err;
1333
1334         // unlock released page
1335
1336         bt_unlockpage (BtLockDelete, latch);
1337         bt_unlockpage (BtLockWrite, latch);
1338         bt_unpinlatch (latch);
1339
1340         // unlock allocation page
1341
1342         bt_spinreleasewrite (bt->latchmgr->lock);
1343         return 0;
1344 }
1345
1346 //  find slot in page for given key at a given level
1347
1348 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1349 {
1350 uint diff, higher = bt->page->cnt, low = 1, slot;
1351 uint good = 0;
1352
1353         //      make stopper key an infinite fence value
1354
1355         if( bt_getid (bt->page->right) )
1356                 higher++;
1357         else
1358                 good++;
1359
1360         //      low is the lowest candidate, higher is already
1361         //      tested as .ge. the given key, loop ends when they meet
1362
1363         while( diff = higher - low ) {
1364                 slot = low + ( diff >> 1 );
1365                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1366                         low = slot + 1;
1367                 else
1368                         higher = slot, good++;
1369         }
1370
1371         //      return zero if key is on right link page
1372
1373         return good ? higher : 0;
1374 }
1375
1376 //  find and load page at given level for given key
1377 //      leave page rd or wr locked as requested
1378
1379 int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
1380 {
1381 uid page_no = ROOT_page, prevpage = 0;
1382 uint drill = 0xff, slot;
1383 BtLatchSet *prevlatch;
1384 uint mode, prevmode;
1385
1386   //  start at root of btree and drill down
1387
1388   do {
1389         // determine lock mode of drill level
1390         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1391
1392         bt->latch = bt_pinlatch(bt, page_no);
1393         bt->page_no = page_no;
1394
1395         // obtain access lock using lock chaining
1396
1397         if( page_no > ROOT_page )
1398                 bt_lockpage(BtLockAccess, bt->latch);
1399
1400         if( prevpage ) {
1401                 bt_unlockpage(prevmode, prevlatch);
1402                 bt_unpinlatch(prevlatch);
1403                 prevpage = 0;
1404         }
1405
1406         // obtain read lock using lock chaining
1407
1408         bt_lockpage(mode, bt->latch);
1409
1410         if( page_no > ROOT_page )
1411                 bt_unlockpage(BtLockAccess, bt->latch);
1412
1413         //      map/obtain page contents
1414
1415         if( bt_mappage (bt, &bt->page, page_no) )
1416                 return 0;
1417
1418         // re-read and re-lock root after determining actual level of root
1419
1420         if( bt->page->lvl != drill) {
1421                 if( bt->page_no != ROOT_page )
1422                         return bt->err = BTERR_struct, 0;
1423                         
1424                 drill = bt->page->lvl;
1425
1426                 if( lock != BtLockRead && drill == lvl ) {
1427                         bt_unlockpage(mode, bt->latch);
1428                         bt_unpinlatch(bt->latch);
1429                         continue;
1430                 }
1431         }
1432
1433         prevpage = bt->page_no;
1434         prevlatch = bt->latch;
1435         prevmode = mode;
1436
1437         //  find key on page at this level
1438         //  and descend to requested level
1439
1440         if( !bt->page->kill )
1441          if( slot = bt_findslot (bt, key, len) ) {
1442           if( drill == lvl )
1443                 return slot;
1444
1445           while( slotptr(bt->page, slot)->dead )
1446                 if( slot++ < bt->page->cnt )
1447                         continue;
1448                 else
1449                         goto slideright;
1450
1451           page_no = bt_getid(slotptr(bt->page, slot)->id);
1452           drill--;
1453           continue;
1454          }
1455
1456         //  or slide right into next page
1457
1458 slideright:
1459         page_no = bt_getid(bt->page->right);
1460
1461   } while( page_no );
1462
1463   // return error on end of right chain
1464
1465   bt->err = BTERR_eof;
1466   return 0;     // return error
1467 }
1468
1469 //      a fence key was deleted from a page
1470 //      push new fence value upwards
1471
1472 BTERR bt_fixfence (BtDb *bt, uid page_no, uint lvl)
1473 {
1474 unsigned char leftkey[256], rightkey[256];
1475 BtLatchSet *latch = bt->latch;
1476 BtKey ptr;
1477
1478         // remove deleted key, the old fence value
1479
1480         ptr = keyptr(bt->page, bt->page->cnt);
1481         memcpy(rightkey, ptr, ptr->len + 1);
1482
1483         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1484         bt->page->dirty = 1;
1485
1486         ptr = keyptr(bt->page, bt->page->cnt);
1487         memcpy(leftkey, ptr, ptr->len + 1);
1488
1489         if( bt_update (bt, bt->page, page_no) )
1490                 return bt->err;
1491
1492         bt_lockpage (BtLockParent, latch);
1493         bt_unlockpage (BtLockWrite, latch);
1494
1495         //  insert new (now smaller) fence key
1496
1497         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl + 1, page_no, time(NULL)) )
1498                 return bt->err;
1499
1500         //  remove old (larger) fence key
1501
1502         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl + 1) )
1503                 return bt->err;
1504
1505         bt_unlockpage (BtLockParent, latch);
1506         bt_unpinlatch (latch);
1507         return 0;
1508 }
1509
1510 //      root has a single child
1511 //      collapse a level from the btree
1512 //      call with root locked in bt->page
1513
1514 BTERR bt_collapseroot (BtDb *bt, BtPage root)
1515 {
1516 BtLatchSet *latch;
1517 uid child;
1518 uint idx;
1519
1520         // find the child entry
1521         //      and promote to new root
1522
1523   do {
1524         for( idx = 0; idx++ < root->cnt; )
1525           if( !slotptr(root, idx)->dead )
1526                 break;
1527
1528         child = bt_getid (slotptr(root, idx)->id);
1529         latch = bt_pinlatch (bt, child);
1530
1531         bt_lockpage (BtLockDelete, latch);
1532         bt_lockpage (BtLockWrite, latch);
1533
1534         if( bt_mappage (bt, &bt->temp, child) )
1535                 return bt->err;
1536
1537         memcpy (root, bt->temp, bt->page_size);
1538
1539         if( bt_update (bt, root, ROOT_page) )
1540                 return bt->err;
1541
1542         if( bt_freepage (bt, child, bt->temp, latch) )
1543                 return bt->err;
1544
1545   } while( root->lvl > 1 && root->act == 1 );
1546
1547   bt_unlockpage (BtLockWrite, bt->latch);
1548   bt_unpinlatch (bt->latch);
1549   return 0;
1550 }
1551
1552 //  find and delete key on page by marking delete flag bit
1553 //  when page becomes empty, delete it
1554
1555 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1556 {
1557 unsigned char lowerkey[256], higherkey[256];
1558 uint slot, dirty = 0, idx, fence, found;
1559 BtLatchSet *latch, *rlatch;
1560 uid page_no, right;
1561 BtKey ptr;
1562
1563         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1564                 ptr = keyptr(bt->page, slot);
1565         else
1566                 return bt->err;
1567
1568         // are we deleting a fence slot?
1569
1570         fence = slot == bt->page->cnt;
1571
1572         // if key is found delete it, otherwise ignore request
1573
1574         if( found = !keycmp (ptr, key, len) )
1575           if( found = slotptr(bt->page, slot)->dead == 0 ) {
1576                 dirty = slotptr(bt->page,slot)->dead = 1;
1577                 bt->page->dirty = 1;
1578                 bt->page->act--;
1579
1580                 // collapse empty slots
1581
1582                 while( idx = bt->page->cnt - 1 )
1583                   if( slotptr(bt->page, idx)->dead ) {
1584                         *slotptr(bt->page, idx) = *slotptr(bt->page, idx + 1);
1585                         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1586                   } else
1587                         break;
1588           }
1589
1590         right = bt_getid(bt->page->right);
1591         page_no = bt->page_no;
1592         latch = bt->latch;
1593
1594         if( !dirty ) {
1595           if( lvl )
1596                 return bt_abort (bt, bt->page, page_no, BTERR_notfound);
1597           bt_unlockpage(BtLockWrite, latch);
1598           bt_unpinlatch (latch);
1599           return bt->found = found, 0;
1600         }
1601
1602         //      did we delete a fence key in an upper level?
1603
1604         if( lvl && bt->page->act && fence )
1605           if( bt_fixfence (bt, page_no, lvl) )
1606                 return bt->err;
1607           else
1608                 return bt->found = found, 0;
1609
1610         //      is this a collapsed root?
1611
1612         if( lvl > 1 && page_no == ROOT_page && bt->page->act == 1 )
1613           if( bt_collapseroot (bt, bt->page) )
1614                 return bt->err;
1615           else
1616                 return bt->found = found, 0;
1617
1618         // return if page is not empty
1619
1620         if( bt->page->act ) {
1621           if( bt_update(bt, bt->page, page_no) )
1622                 return bt->err;
1623           bt_unlockpage(BtLockWrite, latch);
1624           bt_unpinlatch (latch);
1625           return bt->found = found, 0;
1626         }
1627
1628         // cache copy of fence key
1629         //      in order to find parent
1630
1631         ptr = keyptr(bt->page, bt->page->cnt);
1632         memcpy(lowerkey, ptr, ptr->len + 1);
1633
1634         // obtain lock on right page
1635
1636         rlatch = bt_pinlatch (bt, right);
1637         bt_lockpage(BtLockWrite, rlatch);
1638
1639         if( bt_mappage (bt, &bt->temp, right) )
1640                 return bt->err;
1641
1642         if( bt->temp->kill ) {
1643                 bt_abort(bt, bt->temp, right, 0);
1644                 return bt_abort(bt, bt->page, bt->page_no, BTERR_kill);
1645         }
1646
1647         // pull contents of next page into current empty page 
1648
1649         memcpy (bt->page, bt->temp, bt->page_size);
1650
1651         //      cache copy of key to update
1652
1653         ptr = keyptr(bt->temp, bt->temp->cnt);
1654         memcpy(higherkey, ptr, ptr->len + 1);
1655
1656         //  Mark right page as deleted and point it to left page
1657         //      until we can post updates at higher level.
1658
1659         bt_putid(bt->temp->right, page_no);
1660         bt->temp->kill = 1;
1661
1662         if( bt_update(bt, bt->page, page_no) )
1663                 return bt->err;
1664
1665         if( bt_update(bt, bt->temp, right) )
1666                 return bt->err;
1667
1668         bt_lockpage(BtLockParent, latch);
1669         bt_unlockpage(BtLockWrite, latch);
1670
1671         bt_lockpage(BtLockParent, rlatch);
1672         bt_unlockpage(BtLockWrite, rlatch);
1673
1674         //  redirect higher key directly to consolidated node
1675
1676         if( bt_insertkey (bt, higherkey+1, *higherkey, lvl+1, page_no, time(NULL)) )
1677                 return bt->err;
1678
1679         //  delete old lower key to consolidated node
1680
1681         if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) )
1682                 return bt->err;
1683
1684         //  obtain write & delete lock on deleted node
1685         //      add right block to free chain
1686
1687         bt_lockpage(BtLockDelete, rlatch);
1688         bt_lockpage(BtLockWrite, rlatch);
1689         bt_unlockpage(BtLockParent, rlatch);
1690
1691         if( bt_freepage (bt, right, bt->temp, rlatch) )
1692                 return bt->err;
1693
1694         bt_unlockpage(BtLockParent, latch);
1695         bt_unpinlatch(latch);
1696         return 0;
1697 }
1698
1699 //      find key in leaf level and return row-id
1700
1701 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1702 {
1703 uint  slot;
1704 BtKey ptr;
1705 uid id;
1706
1707         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1708                 ptr = keyptr(bt->page, slot);
1709         else
1710                 return 0;
1711
1712         // if key exists, return row-id
1713         //      otherwise return 0
1714
1715         if( ptr->len == len && !memcmp (ptr->key, key, len) )
1716                 id = bt_getid(slotptr(bt->page,slot)->id);
1717         else
1718                 id = 0;
1719
1720         bt_unlockpage (BtLockRead, bt->latch);
1721         bt_unpinlatch (bt->latch);
1722         return id;
1723 }
1724
1725 //      check page for space available,
1726 //      clean if necessary and return
1727 //      0 - page needs splitting
1728 //      >0 - go ahead with new slot
1729  
1730 uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
1731 {
1732 uint nxt = bt->page_size;
1733 BtPage page = bt->page;
1734 uint cnt = 0, idx = 0;
1735 uint max = page->cnt;
1736 uint newslot = slot;
1737 BtKey key;
1738 int ret;
1739
1740         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1741                 return slot;
1742
1743         //      skip cleanup if nothing to reclaim
1744
1745         if( !page->dirty )
1746                 return 0;
1747
1748         memcpy (bt->frame, page, bt->page_size);
1749
1750         // skip page info and set rest of page to zero
1751
1752         memset (page+1, 0, bt->page_size - sizeof(*page));
1753         page->act = 0;
1754
1755         while( cnt++ < max ) {
1756                 if( cnt == slot )
1757                         newslot = idx + 1;
1758                 // always leave fence key in list
1759                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1760                         continue;
1761
1762                 // copy key
1763                 key = keyptr(bt->frame, cnt);
1764                 nxt -= key->len + 1;
1765                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1766
1767                 // copy slot
1768                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1769                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1770                         page->act++;
1771                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1772                 slotptr(page, idx)->off = nxt;
1773         }
1774
1775         page->min = nxt;
1776         page->cnt = idx;
1777
1778         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1779                 return newslot;
1780
1781         return 0;
1782 }
1783
1784 // split the root and raise the height of the btree
1785
1786 BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
1787 {
1788 uint nxt = bt->page_size;
1789 BtPage root = bt->page;
1790 uid right;
1791
1792         //  Obtain an empty page to use, and copy the current
1793         //  root contents into it
1794
1795         if( !(right = bt_newpage(bt, root)) )
1796                 return bt->err;
1797
1798         // preserve the page info at the bottom
1799         // and set rest to zero
1800
1801         memset(root+1, 0, bt->page_size - sizeof(*root));
1802
1803         // insert first key on newroot page
1804
1805         nxt -= *leftkey + 1;
1806         memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
1807         bt_putid(slotptr(root, 1)->id, right);
1808         slotptr(root, 1)->off = nxt;
1809         
1810         // insert second key on newroot page
1811         // and increase the root height
1812
1813         nxt -= 3;
1814         ((unsigned char *)root)[nxt] = 2;
1815         ((unsigned char *)root)[nxt+1] = 0xff;
1816         ((unsigned char *)root)[nxt+2] = 0xff;
1817         bt_putid(slotptr(root, 2)->id, page_no2);
1818         slotptr(root, 2)->off = nxt;
1819
1820         bt_putid(root->right, 0);
1821         root->min = nxt;                // reset lowest used offset and key count
1822         root->cnt = 2;
1823         root->act = 2;
1824         root->lvl++;
1825
1826         // update and release root (bt->page)
1827
1828         if( bt_update(bt, root, bt->page_no) )
1829                 return bt->err;
1830
1831         bt_unlockpage(BtLockWrite, bt->latch);
1832         bt_unpinlatch(bt->latch);
1833         return 0;
1834 }
1835
1836 //  split already locked full node
1837 //      return unlocked.
1838
1839 BTERR bt_splitpage (BtDb *bt)
1840 {
1841 uint cnt = 0, idx = 0, max, nxt = bt->page_size;
1842 unsigned char fencekey[256], rightkey[256];
1843 uid page_no = bt->page_no, right;
1844 BtLatchSet *latch, *rlatch;
1845 BtPage page = bt->page;
1846 uint lvl = page->lvl;
1847 BtKey key;
1848
1849         latch = bt->latch;
1850
1851         //  split higher half of keys to bt->frame
1852         //      the last key (fence key) might be dead
1853
1854         memset (bt->frame, 0, bt->page_size);
1855         max = page->cnt;
1856         cnt = max / 2;
1857         idx = 0;
1858
1859         while( cnt++ < max ) {
1860                 key = keyptr(page, cnt);
1861                 nxt -= key->len + 1;
1862                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1863                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1864                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
1865                         bt->frame->act++;
1866                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1867                 slotptr(bt->frame, idx)->off = nxt;
1868         }
1869
1870         //      remember fence key for new right page
1871
1872         memcpy (rightkey, key, key->len + 1);
1873
1874         bt->frame->bits = bt->page_bits;
1875         bt->frame->min = nxt;
1876         bt->frame->cnt = idx;
1877         bt->frame->lvl = lvl;
1878
1879         // link right node
1880
1881         if( page_no > ROOT_page )
1882                 memcpy (bt->frame->right, page->right, BtId);
1883
1884         //      get new free page and write frame to it.
1885
1886         if( !(right = bt_newpage(bt, bt->frame)) )
1887                 return bt->err;
1888
1889         //      update lower keys to continue in old page
1890
1891         memcpy (bt->frame, page, bt->page_size);
1892         memset (page+1, 0, bt->page_size - sizeof(*page));
1893         nxt = bt->page_size;
1894         page->dirty = 0;
1895         page->act = 0;
1896         cnt = 0;
1897         idx = 0;
1898
1899         //  assemble page of smaller keys
1900         //      (they're all active keys)
1901
1902         while( cnt++ < max / 2 ) {
1903                 key = keyptr(bt->frame, cnt);
1904                 nxt -= key->len + 1;
1905                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1906                 memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1907                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1908                 slotptr(page, idx)->off = nxt;
1909                 page->act++;
1910         }
1911
1912         // remember fence key for smaller page
1913
1914         memcpy (fencekey, key, key->len + 1);
1915
1916         bt_putid(page->right, right);
1917         page->min = nxt;
1918         page->cnt = idx;
1919
1920         // if current page is the root page, split it
1921
1922         if( page_no == ROOT_page )
1923                 return bt_splitroot (bt, fencekey, right);
1924
1925         //      lock right page
1926
1927         rlatch = bt_pinlatch (bt, right);
1928         bt_lockpage (BtLockParent, rlatch);
1929
1930         // update left (containing) node
1931
1932         if( bt_update(bt, page, page_no) )
1933                 return bt->err;
1934
1935         bt_lockpage (BtLockParent, latch);
1936         bt_unlockpage (BtLockWrite, latch);
1937
1938         // insert new fence for reformulated left block
1939
1940         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, page_no, time(NULL)) )
1941                 return bt->err;
1942
1943         //      switch fence for right block of larger keys to new right page
1944
1945         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, right, time(NULL)) )
1946                 return bt->err;
1947
1948         bt_unlockpage (BtLockParent, latch);
1949         bt_unlockpage (BtLockParent, rlatch);
1950
1951         bt_unpinlatch (rlatch);
1952         bt_unpinlatch (latch);
1953         return 0;
1954 }
1955
1956 //  Insert new key into the btree at requested level.
1957 //  Pages are unlocked at exit.
1958
1959 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
1960 {
1961 uint slot, idx;
1962 BtPage page;
1963 BtKey ptr;
1964
1965   while( 1 ) {
1966         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1967                 ptr = keyptr(bt->page, slot);
1968         else
1969         {
1970                 if( !bt->err )
1971                         bt->err = BTERR_ovflw;
1972                 return bt->err;
1973         }
1974
1975         // if key already exists, update id and return
1976
1977         page = bt->page;
1978
1979         if( !keycmp (ptr, key, len) ) {
1980           if( slotptr(page, slot)->dead )
1981                 page->act++;
1982           slotptr(page, slot)->dead = 0;
1983           slotptr(page, slot)->tod = tod;
1984           bt_putid(slotptr(page,slot)->id, id);
1985           if( bt_update(bt, bt->page, bt->page_no) )
1986                 return bt->err;
1987           bt_unlockpage(BtLockWrite, bt->latch);
1988           bt_unpinlatch (bt->latch);
1989           return 0;
1990         }
1991
1992         // check if page has enough space
1993
1994         if( slot = bt_cleanpage (bt, len, slot) )
1995                 break;
1996
1997         if( bt_splitpage (bt) )
1998                 return bt->err;
1999   }
2000
2001   // calculate next available slot and copy key into page
2002
2003   page->min -= len + 1; // reset lowest used offset
2004   ((unsigned char *)page)[page->min] = len;
2005   memcpy ((unsigned char *)page + page->min +1, key, len );
2006
2007   for( idx = slot; idx < page->cnt; idx++ )
2008         if( slotptr(page, idx)->dead )
2009                 break;
2010
2011   // now insert key into array before slot
2012   // preserving the fence slot
2013
2014   if( idx == page->cnt )
2015         idx++, page->cnt++;
2016
2017   page->act++;
2018
2019   while( idx > slot )
2020         *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
2021
2022   bt_putid(slotptr(page,slot)->id, id);
2023   slotptr(page, slot)->off = page->min;
2024   slotptr(page, slot)->tod = tod;
2025   slotptr(page, slot)->dead = 0;
2026
2027   if( bt_update(bt, bt->page, bt->page_no) )
2028           return bt->err;
2029
2030   bt_unlockpage(BtLockWrite, bt->latch);
2031   bt_unpinlatch(bt->latch);
2032   return 0;
2033 }
2034
2035 //  cache page of keys into cursor and return starting slot for given key
2036
2037 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2038 {
2039 uint slot;
2040
2041         // cache page for retrieval
2042
2043         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
2044           memcpy (bt->cursor, bt->page, bt->page_size);
2045         else
2046           return 0;
2047
2048         bt_unlockpage(BtLockRead, bt->latch);
2049         bt->cursor_page = bt->page_no;
2050         bt_unpinlatch (bt->latch);
2051         return slot;
2052 }
2053
2054 //  return next slot for cursor page
2055 //  or slide cursor right into next page
2056
2057 uint bt_nextkey (BtDb *bt, uint slot)
2058 {
2059 BtLatchSet *latch;
2060 off64_t right;
2061
2062   do {
2063         right = bt_getid(bt->cursor->right);
2064
2065         while( slot++ < bt->cursor->cnt )
2066           if( slotptr(bt->cursor,slot)->dead )
2067                 continue;
2068           else if( right || (slot < bt->cursor->cnt))
2069                 return slot;
2070           else
2071                 break;
2072
2073         if( !right )
2074                 break;
2075
2076         bt->cursor_page = right;
2077         latch = bt_pinlatch (bt, right);
2078     bt_lockpage(BtLockRead, latch);
2079
2080         if( bt_mappage (bt, &bt->page, right) )
2081                 return 0;
2082
2083         memcpy (bt->cursor, bt->page, bt->page_size);
2084         bt_unlockpage(BtLockRead, latch);
2085         bt_unpinlatch (latch);
2086         slot = 0;
2087   } while( 1 );
2088
2089   return bt->err = 0;
2090 }
2091
2092 BtKey bt_key(BtDb *bt, uint slot)
2093 {
2094         return keyptr(bt->cursor, slot);
2095 }
2096
2097 uid bt_uid(BtDb *bt, uint slot)
2098 {
2099         return bt_getid(slotptr(bt->cursor,slot)->id);
2100 }
2101
2102 uint bt_tod(BtDb *bt, uint slot)
2103 {
2104         return slotptr(bt->cursor,slot)->tod;
2105 }
2106
2107
2108 #ifdef STANDALONE
2109
2110 uint bt_audit (BtDb *bt)
2111 {
2112 ushort idx, hashidx;
2113 uid next, page_no;
2114 BtLatchSet *latch;
2115 uint cnt = 0;
2116 BtKey ptr;
2117
2118 #ifdef unix
2119         posix_fadvise( bt->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2120 #endif
2121         if( *(ushort *)(bt->latchmgr->lock) )
2122                 fprintf(stderr, "Alloc page locked\n");
2123         *(ushort *)(bt->latchmgr->lock) = 0;
2124
2125         for( idx = 1; idx <= bt->latchmgr->latchdeployed; idx++ ) {
2126                 latch = bt->latchsets + idx;
2127                 if( *(ushort *)latch->readwr )
2128                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2129                 *(ushort *)latch->readwr = 0;
2130
2131                 if( *(ushort *)latch->access )
2132                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2133                 *(ushort *)latch->access = 0;
2134
2135                 if( *(ushort *)latch->parent )
2136                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2137                 *(ushort *)latch->parent = 0;
2138
2139                 if( latch->pin ) {
2140                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2141                         latch->pin = 0;
2142                 }
2143         }
2144
2145         for( hashidx = 0; hashidx < bt->latchmgr->latchhash; hashidx++ ) {
2146           if( *(ushort *)(bt->latchmgr->table[hashidx].latch) )
2147                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2148
2149           *(ushort *)(bt->latchmgr->table[hashidx].latch) = 0;
2150
2151           if( idx = bt->latchmgr->table[hashidx].slot ) do {
2152                 latch = bt->latchsets + idx;
2153                 if( *(ushort *)latch->busy )
2154                         fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2155                 *(ushort *)latch->busy = 0;
2156                 if( latch->hash != hashidx )
2157                         fprintf(stderr, "latchset %d wrong hashidx\n", idx);
2158                 if( latch->pin )
2159                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2160           } while( idx = latch->next );
2161         }
2162
2163         next = bt->latchmgr->nlatchpage + LATCH_page;
2164         page_no = LEAF_page;
2165
2166         while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
2167         off64_t off = page_no << bt->page_bits;
2168 #ifdef unix
2169                 pread (bt->idx, bt->frame, bt->page_size, off);
2170 #else
2171                 DWORD amt[1];
2172
2173                   SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2174
2175                   if( !ReadFile(bt->idx, bt->frame, bt->page_size, amt, NULL))
2176                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2177
2178                   if( *amt <  bt->page_size )
2179                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2180 #endif
2181                 if( !bt->frame->free ) {
2182                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2183                   ptr = keyptr(bt->frame, idx+1);
2184                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2185                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2186                  }
2187                  if( !bt->frame->lvl )
2188                         cnt += bt->frame->act;
2189                 }
2190
2191                 if( page_no > LEAF_page )
2192                         next = page_no + 1;
2193                 page_no = next;
2194         }
2195         return cnt - 1;
2196 }
2197
2198 #ifndef unix
2199 double getCpuTime(int type)
2200 {
2201 FILETIME crtime[1];
2202 FILETIME xittime[1];
2203 FILETIME systime[1];
2204 FILETIME usrtime[1];
2205 SYSTEMTIME timeconv[1];
2206 double ans = 0;
2207
2208         memset (timeconv, 0, sizeof(SYSTEMTIME));
2209
2210         switch( type ) {
2211         case 0:
2212                 GetSystemTimeAsFileTime (xittime);
2213                 FileTimeToSystemTime (xittime, timeconv);
2214                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2215                 break;
2216         case 1:
2217                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2218                 FileTimeToSystemTime (usrtime, timeconv);
2219                 break;
2220         case 2:
2221                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2222                 FileTimeToSystemTime (systime, timeconv);
2223                 break;
2224         }
2225
2226         ans += (double)timeconv->wHour * 3600;
2227         ans += (double)timeconv->wMinute * 60;
2228         ans += (double)timeconv->wSecond;
2229         ans += (double)timeconv->wMilliseconds / 1000;
2230         return ans;
2231 }
2232 #else
2233 #include <time.h>
2234 #include <sys/resource.h>
2235
2236 double getCpuTime(int type)
2237 {
2238 struct rusage used[1];
2239 struct timeval tv[1];
2240
2241         switch( type ) {
2242         case 0:
2243                 gettimeofday(tv, NULL);
2244                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2245
2246         case 1:
2247                 getrusage(RUSAGE_SELF, used);
2248                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2249
2250         case 2:
2251                 getrusage(RUSAGE_SELF, used);
2252                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2253         }
2254
2255         return 0;
2256 }
2257 #endif
2258
2259 //  standalone program to index file of keys
2260 //  then list them onto std-out
2261
2262 int main (int argc, char **argv)
2263 {
2264 uint slot, line = 0, off = 0, found = 0;
2265 int ch, cnt = 0, bits = 12;
2266 unsigned char key[256];
2267 double done, start;
2268 uid next, page_no;
2269 uint pgblk = 0;
2270 float elapsed;
2271 time_t tod[1];
2272 uint scan = 0;
2273 uint len = 0;
2274 uint map = 0;
2275 BtKey ptr;
2276 BtDb *bt;
2277 FILE *in;
2278
2279         if( argc < 4 ) {
2280                 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find [page_bits mapped_pool_segments pages_per_segment start_line_number]\n", argv[0]);
2281                 fprintf (stderr, "  page_bits: size of btree page in bits\n");
2282                 fprintf (stderr, "  mapped_pool_segments: size of buffer pool in segments\n");
2283                 fprintf (stderr, "  pages_per_segment: size of buffer pool segment in pages in bits\n");
2284                 exit(0);
2285         }
2286
2287         start = getCpuTime(0);
2288         time(tod);
2289
2290         if( argc > 4 )
2291                 bits = atoi(argv[4]);
2292
2293         if( argc > 5 )
2294                 map = atoi(argv[5]);
2295
2296         if( map > 65536 )
2297                 fprintf (stderr, "Warning: buffer_pool > 65536 segments\n");
2298
2299         if( map && map < 8 )
2300                 fprintf (stderr, "Buffer_pool too small\n");
2301
2302         if( argc > 6 )
2303                 pgblk = atoi(argv[6]);
2304
2305         if( bits + pgblk > 30 )
2306                 fprintf (stderr, "Warning: very large buffer pool segment size\n");
2307
2308         if( argc > 7 )
2309                 off = atoi(argv[7]);
2310
2311         bt = bt_open ((argv[1]), BT_rw, bits, map, pgblk, map /  8);
2312
2313         if( !bt ) {
2314                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2315                 exit (1);
2316         }
2317
2318         switch(argv[3][0]| 0x20)
2319         {
2320         case 'a':
2321                 fprintf(stderr, "started audit for %s\n", argv[2]);
2322                 cnt = bt_audit (bt);
2323                 fprintf(stderr, "finished audit for %s, %d keys\n", argv[2], cnt);
2324                 break;
2325
2326         case 'w':
2327                 fprintf(stderr, "started indexing for %s\n", argv[2]);
2328                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2329                   while( ch = getc(in), ch != EOF )
2330                         if( ch == '\n' )
2331                         {
2332                           if( off )
2333                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2334
2335                           if( bt_insertkey (bt, key, len, 0, ++line, *tod) )
2336                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2337                           len = 0;
2338                         }
2339                         else if( len < 245 )
2340                                 key[len++] = ch;
2341                 fprintf(stderr, "finished adding keys for %s, %d \n", argv[2], line);
2342                 break;
2343
2344         case 'd':
2345                 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2346                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2347                   while( ch = getc(in), ch != EOF )
2348                         if( ch == '\n' )
2349                         {
2350                           if( off )
2351                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2352                           line++;
2353                           if( bt_deletekey (bt, key, len, 0) )
2354                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2355                           len = 0;
2356                         }
2357                         else if( len < 245 )
2358                                 key[len++] = ch;
2359                 fprintf(stderr, "finished deleting keys for %s, %d \n", argv[2], line);
2360                 break;
2361
2362         case 'f':
2363                 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2364                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2365                   while( ch = getc(in), ch != EOF )
2366                         if( ch == '\n' )
2367                         {
2368                           if( off )
2369                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2370                           line++;
2371                           if( bt_findkey (bt, key, len) )
2372                                 found++;
2373                           else if( bt->err )
2374                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2375                           len = 0;
2376                         }
2377                         else if( len < 245 )
2378                                 key[len++] = ch;
2379                 fprintf(stderr, "finished search of %d keys for %s, found %d\n", line, argv[2], found);
2380                 break;
2381
2382         case 's':
2383                 fprintf(stderr, "started scaning\n");
2384                 cnt = len = key[0] = 0;
2385
2386                 if( slot = bt_startkey (bt, key, len) )
2387                   slot--;
2388                 else
2389                   fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2390
2391                 while( slot = bt_nextkey (bt, slot) ) {
2392                         ptr = bt_key(bt, slot);
2393                         fwrite (ptr->key, ptr->len, 1, stdout);
2394                         fputc ('\n', stdout);
2395                         cnt++;
2396                 }
2397
2398                 fprintf(stderr, " Total keys read %d\n", cnt - 1);
2399                 break;
2400
2401         case 'c':
2402           fprintf(stderr, "started counting\n");
2403
2404           next = bt->latchmgr->nlatchpage + LATCH_page;
2405           page_no = LEAF_page;
2406           cnt = 0;
2407
2408           while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
2409           uid off = page_no << bt->page_bits;
2410 #ifdef unix
2411                 pread (bt->idx, bt->frame, bt->page_size, off);
2412 #else
2413                 DWORD amt[1];
2414
2415                 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2416
2417                 if( !ReadFile(bt->idx, bt->frame, bt->page_size, amt, NULL))
2418                         fprintf (stderr, "unable to read page %.8x", page_no);
2419
2420                 if( *amt <  bt->page_size )
2421                         fprintf (stderr, "unable to read page %.8x", page_no);
2422 #endif
2423                 if( !bt->frame->free && !bt->frame->lvl )
2424                         cnt += bt->frame->act;
2425                 if( page_no > LEAF_page )
2426                         next = page_no + 1;
2427                 page_no = next;
2428           }
2429                 
2430           cnt--;        // remove stopper key
2431           fprintf(stderr, " Total keys read %d\n", cnt);
2432           break;
2433         }
2434
2435         done = getCpuTime(0);
2436         elapsed = (float)(done - start);
2437         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2438         elapsed = getCpuTime(1);
2439         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2440         elapsed = getCpuTime(2);
2441         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2442         return 0;
2443 }
2444
2445 #endif  //STANDALONE