]> pd.if.org Git - btree/blob - btree2u.c
Explain new Atomic/Consistent transaction for inserting and deleting keys.
[btree] / btree2u.c
1 // btree version 2u  sched_yield locks
2 //      with combined latch & pool manager
3 //      and phase-fair reader writer lock
4 // 12 MAR 2014
5
6 // author: karl malbrain, malbrain@cal.berkeley.edu
7
8 /*
9 This work, including the source code, documentation
10 and related data, is placed into the public domain.
11
12 The orginal author is Karl Malbrain.
13
14 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
15 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
16 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
17 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
18 RESULTING FROM THE USE, MODIFICATION, OR
19 REDISTRIBUTION OF THIS SOFTWARE.
20 */
21
22 // Please see the project home page for documentation
23 // code.google.com/p/high-concurrency-btree
24
25 #define _FILE_OFFSET_BITS 64
26 #define _LARGEFILE64_SOURCE
27
28 #ifdef linux
29 #define _GNU_SOURCE
30 #endif
31
32 #ifdef unix
33 #include <unistd.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <time.h>
37 #include <fcntl.h>
38 #include <sys/mman.h>
39 #include <errno.h>
40 #else
41 #define WIN32_LEAN_AND_MEAN
42 #include <windows.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <time.h>
46 #include <fcntl.h>
47 #include <io.h>
48 #endif
49
50 #include <memory.h>
51 #include <string.h>
52
53 typedef unsigned long long      uid;
54
55 #ifndef unix
56 typedef unsigned long long      off64_t;
57 typedef unsigned short          ushort;
58 typedef unsigned int            uint;
59 #endif
60
61 #define BT_ro 0x6f72    // ro
62 #define BT_rw 0x7772    // rw
63 #define BT_fl 0x6c66    // fl
64
65 #define BT_maxbits              15                                      // maximum page size in bits
66 #define BT_minbits              12                                      // minimum page size in bits
67 #define BT_minpage              (1 << BT_minbits)       // minimum page size
68 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
69
70 //  BTree page number constants
71 #define ALLOC_page              0
72 #define ROOT_page               1
73 #define LEAF_page               2
74 #define LATCH_page              3
75
76 //      Number of levels to create in a new BTree
77
78 #define MIN_lvl                 2
79 #define MAX_lvl                 15
80
81 /*
82 There are five lock types for each node in three independent sets: 
83 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
84 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
85 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
86 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
87 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
88 */
89
90 typedef enum{
91         BtLockAccess,
92         BtLockDelete,
93         BtLockRead,
94         BtLockWrite,
95         BtLockParent
96 }BtLock;
97
98 //      definition for latch implementation
99
100 volatile typedef struct {
101         ushort lock[1];
102 } BtSpinLatch;
103
104 #define XCL 1
105 #define PEND 2
106 #define BOTH 3
107 #define SHARE 4
108
109 volatile typedef struct {
110         ushort rin[1];          // readers in count
111         ushort rout[1];         // readers out count
112         ushort serving[1];      // writers out count
113         ushort ticket[1];       // writers in count
114 } RWLock;
115
116 //      define bits at bottom of rin
117
118 #define PHID 0x1        // writer phase (0/1)
119 #define PRES 0x2        // writer present
120 #define MASK 0x3        // both write bits
121 #define RINC 0x4        // reader increment
122
123 //      Define the length of the page and key pointers
124
125 #define BtId 6
126
127 //      Page key slot definition.
128
129 //      If BT_maxbits is 15 or less, you can save 2 bytes
130 //      for each key stored by making the first two uints
131 //      into ushorts.  You can also save 4 bytes by removing
132 //      the tod field from the key.
133
134 //      Keys are marked dead, but remain on the page until
135 //      cleanup is called. The fence key (highest key) for
136 //      the page is always present, even if dead.
137
138 typedef struct {
139 #ifdef USETOD
140         uint tod;                                       // time-stamp for key
141 #endif
142         ushort off:BT_maxbits;          // page offset for key start
143         ushort dead:1;                          // set for deleted key
144         unsigned char id[BtId];         // id associated with key
145 } BtSlot;
146
147 //      The key structure occupies space at the upper end of
148 //      each page.  It's a length byte followed by the value
149 //      bytes.
150
151 typedef struct {
152         unsigned char len;
153         unsigned char key[0];
154 } *BtKey;
155
156 //      The first part of an index page.
157 //      It is immediately followed
158 //      by the BtSlot array of keys.
159
160 typedef struct BtPage_ {
161         uint cnt;                                       // count of keys in page
162         uint act;                                       // count of active keys
163         uint min;                                       // next key offset
164         unsigned char bits:6;           // page size in bits
165         unsigned char free:1;           // page is on free list
166         unsigned char dirty:1;          // page is dirty in cache
167         unsigned char lvl:6;            // level of page
168         unsigned char kill:1;           // page is being deleted
169         unsigned char clean:1;          // page needs cleaning
170         unsigned char right[BtId];      // page number to right
171 } *BtPage;
172
173 typedef struct {
174         struct BtPage_ alloc[2];        // next & free page_nos in right ptr
175         BtSpinLatch lock[1];            // allocation area lite latch
176         volatile uint latchdeployed;// highest number of latch entries deployed
177         volatile uint nlatchpage;       // number of latch pages at BT_latch
178         volatile uint latchtotal;       // number of page latch entries
179         volatile uint latchhash;        // number of latch hash table slots
180         volatile uint latchvictim;      // next latch hash entry to examine
181         volatile uint safelevel;        // safe page level in cache
182         volatile uint cache[MAX_lvl];// cache census counts by btree level
183 } BtLatchMgr;
184
185 //  latch hash table entries
186
187 typedef struct {
188         volatile uint slot;             // Latch table entry at head of collision chain
189         BtSpinLatch latch[1];   // lock for the collision chain
190 } BtHashEntry;
191
192 //      latch manager table structure
193
194 typedef struct {
195         volatile uid page_no;   // latch set page number on disk
196         RWLock readwr[1];               // read/write page lock
197         RWLock access[1];               // Access Intent/Page delete
198         RWLock parent[1];               // Posting of fence key in parent
199         volatile ushort pin;    // number of pins/level/clock bits
200         volatile uint next;             // next entry in hash table chain
201         volatile uint prev;             // prev entry in hash table chain
202 } BtLatchSet;
203
204 #define CLOCK_mask 0xe000
205 #define CLOCK_unit 0x2000
206 #define PIN_mask 0x07ff
207 #define LVL_mask 0x1800
208 #define LVL_shift 11
209
210 //      The object structure for Btree access
211
212 typedef struct _BtDb {
213         uint page_size;         // each page size       
214         uint page_bits;         // each page size in bits       
215         uid page_no;            // current page number  
216         uid cursor_page;        // current cursor page number   
217         int  err;
218         uint mode;                      // read-write mode
219         BtPage cursor;          // cached frame for start/next (never mapped)
220         BtPage frame;           // spare frame for the page split (never mapped)
221         BtPage page;            // current mapped page in buffer pool
222         BtLatchSet *latch;                      // current page latch
223         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
224         BtLatchSet *latchsets;          // mapped latch set from latch pages
225         unsigned char *pagepool;        // cached page pool set
226         BtHashEntry *table;     // the hash table
227 #ifdef unix
228         int idx;
229 #else
230         HANDLE idx;
231         HANDLE halloc;          // allocation and latch table handle
232 #endif
233         unsigned char *mem;     // frame, cursor, memory buffers
234         uint found;                     // last deletekey found key
235 } BtDb;
236
237 typedef enum {
238 BTERR_ok = 0,
239 BTERR_notfound,
240 BTERR_struct,
241 BTERR_ovflw,
242 BTERR_read,
243 BTERR_lock,
244 BTERR_hash,
245 BTERR_kill,
246 BTERR_map,
247 BTERR_wrt,
248 BTERR_eof
249 } BTERR;
250
251 // B-Tree functions
252 extern void bt_close (BtDb *bt);
253 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk);
254 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
255 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
256 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
257 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
258 extern uint bt_nextkey   (BtDb *bt, uint slot);
259
260 //      internal functions
261 void bt_update (BtDb *bt, BtPage page);
262 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch);
263 //  Helper functions to return slot values
264
265 extern BtKey bt_key (BtDb *bt, uint slot);
266 extern uid bt_uid (BtDb *bt, uint slot);
267 #ifdef USETOD
268 extern uint bt_tod (BtDb *bt, uint slot);
269 #endif
270
271 //  The page is allocated from low and hi ends.
272 //  The key offsets and row-id's are allocated
273 //  from the bottom, while the text of the key
274 //  is allocated from the top.  When the two
275 //  areas meet, the page is split into two.
276
277 //  A key consists of a length byte, two bytes of
278 //  index number (0 - 65534), and up to 253 bytes
279 //  of key value.  Duplicate keys are discarded.
280 //  Associated with each key is a 48 bit row-id.
281
282 //  The b-tree root is always located at page 1.
283 //      The first leaf page of level zero is always
284 //      located on page 2.
285
286 //      The b-tree pages are linked with right
287 //      pointers to facilitate enumerators,
288 //      and provide for concurrency.
289
290 //      When to root page fills, it is split in two and
291 //      the tree height is raised by a new root at page
292 //      one with two keys.
293
294 //      Deleted keys are marked with a dead bit until
295 //      page cleanup The fence key for a node is always
296 //      present, even after deletion and cleanup.
297
298 //  Deleted leaf pages are reclaimed  on a free list.
299 //      The upper levels of the btree are fixed on creation.
300
301 //  To achieve maximum concurrency one page is locked at a time
302 //  as the tree is traversed to find leaf key in question. The right
303 //  page numbers are used in cases where the page is being split,
304 //      or consolidated.
305
306 //  Page 0 (ALLOC page) is dedicated to lock for new page extensions,
307 //      and chains empty leaf pages together for reuse.
308
309 //      Parent locks are obtained to prevent resplitting or deleting a node
310 //      before its fence is posted into its upper level.
311
312 //      A special open mode of BT_fl is provided to safely access files on
313 //      WIN32 networks. WIN32 network operations should not use memory mapping.
314 //      This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
315 //      to prevent local caching of network file contents.
316
317 //      Access macros to address slot and key values from the page.
318 //      Page slots use 1 based indexing.
319
320 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
321 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
322
323 void bt_putid(unsigned char *dest, uid id)
324 {
325 int i = BtId;
326
327         while( i-- )
328                 dest[i] = (unsigned char)id, id >>= 8;
329 }
330
331 uid bt_getid(unsigned char *src)
332 {
333 uid id = 0;
334 int i;
335
336         for( i = 0; i < BtId; i++ )
337                 id <<= 8, id |= *src++; 
338
339         return id;
340 }
341
342 BTERR bt_abort (BtDb *bt, BtPage page, uid page_no, BTERR err)
343 {
344 BtKey ptr;
345
346         fprintf(stderr, "\n Btree2 abort, error %d on page %.8x\n", err, page_no);
347         fprintf(stderr, "level=%d kill=%d free=%d cnt=%x act=%x\n", page->lvl, page->kill, page->free, page->cnt, page->act);
348         ptr = keyptr(page, page->cnt);
349         fprintf(stderr, "fence='%.*s'\n", ptr->len, ptr->key);
350         fprintf(stderr, "right=%.8x\n", bt_getid(page->right));
351         return bt->err = err;
352 }
353
354 //      Phase-Fair reader/writer lock implementation
355
356 void WriteLock (RWLock *lock)
357 {
358 ushort w, r, tix;
359
360 #ifdef unix
361         tix = __sync_fetch_and_add (lock->ticket, 1);
362 #else
363         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
364 #endif
365         // wait for our ticket to come up
366
367         while( tix != lock->serving[0] )
368 #ifdef unix
369                 sched_yield();
370 #else
371                 SwitchToThread ();
372 #endif
373
374         w = PRES | (tix & PHID);
375 #ifdef  unix
376         r = __sync_fetch_and_add (lock->rin, w);
377 #else
378         r = _InterlockedExchangeAdd16 (lock->rin, w);
379 #endif
380         while( r != *lock->rout )
381 #ifdef unix
382                 sched_yield();
383 #else
384                 SwitchToThread();
385 #endif
386 }
387
388 void WriteRelease (RWLock *lock)
389 {
390 #ifdef unix
391         __sync_fetch_and_and (lock->rin, ~MASK);
392 #else
393         _InterlockedAnd16 (lock->rin, ~MASK);
394 #endif
395         lock->serving[0]++;
396 }
397
398 void ReadLock (RWLock *lock)
399 {
400 ushort w;
401 #ifdef unix
402         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
403 #else
404         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
405 #endif
406         if( w )
407           while( w == (*lock->rin & MASK) )
408 #ifdef unix
409                 sched_yield ();
410 #else
411                 SwitchToThread ();
412 #endif
413 }
414
415 void ReadRelease (RWLock *lock)
416 {
417 #ifdef unix
418         __sync_fetch_and_add (lock->rout, RINC);
419 #else
420         _InterlockedExchangeAdd16 (lock->rout, RINC);
421 #endif
422 }
423
424 //      Spin Latch Manager
425
426 //      wait until write lock mode is clear
427 //      and add 1 to the share count
428
429 void bt_spinreadlock(BtSpinLatch *latch)
430 {
431 ushort prev;
432
433   do {
434 #ifdef unix
435         prev = __sync_fetch_and_add (latch->lock, SHARE);
436 #else
437         prev = _InterlockedExchangeAdd16(latch->lock, SHARE);
438 #endif
439         //  see if exclusive request is granted or pending
440
441         if( !(prev & BOTH) )
442                 return;
443 #ifdef unix
444         prev = __sync_fetch_and_add (latch->lock, -SHARE);
445 #else
446         prev = _InterlockedExchangeAdd16(latch->lock, -SHARE);
447 #endif
448 #ifdef  unix
449   } while( sched_yield(), 1 );
450 #else
451   } while( SwitchToThread(), 1 );
452 #endif
453 }
454
455 //      wait for other read and write latches to relinquish
456
457 void bt_spinwritelock(BtSpinLatch *latch)
458 {
459 ushort prev;
460
461   do {
462 #ifdef  unix
463         prev = __sync_fetch_and_or(latch->lock, PEND | XCL);
464 #else
465         prev = _InterlockedOr16(latch->lock, PEND | XCL);
466 #endif
467         if( !(prev & XCL) )
468           if( !(prev & ~BOTH) )
469                 return;
470           else
471 #ifdef unix
472                 __sync_fetch_and_and (latch->lock, ~XCL);
473 #else
474                 _InterlockedAnd16(latch->lock, ~XCL);
475 #endif
476 #ifdef  unix
477   } while( sched_yield(), 1 );
478 #else
479   } while( SwitchToThread(), 1 );
480 #endif
481 }
482
483 //      try to obtain write lock
484
485 //      return 1 if obtained,
486 //              0 otherwise
487
488 int bt_spinwritetry(BtSpinLatch *latch)
489 {
490 ushort prev;
491
492 #ifdef  unix
493         prev = __sync_fetch_and_or(latch->lock, XCL);
494 #else
495         prev = _InterlockedOr16(latch->lock, XCL);
496 #endif
497         //      take write access if all bits are clear
498
499         if( !(prev & XCL) )
500           if( !(prev & ~BOTH) )
501                 return 1;
502           else
503 #ifdef unix
504                 __sync_fetch_and_and (latch->lock, ~XCL);
505 #else
506                 _InterlockedAnd16(latch->lock, ~XCL);
507 #endif
508         return 0;
509 }
510
511 //      clear write mode
512
513 void bt_spinreleasewrite(BtSpinLatch *latch)
514 {
515 #ifdef unix
516         __sync_fetch_and_and(latch->lock, ~BOTH);
517 #else
518         _InterlockedAnd16(latch->lock, ~BOTH);
519 #endif
520 }
521
522 //      decrement reader count
523
524 void bt_spinreleaseread(BtSpinLatch *latch)
525 {
526 #ifdef unix
527         __sync_fetch_and_add(latch->lock, -SHARE);
528 #else
529         _InterlockedExchangeAdd16(latch->lock, -SHARE);
530 #endif
531 }
532
533 //      read page from permanent location in Btree file
534
535 BTERR bt_readpage (BtDb *bt, BtPage page, uid page_no)
536 {
537 off64_t off = page_no << bt->page_bits;
538
539 #ifdef unix
540         if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) < bt->page_size ) {
541                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
542                 return bt->err = BTERR_read;
543         }
544 #else
545 OVERLAPPED ovl[1];
546 uint amt[1];
547
548         memset (ovl, 0, sizeof(OVERLAPPED));
549         ovl->Offset = off;
550         ovl->OffsetHigh = off >> 32;
551
552         if( !ReadFile(bt->idx, page, bt->page_size, amt, ovl)) {
553                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
554                 return bt->err = BTERR_read;
555         }
556         if( *amt <  bt->page_size ) {
557                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
558                 return bt->err = BTERR_read;
559         }
560 #endif
561         return 0;
562 }
563
564 //      write page to permanent location in Btree file
565 //      clear the dirty bit
566
567 BTERR bt_writepage (BtDb *bt, BtPage page, uid page_no)
568 {
569 off64_t off = page_no << bt->page_bits;
570
571 #ifdef unix
572         page->dirty = 0;
573
574         if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
575                 return bt->err = BTERR_wrt;
576 #else
577 OVERLAPPED ovl[1];
578 uint amt[1];
579
580         memset (ovl, 0, sizeof(OVERLAPPED));
581         ovl->Offset = off;
582         ovl->OffsetHigh = off >> 32;
583         page->dirty = 0;
584
585         if( !WriteFile(bt->idx, page, bt->page_size, amt, ovl) )
586                 return bt->err = BTERR_wrt;
587
588         if( *amt <  bt->page_size )
589                 return bt->err = BTERR_wrt;
590 #endif
591         return 0;
592 }
593
594 //      link latch table entry into head of latch hash table
595
596 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no)
597 {
598 BtPage page = (BtPage)((uid)slot * bt->page_size + bt->pagepool);
599 BtLatchSet *latch = bt->latchsets + slot;
600 int lvl;
601
602         if( latch->next = bt->table[hashidx].slot )
603                 bt->latchsets[latch->next].prev = slot;
604
605         bt->table[hashidx].slot = slot;
606         latch->page_no = page_no;
607         latch->prev = 0;
608         latch->pin = 1;
609
610         if( bt_readpage (bt, page, page_no) )
611                 return bt->err;
612
613         lvl = page->lvl << LVL_shift;
614         if( lvl > LVL_mask )
615                 lvl = LVL_mask;
616         latch->pin |= lvl;              // store lvl
617         latch->pin |= lvl << 3; // initialize clock
618
619 #ifdef unix
620         __sync_fetch_and_add (&bt->latchmgr->cache[page->lvl], 1);
621 #else
622         _InterlockedExchangeAdd(&bt->latchmgr->cache[page->lvl], 1);
623 #endif
624         return bt->err = 0;
625 }
626
627 //      release latch pin
628
629 void bt_unpinlatch (BtLatchSet *latch)
630 {
631 #ifdef unix
632         __sync_fetch_and_add(&latch->pin, -1);
633 #else
634         _InterlockedDecrement16 (&latch->pin);
635 #endif
636 }
637
638 //      find existing latchset or inspire new one
639 //      return with latchset pinned
640
641 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
642 {
643 uint hashidx = page_no % bt->latchmgr->latchhash;
644 BtLatchSet *latch;
645 uint slot, idx;
646 uint lvl, cnt;
647 off64_t off;
648 uint amt[1];
649 BtPage page;
650
651   //  try to find our entry
652
653   bt_spinwritelock(bt->table[hashidx].latch);
654
655   if( slot = bt->table[hashidx].slot ) do
656   {
657         latch = bt->latchsets + slot;
658         if( page_no == latch->page_no )
659                 break;
660   } while( slot = latch->next );
661
662   //  found our entry
663   //  increment clock
664
665   if( slot ) {
666         latch = bt->latchsets + slot;
667         lvl = (latch->pin & LVL_mask) >> LVL_shift;
668         lvl *= CLOCK_unit * 2;
669         lvl |= CLOCK_unit;
670 #ifdef unix
671         __sync_fetch_and_add(&latch->pin, 1);
672         __sync_fetch_and_or(&latch->pin, lvl);
673 #else
674         _InterlockedIncrement16 (&latch->pin);
675         _InterlockedOr16 (&latch->pin, lvl);
676 #endif
677         bt_spinreleasewrite(bt->table[hashidx].latch);
678         return latch;
679   }
680
681         //  see if there are any unused pool entries
682 #ifdef unix
683         slot = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1;
684 #else
685         slot = _InterlockedIncrement (&bt->latchmgr->latchdeployed);
686 #endif
687
688         if( slot < bt->latchmgr->latchtotal ) {
689                 latch = bt->latchsets + slot;
690                 if( bt_latchlink (bt, hashidx, slot, page_no) )
691                         return NULL;
692                 bt_spinreleasewrite (bt->table[hashidx].latch);
693                 return latch;
694         }
695
696 #ifdef unix
697         __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1);
698 #else
699         _InterlockedDecrement (&bt->latchmgr->latchdeployed);
700 #endif
701   //  find and reuse previous entry on victim
702
703   while( 1 ) {
704 #ifdef unix
705         slot = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
706 #else
707         slot = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1;
708 #endif
709         // try to get write lock on hash chain
710         //      skip entry if not obtained
711         //      or has outstanding pins
712
713         slot %= bt->latchmgr->latchtotal;
714
715         //      on slot wraparound, check census
716         //      count and increment safe level
717
718         cnt = bt->latchmgr->cache[bt->latchmgr->safelevel];
719
720         if( !slot ) {
721           if( cnt < bt->latchmgr->latchtotal / 10 )
722 #ifdef unix
723                 __sync_fetch_and_add(&bt->latchmgr->safelevel, 1);
724 #else
725                 _InterlockedIncrement (&bt->latchmgr->safelevel);
726 #endif
727           continue;
728         }
729
730         latch = bt->latchsets + slot;
731         idx = latch->page_no % bt->latchmgr->latchhash;
732         lvl = (latch->pin & LVL_mask) >> LVL_shift;
733
734         //      see if we are evicting this level yet
735         //      or if we are on same chain as hashidx
736
737         if( idx == hashidx || lvl > bt->latchmgr->safelevel )
738                 continue;
739
740         if( !bt_spinwritetry (bt->table[idx].latch) )
741                 continue;
742
743         if( latch->pin & ~LVL_mask ) {
744           if( latch->pin & CLOCK_mask )
745 #ifdef unix
746                 __sync_fetch_and_add(&latch->pin, -CLOCK_unit);
747 #else
748                 _InterlockedExchangeAdd16 (&latch->pin, -CLOCK_unit);
749 #endif
750           bt_spinreleasewrite (bt->table[idx].latch);
751           continue;
752         }
753
754         //  update permanent page area in btree
755
756         page = (BtPage)((uid)slot * bt->page_size + bt->pagepool);
757 #ifdef unix
758         posix_fadvise (bt->idx, page_no << bt->page_bits, bt->page_size, POSIX_FADV_WILLNEED);
759         __sync_fetch_and_add (&bt->latchmgr->cache[page->lvl], -1);
760 #else
761         _InterlockedExchangeAdd(&bt->latchmgr->cache[page->lvl], -1);
762 #endif
763         if( page->dirty )
764           if( bt_writepage (bt, page, latch->page_no) )
765                 return NULL;
766
767         //  unlink our available slot from its hash chain
768
769         if( latch->prev )
770                 bt->latchsets[latch->prev].next = latch->next;
771         else
772                 bt->table[idx].slot = latch->next;
773
774         if( latch->next )
775                 bt->latchsets[latch->next].prev = latch->prev;
776
777         bt_spinreleasewrite (bt->table[idx].latch);
778
779         if( bt_latchlink (bt, hashidx, slot, page_no) )
780                 return NULL;
781
782         bt_spinreleasewrite (bt->table[hashidx].latch);
783         return latch;
784   }
785 }
786
787 //      close and release memory
788
789 void bt_close (BtDb *bt)
790 {
791 #ifdef unix
792         munmap (bt->table, bt->latchmgr->nlatchpage * bt->page_size);
793         munmap (bt->latchmgr, bt->page_size);
794 #else
795         FlushViewOfFile(bt->latchmgr, 0);
796         UnmapViewOfFile(bt->latchmgr);
797         CloseHandle(bt->halloc);
798 #endif
799 #ifdef unix
800         if( bt->mem )
801                 free (bt->mem);
802         close (bt->idx);
803         free (bt);
804 #else
805         if( bt->mem)
806                 VirtualFree (bt->mem, 0, MEM_RELEASE);
807         FlushFileBuffers(bt->idx);
808         CloseHandle(bt->idx);
809         GlobalFree (bt);
810 #endif
811 }
812 //  open/create new btree
813
814 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
815 //              size of mapped page pool (e.g. 8192)
816
817 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax)
818 {
819 uint lvl, attr, last, slot, idx;
820 uint nlatchpage, latchhash;
821 BtLatchMgr *latchmgr;
822 off64_t size, off;
823 uint amt[1];
824 BtKey key;
825 BtDb* bt;
826 int flag;
827
828 #ifndef unix
829 OVERLAPPED ovl[1];
830 #else
831 struct flock lock[1];
832 #endif
833
834         // determine sanity of page size and buffer pool
835
836         if( bits > BT_maxbits )
837                 bits = BT_maxbits;
838         else if( bits < BT_minbits )
839                 bits = BT_minbits;
840
841         if( mode == BT_ro ) {
842                 fprintf(stderr, "ReadOnly mode not supported: %s\n", name);
843                 return NULL;
844         }
845 #ifdef unix
846         bt = calloc (1, sizeof(BtDb));
847
848         bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
849         posix_fadvise( bt->idx, 0, 0, POSIX_FADV_RANDOM);
850  
851         if( bt->idx == -1 ) {
852                 fprintf(stderr, "unable to open %s\n", name);
853                 return free(bt), NULL;
854         }
855 #else
856         bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb));
857         attr = FILE_ATTRIBUTE_NORMAL;
858         bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
859
860         if( bt->idx == INVALID_HANDLE_VALUE ) {
861                 fprintf(stderr, "unable to open %s\n", name);
862                 return GlobalFree(bt), NULL;
863         }
864 #endif
865 #ifdef unix
866         memset (lock, 0, sizeof(lock));
867         lock->l_len = sizeof(struct BtPage_);
868         lock->l_type = F_WRLCK;
869
870         if( fcntl (bt->idx, F_SETLKW, lock) < 0 ) {
871                 fprintf(stderr, "unable to lock record zero %s\n", name);
872                 return bt_close (bt), NULL;
873         }
874 #else
875         memset (ovl, 0, sizeof(ovl));
876
877         //      use large offsets to
878         //      simulate advisory locking
879
880         ovl->OffsetHigh |= 0x80000000;
881
882         if( !LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, sizeof(struct BtPage_), 0L, ovl) ) {
883                 fprintf(stderr, "unable to lock record zero %s, GetLastError = %d\n", name, GetLastError());
884                 return bt_close (bt), NULL;
885         }
886 #endif 
887
888 #ifdef unix
889         latchmgr = valloc (BT_maxpage);
890         *amt = 0;
891
892         // read minimum page size to get root info
893
894         if( size = lseek (bt->idx, 0L, 2) ) {
895                 if( pread(bt->idx, latchmgr, BT_minpage, 0) == BT_minpage )
896                         bits = latchmgr->alloc->bits;
897                 else {
898                         fprintf(stderr, "Unable to read page zero\n");
899                         return free(bt), free(latchmgr), NULL;
900                 }
901         }
902 #else
903         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
904         size = GetFileSize(bt->idx, amt);
905
906         if( size || *amt ) {
907                 if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) ) {
908                         fprintf(stderr, "Unable to read page zero\n");
909                         return bt_close (bt), NULL;
910                 } else
911                         bits = latchmgr->alloc->bits;
912         }
913 #endif
914
915         bt->page_size = 1 << bits;
916         bt->page_bits = bits;
917
918         bt->mode = mode;
919
920         if( size || *amt ) {
921                 nlatchpage = latchmgr->nlatchpage;
922                 goto btlatch;
923         }
924
925         if( nodemax < 16 ) {
926                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
927                 return bt_close(bt), NULL;
928         }
929
930         // initialize an empty b-tree with latch page, root page, page of leaves
931         // and page(s) of latches and page pool cache
932
933         memset (latchmgr, 0, 1 << bits);
934         latchmgr->alloc->bits = bt->page_bits;
935
936         //  calculate number of latch hash table entries
937
938         nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + bt->page_size - 1) / bt->page_size;
939         latchhash = nlatchpage * bt->page_size / sizeof(BtHashEntry);
940
941         nlatchpage += nodemax;          // size of the buffer pool in pages
942         nlatchpage += (sizeof(BtLatchSet) * nodemax + bt->page_size - 1)/bt->page_size;
943
944         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
945         latchmgr->nlatchpage = nlatchpage;
946         latchmgr->latchtotal = nodemax;
947         latchmgr->latchhash = latchhash;
948
949         if( bt_writepage (bt, latchmgr->alloc, 0) ) {
950                 fprintf (stderr, "Unable to create btree page zero\n");
951                 return bt_close (bt), NULL;
952         }
953
954         memset (latchmgr, 0, 1 << bits);
955         latchmgr->alloc->bits = bt->page_bits;
956
957         for( lvl=MIN_lvl; lvl--; ) {
958                 last = MIN_lvl - lvl;   // page number
959                 slotptr(latchmgr->alloc, 1)->off = bt->page_size - 3;
960                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? last + 1 : 0);
961                 key = keyptr(latchmgr->alloc, 1);
962                 key->len = 2;           // create stopper key
963                 key->key[0] = 0xff;
964                 key->key[1] = 0xff;
965
966                 latchmgr->alloc->min = bt->page_size - 3;
967                 latchmgr->alloc->lvl = lvl;
968                 latchmgr->alloc->cnt = 1;
969                 latchmgr->alloc->act = 1;
970
971                 if( bt_writepage (bt, latchmgr->alloc, last) ) {
972                         fprintf (stderr, "Unable to create btree page %.8x\n", last);
973                         return bt_close (bt), NULL;
974                 }
975         }
976
977         // clear out buffer pool pages
978
979         memset(latchmgr, 0, bt->page_size);
980         last = MIN_lvl + nlatchpage;
981
982         if( bt_writepage (bt, latchmgr->alloc, last) ) {
983                 fprintf (stderr, "Unable to write buffer pool page %.8x\n", last);
984                 return bt_close (bt), NULL;
985         }
986                 
987 #ifdef unix
988         free (latchmgr);
989 #else
990         VirtualFree (latchmgr, 0, MEM_RELEASE);
991 #endif
992
993 btlatch:
994 #ifdef unix
995         lock->l_type = F_UNLCK;
996         if( fcntl (bt->idx, F_SETLK, lock) < 0 ) {
997                 fprintf (stderr, "Unable to unlock page zero\n");
998                 return bt_close (bt), NULL;
999         }
1000 #else
1001         if( !UnlockFileEx (bt->idx, 0, sizeof(struct BtPage_), 0, ovl) ) {
1002                 fprintf (stderr, "Unable to unlock page zero, GetLastError = %d\n", GetLastError());
1003                 return bt_close (bt), NULL;
1004         }
1005 #endif
1006 #ifdef unix
1007         flag = PROT_READ | PROT_WRITE;
1008         bt->latchmgr = mmap (0, bt->page_size, flag, MAP_SHARED, bt->idx, ALLOC_page * bt->page_size);
1009         if( bt->latchmgr == MAP_FAILED ) {
1010                 fprintf (stderr, "Unable to mmap page zero, errno = %d", errno);
1011                 return bt_close (bt), NULL;
1012         }
1013         bt->table = (void *)mmap (0, (uid)nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size);
1014         if( bt->table == MAP_FAILED ) {
1015                 fprintf (stderr, "Unable to mmap buffer pool, errno = %d", errno);
1016                 return bt_close (bt), NULL;
1017         }
1018         madvise (bt->table, (uid)nlatchpage << bt->page_bits, MADV_RANDOM | MADV_WILLNEED);
1019 #else
1020         flag = PAGE_READWRITE;
1021         bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size, NULL);
1022         if( !bt->halloc ) {
1023                 fprintf (stderr, "Unable to create file mapping for buffer pool mgr, GetLastError = %d\n", GetLastError());
1024                 return bt_close (bt), NULL;
1025         }
1026
1027         flag = FILE_MAP_WRITE;
1028         bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size);
1029         if( !bt->latchmgr ) {
1030                 fprintf (stderr, "Unable to map buffer pool, GetLastError = %d\n", GetLastError());
1031                 return bt_close (bt), NULL;
1032         }
1033
1034         bt->table = (void *)((char *)bt->latchmgr + LATCH_page * bt->page_size);
1035 #endif
1036         bt->pagepool = (unsigned char *)bt->table + (uid)(nlatchpage - bt->latchmgr->latchtotal) * bt->page_size;
1037         bt->latchsets = (BtLatchSet *)(bt->pagepool - (uid)bt->latchmgr->latchtotal * sizeof(BtLatchSet));
1038
1039 #ifdef unix
1040         bt->mem = valloc (2 * bt->page_size);
1041 #else
1042         bt->mem = VirtualAlloc(NULL, 2 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
1043 #endif
1044         bt->frame = (BtPage)bt->mem;
1045         bt->cursor = (BtPage)(bt->mem + bt->page_size);
1046         return bt;
1047 }
1048
1049 // place write, read, or parent lock on requested page_no.
1050
1051 void bt_lockpage(BtLock mode, BtLatchSet *latch)
1052 {
1053         switch( mode ) {
1054         case BtLockRead:
1055                 ReadLock (latch->readwr);
1056                 break;
1057         case BtLockWrite:
1058                 WriteLock (latch->readwr);
1059                 break;
1060         case BtLockAccess:
1061                 ReadLock (latch->access);
1062                 break;
1063         case BtLockDelete:
1064                 WriteLock (latch->access);
1065                 break;
1066         case BtLockParent:
1067                 WriteLock (latch->parent);
1068                 break;
1069         }
1070 }
1071
1072 // remove write, read, or parent lock on requested page
1073
1074 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1075 {
1076         switch( mode ) {
1077         case BtLockRead:
1078                 ReadRelease (latch->readwr);
1079                 break;
1080         case BtLockWrite:
1081                 WriteRelease (latch->readwr);
1082                 break;
1083         case BtLockAccess:
1084                 ReadRelease (latch->access);
1085                 break;
1086         case BtLockDelete:
1087                 WriteRelease (latch->access);
1088                 break;
1089         case BtLockParent:
1090                 WriteRelease (latch->parent);
1091                 break;
1092         }
1093 }
1094
1095 //      allocate a new page and write page into it
1096
1097 uid bt_newpage(BtDb *bt, BtPage page)
1098 {
1099 BtLatchSet *latch;
1100 uid new_page;
1101 BtPage temp;
1102
1103         //      lock allocation page
1104
1105         bt_spinwritelock(bt->latchmgr->lock);
1106
1107         // use empty chain first
1108         // else allocate empty page
1109
1110         if( new_page = bt_getid(bt->latchmgr->alloc[1].right) ) {
1111           if( latch = bt_pinlatch (bt, new_page) )
1112                 temp = bt_mappage (bt, latch);
1113           else
1114                 return 0;
1115
1116           bt_putid(bt->latchmgr->alloc[1].right, bt_getid(temp->right));
1117           bt_spinreleasewrite(bt->latchmgr->lock);
1118           memcpy (temp, page, bt->page_size);
1119
1120           bt_update (bt, temp);
1121           bt_unpinlatch (latch);
1122           return new_page;
1123         } else {
1124           new_page = bt_getid(bt->latchmgr->alloc->right);
1125           bt_putid(bt->latchmgr->alloc->right, new_page+1);
1126           bt_spinreleasewrite(bt->latchmgr->lock);
1127
1128           if( bt_writepage (bt, page, new_page) )
1129                 return 0;
1130         }
1131
1132         bt_update (bt, bt->latchmgr->alloc);
1133         return new_page;
1134 }
1135
1136 //  compare two keys, returning > 0, = 0, or < 0
1137 //  as the comparison value
1138
1139 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1140 {
1141 uint len1 = key1->len;
1142 int ans;
1143
1144         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1145                 return ans;
1146
1147         if( len1 > len2 )
1148                 return 1;
1149         if( len1 < len2 )
1150                 return -1;
1151
1152         return 0;
1153 }
1154
1155 //  Update current page of btree by
1156 //      flushing mapped area to disk backing of cache pool.
1157 //      mark page as dirty for rewrite to permanent location
1158
1159 void bt_update (BtDb *bt, BtPage page)
1160 {
1161 #ifdef unix
1162         msync (page, bt->page_size, MS_ASYNC);
1163 #else
1164 //      FlushViewOfFile (page, bt->page_size);
1165 #endif
1166         page->dirty = 1;
1167 }
1168
1169 //  map the btree cached page onto current page
1170
1171 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
1172 {
1173         return (BtPage)((uid)(latch - bt->latchsets) * bt->page_size + bt->pagepool);
1174 }
1175
1176 //      deallocate a deleted page 
1177 //      place on free chain out of allocator page
1178 //      call with page latched for Writing and Deleting
1179
1180 BTERR bt_freepage(BtDb *bt, uid page_no, BtLatchSet *latch)
1181 {
1182 BtPage page = bt_mappage (bt, latch);
1183
1184         //      lock allocation page
1185
1186         bt_spinwritelock (bt->latchmgr->lock);
1187
1188         //      store chain in second right
1189         bt_putid(page->right, bt_getid(bt->latchmgr->alloc[1].right));
1190         bt_putid(bt->latchmgr->alloc[1].right, page_no);
1191
1192         page->free = 1;
1193         bt_update(bt, page);
1194
1195         // unlock released page
1196
1197         bt_unlockpage (BtLockDelete, latch);
1198         bt_unlockpage (BtLockWrite, latch);
1199         bt_unpinlatch (latch);
1200
1201         // unlock allocation page
1202
1203         bt_spinreleasewrite (bt->latchmgr->lock);
1204         bt_update (bt, bt->latchmgr->alloc);
1205         return 0;
1206 }
1207
1208 //  find slot in page for given key at a given level
1209
1210 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1211 {
1212 uint diff, higher = bt->page->cnt, low = 1, slot;
1213 uint good = 0;
1214
1215         //      make stopper key an infinite fence value
1216
1217         if( bt_getid (bt->page->right) )
1218                 higher++;
1219         else
1220                 good++;
1221
1222         //      low is the lowest candidate, higher is already
1223         //      tested as .ge. the given key, loop ends when they meet
1224
1225         while( diff = higher - low ) {
1226                 slot = low + ( diff >> 1 );
1227                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1228                         low = slot + 1;
1229                 else
1230                         higher = slot, good++;
1231         }
1232
1233         //      return zero if key is on right link page
1234
1235         return good ? higher : 0;
1236 }
1237
1238 //  find and load page at given level for given key
1239 //      leave page rd or wr locked as requested
1240
1241 int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
1242 {
1243 uid page_no = ROOT_page, prevpage = 0;
1244 uint drill = 0xff, slot;
1245 BtLatchSet *prevlatch;
1246 uint mode, prevmode;
1247
1248   //  start at root of btree and drill down
1249
1250   do {
1251         // determine lock mode of drill level
1252         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1253
1254         if( bt->latch = bt_pinlatch(bt, page_no) )
1255                 bt->page_no = page_no;
1256         else
1257                 return 0;
1258
1259         // obtain access lock using lock chaining
1260
1261         if( page_no > ROOT_page )
1262                 bt_lockpage(BtLockAccess, bt->latch);
1263
1264         if( prevpage ) {
1265                 bt_unlockpage(prevmode, prevlatch);
1266                 bt_unpinlatch(prevlatch);
1267                 prevpage = 0;
1268         }
1269
1270         // obtain read lock using lock chaining
1271
1272         bt_lockpage(mode, bt->latch);
1273
1274         if( page_no > ROOT_page )
1275                 bt_unlockpage(BtLockAccess, bt->latch);
1276
1277         //      map/obtain page contents
1278
1279         bt->page = bt_mappage (bt, bt->latch);
1280
1281         // re-read and re-lock root after determining actual level of root
1282
1283         if( bt->page->lvl != drill) {
1284                 if( bt->page_no != ROOT_page )
1285                         return bt->err = BTERR_struct, 0;
1286                         
1287                 drill = bt->page->lvl;
1288
1289                 if( lock != BtLockRead && drill == lvl ) {
1290                         bt_unlockpage(mode, bt->latch);
1291                         bt_unpinlatch(bt->latch);
1292                         continue;
1293                 }
1294         }
1295
1296         prevpage = bt->page_no;
1297         prevlatch = bt->latch;
1298         prevmode = mode;
1299
1300         //  find key on page at this level
1301         //  and descend to requested level
1302
1303         if( !bt->page->kill )
1304          if( slot = bt_findslot (bt, key, len) ) {
1305           if( drill == lvl )
1306                 return slot;
1307
1308           while( slotptr(bt->page, slot)->dead )
1309                 if( slot++ < bt->page->cnt )
1310                         continue;
1311                 else
1312                         goto slideright;
1313
1314           page_no = bt_getid(slotptr(bt->page, slot)->id);
1315           drill--;
1316           continue;
1317          }
1318
1319         //  or slide right into next page
1320
1321 slideright:
1322         page_no = bt_getid(bt->page->right);
1323
1324   } while( page_no );
1325
1326   // return error on end of right chain
1327
1328   bt->err = BTERR_eof;
1329   return 0;     // return error
1330 }
1331
1332 //      a fence key was deleted from a page
1333 //      push new fence value upwards
1334
1335 BTERR bt_fixfence (BtDb *bt, uid page_no, uint lvl)
1336 {
1337 unsigned char leftkey[256], rightkey[256];
1338 BtLatchSet *latch = bt->latch;
1339 BtKey ptr;
1340
1341         // remove deleted key, the old fence value
1342
1343         ptr = keyptr(bt->page, bt->page->cnt);
1344         memcpy(rightkey, ptr, ptr->len + 1);
1345
1346         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1347         bt->page->clean = 1;
1348
1349         ptr = keyptr(bt->page, bt->page->cnt);
1350         memcpy(leftkey, ptr, ptr->len + 1);
1351
1352         bt_update (bt, bt->page);
1353         bt_lockpage (BtLockParent, latch);
1354         bt_unlockpage (BtLockWrite, latch);
1355
1356         //  insert new (now smaller) fence key
1357
1358         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl + 1, page_no, time(NULL)) )
1359                 return bt->err;
1360
1361         //  remove old (larger) fence key
1362
1363         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl + 1) )
1364                 return bt->err;
1365
1366         bt_unlockpage (BtLockParent, latch);
1367         bt_unpinlatch (latch);
1368         return 0;
1369 }
1370
1371 //      root has a single child
1372 //      collapse a level from the btree
1373 //      call with root locked in bt->page
1374
1375 BTERR bt_collapseroot (BtDb *bt, BtPage root)
1376 {
1377 BtLatchSet *latch;
1378 BtPage temp;
1379 uid child;
1380 uint idx;
1381
1382         // find the child entry
1383         //      and promote to new root
1384
1385   do {
1386         for( idx = 0; idx++ < root->cnt; )
1387           if( !slotptr(root, idx)->dead )
1388                 break;
1389
1390         child = bt_getid (slotptr(root, idx)->id);
1391         if( latch = bt_pinlatch (bt, child) )
1392                 temp = bt_mappage (bt, latch);
1393         else
1394                 return bt->err;
1395
1396         bt_lockpage (BtLockDelete, latch);
1397         bt_lockpage (BtLockWrite, latch);
1398         memcpy (root, temp, bt->page_size);
1399
1400         bt_update (bt, root);
1401
1402         if( bt_freepage (bt, child, latch) )
1403                 return bt->err;
1404
1405   } while( root->lvl > 1 && root->act == 1 );
1406
1407   bt_unlockpage (BtLockWrite, bt->latch);
1408   bt_unpinlatch (bt->latch);
1409   return 0;
1410 }
1411
1412 //  find and delete key on page by marking delete flag bit
1413 //  when page becomes empty, delete it
1414
1415 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1416 {
1417 unsigned char lowerkey[256], higherkey[256];
1418 uint slot, dirty = 0, idx, fence, found;
1419 BtLatchSet *latch, *rlatch;
1420 uid page_no, right;
1421 BtPage temp;
1422 BtKey ptr;
1423
1424         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1425                 ptr = keyptr(bt->page, slot);
1426         else
1427                 return bt->err;
1428
1429         // are we deleting a fence slot?
1430
1431         fence = slot == bt->page->cnt;
1432
1433         // if key is found delete it, otherwise ignore request
1434
1435         if( found = !keycmp (ptr, key, len) )
1436           if( found = slotptr(bt->page, slot)->dead == 0 ) {
1437                 dirty = slotptr(bt->page,slot)->dead = 1;
1438                 bt->page->clean = 1;
1439                 bt->page->act--;
1440
1441                 // collapse empty slots
1442
1443                 while( idx = bt->page->cnt - 1 )
1444                   if( slotptr(bt->page, idx)->dead ) {
1445                         *slotptr(bt->page, idx) = *slotptr(bt->page, idx + 1);
1446                         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1447                   } else
1448                         break;
1449           }
1450
1451         right = bt_getid(bt->page->right);
1452         page_no = bt->page_no;
1453         latch = bt->latch;
1454
1455         if( !dirty ) {
1456           if( lvl )
1457                 return bt_abort (bt, bt->page, page_no, BTERR_notfound);
1458           bt_unlockpage(BtLockWrite, latch);
1459           bt_unpinlatch (latch);
1460           return bt->found = found, 0;
1461         }
1462
1463         //      did we delete a fence key in an upper level?
1464
1465         if( lvl && bt->page->act && fence )
1466           if( bt_fixfence (bt, page_no, lvl) )
1467                 return bt->err;
1468           else
1469                 return bt->found = found, 0;
1470
1471         //      is this a collapsed root?
1472
1473         if( lvl > 1 && page_no == ROOT_page && bt->page->act == 1 )
1474           if( bt_collapseroot (bt, bt->page) )
1475                 return bt->err;
1476           else
1477                 return bt->found = found, 0;
1478
1479         // return if page is not empty
1480
1481         if( bt->page->act ) {
1482           bt_update(bt, bt->page);
1483           bt_unlockpage(BtLockWrite, latch);
1484           bt_unpinlatch (latch);
1485           return bt->found = found, 0;
1486         }
1487
1488         // cache copy of fence key
1489         //      in order to find parent
1490
1491         ptr = keyptr(bt->page, bt->page->cnt);
1492         memcpy(lowerkey, ptr, ptr->len + 1);
1493
1494         // obtain lock on right page
1495
1496         if( rlatch = bt_pinlatch (bt, right) )
1497                 temp = bt_mappage (bt, rlatch);
1498         else
1499                 return bt->err;
1500
1501         bt_lockpage(BtLockWrite, rlatch);
1502
1503         if( temp->kill ) {
1504                 bt_abort(bt, temp, right, 0);
1505                 return bt_abort(bt, bt->page, bt->page_no, BTERR_kill);
1506         }
1507
1508         // pull contents of next page into current empty page 
1509
1510         memcpy (bt->page, temp, bt->page_size);
1511
1512         //      cache copy of key to update
1513
1514         ptr = keyptr(temp, temp->cnt);
1515         memcpy(higherkey, ptr, ptr->len + 1);
1516
1517         //  Mark right page as deleted and point it to left page
1518         //      until we can post updates at higher level.
1519
1520         bt_putid(temp->right, page_no);
1521         temp->kill = 1;
1522
1523         bt_update(bt, bt->page);
1524         bt_update(bt, temp);
1525
1526         bt_lockpage(BtLockParent, latch);
1527         bt_unlockpage(BtLockWrite, latch);
1528
1529         bt_lockpage(BtLockParent, rlatch);
1530         bt_unlockpage(BtLockWrite, rlatch);
1531
1532         //  redirect higher key directly to consolidated node
1533
1534         if( bt_insertkey (bt, higherkey+1, *higherkey, lvl+1, page_no, time(NULL)) )
1535                 return bt->err;
1536
1537         //  delete old lower key to consolidated node
1538
1539         if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) )
1540                 return bt->err;
1541
1542         //  obtain write & delete lock on deleted node
1543         //      add right block to free chain
1544
1545         bt_lockpage(BtLockDelete, rlatch);
1546         bt_lockpage(BtLockWrite, rlatch);
1547         bt_unlockpage(BtLockParent, rlatch);
1548
1549         if( bt_freepage (bt, right, rlatch) )
1550                 return bt->err;
1551
1552         bt_unlockpage(BtLockParent, latch);
1553         bt_unpinlatch(latch);
1554         return 0;
1555 }
1556
1557 //      find key in leaf level and return row-id
1558
1559 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1560 {
1561 uint  slot;
1562 BtKey ptr;
1563 uid id;
1564
1565         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1566                 ptr = keyptr(bt->page, slot);
1567         else
1568                 return 0;
1569
1570         // if key exists, return row-id
1571         //      otherwise return 0
1572
1573         if( ptr->len == len && !memcmp (ptr->key, key, len) )
1574                 id = bt_getid(slotptr(bt->page,slot)->id);
1575         else
1576                 id = 0;
1577
1578         bt_unlockpage (BtLockRead, bt->latch);
1579         bt_unpinlatch (bt->latch);
1580         return id;
1581 }
1582
1583 //      check page for space available,
1584 //      clean if necessary and return
1585 //      0 - page needs splitting
1586 //      >0 - go ahead with new slot
1587  
1588 uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
1589 {
1590 uint nxt = bt->page_size;
1591 BtPage page = bt->page;
1592 uint cnt = 0, idx = 0;
1593 uint max = page->cnt;
1594 uint newslot = slot;
1595 BtKey key;
1596 int ret;
1597
1598         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1599                 return slot;
1600
1601         //      skip cleanup if nothing to reclaim
1602
1603         if( !page->clean )
1604                 return 0;
1605
1606         memcpy (bt->frame, page, bt->page_size);
1607
1608         // skip page info and set rest of page to zero
1609
1610         memset (page+1, 0, bt->page_size - sizeof(*page));
1611         page->act = 0;
1612
1613         while( cnt++ < max ) {
1614                 if( cnt == slot )
1615                         newslot = idx + 1;
1616                 // always leave fence key in list
1617                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1618                         continue;
1619
1620                 // copy key
1621                 key = keyptr(bt->frame, cnt);
1622                 nxt -= key->len + 1;
1623                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1624
1625                 // copy slot
1626                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1627                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1628                         page->act++;
1629 #ifdef USETOD
1630                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1631 #endif
1632                 slotptr(page, idx)->off = nxt;
1633         }
1634
1635         page->min = nxt;
1636         page->cnt = idx;
1637
1638         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1639                 return newslot;
1640
1641         return 0;
1642 }
1643
1644 // split the root and raise the height of the btree
1645
1646 BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
1647 {
1648 uint nxt = bt->page_size;
1649 BtPage root = bt->page;
1650 uid right;
1651
1652         //  Obtain an empty page to use, and copy the current
1653         //  root contents into it
1654
1655         if( !(right = bt_newpage(bt, root)) )
1656                 return bt->err;
1657
1658         // preserve the page info at the bottom
1659         // and set rest to zero
1660
1661         memset(root+1, 0, bt->page_size - sizeof(*root));
1662
1663         // insert first key on newroot page
1664
1665         nxt -= *leftkey + 1;
1666         memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
1667         bt_putid(slotptr(root, 1)->id, right);
1668         slotptr(root, 1)->off = nxt;
1669         
1670         // insert second key on newroot page
1671         // and increase the root height
1672
1673         nxt -= 3;
1674         ((unsigned char *)root)[nxt] = 2;
1675         ((unsigned char *)root)[nxt+1] = 0xff;
1676         ((unsigned char *)root)[nxt+2] = 0xff;
1677         bt_putid(slotptr(root, 2)->id, page_no2);
1678         slotptr(root, 2)->off = nxt;
1679
1680         bt_putid(root->right, 0);
1681         root->min = nxt;                // reset lowest used offset and key count
1682         root->cnt = 2;
1683         root->act = 2;
1684         root->lvl++;
1685
1686         // update and release root (bt->page)
1687
1688         bt_update(bt, root);
1689
1690         bt_unlockpage(BtLockWrite, bt->latch);
1691         bt_unpinlatch(bt->latch);
1692         return 0;
1693 }
1694
1695 //  split already locked full node
1696 //      return unlocked.
1697
1698 BTERR bt_splitpage (BtDb *bt)
1699 {
1700 uint cnt = 0, idx = 0, max, nxt = bt->page_size;
1701 unsigned char fencekey[256], rightkey[256];
1702 uid page_no = bt->page_no, right;
1703 BtLatchSet *latch, *rlatch;
1704 BtPage page = bt->page;
1705 uint lvl = page->lvl;
1706 BtKey key;
1707
1708         latch = bt->latch;
1709
1710         //  split higher half of keys to bt->frame
1711         //      the last key (fence key) might be dead
1712
1713         memset (bt->frame, 0, bt->page_size);
1714         max = page->cnt;
1715         cnt = max / 2;
1716         idx = 0;
1717
1718         while( cnt++ < max ) {
1719                 key = keyptr(page, cnt);
1720                 nxt -= key->len + 1;
1721                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1722                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1723                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
1724                         bt->frame->act++;
1725 #ifdef USETOD
1726                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1727 #endif
1728                 slotptr(bt->frame, idx)->off = nxt;
1729         }
1730
1731         //      remember fence key for new right page
1732
1733         memcpy (rightkey, key, key->len + 1);
1734
1735         bt->frame->bits = bt->page_bits;
1736         bt->frame->min = nxt;
1737         bt->frame->cnt = idx;
1738         bt->frame->lvl = lvl;
1739
1740         // link right node
1741
1742         if( page_no > ROOT_page )
1743                 memcpy (bt->frame->right, page->right, BtId);
1744
1745         //      get new free page and write frame to it.
1746
1747         if( !(right = bt_newpage(bt, bt->frame)) )
1748                 return bt->err;
1749
1750         //      update lower keys to continue in old page
1751
1752         memcpy (bt->frame, page, bt->page_size);
1753         memset (page+1, 0, bt->page_size - sizeof(*page));
1754         nxt = bt->page_size;
1755         page->clean = 0;
1756         page->act = 0;
1757         cnt = 0;
1758         idx = 0;
1759
1760         //  assemble page of smaller keys
1761         //      (they're all active keys)
1762
1763         while( cnt++ < max / 2 ) {
1764                 key = keyptr(bt->frame, cnt);
1765                 nxt -= key->len + 1;
1766                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1767                 memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1768 #ifdef USETOD
1769                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1770 #endif
1771                 slotptr(page, idx)->off = nxt;
1772                 page->act++;
1773         }
1774
1775         // remember fence key for smaller page
1776
1777         memcpy (fencekey, key, key->len + 1);
1778
1779         bt_putid(page->right, right);
1780         page->min = nxt;
1781         page->cnt = idx;
1782
1783         // if current page is the root page, split it
1784
1785         if( page_no == ROOT_page )
1786                 return bt_splitroot (bt, fencekey, right);
1787
1788         //      lock right page
1789
1790         if( rlatch = bt_pinlatch (bt, right) )
1791                 bt_lockpage (BtLockParent, rlatch);
1792         else
1793                 return bt->err;
1794
1795         // update left (containing) node
1796
1797         bt_update(bt, page);
1798
1799         bt_lockpage (BtLockParent, latch);
1800         bt_unlockpage (BtLockWrite, latch);
1801
1802         // insert new fence for reformulated left block
1803
1804         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, page_no, time(NULL)) )
1805                 return bt->err;
1806
1807         //      switch fence for right block of larger keys to new right page
1808
1809         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, right, time(NULL)) )
1810                 return bt->err;
1811
1812         bt_unlockpage (BtLockParent, latch);
1813         bt_unlockpage (BtLockParent, rlatch);
1814
1815         bt_unpinlatch (rlatch);
1816         bt_unpinlatch (latch);
1817         return 0;
1818 }
1819
1820 //  Insert new key into the btree at requested level.
1821 //  Pages are unlocked at exit.
1822
1823 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
1824 {
1825 uint slot, idx;
1826 BtPage page;
1827 BtKey ptr;
1828
1829   while( 1 ) {
1830         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1831                 ptr = keyptr(bt->page, slot);
1832         else
1833         {
1834                 if( !bt->err )
1835                         bt->err = BTERR_ovflw;
1836                 return bt->err;
1837         }
1838
1839         // if key already exists, update id and return
1840
1841         page = bt->page;
1842
1843         if( !keycmp (ptr, key, len) ) {
1844           if( slotptr(page, slot)->dead )
1845                 page->act++;
1846           slotptr(page, slot)->dead = 0;
1847 #ifdef USETOD
1848           slotptr(page, slot)->tod = tod;
1849 #endif
1850           bt_putid(slotptr(page,slot)->id, id);
1851           bt_update(bt, bt->page);
1852           bt_unlockpage(BtLockWrite, bt->latch);
1853           bt_unpinlatch (bt->latch);
1854           return 0;
1855         }
1856
1857         // check if page has enough space
1858
1859         if( slot = bt_cleanpage (bt, len, slot) )
1860                 break;
1861
1862         if( bt_splitpage (bt) )
1863                 return bt->err;
1864   }
1865
1866   // calculate next available slot and copy key into page
1867
1868   page->min -= len + 1; // reset lowest used offset
1869   ((unsigned char *)page)[page->min] = len;
1870   memcpy ((unsigned char *)page + page->min +1, key, len );
1871
1872   for( idx = slot; idx < page->cnt; idx++ )
1873         if( slotptr(page, idx)->dead )
1874                 break;
1875
1876   // now insert key into array before slot
1877   // preserving the fence slot
1878
1879   if( idx == page->cnt )
1880         idx++, page->cnt++;
1881
1882   page->act++;
1883
1884   while( idx > slot )
1885         *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
1886
1887   bt_putid(slotptr(page,slot)->id, id);
1888   slotptr(page, slot)->off = page->min;
1889 #ifdef USETOD
1890   slotptr(page, slot)->tod = tod;
1891 #endif
1892   slotptr(page, slot)->dead = 0;
1893
1894   bt_update(bt, bt->page);
1895
1896   bt_unlockpage(BtLockWrite, bt->latch);
1897   bt_unpinlatch(bt->latch);
1898   return 0;
1899 }
1900
1901 //  cache page of keys into cursor and return starting slot for given key
1902
1903 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
1904 {
1905 uint slot;
1906
1907         // cache page for retrieval
1908
1909         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1910           memcpy (bt->cursor, bt->page, bt->page_size);
1911         else
1912           return 0;
1913
1914         bt_unlockpage(BtLockRead, bt->latch);
1915         bt->cursor_page = bt->page_no;
1916         bt_unpinlatch (bt->latch);
1917         return slot;
1918 }
1919
1920 //  return next slot for cursor page
1921 //  or slide cursor right into next page
1922
1923 uint bt_nextkey (BtDb *bt, uint slot)
1924 {
1925 BtLatchSet *latch;
1926 off64_t right;
1927
1928   do {
1929         right = bt_getid(bt->cursor->right);
1930
1931         while( slot++ < bt->cursor->cnt )
1932           if( slotptr(bt->cursor,slot)->dead )
1933                 continue;
1934           else if( right || (slot < bt->cursor->cnt))
1935                 return slot;
1936           else
1937                 break;
1938
1939         if( !right )
1940                 break;
1941
1942         bt->cursor_page = right;
1943
1944         if( latch = bt_pinlatch (bt, right) )
1945         bt_lockpage(BtLockRead, latch);
1946         else
1947                 return 0;
1948
1949         bt->page = bt_mappage (bt, latch);
1950         memcpy (bt->cursor, bt->page, bt->page_size);
1951         bt_unlockpage(BtLockRead, latch);
1952         bt_unpinlatch (latch);
1953         slot = 0;
1954   } while( 1 );
1955
1956   return bt->err = 0;
1957 }
1958
1959 BtKey bt_key(BtDb *bt, uint slot)
1960 {
1961         return keyptr(bt->cursor, slot);
1962 }
1963
1964 uid bt_uid(BtDb *bt, uint slot)
1965 {
1966         return bt_getid(slotptr(bt->cursor,slot)->id);
1967 }
1968
1969 #ifdef USETOD
1970 uint bt_tod(BtDb *bt, uint slot)
1971 {
1972         return slotptr(bt->cursor,slot)->tod;
1973 }
1974 #endif
1975
1976 #ifdef STANDALONE
1977
1978 uint bt_audit (BtDb *bt)
1979 {
1980 uint idx, hashidx;
1981 uid next, page_no;
1982 BtLatchSet *latch;
1983 uint blks[64];
1984 uint cnt = 0;
1985 BtPage page;
1986 uint amt[1];
1987 BtKey ptr;
1988
1989 #ifdef unix
1990         posix_fadvise( bt->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
1991 #endif
1992         if( *(ushort *)(bt->latchmgr->lock) )
1993                 fprintf(stderr, "Alloc page locked\n");
1994         *(ushort *)(bt->latchmgr->lock) = 0;
1995
1996         memset (blks, 0, sizeof(blks));
1997
1998         for( idx = 1; idx <= bt->latchmgr->latchdeployed; idx++ ) {
1999                 latch = bt->latchsets + idx;
2000                 if( *(ushort *)latch->readwr )
2001                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2002                 *(ushort *)latch->readwr = 0;
2003
2004                 if( *(ushort *)latch->access )
2005                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2006                 *(ushort *)latch->access = 0;
2007
2008                 if( *(ushort *)latch->parent )
2009                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2010                 *(ushort *)latch->parent = 0;
2011
2012                 if( latch->pin & PIN_mask ) {
2013                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2014                         latch->pin = 0;
2015                 }
2016                 page = (BtPage)((uid)idx * bt->page_size + bt->pagepool);
2017                 blks[page->lvl]++;
2018
2019             if( page->dirty )
2020                  if( bt_writepage (bt, page, latch->page_no) )
2021                         fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
2022         }
2023
2024         for( idx = 0; blks[idx]; idx++ )
2025                 fprintf(stderr, "cache: %d lvl %d blocks\n", blks[idx], idx);
2026
2027         for( hashidx = 0; hashidx < bt->latchmgr->latchhash; hashidx++ ) {
2028           if( *(ushort *)(bt->table[hashidx].latch) )
2029                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2030
2031           *(ushort *)(bt->table[hashidx].latch) = 0;
2032         }
2033
2034         memset (blks, 0, sizeof(blks));
2035
2036         next = bt->latchmgr->nlatchpage + LATCH_page;
2037         page_no = LEAF_page;
2038
2039         while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
2040                 if( bt_readpage (bt, bt->frame, page_no) )
2041                   fprintf(stderr, "page %.8x unreadable\n", page_no);
2042                 if( !bt->frame->free ) {
2043                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2044                   ptr = keyptr(bt->frame, idx+1);
2045                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2046                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2047                  }
2048                  if( !bt->frame->lvl )
2049                         cnt += bt->frame->act;
2050                  blks[bt->frame->lvl]++;
2051                 }
2052
2053                 if( page_no > LEAF_page )
2054                         next = page_no + 1;
2055                 page_no = next;
2056         }
2057
2058         for( idx = 0; blks[idx]; idx++ )
2059                 fprintf(stderr, "btree: %d lvl %d blocks\n", blks[idx], idx);
2060
2061         return cnt - 1;
2062 }
2063
2064 #ifndef unix
2065 double getCpuTime(int type)
2066 {
2067 FILETIME crtime[1];
2068 FILETIME xittime[1];
2069 FILETIME systime[1];
2070 FILETIME usrtime[1];
2071 SYSTEMTIME timeconv[1];
2072 double ans = 0;
2073
2074         memset (timeconv, 0, sizeof(SYSTEMTIME));
2075
2076         switch( type ) {
2077         case 0:
2078                 GetSystemTimeAsFileTime (xittime);
2079                 FileTimeToSystemTime (xittime, timeconv);
2080                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2081                 break;
2082         case 1:
2083                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2084                 FileTimeToSystemTime (usrtime, timeconv);
2085                 break;
2086         case 2:
2087                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2088                 FileTimeToSystemTime (systime, timeconv);
2089                 break;
2090         }
2091
2092         ans += (double)timeconv->wHour * 3600;
2093         ans += (double)timeconv->wMinute * 60;
2094         ans += (double)timeconv->wSecond;
2095         ans += (double)timeconv->wMilliseconds / 1000;
2096         return ans;
2097 }
2098 #else
2099 #include <time.h>
2100 #include <sys/resource.h>
2101
2102 double getCpuTime(int type)
2103 {
2104 struct rusage used[1];
2105 struct timeval tv[1];
2106
2107         switch( type ) {
2108         case 0:
2109                 gettimeofday(tv, NULL);
2110                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2111
2112         case 1:
2113                 getrusage(RUSAGE_SELF, used);
2114                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2115
2116         case 2:
2117                 getrusage(RUSAGE_SELF, used);
2118                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2119         }
2120
2121         return 0;
2122 }
2123 #endif
2124
2125 //  standalone program to index file of keys
2126 //  then list them onto std-out
2127
2128 int main (int argc, char **argv)
2129 {
2130 uint slot, line = 0, off = 0, found = 0;
2131 int ch, cnt = 0, bits = 12, idx;
2132 unsigned char key[256];
2133 double done, start;
2134 uid next, page_no;
2135 BtLatchSet *latch;
2136 float elapsed;
2137 time_t tod[1];
2138 uint scan = 0;
2139 uint len = 0;
2140 uint map = 0;
2141 BtPage page;
2142 BtKey ptr;
2143 BtDb *bt;
2144 FILE *in;
2145
2146 #ifdef WIN32
2147         _setmode (1, _O_BINARY);
2148 #endif
2149         if( argc < 4 ) {
2150                 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find/Count [page_bits mapped_pool_pages start_line_number]\n", argv[0]);
2151                 fprintf (stderr, "  page_bits: size of btree page in bits\n");
2152                 fprintf (stderr, "  mapped_pool_pages: number of pages in buffer pool\n");
2153                 exit(0);
2154         }
2155
2156         start = getCpuTime(0);
2157         time(tod);
2158
2159         if( argc > 4 )
2160                 bits = atoi(argv[4]);
2161
2162         if( argc > 5 )
2163                 map = atoi(argv[5]);
2164
2165         if( argc > 6 )
2166                 off = atoi(argv[6]);
2167
2168         bt = bt_open ((argv[1]), BT_rw, bits, map);
2169
2170         if( !bt ) {
2171                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2172                 exit (1);
2173         }
2174
2175         switch(argv[3][0]| 0x20)
2176         {
2177         case 'p':       // display page
2178                 if( latch = bt_pinlatch (bt, off) )
2179                         page = bt_mappage (bt, latch);
2180                 else
2181                         fprintf(stderr, "unable to read page %.8x\n", off);
2182
2183                 write (1, page, bt->page_size);
2184                 break;
2185
2186         case 'a':       // buffer pool audit
2187                 fprintf(stderr, "started audit for %s\n", argv[1]);
2188                 cnt = bt_audit (bt);
2189                 fprintf(stderr, "finished audit for %s, %d keys\n", argv[1], cnt);
2190                 break;
2191
2192         case 'w':       // write keys
2193                 fprintf(stderr, "started indexing for %s\n", argv[2]);
2194                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2195 #ifdef unix
2196                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2197 #endif
2198                   while( ch = getc(in), ch != EOF )
2199                         if( ch == '\n' )
2200                         {
2201                           if( off )
2202                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2203
2204                           if( bt_insertkey (bt, key, len, 0, ++line, *tod) )
2205                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2206                           len = 0;
2207                         }
2208                         else if( len < 245 )
2209                                 key[len++] = ch;
2210                   }
2211                 fprintf(stderr, "finished adding keys for %s, %d \n", argv[2], line);
2212                 break;
2213
2214         case 'd':       // delete keys
2215                 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2216                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2217 #ifdef unix
2218                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2219 #endif
2220                   while( ch = getc(in), ch != EOF )
2221                         if( ch == '\n' )
2222                         {
2223                           if( off )
2224                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2225                           line++;
2226                           if( bt_deletekey (bt, key, len, 0) )
2227                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2228                           len = 0;
2229                         }
2230                         else if( len < 245 )
2231                                 key[len++] = ch;
2232                   }
2233                 fprintf(stderr, "finished deleting keys for %s, %d \n", argv[2], line);
2234                 break;
2235
2236         case 'f':       // find keys
2237                 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2238                 if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
2239 #ifdef unix
2240                   posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
2241 #endif
2242                   while( ch = getc(in), ch != EOF )
2243                         if( ch == '\n' )
2244                         {
2245                           if( off )
2246                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2247                           line++;
2248                           if( bt_findkey (bt, key, len) )
2249                                 found++;
2250                           else if( bt->err )
2251                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2252                           len = 0;
2253                         }
2254                         else if( len < 245 )
2255                                 key[len++] = ch;
2256                   }
2257                 fprintf(stderr, "finished search of %d keys for %s, found %d\n", line, argv[2], found);
2258                 break;
2259
2260         case 's':       // scan and print keys
2261                 fprintf(stderr, "started scaning\n");
2262                 cnt = len = key[0] = 0;
2263
2264                 if( slot = bt_startkey (bt, key, len) )
2265                   slot--;
2266                 else
2267                   fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2268
2269                 while( slot = bt_nextkey (bt, slot) ) {
2270                         ptr = bt_key(bt, slot);
2271                         fwrite (ptr->key, ptr->len, 1, stdout);
2272                         fputc ('\n', stdout);
2273                         cnt++;
2274                 }
2275
2276                 fprintf(stderr, " Total keys read %d\n", cnt - 1);
2277                 break;
2278
2279         case 'c':       // count keys
2280           fprintf(stderr, "started counting\n");
2281           cnt = 0;
2282
2283           next = bt->latchmgr->nlatchpage + LATCH_page;
2284           page_no = LEAF_page;
2285
2286           while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
2287                 if( latch = bt_pinlatch (bt, page_no) )
2288                         page = bt_mappage (bt, latch);
2289                 if( !page->free && !page->lvl )
2290                         cnt += page->act;
2291                 if( page_no > LEAF_page )
2292                         next = page_no + 1;
2293                 if( scan )
2294                  for( idx = 0; idx++ < page->cnt; ) {
2295                   if( slotptr(page, idx)->dead )
2296                         continue;
2297                   ptr = keyptr(page, idx);
2298                   if( idx != page->cnt && bt_getid (page->right) ) {
2299                         fwrite (ptr->key, ptr->len, 1, stdout);
2300                         fputc ('\n', stdout);
2301                   }
2302                  }
2303                 bt_unpinlatch (latch);
2304                 page_no = next;
2305           }
2306                 
2307           cnt--;        // remove stopper key
2308           fprintf(stderr, " Total keys read %d\n", cnt);
2309           break;
2310         }
2311
2312         done = getCpuTime(0);
2313         elapsed = (float)(done - start);
2314         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2315         elapsed = getCpuTime(1);
2316         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2317         elapsed = getCpuTime(2);
2318         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2319         return 0;
2320 }
2321
2322 #endif  //STANDALONE