]> pd.if.org Git - btree/blob - btree2v.c
Introduce linux only futex contention resolution btree2v.c
[btree] / btree2v.c
1 // btree version 2v  linux futex contention
2 //      with combined latch & pool manager
3 //      and phase-fair reader writer lock
4 // 12 MAR 2014
5
6 // author: karl malbrain, malbrain@cal.berkeley.edu
7
8 /*
9 This work, including the source code, documentation
10 and related data, is placed into the public domain.
11
12 The orginal author is Karl Malbrain.
13
14 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
15 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
16 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
17 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
18 RESULTING FROM THE USE, MODIFICATION, OR
19 REDISTRIBUTION OF THIS SOFTWARE.
20 */
21
22 // Please see the project home page for documentation
23 // code.google.com/p/high-concurrency-btree
24
25 #define _FILE_OFFSET_BITS 64
26 #define _LARGEFILE64_SOURCE
27
28 #ifdef linux
29 #define _GNU_SOURCE
30 #include <linux/futex.h>
31 #include <limits.h>
32 #define SYS_futex 202
33 #endif
34
35 #ifdef unix
36 #include <unistd.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <time.h>
40 #include <fcntl.h>
41 #include <sys/mman.h>
42 #include <errno.h>
43 #else
44 #define WIN32_LEAN_AND_MEAN
45 #include <windows.h>
46 #include <stdio.h>
47 #include <stdlib.h>
48 #include <time.h>
49 #include <fcntl.h>
50 #include <io.h>
51 #endif
52
53 #include <memory.h>
54 #include <string.h>
55
56 typedef unsigned long long      uid;
57
58 #ifndef unix
59 typedef unsigned long long      off64_t;
60 typedef unsigned short          ushort;
61 typedef unsigned int            uint;
62 #endif
63
64 #define BT_ro 0x6f72    // ro
65 #define BT_rw 0x7772    // rw
66 #define BT_fl 0x6c66    // fl
67
68 #define BT_maxbits              15                                      // maximum page size in bits
69 #define BT_minbits              12                                      // minimum page size in bits
70 #define BT_minpage              (1 << BT_minbits)       // minimum page size
71 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
72
73 //  BTree page number constants
74 #define ALLOC_page              0
75 #define ROOT_page               1
76 #define LEAF_page               2
77 #define LATCH_page              3
78
79 //      Number of levels to create in a new BTree
80
81 #define MIN_lvl                 2
82 #define MAX_lvl                 15
83
84 /*
85 There are five lock types for each node in three independent sets: 
86 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
87 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
88 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
89 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
90 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
91 */
92
93 typedef enum{
94         BtLockAccess,
95         BtLockDelete,
96         BtLockRead,
97         BtLockWrite,
98         BtLockParent
99 }BtLock;
100
101 enum {
102         QueRd = 1,              // reader queue
103         QueWr = 2               // writer queue
104 } RWQueue;
105
106 volatile typedef struct {
107         ushort rin[1];          // readers in count
108         ushort rout[1];         // readers out count
109         ushort ticket[1];       // writers in count
110         ushort serving[1];      // writers out count
111 } RWLock;
112
113 //      define bits at bottom of rin
114
115 #define PHID 0x1        // writer phase (0/1)
116 #define PRES 0x2        // writer present
117 #define MASK 0x3        // both write bits
118 #define RINC 0x4        // reader increment
119
120 //      lite weight spin latch
121
122 typedef struct {
123   union {
124         struct {
125           uint xlock:1;         // writer has exclusive lock
126           uint share:15;        // count of readers with lock
127           uint read:1;          // readers are waiting
128           uint wrt:15;          // count of writers waiting
129         } bits[1];
130         uint value[1];
131   };
132 } BtSpinLatch;
133
134 #define XCL 1
135 #define SHARE 2
136 #define READ (SHARE * 32768)
137 #define WRT (READ * 2)
138
139 //      Define the length of the page and key pointers
140
141 #define BtId 6
142
143 //      Page key slot definition.
144
145 //      If BT_maxbits is 15 or less, you can save 2 bytes
146 //      for each key stored by making the first two uints
147 //      into ushorts.  You can also save 4 bytes by removing
148 //      the tod field from the key.
149
150 //      Keys are marked dead, but remain on the page until
151 //      cleanup is called. The fence key (highest key) for
152 //      the page is always present, even if dead.
153
154 typedef struct {
155 #ifdef USETOD
156         uint tod;                                       // time-stamp for key
157 #endif
158         ushort off:BT_maxbits;          // page offset for key start
159         ushort dead:1;                          // set for deleted key
160         unsigned char id[BtId];         // id associated with key
161 } BtSlot;
162
163 //      The key structure occupies space at the upper end of
164 //      each page.  It's a length byte followed by the value
165 //      bytes.
166
167 typedef struct {
168         unsigned char len;
169         unsigned char key[0];
170 } *BtKey;
171
172 //      The first part of an index page.
173 //      It is immediately followed
174 //      by the BtSlot array of keys.
175
176 typedef struct BtPage_ {
177         uint cnt;                                       // count of keys in page
178         uint act;                                       // count of active keys
179         uint min;                                       // next key offset
180         unsigned char bits:6;           // page size in bits
181         unsigned char free:1;           // page is on free list
182         unsigned char dirty:1;          // page is dirty in cache
183         unsigned char lvl:6;            // level of page
184         unsigned char kill:1;           // page is being deleted
185         unsigned char clean:1;          // page needs cleaning
186         unsigned char right[BtId];      // page number to right
187 } *BtPage;
188
189 typedef struct {
190         struct BtPage_ alloc[2];        // next & free page_nos in right ptr
191         BtSpinLatch lock[1];            // allocation area lite latch
192         volatile uint latchdeployed;// highest number of latch entries deployed
193         volatile uint nlatchpage;       // number of latch pages at BT_latch
194         volatile uint latchtotal;       // number of page latch entries
195         volatile uint latchhash;        // number of latch hash table slots
196         volatile uint latchvictim;      // next latch hash entry to examine
197         volatile uint safelevel;        // safe page level in cache
198         volatile uint cache[MAX_lvl];// cache census counts by btree level
199 } BtLatchMgr;
200
201 //  latch hash table entries
202
203 typedef struct {
204         volatile uint slot;             // Latch table entry at head of collision chain
205         BtSpinLatch latch[1];   // lock for the collision chain
206 } BtHashEntry;
207
208 //      latch manager table structure
209
210 typedef struct {
211         volatile uid page_no;   // latch set page number on disk
212         RWLock readwr[1];               // read/write page lock
213         RWLock access[1];               // Access Intent/Page delete
214         RWLock parent[1];               // Posting of fence key in parent
215         volatile ushort pin;    // number of pins/level/clock bits
216         volatile uint next;             // next entry in hash table chain
217         volatile uint prev;             // prev entry in hash table chain
218 } BtLatchSet;
219
220 #define CLOCK_mask 0xe000
221 #define CLOCK_unit 0x2000
222 #define PIN_mask 0x07ff
223 #define LVL_mask 0x1800
224 #define LVL_shift 11
225
226 //      The object structure for Btree access
227
228 typedef struct _BtDb {
229         uint page_size;         // each page size       
230         uint page_bits;         // each page size in bits       
231         uid page_no;            // current page number  
232         uid cursor_page;        // current cursor page number   
233         int  err;
234         uint mode;                      // read-write mode
235         BtPage cursor;          // cached frame for start/next (never mapped)
236         BtPage frame;           // spare frame for the page split (never mapped)
237         BtPage page;            // current mapped page in buffer pool
238         BtLatchSet *latch;                      // current page latch
239         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
240         BtLatchSet *latchsets;          // mapped latch set from latch pages
241         unsigned char *pagepool;        // cached page pool set
242         BtHashEntry *table;     // the hash table
243 #ifdef unix
244         int idx;
245 #else
246         HANDLE idx;
247         HANDLE halloc;          // allocation and latch table handle
248 #endif
249         unsigned char *mem;     // frame, cursor, memory buffers
250         uint found;                     // last deletekey found key
251 } BtDb;
252
253 typedef enum {
254 BTERR_ok = 0,
255 BTERR_notfound,
256 BTERR_struct,
257 BTERR_ovflw,
258 BTERR_read,
259 BTERR_lock,
260 BTERR_hash,
261 BTERR_kill,
262 BTERR_map,
263 BTERR_wrt,
264 BTERR_eof
265 } BTERR;
266
267 // B-Tree functions
268 extern void bt_close (BtDb *bt);
269 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk);
270 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
271 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
272 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
273 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
274 extern uint bt_nextkey   (BtDb *bt, uint slot);
275
276 //      internal functions
277 void bt_update (BtDb *bt, BtPage page);
278 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch);
279 //  Helper functions to return slot values
280
281 extern BtKey bt_key (BtDb *bt, uint slot);
282 extern uid bt_uid (BtDb *bt, uint slot);
283 #ifdef USETOD
284 extern uint bt_tod (BtDb *bt, uint slot);
285 #endif
286
287 //  The page is allocated from low and hi ends.
288 //  The key offsets and row-id's are allocated
289 //  from the bottom, while the text of the key
290 //  is allocated from the top.  When the two
291 //  areas meet, the page is split into two.
292
293 //  A key consists of a length byte, two bytes of
294 //  index number (0 - 65534), and up to 253 bytes
295 //  of key value.  Duplicate keys are discarded.
296 //  Associated with each key is a 48 bit row-id.
297
298 //  The b-tree root is always located at page 1.
299 //      The first leaf page of level zero is always
300 //      located on page 2.
301
302 //      The b-tree pages are linked with right
303 //      pointers to facilitate enumerators,
304 //      and provide for concurrency.
305
306 //      When to root page fills, it is split in two and
307 //      the tree height is raised by a new root at page
308 //      one with two keys.
309
310 //      Deleted keys are marked with a dead bit until
311 //      page cleanup The fence key for a node is always
312 //      present, even after deletion and cleanup.
313
314 //  Deleted leaf pages are reclaimed  on a free list.
315 //      The upper levels of the btree are fixed on creation.
316
317 //  To achieve maximum concurrency one page is locked at a time
318 //  as the tree is traversed to find leaf key in question. The right
319 //  page numbers are used in cases where the page is being split,
320 //      or consolidated.
321
322 //  Page 0 (ALLOC page) is dedicated to lock for new page extensions,
323 //      and chains empty leaf pages together for reuse.
324
325 //      Parent locks are obtained to prevent resplitting or deleting a node
326 //      before its fence is posted into its upper level.
327
328 //      A special open mode of BT_fl is provided to safely access files on
329 //      WIN32 networks. WIN32 network operations should not use memory mapping.
330 //      This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
331 //      to prevent local caching of network file contents.
332
333 //      Access macros to address slot and key values from the page.
334 //      Page slots use 1 based indexing.
335
336 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
337 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
338
339 void bt_putid(unsigned char *dest, uid id)
340 {
341 int i = BtId;
342
343         while( i-- )
344                 dest[i] = (unsigned char)id, id >>= 8;
345 }
346
347 uid bt_getid(unsigned char *src)
348 {
349 uid id = 0;
350 int i;
351
352         for( i = 0; i < BtId; i++ )
353                 id <<= 8, id |= *src++; 
354
355         return id;
356 }
357
358 BTERR bt_abort (BtDb *bt, BtPage page, uid page_no, BTERR err)
359 {
360 BtKey ptr;
361
362         fprintf(stderr, "\n Btree2 abort, error %d on page %.8x\n", err, page_no);
363         fprintf(stderr, "level=%d kill=%d free=%d cnt=%x act=%x\n", page->lvl, page->kill, page->free, page->cnt, page->act);
364         ptr = keyptr(page, page->cnt);
365         fprintf(stderr, "fence='%.*s'\n", ptr->len, ptr->key);
366         fprintf(stderr, "right=%.8x\n", bt_getid(page->right));
367         return bt->err = err;
368 }
369
370 //      Phase-Fair reader/writer lock implementation
371 //      with futex calls on contention
372
373 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
374 {
375         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
376 }
377
378 //  a phase fair reader/writer lock implementation
379
380 void WriteLock (RWLock *lock)
381 {
382 ushort w, r, tix;
383 uint prev;
384
385         tix = __sync_fetch_and_add (lock->ticket, 1);
386
387         // wait for our ticket to come up
388
389         while( 1 ) {
390                 prev = *lock->ticket | *lock->serving << 16;
391                 if( tix == prev >> 16 )
392                   break;
393                 sys_futex( (uint *)lock->ticket, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
394         }
395
396         w = PRES | (tix & PHID);
397         r = __sync_fetch_and_add (lock->rin, w);
398
399         while( 1 ) {
400                 prev = *lock->rin | *lock->rout << 16;
401                 if( r == prev >> 16 )
402                   break;
403                 sys_futex( (uint *)lock->rin, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
404         }
405 }
406
407 void WriteRelease (RWLock *lock)
408 {
409         __sync_fetch_and_and (lock->rin, ~MASK);
410         lock->serving[0]++;
411
412         if( (*lock->rin & ~MASK) != (*lock->rout & ~MASK) )
413           if( sys_futex( (uint *)lock->rin, FUTEX_WAKE_BITSET, INT_MAX, NULL, NULL, QueRd ) )
414                 return;
415
416         if( *lock->ticket != *lock->serving )
417           sys_futex( (uint *)lock->ticket, FUTEX_WAKE_BITSET, INT_MAX, NULL, NULL, QueWr );
418 }
419
420 void ReadLock (RWLock *lock)
421 {
422 uint prev;
423 ushort w;
424
425         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
426
427         if( w )
428           while( 1 ) {
429                 prev = *lock->rin | *lock->rout << 16;
430                 if( w != (prev & MASK) )
431                   break;
432                 sys_futex( (uint *)lock->rin, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueRd );
433           }
434 }
435
436 void ReadRelease (RWLock *lock)
437 {
438         __sync_fetch_and_add (lock->rout, RINC);
439
440         if( *lock->ticket == *lock->serving )
441                 return;
442
443         if( *lock->rin & PRES )
444           if( sys_futex( (uint *)lock->rin, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr ) )
445                 return;
446
447         sys_futex( (uint *)lock->ticket, FUTEX_WAKE_BITSET, INT_MAX, NULL, NULL, QueWr );
448 }
449
450 //      lite weight spin lock Latch Manager
451
452 //      wait until write lock mode is clear
453 //      and add 1 to the share count
454
455 void bt_spinreadlock(BtSpinLatch *latch)
456 {
457 BtSpinLatch prev[1];
458 uint slept = 0;
459
460   while( 1 ) {
461         *prev->value = __sync_fetch_and_add(latch->value, SHARE);
462
463         //  see if exclusive request is already granted
464         //       or if it is reader phase
465
466         if( slept || !prev->bits->wrt )
467           if( !prev->bits->xlock )
468                 return;
469
470         slept = 1;
471         prev->bits->read = 1;
472         __sync_fetch_and_or (latch->value, READ);
473         __sync_fetch_and_sub (latch->value, SHARE);
474         sys_futex( latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueRd );
475   }
476 }
477
478 //      wait for other read and write latches to relinquish
479
480 void bt_spinwritelock(BtSpinLatch *latch)
481 {
482 BtSpinLatch prev[1];
483 uint slept = 0;
484
485   while( 1 ) {
486         *prev->value = __sync_fetch_and_or(latch->value, XCL);
487
488         if( !prev->bits->xlock )                        // did we set XCL bit?
489           if( !(prev->bits->share) )    {               // any readers?
490             if( slept )
491                   __sync_fetch_and_sub(latch->value, WRT);
492                 return;
493           } else
494                 __sync_fetch_and_and(latch->value, ~XCL);
495
496         if( !slept ) {
497                 prev->bits->wrt++;
498                 __sync_fetch_and_add(latch->value, WRT);
499         }
500
501         sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr);
502         slept = 1;
503   }
504 }
505
506 //      try to obtain write lock
507
508 //      return 1 if obtained,
509 //              0 otherwise
510
511 int bt_spinwritetry(BtSpinLatch *latch)
512 {
513 BtSpinLatch prev[1];
514
515         *prev->value = __sync_fetch_and_or(latch->value, XCL);
516
517         //      take write access if all bits are clear
518
519         if( !prev->bits->xlock ) {
520           if( !prev->bits->share )
521                 return 1;
522         } else
523                 __sync_fetch_and_and(latch->value, ~XCL);
524
525         return 0;
526 }
527
528 //      clear write mode
529 //      wake up sleeping readers
530
531 void bt_spinreleasewrite(BtSpinLatch *latch)
532 {
533 BtSpinLatch prev[1];
534
535         *prev->value = __sync_fetch_and_and(latch->value, ~(XCL | READ));
536
537         //      alternate read/write phases
538
539         if( prev->bits->read )
540           if( sys_futex( latch->value, FUTEX_WAKE_BITSET, INT_MAX, NULL, NULL, QueRd ) )
541                 return;
542
543         if( prev->bits->wrt )
544           sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
545 }
546
547 //      decrement reader count
548 //      wake up sleeping writers
549
550 void bt_spinreleaseread(BtSpinLatch *latch)
551 {
552 BtSpinLatch prev[1];
553
554         *prev->value = __sync_sub_and_fetch(latch->value, SHARE);
555
556         //      alternate read/write phases
557
558         if( prev->bits->wrt ) {
559           if( !prev->bits->share )
560                 sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
561           return;
562         }
563
564         if( prev->bits->read ) {
565           __sync_fetch_and_and(latch->value, ~READ);
566           sys_futex (latch->value, FUTEX_WAKE_BITSET, INT_MAX, NULL, NULL, QueRd);
567         }
568 }
569
570 //      read page from permanent location in Btree file
571
572 BTERR bt_readpage (BtDb *bt, BtPage page, uid page_no)
573 {
574 off64_t off = page_no << bt->page_bits;
575
576 #ifdef unix
577         if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) < bt->page_size ) {
578                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
579                 return bt->err = BTERR_read;
580         }
581 #else
582 OVERLAPPED ovl[1];
583 uint amt[1];
584
585         memset (ovl, 0, sizeof(OVERLAPPED));
586         ovl->Offset = off;
587         ovl->OffsetHigh = off >> 32;
588
589         if( !ReadFile(bt->idx, page, bt->page_size, amt, ovl)) {
590                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
591                 return bt->err = BTERR_read;
592         }
593         if( *amt <  bt->page_size ) {
594                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
595                 return bt->err = BTERR_read;
596         }
597 #endif
598         return 0;
599 }
600
601 //      write page to permanent location in Btree file
602 //      clear the dirty bit
603
604 BTERR bt_writepage (BtDb *bt, BtPage page, uid page_no)
605 {
606 off64_t off = page_no << bt->page_bits;
607
608 #ifdef unix
609         page->dirty = 0;
610
611         if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
612                 return bt->err = BTERR_wrt;
613 #else
614 OVERLAPPED ovl[1];
615 uint amt[1];
616
617         memset (ovl, 0, sizeof(OVERLAPPED));
618         ovl->Offset = off;
619         ovl->OffsetHigh = off >> 32;
620         page->dirty = 0;
621
622         if( !WriteFile(bt->idx, page, bt->page_size, amt, ovl) )
623                 return bt->err = BTERR_wrt;
624
625         if( *amt <  bt->page_size )
626                 return bt->err = BTERR_wrt;
627 #endif
628         return 0;
629 }
630
631 //      link latch table entry into head of latch hash table
632
633 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no)
634 {
635 BtPage page = (BtPage)((uid)slot * bt->page_size + bt->pagepool);
636 BtLatchSet *latch = bt->latchsets + slot;
637 int lvl;
638
639         if( latch->next = bt->table[hashidx].slot )
640                 bt->latchsets[latch->next].prev = slot;
641
642         bt->table[hashidx].slot = slot;
643         latch->page_no = page_no;
644         latch->prev = 0;
645         latch->pin = 1;
646
647         if( bt_readpage (bt, page, page_no) )
648                 return bt->err;
649
650         lvl = page->lvl << LVL_shift;
651         if( lvl > LVL_mask )
652                 lvl = LVL_mask;
653         latch->pin |= lvl;              // store lvl
654         latch->pin |= lvl << 3; // initialize clock
655
656 #ifdef unix
657         __sync_fetch_and_add (&bt->latchmgr->cache[page->lvl], 1);
658 #else
659         _InterlockedAdd(&bt->latchmgr->cache[page->lvl], 1);
660 #endif
661         return bt->err = 0;
662 }
663
664 //      release latch pin
665
666 void bt_unpinlatch (BtLatchSet *latch)
667 {
668 #ifdef unix
669         __sync_fetch_and_add(&latch->pin, -1);
670 #else
671         _InterlockedDecrement16 (&latch->pin);
672 #endif
673 }
674
675 //      find existing latchset or inspire new one
676 //      return with latchset pinned
677
678 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
679 {
680 uint hashidx = page_no % bt->latchmgr->latchhash;
681 BtLatchSet *latch;
682 uint slot, idx;
683 uint lvl, cnt;
684 off64_t off;
685 uint amt[1];
686 BtPage page;
687
688   //  try to find our entry
689
690   bt_spinwritelock(bt->table[hashidx].latch);
691
692   if( slot = bt->table[hashidx].slot ) do
693   {
694         latch = bt->latchsets + slot;
695         if( page_no == latch->page_no )
696                 break;
697   } while( slot = latch->next );
698
699   //  found our entry
700   //  increment clock
701
702   if( slot ) {
703         latch = bt->latchsets + slot;
704         lvl = (latch->pin & LVL_mask) >> LVL_shift;
705         lvl *= CLOCK_unit * 2;
706         lvl |= CLOCK_unit;
707 #ifdef unix
708         __sync_fetch_and_add(&latch->pin, 1);
709         __sync_fetch_and_or(&latch->pin, lvl);
710 #else
711         _InterlockedIncrement16 (&latch->pin);
712         _InterlockedOr16 (&latch->pin, lvl);
713 #endif
714         bt_spinreleasewrite(bt->table[hashidx].latch);
715         return latch;
716   }
717
718         //  see if there are any unused pool entries
719 #ifdef unix
720         slot = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1;
721 #else
722         slot = _InterlockedIncrement (&bt->latchmgr->latchdeployed);
723 #endif
724
725         if( slot < bt->latchmgr->latchtotal ) {
726                 latch = bt->latchsets + slot;
727                 if( bt_latchlink (bt, hashidx, slot, page_no) )
728                         return NULL;
729                 bt_spinreleasewrite (bt->table[hashidx].latch);
730                 return latch;
731         }
732
733 #ifdef unix
734         __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1);
735 #else
736         _InterlockedDecrement (&bt->latchmgr->latchdeployed);
737 #endif
738   //  find and reuse previous entry on victim
739
740   while( 1 ) {
741 #ifdef unix
742         slot = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
743 #else
744         slot = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1;
745 #endif
746         // try to get write lock on hash chain
747         //      skip entry if not obtained
748         //      or has outstanding pins
749
750         slot %= bt->latchmgr->latchtotal;
751
752         //      on slot wraparound, check census
753         //      count and increment safe level
754
755         cnt = bt->latchmgr->cache[bt->latchmgr->safelevel];
756
757         if( !slot ) {
758           if( cnt < bt->latchmgr->latchtotal / 10 )
759 #ifdef unix
760                 __sync_fetch_and_add(&bt->latchmgr->safelevel, 1);
761 #else
762                 _InterlockedIncrement (&bt->latchmgr->safelevel);
763 #endif
764           continue;
765         }
766
767         latch = bt->latchsets + slot;
768         idx = latch->page_no % bt->latchmgr->latchhash;
769         lvl = (latch->pin & LVL_mask) >> LVL_shift;
770
771         //      see if we are evicting this level yet
772         //      or if we are on same chain as hashidx
773
774         if( idx == hashidx || lvl > bt->latchmgr->safelevel )
775                 continue;
776
777         if( !bt_spinwritetry (bt->table[idx].latch) )
778                 continue;
779
780         if( latch->pin & ~LVL_mask ) {
781           if( latch->pin & CLOCK_mask )
782 #ifdef unix
783                 __sync_fetch_and_add(&latch->pin, -CLOCK_unit);
784 #else
785                 _InterlockedExchangeAdd16 (&latch->pin, -CLOCK_unit);
786 #endif
787           bt_spinreleasewrite (bt->table[idx].latch);
788           continue;
789         }
790
791         //  update permanent page area in btree
792
793         page = (BtPage)((uid)slot * bt->page_size + bt->pagepool);
794 #ifdef unix
795         posix_fadvise (bt->idx, page_no << bt->page_bits, bt->page_size, POSIX_FADV_WILLNEED);
796         __sync_fetch_and_add (&bt->latchmgr->cache[page->lvl], -1);
797 #else
798         _InterlockedAdd(&bt->latchmgr->cache[page->lvl], -1);
799 #endif
800         if( page->dirty )
801           if( bt_writepage (bt, page, latch->page_no) )
802                 return NULL;
803
804         //  unlink our available slot from its hash chain
805
806         if( latch->prev )
807                 bt->latchsets[latch->prev].next = latch->next;
808         else
809                 bt->table[idx].slot = latch->next;
810
811         if( latch->next )
812                 bt->latchsets[latch->next].prev = latch->prev;
813
814         bt_spinreleasewrite (bt->table[idx].latch);
815
816         if( bt_latchlink (bt, hashidx, slot, page_no) )
817                 return NULL;
818
819         bt_spinreleasewrite (bt->table[hashidx].latch);
820         return latch;
821   }
822 }
823
824 //      close and release memory
825
826 void bt_close (BtDb *bt)
827 {
828 #ifdef unix
829         munmap (bt->table, bt->latchmgr->nlatchpage * bt->page_size);
830         munmap (bt->latchmgr, bt->page_size);
831 #else
832         FlushViewOfFile(bt->latchmgr, 0);
833         UnmapViewOfFile(bt->latchmgr);
834         CloseHandle(bt->halloc);
835 #endif
836 #ifdef unix
837         if( bt->mem )
838                 free (bt->mem);
839         close (bt->idx);
840         free (bt);
841 #else
842         if( bt->mem)
843                 VirtualFree (bt->mem, 0, MEM_RELEASE);
844         FlushFileBuffers(bt->idx);
845         CloseHandle(bt->idx);
846         GlobalFree (bt);
847 #endif
848 }
849 //  open/create new btree
850
851 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
852 //              size of mapped page pool (e.g. 8192)
853
854 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax)
855 {
856 uint lvl, attr, last, slot, idx;
857 uint nlatchpage, latchhash;
858 BtLatchMgr *latchmgr;
859 off64_t size, off;
860 uint amt[1];
861 BtKey key;
862 BtDb* bt;
863 int flag;
864
865 #ifndef unix
866 OVERLAPPED ovl[1];
867 #else
868 struct flock lock[1];
869 #endif
870
871         // determine sanity of page size and buffer pool
872
873         if( bits > BT_maxbits )
874                 bits = BT_maxbits;
875         else if( bits < BT_minbits )
876                 bits = BT_minbits;
877
878         if( mode == BT_ro ) {
879                 fprintf(stderr, "ReadOnly mode not supported: %s\n", name);
880                 return NULL;
881         }
882 #ifdef unix
883         bt = calloc (1, sizeof(BtDb));
884
885         bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
886         posix_fadvise( bt->idx, 0, 0, POSIX_FADV_RANDOM);
887  
888         if( bt->idx == -1 ) {
889                 fprintf(stderr, "unable to open %s\n", name);
890                 return free(bt), NULL;
891         }
892 #else
893         bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb));
894         attr = FILE_ATTRIBUTE_NORMAL;
895         bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
896
897         if( bt->idx == INVALID_HANDLE_VALUE ) {
898                 fprintf(stderr, "unable to open %s\n", name);
899                 return GlobalFree(bt), NULL;
900         }
901 #endif
902 #ifdef unix
903         memset (lock, 0, sizeof(lock));
904         lock->l_len = sizeof(struct BtPage_);
905         lock->l_type = F_WRLCK;
906
907         if( fcntl (bt->idx, F_SETLKW, lock) < 0 ) {
908                 fprintf(stderr, "unable to lock record zero %s\n", name);
909                 return bt_close (bt), NULL;
910         }
911 #else
912         memset (ovl, 0, sizeof(ovl));
913
914         //      use large offsets to
915         //      simulate advisory locking
916
917         ovl->OffsetHigh |= 0x80000000;
918
919         if( !LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, sizeof(struct BtPage_), 0L, ovl) ) {
920                 fprintf(stderr, "unable to lock record zero %s, GetLastError = %d\n", name, GetLastError());
921                 return bt_close (bt), NULL;
922         }
923 #endif 
924
925 #ifdef unix
926         latchmgr = valloc (BT_maxpage);
927         *amt = 0;
928
929         // read minimum page size to get root info
930
931         if( size = lseek (bt->idx, 0L, 2) ) {
932                 if( pread(bt->idx, latchmgr, BT_minpage, 0) == BT_minpage )
933                         bits = latchmgr->alloc->bits;
934                 else {
935                         fprintf(stderr, "Unable to read page zero\n");
936                         return free(bt), free(latchmgr), NULL;
937                 }
938         }
939 #else
940         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
941         size = GetFileSize(bt->idx, amt);
942
943         if( size || *amt ) {
944                 if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) ) {
945                         fprintf(stderr, "Unable to read page zero\n");
946                         return bt_close (bt), NULL;
947                 } else
948                         bits = latchmgr->alloc->bits;
949         }
950 #endif
951
952         bt->page_size = 1 << bits;
953         bt->page_bits = bits;
954
955         bt->mode = mode;
956
957         if( size || *amt ) {
958                 nlatchpage = latchmgr->nlatchpage;
959                 goto btlatch;
960         }
961
962         if( nodemax < 16 ) {
963                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
964                 return bt_close(bt), NULL;
965         }
966
967         // initialize an empty b-tree with latch page, root page, page of leaves
968         // and page(s) of latches and page pool cache
969
970         memset (latchmgr, 0, 1 << bits);
971         latchmgr->alloc->bits = bt->page_bits;
972
973         //  calculate number of latch hash table entries
974
975         nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + bt->page_size - 1) / bt->page_size;
976         latchhash = nlatchpage * bt->page_size / sizeof(BtHashEntry);
977
978         nlatchpage += nodemax;          // size of the buffer pool in pages
979         nlatchpage += (sizeof(BtLatchSet) * nodemax + bt->page_size - 1)/bt->page_size;
980
981         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
982         latchmgr->nlatchpage = nlatchpage;
983         latchmgr->latchtotal = nodemax;
984         latchmgr->latchhash = latchhash;
985
986         if( bt_writepage (bt, latchmgr->alloc, 0) ) {
987                 fprintf (stderr, "Unable to create btree page zero\n");
988                 return bt_close (bt), NULL;
989         }
990
991         memset (latchmgr, 0, 1 << bits);
992         latchmgr->alloc->bits = bt->page_bits;
993
994         for( lvl=MIN_lvl; lvl--; ) {
995                 last = MIN_lvl - lvl;   // page number
996                 slotptr(latchmgr->alloc, 1)->off = bt->page_size - 3;
997                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? last + 1 : 0);
998                 key = keyptr(latchmgr->alloc, 1);
999                 key->len = 2;           // create stopper key
1000                 key->key[0] = 0xff;
1001                 key->key[1] = 0xff;
1002
1003                 latchmgr->alloc->min = bt->page_size - 3;
1004                 latchmgr->alloc->lvl = lvl;
1005                 latchmgr->alloc->cnt = 1;
1006                 latchmgr->alloc->act = 1;
1007
1008                 if( bt_writepage (bt, latchmgr->alloc, last) ) {
1009                         fprintf (stderr, "Unable to create btree page %.8x\n", last);
1010                         return bt_close (bt), NULL;
1011                 }
1012         }
1013
1014         // clear out buffer pool pages
1015
1016         memset(latchmgr, 0, bt->page_size);
1017         last = MIN_lvl + nlatchpage;
1018
1019         if( bt_writepage (bt, latchmgr->alloc, last) ) {
1020                 fprintf (stderr, "Unable to write buffer pool page %.8x\n", last);
1021                 return bt_close (bt), NULL;
1022         }
1023                 
1024 #ifdef unix
1025         free (latchmgr);
1026 #else
1027         VirtualFree (latchmgr, 0, MEM_RELEASE);
1028 #endif
1029
1030 btlatch:
1031 #ifdef unix
1032         lock->l_type = F_UNLCK;
1033         if( fcntl (bt->idx, F_SETLK, lock) < 0 ) {
1034                 fprintf (stderr, "Unable to unlock page zero\n");
1035                 return bt_close (bt), NULL;
1036         }
1037 #else
1038         if( !UnlockFileEx (bt->idx, 0, sizeof(struct BtPage_), 0, ovl) ) {
1039                 fprintf (stderr, "Unable to unlock page zero, GetLastError = %d\n", GetLastError());
1040                 return bt_close (bt), NULL;
1041         }
1042 #endif
1043 #ifdef unix
1044         flag = PROT_READ | PROT_WRITE;
1045         bt->latchmgr = mmap (0, bt->page_size, flag, MAP_SHARED, bt->idx, ALLOC_page * bt->page_size);
1046         if( bt->latchmgr == MAP_FAILED ) {
1047                 fprintf (stderr, "Unable to mmap page zero, errno = %d", errno);
1048                 return bt_close (bt), NULL;
1049         }
1050         bt->table = (void *)mmap (0, (uid)nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size);
1051         if( bt->table == MAP_FAILED ) {
1052                 fprintf (stderr, "Unable to mmap buffer pool, errno = %d", errno);
1053                 return bt_close (bt), NULL;
1054         }
1055         madvise (bt->table, (uid)nlatchpage << bt->page_bits, MADV_RANDOM | MADV_WILLNEED);
1056 #else
1057         flag = PAGE_READWRITE;
1058         bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size, NULL);
1059         if( !bt->halloc ) {
1060                 fprintf (stderr, "Unable to create file mapping for buffer pool mgr, GetLastError = %d\n", GetLastError());
1061                 return bt_close (bt), NULL;
1062         }
1063
1064         flag = FILE_MAP_WRITE;
1065         bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size);
1066         if( !bt->latchmgr ) {
1067                 fprintf (stderr, "Unable to map buffer pool, GetLastError = %d\n", GetLastError());
1068                 return bt_close (bt), NULL;
1069         }
1070
1071         bt->table = (void *)((char *)bt->latchmgr + LATCH_page * bt->page_size);
1072 #endif
1073         bt->pagepool = (unsigned char *)bt->table + (uid)(nlatchpage - bt->latchmgr->latchtotal) * bt->page_size;
1074         bt->latchsets = (BtLatchSet *)(bt->pagepool - (uid)bt->latchmgr->latchtotal * sizeof(BtLatchSet));
1075
1076 #ifdef unix
1077         bt->mem = valloc (2 * bt->page_size);
1078 #else
1079         bt->mem = VirtualAlloc(NULL, 2 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
1080 #endif
1081         bt->frame = (BtPage)bt->mem;
1082         bt->cursor = (BtPage)(bt->mem + bt->page_size);
1083         return bt;
1084 }
1085
1086 // place write, read, or parent lock on requested page_no.
1087
1088 void bt_lockpage(BtLock mode, BtLatchSet *latch)
1089 {
1090         switch( mode ) {
1091         case BtLockRead:
1092                 ReadLock (latch->readwr);
1093                 break;
1094         case BtLockWrite:
1095                 WriteLock (latch->readwr);
1096                 break;
1097         case BtLockAccess:
1098                 ReadLock (latch->access);
1099                 break;
1100         case BtLockDelete:
1101                 WriteLock (latch->access);
1102                 break;
1103         case BtLockParent:
1104                 WriteLock (latch->parent);
1105                 break;
1106         }
1107 }
1108
1109 // remove write, read, or parent lock on requested page
1110
1111 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1112 {
1113         switch( mode ) {
1114         case BtLockRead:
1115                 ReadRelease (latch->readwr);
1116                 break;
1117         case BtLockWrite:
1118                 WriteRelease (latch->readwr);
1119                 break;
1120         case BtLockAccess:
1121                 ReadRelease (latch->access);
1122                 break;
1123         case BtLockDelete:
1124                 WriteRelease (latch->access);
1125                 break;
1126         case BtLockParent:
1127                 WriteRelease (latch->parent);
1128                 break;
1129         }
1130 }
1131
1132 //      allocate a new page and write page into it
1133
1134 uid bt_newpage(BtDb *bt, BtPage page)
1135 {
1136 BtLatchSet *latch;
1137 uid new_page;
1138 BtPage temp;
1139
1140         //      lock allocation page
1141
1142         bt_spinwritelock(bt->latchmgr->lock);
1143
1144         // use empty chain first
1145         // else allocate empty page
1146
1147         if( new_page = bt_getid(bt->latchmgr->alloc[1].right) ) {
1148           if( latch = bt_pinlatch (bt, new_page) )
1149                 temp = bt_mappage (bt, latch);
1150           else
1151                 return 0;
1152
1153           bt_putid(bt->latchmgr->alloc[1].right, bt_getid(temp->right));
1154           bt_spinreleasewrite(bt->latchmgr->lock);
1155           memcpy (temp, page, bt->page_size);
1156
1157           bt_update (bt, temp);
1158           bt_unpinlatch (latch);
1159           return new_page;
1160         } else {
1161           new_page = bt_getid(bt->latchmgr->alloc->right);
1162           bt_putid(bt->latchmgr->alloc->right, new_page+1);
1163           bt_spinreleasewrite(bt->latchmgr->lock);
1164
1165           if( bt_writepage (bt, page, new_page) )
1166                 return 0;
1167         }
1168
1169         bt_update (bt, bt->latchmgr->alloc);
1170         return new_page;
1171 }
1172
1173 //  compare two keys, returning > 0, = 0, or < 0
1174 //  as the comparison value
1175
1176 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1177 {
1178 uint len1 = key1->len;
1179 int ans;
1180
1181         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1182                 return ans;
1183
1184         if( len1 > len2 )
1185                 return 1;
1186         if( len1 < len2 )
1187                 return -1;
1188
1189         return 0;
1190 }
1191
1192 //  Update current page of btree by
1193 //      flushing mapped area to disk backing of cache pool.
1194 //      mark page as dirty for rewrite to permanent location
1195
1196 void bt_update (BtDb *bt, BtPage page)
1197 {
1198 #ifdef unix
1199         msync (page, bt->page_size, MS_ASYNC);
1200 #else
1201 //      FlushViewOfFile (page, bt->page_size);
1202 #endif
1203         page->dirty = 1;
1204 }
1205
1206 //  map the btree cached page onto current page
1207
1208 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
1209 {
1210         return (BtPage)((uid)(latch - bt->latchsets) * bt->page_size + bt->pagepool);
1211 }
1212
1213 //      deallocate a deleted page 
1214 //      place on free chain out of allocator page
1215 //      call with page latched for Writing and Deleting
1216
1217 BTERR bt_freepage(BtDb *bt, uid page_no, BtLatchSet *latch)
1218 {
1219 BtPage page = bt_mappage (bt, latch);
1220
1221         //      lock allocation page
1222
1223         bt_spinwritelock (bt->latchmgr->lock);
1224
1225         //      store chain in second right
1226         bt_putid(page->right, bt_getid(bt->latchmgr->alloc[1].right));
1227         bt_putid(bt->latchmgr->alloc[1].right, page_no);
1228
1229         page->free = 1;
1230         bt_update(bt, page);
1231
1232         // unlock released page
1233
1234         bt_unlockpage (BtLockDelete, latch);
1235         bt_unlockpage (BtLockWrite, latch);
1236         bt_unpinlatch (latch);
1237
1238         // unlock allocation page
1239
1240         bt_spinreleasewrite (bt->latchmgr->lock);
1241         bt_update (bt, bt->latchmgr->alloc);
1242         return 0;
1243 }
1244
1245 //  find slot in page for given key at a given level
1246
1247 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1248 {
1249 uint diff, higher = bt->page->cnt, low = 1, slot;
1250 uint good = 0;
1251
1252         //      make stopper key an infinite fence value
1253
1254         if( bt_getid (bt->page->right) )
1255                 higher++;
1256         else
1257                 good++;
1258
1259         //      low is the lowest candidate, higher is already
1260         //      tested as .ge. the given key, loop ends when they meet
1261
1262         while( diff = higher - low ) {
1263                 slot = low + ( diff >> 1 );
1264                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1265                         low = slot + 1;
1266                 else
1267                         higher = slot, good++;
1268         }
1269
1270         //      return zero if key is on right link page
1271
1272         return good ? higher : 0;
1273 }
1274
1275 //  find and load page at given level for given key
1276 //      leave page rd or wr locked as requested
1277
1278 int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
1279 {
1280 uid page_no = ROOT_page, prevpage = 0;
1281 uint drill = 0xff, slot;
1282 BtLatchSet *prevlatch;
1283 uint mode, prevmode;
1284
1285   //  start at root of btree and drill down
1286
1287   do {
1288         // determine lock mode of drill level
1289         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1290
1291         if( bt->latch = bt_pinlatch(bt, page_no) )
1292                 bt->page_no = page_no;
1293         else
1294                 return 0;
1295
1296         // obtain access lock using lock chaining
1297
1298         if( page_no > ROOT_page )
1299                 bt_lockpage(BtLockAccess, bt->latch);
1300
1301         if( prevpage ) {
1302                 bt_unlockpage(prevmode, prevlatch);
1303                 bt_unpinlatch(prevlatch);
1304                 prevpage = 0;
1305         }
1306
1307         // obtain read lock using lock chaining
1308
1309         bt_lockpage(mode, bt->latch);
1310
1311         if( page_no > ROOT_page )
1312                 bt_unlockpage(BtLockAccess, bt->latch);
1313
1314         //      map/obtain page contents
1315
1316         bt->page = bt_mappage (bt, bt->latch);
1317
1318         // re-read and re-lock root after determining actual level of root
1319
1320         if( bt->page->lvl != drill) {
1321                 if( bt->page_no != ROOT_page )
1322                         return bt->err = BTERR_struct, 0;
1323                         
1324                 drill = bt->page->lvl;
1325
1326                 if( lock != BtLockRead && drill == lvl ) {
1327                         bt_unlockpage(mode, bt->latch);
1328                         bt_unpinlatch(bt->latch);
1329                         continue;
1330                 }
1331         }
1332
1333         prevpage = bt->page_no;
1334         prevlatch = bt->latch;
1335         prevmode = mode;
1336
1337         //  find key on page at this level
1338         //  and descend to requested level
1339
1340         if( !bt->page->kill )
1341          if( slot = bt_findslot (bt, key, len) ) {
1342           if( drill == lvl )
1343                 return slot;
1344
1345           while( slotptr(bt->page, slot)->dead )
1346                 if( slot++ < bt->page->cnt )
1347                         continue;
1348                 else
1349                         goto slideright;
1350
1351           page_no = bt_getid(slotptr(bt->page, slot)->id);
1352           drill--;
1353           continue;
1354          }
1355
1356         //  or slide right into next page
1357
1358 slideright:
1359         page_no = bt_getid(bt->page->right);
1360
1361   } while( page_no );
1362
1363   // return error on end of right chain
1364
1365   bt->err = BTERR_eof;
1366   return 0;     // return error
1367 }
1368
1369 //      a fence key was deleted from a page
1370 //      push new fence value upwards
1371
1372 BTERR bt_fixfence (BtDb *bt, uid page_no, uint lvl)
1373 {
1374 unsigned char leftkey[256], rightkey[256];
1375 BtLatchSet *latch = bt->latch;
1376 BtKey ptr;
1377
1378         // remove deleted key, the old fence value
1379
1380         ptr = keyptr(bt->page, bt->page->cnt);
1381         memcpy(rightkey, ptr, ptr->len + 1);
1382
1383         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1384         bt->page->clean = 1;
1385
1386         ptr = keyptr(bt->page, bt->page->cnt);
1387         memcpy(leftkey, ptr, ptr->len + 1);
1388
1389         bt_update (bt, bt->page);
1390         bt_lockpage (BtLockParent, latch);
1391         bt_unlockpage (BtLockWrite, latch);
1392
1393         //  insert new (now smaller) fence key
1394
1395         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl + 1, page_no, time(NULL)) )
1396                 return bt->err;
1397
1398         //  remove old (larger) fence key
1399
1400         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl + 1) )
1401                 return bt->err;
1402
1403         bt_unlockpage (BtLockParent, latch);
1404         bt_unpinlatch (latch);
1405         return 0;
1406 }
1407
1408 //      root has a single child
1409 //      collapse a level from the btree
1410 //      call with root locked in bt->page
1411
1412 BTERR bt_collapseroot (BtDb *bt, BtPage root)
1413 {
1414 BtLatchSet *latch;
1415 BtPage temp;
1416 uid child;
1417 uint idx;
1418
1419         // find the child entry
1420         //      and promote to new root
1421
1422   do {
1423         for( idx = 0; idx++ < root->cnt; )
1424           if( !slotptr(root, idx)->dead )
1425                 break;
1426
1427         child = bt_getid (slotptr(root, idx)->id);
1428         if( latch = bt_pinlatch (bt, child) )
1429                 temp = bt_mappage (bt, latch);
1430         else
1431                 return bt->err;
1432
1433         bt_lockpage (BtLockDelete, latch);
1434         bt_lockpage (BtLockWrite, latch);
1435         memcpy (root, temp, bt->page_size);
1436
1437         bt_update (bt, root);
1438
1439         if( bt_freepage (bt, child, latch) )
1440                 return bt->err;
1441
1442   } while( root->lvl > 1 && root->act == 1 );
1443
1444   bt_unlockpage (BtLockWrite, bt->latch);
1445   bt_unpinlatch (bt->latch);
1446   return 0;
1447 }
1448
1449 //  find and delete key on page by marking delete flag bit
1450 //  when page becomes empty, delete it
1451
1452 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1453 {
1454 unsigned char lowerkey[256], higherkey[256];
1455 uint slot, dirty = 0, idx, fence, found;
1456 BtLatchSet *latch, *rlatch;
1457 uid page_no, right;
1458 BtPage temp;
1459 BtKey ptr;
1460
1461         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1462                 ptr = keyptr(bt->page, slot);
1463         else
1464                 return bt->err;
1465
1466         // are we deleting a fence slot?
1467
1468         fence = slot == bt->page->cnt;
1469
1470         // if key is found delete it, otherwise ignore request
1471
1472         if( found = !keycmp (ptr, key, len) )
1473           if( found = slotptr(bt->page, slot)->dead == 0 ) {
1474                 dirty = slotptr(bt->page,slot)->dead = 1;
1475                 bt->page->clean = 1;
1476                 bt->page->act--;
1477
1478                 // collapse empty slots
1479
1480                 while( idx = bt->page->cnt - 1 )
1481                   if( slotptr(bt->page, idx)->dead ) {
1482                         *slotptr(bt->page, idx) = *slotptr(bt->page, idx + 1);
1483                         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1484                   } else
1485                         break;
1486           }
1487
1488         right = bt_getid(bt->page->right);
1489         page_no = bt->page_no;
1490         latch = bt->latch;
1491
1492         if( !dirty ) {
1493           if( lvl )
1494                 return bt_abort (bt, bt->page, page_no, BTERR_notfound);
1495           bt_unlockpage(BtLockWrite, latch);
1496           bt_unpinlatch (latch);
1497           return bt->found = found, 0;
1498         }
1499
1500         //      did we delete a fence key in an upper level?
1501
1502         if( lvl && bt->page->act && fence )
1503           if( bt_fixfence (bt, page_no, lvl) )
1504                 return bt->err;
1505           else
1506                 return bt->found = found, 0;
1507
1508         //      is this a collapsed root?
1509
1510         if( lvl > 1 && page_no == ROOT_page && bt->page->act == 1 )
1511           if( bt_collapseroot (bt, bt->page) )
1512                 return bt->err;
1513           else
1514                 return bt->found = found, 0;
1515
1516         // return if page is not empty
1517
1518         if( bt->page->act ) {
1519           bt_update(bt, bt->page);
1520           bt_unlockpage(BtLockWrite, latch);
1521           bt_unpinlatch (latch);
1522           return bt->found = found, 0;
1523         }
1524
1525         // cache copy of fence key
1526         //      in order to find parent
1527
1528         ptr = keyptr(bt->page, bt->page->cnt);
1529         memcpy(lowerkey, ptr, ptr->len + 1);
1530
1531         // obtain lock on right page
1532
1533         if( rlatch = bt_pinlatch (bt, right) )
1534                 temp = bt_mappage (bt, rlatch);
1535         else
1536                 return bt->err;
1537
1538         bt_lockpage(BtLockWrite, rlatch);
1539
1540         if( temp->kill ) {
1541                 bt_abort(bt, temp, right, 0);
1542                 return bt_abort(bt, bt->page, bt->page_no, BTERR_kill);
1543         }
1544
1545         // pull contents of next page into current empty page 
1546
1547         memcpy (bt->page, temp, bt->page_size);
1548
1549         //      cache copy of key to update
1550
1551         ptr = keyptr(temp, temp->cnt);
1552         memcpy(higherkey, ptr, ptr->len + 1);
1553
1554         //  Mark right page as deleted and point it to left page
1555         //      until we can post updates at higher level.
1556
1557         bt_putid(temp->right, page_no);
1558         temp->kill = 1;
1559
1560         bt_update(bt, bt->page);
1561         bt_update(bt, temp);
1562
1563         bt_lockpage(BtLockParent, latch);
1564         bt_unlockpage(BtLockWrite, latch);
1565
1566         bt_lockpage(BtLockParent, rlatch);
1567         bt_unlockpage(BtLockWrite, rlatch);
1568
1569         //  redirect higher key directly to consolidated node
1570
1571         if( bt_insertkey (bt, higherkey+1, *higherkey, lvl+1, page_no, time(NULL)) )
1572                 return bt->err;
1573
1574         //  delete old lower key to consolidated node
1575
1576         if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) )
1577                 return bt->err;
1578
1579         //  obtain write & delete lock on deleted node
1580         //      add right block to free chain
1581
1582         bt_lockpage(BtLockDelete, rlatch);
1583         bt_lockpage(BtLockWrite, rlatch);
1584         bt_unlockpage(BtLockParent, rlatch);
1585
1586         if( bt_freepage (bt, right, rlatch) )
1587                 return bt->err;
1588
1589         bt_unlockpage(BtLockParent, latch);
1590         bt_unpinlatch(latch);
1591         return 0;
1592 }
1593
1594 //      find key in leaf level and return row-id
1595
1596 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1597 {
1598 uint  slot;
1599 BtKey ptr;
1600 uid id;
1601
1602         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1603                 ptr = keyptr(bt->page, slot);
1604         else
1605                 return 0;
1606
1607         // if key exists, return row-id
1608         //      otherwise return 0
1609
1610         if( ptr->len == len && !memcmp (ptr->key, key, len) )
1611                 id = bt_getid(slotptr(bt->page,slot)->id);
1612         else
1613                 id = 0;
1614
1615         bt_unlockpage (BtLockRead, bt->latch);
1616         bt_unpinlatch (bt->latch);
1617         return id;
1618 }
1619
1620 //      check page for space available,
1621 //      clean if necessary and return
1622 //      0 - page needs splitting
1623 //      >0 - go ahead with new slot
1624  
1625 uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
1626 {
1627 uint nxt = bt->page_size;
1628 BtPage page = bt->page;
1629 uint cnt = 0, idx = 0;
1630 uint max = page->cnt;
1631 uint newslot = slot;
1632 BtKey key;
1633 int ret;
1634
1635         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1636                 return slot;
1637
1638         //      skip cleanup if nothing to reclaim
1639
1640         if( !page->clean )
1641                 return 0;
1642
1643         memcpy (bt->frame, page, bt->page_size);
1644
1645         // skip page info and set rest of page to zero
1646
1647         memset (page+1, 0, bt->page_size - sizeof(*page));
1648         page->act = 0;
1649
1650         while( cnt++ < max ) {
1651                 if( cnt == slot )
1652                         newslot = idx + 1;
1653                 // always leave fence key in list
1654                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1655                         continue;
1656
1657                 // copy key
1658                 key = keyptr(bt->frame, cnt);
1659                 nxt -= key->len + 1;
1660                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1661
1662                 // copy slot
1663                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1664                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1665                         page->act++;
1666 #ifdef USETOD
1667                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1668 #endif
1669                 slotptr(page, idx)->off = nxt;
1670         }
1671
1672         page->min = nxt;
1673         page->cnt = idx;
1674
1675         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1676                 return newslot;
1677
1678         return 0;
1679 }
1680
1681 // split the root and raise the height of the btree
1682
1683 BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
1684 {
1685 uint nxt = bt->page_size;
1686 BtPage root = bt->page;
1687 uid right;
1688
1689         //  Obtain an empty page to use, and copy the current
1690         //  root contents into it
1691
1692         if( !(right = bt_newpage(bt, root)) )
1693                 return bt->err;
1694
1695         // preserve the page info at the bottom
1696         // and set rest to zero
1697
1698         memset(root+1, 0, bt->page_size - sizeof(*root));
1699
1700         // insert first key on newroot page
1701
1702         nxt -= *leftkey + 1;
1703         memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
1704         bt_putid(slotptr(root, 1)->id, right);
1705         slotptr(root, 1)->off = nxt;
1706         
1707         // insert second key on newroot page
1708         // and increase the root height
1709
1710         nxt -= 3;
1711         ((unsigned char *)root)[nxt] = 2;
1712         ((unsigned char *)root)[nxt+1] = 0xff;
1713         ((unsigned char *)root)[nxt+2] = 0xff;
1714         bt_putid(slotptr(root, 2)->id, page_no2);
1715         slotptr(root, 2)->off = nxt;
1716
1717         bt_putid(root->right, 0);
1718         root->min = nxt;                // reset lowest used offset and key count
1719         root->cnt = 2;
1720         root->act = 2;
1721         root->lvl++;
1722
1723         // update and release root (bt->page)
1724
1725         bt_update(bt, root);
1726
1727         bt_unlockpage(BtLockWrite, bt->latch);
1728         bt_unpinlatch(bt->latch);
1729         return 0;
1730 }
1731
1732 //  split already locked full node
1733 //      return unlocked.
1734
1735 BTERR bt_splitpage (BtDb *bt)
1736 {
1737 uint cnt = 0, idx = 0, max, nxt = bt->page_size;
1738 unsigned char fencekey[256], rightkey[256];
1739 uid page_no = bt->page_no, right;
1740 BtLatchSet *latch, *rlatch;
1741 BtPage page = bt->page;
1742 uint lvl = page->lvl;
1743 BtKey key;
1744
1745         latch = bt->latch;
1746
1747         //  split higher half of keys to bt->frame
1748         //      the last key (fence key) might be dead
1749
1750         memset (bt->frame, 0, bt->page_size);
1751         max = page->cnt;
1752         cnt = max / 2;
1753         idx = 0;
1754
1755         while( cnt++ < max ) {
1756                 key = keyptr(page, cnt);
1757                 nxt -= key->len + 1;
1758                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1759                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1760                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
1761                         bt->frame->act++;
1762 #ifdef USETOD
1763                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1764 #endif
1765                 slotptr(bt->frame, idx)->off = nxt;
1766         }
1767
1768         //      remember fence key for new right page
1769
1770         memcpy (rightkey, key, key->len + 1);
1771
1772         bt->frame->bits = bt->page_bits;
1773         bt->frame->min = nxt;
1774         bt->frame->cnt = idx;
1775         bt->frame->lvl = lvl;
1776
1777         // link right node
1778
1779         if( page_no > ROOT_page )
1780                 memcpy (bt->frame->right, page->right, BtId);
1781
1782         //      get new free page and write frame to it.
1783
1784         if( !(right = bt_newpage(bt, bt->frame)) )
1785                 return bt->err;
1786
1787         //      update lower keys to continue in old page
1788
1789         memcpy (bt->frame, page, bt->page_size);
1790         memset (page+1, 0, bt->page_size - sizeof(*page));
1791         nxt = bt->page_size;
1792         page->clean = 0;
1793         page->act = 0;
1794         cnt = 0;
1795         idx = 0;
1796
1797         //  assemble page of smaller keys
1798         //      (they're all active keys)
1799
1800         while( cnt++ < max / 2 ) {
1801                 key = keyptr(bt->frame, cnt);
1802                 nxt -= key->len + 1;
1803                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1804                 memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1805 #ifdef USETOD
1806                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1807 #endif
1808                 slotptr(page, idx)->off = nxt;
1809                 page->act++;
1810         }
1811
1812         // remember fence key for smaller page
1813
1814         memcpy (fencekey, key, key->len + 1);
1815
1816         bt_putid(page->right, right);
1817         page->min = nxt;
1818         page->cnt = idx;
1819
1820         // if current page is the root page, split it
1821
1822         if( page_no == ROOT_page )
1823                 return bt_splitroot (bt, fencekey, right);
1824
1825         //      lock right page
1826
1827         if( rlatch = bt_pinlatch (bt, right) )
1828                 bt_lockpage (BtLockParent, rlatch);
1829         else
1830                 return bt->err;
1831
1832         // update left (containing) node
1833
1834         bt_update(bt, page);
1835
1836         bt_lockpage (BtLockParent, latch);
1837         bt_unlockpage (BtLockWrite, latch);
1838
1839         // insert new fence for reformulated left block
1840
1841         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, page_no, time(NULL)) )
1842                 return bt->err;
1843
1844         //      switch fence for right block of larger keys to new right page
1845
1846         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, right, time(NULL)) )
1847                 return bt->err;
1848
1849         bt_unlockpage (BtLockParent, latch);
1850         bt_unlockpage (BtLockParent, rlatch);
1851
1852         bt_unpinlatch (rlatch);
1853         bt_unpinlatch (latch);
1854         return 0;
1855 }
1856
1857 //  Insert new key into the btree at requested level.
1858 //  Pages are unlocked at exit.
1859
1860 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
1861 {
1862 uint slot, idx;
1863 BtPage page;
1864 BtKey ptr;
1865
1866   while( 1 ) {
1867         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1868                 ptr = keyptr(bt->page, slot);
1869         else
1870         {
1871                 if( !bt->err )
1872                         bt->err = BTERR_ovflw;
1873                 return bt->err;
1874         }
1875
1876         // if key already exists, update id and return
1877
1878         page = bt->page;
1879
1880         if( !keycmp (ptr, key, len) ) {
1881           if( slotptr(page, slot)->dead )
1882                 page->act++;
1883           slotptr(page, slot)->dead = 0;
1884 #ifdef USETOD
1885           slotptr(page, slot)->tod = tod;
1886 #endif
1887           bt_putid(slotptr(page,slot)->id, id);
1888           bt_update(bt, bt->page);
1889           bt_unlockpage(BtLockWrite, bt->latch);
1890           bt_unpinlatch (bt->latch);
1891           return 0;
1892         }
1893
1894         // check if page has enough space
1895
1896         if( slot = bt_cleanpage (bt, len, slot) )
1897                 break;
1898
1899         if( bt_splitpage (bt) )
1900                 return bt->err;
1901   }
1902
1903   // calculate next available slot and copy key into page
1904
1905   page->min -= len + 1; // reset lowest used offset
1906   ((unsigned char *)page)[page->min] = len;
1907   memcpy ((unsigned char *)page + page->min +1, key, len );
1908
1909   for( idx = slot; idx < page->cnt; idx++ )
1910         if( slotptr(page, idx)->dead )
1911                 break;
1912
1913   // now insert key into array before slot
1914   // preserving the fence slot
1915
1916   if( idx == page->cnt )
1917         idx++, page->cnt++;
1918
1919   page->act++;
1920
1921   while( idx > slot )
1922         *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
1923
1924   bt_putid(slotptr(page,slot)->id, id);
1925   slotptr(page, slot)->off = page->min;
1926 #ifdef USETOD
1927   slotptr(page, slot)->tod = tod;
1928 #endif
1929   slotptr(page, slot)->dead = 0;
1930
1931   bt_update(bt, bt->page);
1932
1933   bt_unlockpage(BtLockWrite, bt->latch);
1934   bt_unpinlatch(bt->latch);
1935   return 0;
1936 }
1937
1938 //  cache page of keys into cursor and return starting slot for given key
1939
1940 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
1941 {
1942 uint slot;
1943
1944         // cache page for retrieval
1945
1946         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1947           memcpy (bt->cursor, bt->page, bt->page_size);
1948         else
1949           return 0;
1950
1951         bt_unlockpage(BtLockRead, bt->latch);
1952         bt->cursor_page = bt->page_no;
1953         bt_unpinlatch (bt->latch);
1954         return slot;
1955 }
1956
1957 //  return next slot for cursor page
1958 //  or slide cursor right into next page
1959
1960 uint bt_nextkey (BtDb *bt, uint slot)
1961 {
1962 BtLatchSet *latch;
1963 off64_t right;
1964
1965   do {
1966         right = bt_getid(bt->cursor->right);
1967
1968         while( slot++ < bt->cursor->cnt )
1969           if( slotptr(bt->cursor,slot)->dead )
1970                 continue;
1971           else if( right || (slot < bt->cursor->cnt))
1972                 return slot;
1973           else
1974                 break;
1975
1976         if( !right )
1977                 break;
1978
1979         bt->cursor_page = right;
1980
1981         if( latch = bt_pinlatch (bt, right) )
1982         bt_lockpage(BtLockRead, latch);
1983         else
1984                 return 0;
1985
1986         bt->page = bt_mappage (bt, latch);
1987         memcpy (bt->cursor, bt->page, bt->page_size);
1988         bt_unlockpage(BtLockRead, latch);
1989         bt_unpinlatch (latch);
1990         slot = 0;
1991   } while( 1 );
1992
1993   return bt->err = 0;
1994 }
1995
1996 BtKey bt_key(BtDb *bt, uint slot)
1997 {
1998         return keyptr(bt->cursor, slot);
1999 }
2000
2001 uid bt_uid(BtDb *bt, uint slot)
2002 {
2003         return bt_getid(slotptr(bt->cursor,slot)->id);
2004 }
2005
2006 #ifdef USETOD
2007 uint bt_tod(BtDb *bt, uint slot)
2008 {
2009         return slotptr(bt->cursor,slot)->tod;
2010 }
2011 #endif
2012
2013 #ifdef STANDALONE
2014
2015 uint bt_audit (BtDb *bt)
2016 {
2017 uint idx, hashidx;
2018 uid next, page_no;
2019 BtLatchSet *latch;
2020 uint blks[64];
2021 uint cnt = 0;
2022 BtPage page;
2023 uint amt[1];
2024 BtKey ptr;
2025
2026 #ifdef unix
2027         posix_fadvise( bt->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2028 #endif
2029         if( *(ushort *)(bt->latchmgr->lock) )
2030                 fprintf(stderr, "Alloc page locked\n");
2031         *(ushort *)(bt->latchmgr->lock) = 0;
2032
2033         memset (blks, 0, sizeof(blks));
2034
2035         for( idx = 1; idx <= bt->latchmgr->latchdeployed; idx++ ) {
2036                 latch = bt->latchsets + idx;
2037                 if( *(ushort *)latch->readwr )
2038                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2039                 *(ushort *)latch->readwr = 0;
2040
2041                 if( *(ushort *)latch->access )
2042                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2043                 *(ushort *)latch->access = 0;
2044
2045                 if( *(ushort *)latch->parent )
2046                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2047                 *(ushort *)latch->parent = 0;
2048
2049                 if( latch->pin & PIN_mask ) {
2050                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2051                         latch->pin = 0;
2052                 }
2053                 page = (BtPage)((uid)idx * bt->page_size + bt->pagepool);
2054                 blks[page->lvl]++;
2055
2056             if( page->dirty )
2057                  if( bt_writepage (bt, page, latch->page_no) )
2058                         fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
2059         }
2060
2061         for( idx = 0; blks[idx]; idx++ )
2062                 fprintf(stderr, "cache: %d lvl %d blocks\n", blks[idx], idx);
2063
2064         for( hashidx = 0; hashidx < bt->latchmgr->latchhash; hashidx++ ) {
2065           if( *(ushort *)(bt->table[hashidx].latch) )
2066                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2067
2068           *(ushort *)(bt->table[hashidx].latch) = 0;
2069         }
2070
2071         memset (blks, 0, sizeof(blks));
2072
2073         next = bt->latchmgr->nlatchpage + LATCH_page;
2074         page_no = LEAF_page;
2075
2076         while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
2077                 if( bt_readpage (bt, bt->frame, page_no) )
2078                   fprintf(stderr, "page %.8x unreadable\n", page_no);
2079                 if( !bt->frame->free ) {
2080                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2081                   ptr = keyptr(bt->frame, idx+1);
2082                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2083                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2084                  }
2085                  if( !bt->frame->lvl )
2086                         cnt += bt->frame->act;
2087                  blks[bt->frame->lvl]++;
2088                 }
2089
2090                 if( page_no > LEAF_page )
2091                         next = page_no + 1;
2092                 page_no = next;
2093         }
2094
2095         for( idx = 0; blks[idx]; idx++ )
2096                 fprintf(stderr, "btree: %d lvl %d blocks\n", blks[idx], idx);
2097
2098         return cnt - 1;
2099 }
2100
2101 #ifndef unix
2102 double getCpuTime(int type)
2103 {
2104 FILETIME crtime[1];
2105 FILETIME xittime[1];
2106 FILETIME systime[1];
2107 FILETIME usrtime[1];
2108 SYSTEMTIME timeconv[1];
2109 double ans = 0;
2110
2111         memset (timeconv, 0, sizeof(SYSTEMTIME));
2112
2113         switch( type ) {
2114         case 0:
2115                 GetSystemTimeAsFileTime (xittime);
2116                 FileTimeToSystemTime (xittime, timeconv);
2117                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2118                 break;
2119         case 1:
2120                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2121                 FileTimeToSystemTime (usrtime, timeconv);
2122                 break;
2123         case 2:
2124                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2125                 FileTimeToSystemTime (systime, timeconv);
2126                 break;
2127         }
2128
2129         ans += (double)timeconv->wHour * 3600;
2130         ans += (double)timeconv->wMinute * 60;
2131         ans += (double)timeconv->wSecond;
2132         ans += (double)timeconv->wMilliseconds / 1000;
2133         return ans;
2134 }
2135 #else
2136 #include <time.h>
2137 #include <sys/resource.h>
2138
2139 double getCpuTime(int type)
2140 {
2141 struct rusage used[1];
2142 struct timeval tv[1];
2143
2144         switch( type ) {
2145         case 0:
2146                 gettimeofday(tv, NULL);
2147                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2148
2149         case 1:
2150                 getrusage(RUSAGE_SELF, used);
2151                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2152
2153         case 2:
2154                 getrusage(RUSAGE_SELF, used);
2155                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2156         }
2157
2158         return 0;
2159 }
2160 #endif
2161
2162 //  standalone program to index file of keys
2163 //  then list them onto std-out
2164
2165 int main (int argc, char **argv)
2166 {
2167 uint slot, line = 0, off = 0, found = 0;
2168 int ch, cnt = 0, bits = 12, idx;
2169 unsigned char key[256];
2170 double done, start;
2171 uid next, page_no;
2172 BtLatchSet *latch;
2173 float elapsed;
2174 time_t tod[1];
2175 uint scan = 0;
2176 uint len = 0;
2177 uint map = 0;
2178 BtPage page;
2179 BtKey ptr;
2180 BtDb *bt;
2181 FILE *in;
2182
2183 #ifdef WIN32
2184         _setmode (1, _O_BINARY);
2185 #endif
2186         if( argc < 4 ) {
2187                 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find/Count [page_bits mapped_pool_pages start_line_number]\n", argv[0]);
2188                 fprintf (stderr, "  page_bits: size of btree page in bits\n");
2189                 fprintf (stderr, "  mapped_pool_pages: number of pages in buffer pool\n");
2190                 exit(0);
2191         }
2192
2193         start = getCpuTime(0);
2194         time(tod);
2195
2196         if( argc > 4 )
2197                 bits = atoi(argv[4]);
2198
2199         if( argc > 5 )
2200                 map = atoi(argv[5]);
2201
2202         if( argc > 6 )
2203                 off = atoi(argv[6]);
2204
2205         bt = bt_open ((argv[1]), BT_rw, bits, map);
2206
2207         if( !bt ) {
2208                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2209                 exit (1);
2210         }
2211
2212         switch(argv[3][0]| 0x20)
2213         {
2214         case 'p':       // display page
2215                 if( latch = bt_pinlatch (bt, off) )
2216                         page = bt_mappage (bt, latch);
2217                 else
2218                         fprintf(stderr, "unable to read page %.8x\n", off);
2219
2220                 write (1, page, bt->page_size);
2221                 break;
2222
2223         case 'a':       // buffer pool audit
2224                 fprintf(stderr, "started audit for %s\n", argv[1]);
2225                 cnt = bt_audit (bt);
2226                 fprintf(stderr, "finished audit for %s, %d keys\n", argv[1], cnt);
2227                 break;
2228
2229         case 'w':       // write keys
2230                 fprintf(stderr, "started indexing for %s\n", argv[2]);
2231                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2232 #ifdef unix
2233                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2234 #endif
2235                   while( ch = getc(in), ch != EOF )
2236                         if( ch == '\n' )
2237                         {
2238                           if( off )
2239                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2240
2241                           if( bt_insertkey (bt, key, len, 0, ++line, *tod) )
2242                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2243                           len = 0;
2244                         }
2245                         else if( len < 245 )
2246                                 key[len++] = ch;
2247                   }
2248                 fprintf(stderr, "finished adding keys for %s, %d \n", argv[2], line);
2249                 break;
2250
2251         case 'd':       // delete keys
2252                 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2253                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2254 #ifdef unix
2255                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2256 #endif
2257                   while( ch = getc(in), ch != EOF )
2258                         if( ch == '\n' )
2259                         {
2260                           if( off )
2261                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2262                           line++;
2263                           if( bt_deletekey (bt, key, len, 0) )
2264                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2265                           len = 0;
2266                         }
2267                         else if( len < 245 )
2268                                 key[len++] = ch;
2269                   }
2270                 fprintf(stderr, "finished deleting keys for %s, %d \n", argv[2], line);
2271                 break;
2272
2273         case 'f':       // find keys
2274                 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2275                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2276 #ifdef unix
2277                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2278 #endif
2279                   while( ch = getc(in), ch != EOF )
2280                         if( ch == '\n' )
2281                         {
2282                           if( off )
2283                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2284                           line++;
2285                           if( bt_findkey (bt, key, len) )
2286                                 found++;
2287                           else if( bt->err )
2288                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2289                           len = 0;
2290                         }
2291                         else if( len < 245 )
2292                                 key[len++] = ch;
2293                   }
2294                 fprintf(stderr, "finished search of %d keys for %s, found %d\n", line, argv[2], found);
2295                 break;
2296
2297         case 's':       // scan and print keys
2298                 fprintf(stderr, "started scaning\n");
2299                 cnt = len = key[0] = 0;
2300
2301                 if( slot = bt_startkey (bt, key, len) )
2302                   slot--;
2303                 else
2304                   fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2305
2306                 while( slot = bt_nextkey (bt, slot) ) {
2307                         ptr = bt_key(bt, slot);
2308                         fwrite (ptr->key, ptr->len, 1, stdout);
2309                         fputc ('\n', stdout);
2310                         cnt++;
2311                 }
2312
2313                 fprintf(stderr, " Total keys read %d\n", cnt - 1);
2314                 break;
2315
2316         case 'c':       // count keys
2317           fprintf(stderr, "started counting\n");
2318           cnt = 0;
2319
2320           next = bt->latchmgr->nlatchpage + LATCH_page;
2321           page_no = LEAF_page;
2322
2323           while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
2324                 if( latch = bt_pinlatch (bt, page_no) )
2325                         page = bt_mappage (bt, latch);
2326                 if( !page->free && !page->lvl )
2327                         cnt += page->act;
2328                 if( page_no > LEAF_page )
2329                         next = page_no + 1;
2330                 if( scan )
2331                  for( idx = 0; idx++ < page->cnt; ) {
2332                   if( slotptr(page, idx)->dead )
2333                         continue;
2334                   ptr = keyptr(page, idx);
2335                   if( idx != page->cnt && bt_getid (page->right) ) {
2336                         fwrite (ptr->key, ptr->len, 1, stdout);
2337                         fputc ('\n', stdout);
2338                   }
2339                  }
2340                 bt_unpinlatch (latch);
2341                 page_no = next;
2342           }
2343                 
2344           cnt--;        // remove stopper key
2345           fprintf(stderr, " Total keys read %d\n", cnt);
2346           break;
2347         }
2348
2349         done = getCpuTime(0);
2350         elapsed = (float)(done - start);
2351         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2352         elapsed = getCpuTime(1);
2353         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2354         elapsed = getCpuTime(2);
2355         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2356         return 0;
2357 }
2358
2359 #endif  //STANDALONE