]> pd.if.org Git - btree/blob - threadskv5.c
Initialize empty reverse cursor in threadskv5.c
[btree] / threadskv5.c
1 // btree version threadskv5 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      and bi-directional cursors
7
8 // 08 SEP 2014
9
10 // author: karl malbrain, malbrain@cal.berkeley.edu
11
12 /*
13 This work, including the source code, documentation
14 and related data, is placed into the public domain.
15
16 The orginal author is Karl Malbrain.
17
18 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
19 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
20 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
21 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
22 RESULTING FROM THE USE, MODIFICATION, OR
23 REDISTRIBUTION OF THIS SOFTWARE.
24 */
25
26 // Please see the project home page for documentation
27 // code.google.com/p/high-concurrency-btree
28
29 #define _FILE_OFFSET_BITS 64
30 #define _LARGEFILE64_SOURCE
31
32 #ifdef linux
33 #define _GNU_SOURCE
34 #endif
35
36 #ifdef unix
37 #include <unistd.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <fcntl.h>
41 #include <sys/time.h>
42 #include <sys/mman.h>
43 #include <errno.h>
44 #include <pthread.h>
45 #else
46 #define WIN32_LEAN_AND_MEAN
47 #include <windows.h>
48 #include <stdio.h>
49 #include <stdlib.h>
50 #include <time.h>
51 #include <fcntl.h>
52 #include <process.h>
53 #include <intrin.h>
54 #endif
55
56 #include <memory.h>
57 #include <string.h>
58 #include <stddef.h>
59
60 typedef unsigned long long      uid;
61
62 #ifndef unix
63 typedef unsigned long long      off64_t;
64 typedef unsigned short          ushort;
65 typedef unsigned int            uint;
66 #endif
67
68 #define BT_latchtable   128                                     // number of latch manager slots
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 /*
79 There are five lock types for each node in three independent sets: 
80 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
81 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
82 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
83 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
84 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
85 */
86
87 typedef enum{
88         BtLockAccess,
89         BtLockDelete,
90         BtLockRead,
91         BtLockWrite,
92         BtLockParent
93 } BtLock;
94
95 //      definition for phase-fair reader/writer lock implementation
96
97 typedef struct {
98         ushort rin[1];
99         ushort rout[1];
100         ushort ticket[1];
101         ushort serving[1];
102 } RWLock;
103
104 #define PHID 0x1
105 #define PRES 0x2
106 #define MASK 0x3
107 #define RINC 0x4
108
109 //      definition for spin latch implementation
110
111 // exclusive is set for write access
112 // share is count of read accessors
113 // grant write lock when share == 0
114
115 volatile typedef struct {
116         ushort exclusive:1;
117         ushort pending:1;
118         ushort share:14;
119 } BtSpinLatch;
120
121 #define XCL 1
122 #define PEND 2
123 #define BOTH 3
124 #define SHARE 4
125
126 //  hash table entries
127
128 typedef struct {
129         BtSpinLatch latch[1];
130         volatile ushort slot;           // Latch table entry at head of chain
131 } BtHashEntry;
132
133 //      latch manager table structure
134
135 typedef struct {
136         RWLock readwr[1];               // read/write page lock
137         RWLock access[1];               // Access Intent/Page delete
138         RWLock parent[1];               // Posting of fence key in parent
139         BtSpinLatch busy[1];            // slot is being moved between chains
140         volatile ushort next;           // next entry in hash table chain
141         volatile ushort prev;           // prev entry in hash table chain
142         volatile ushort pin;            // number of outstanding locks
143         volatile ushort hash;           // hash slot entry is under
144         volatile uid page_no;           // latch set page number
145 } BtLatchSet;
146
147 //      Define the length of the page and key pointers
148
149 #define BtId 6
150
151 //      Page key slot definition.
152
153 //      Keys are marked dead, but remain on the page until
154 //      it cleanup is called. The fence key (highest key) for
155 //      a leaf page is always present, even after cleanup.
156
157 //      Slot types
158
159 //      In addition to the Unique keys that occupy slots
160 //      there are Librarian and Duplicate key
161 //      slots occupying the key slot array.
162
163 //      The Librarian slots are dead keys that
164 //      serve as filler, available to add new Unique
165 //      or Dup slots that are inserted into the B-tree.
166
167 //      The Duplicate slots have had their key bytes extended
168 //      by 6 bytes to contain a binary duplicate key uniqueifier.
169
170 typedef enum {
171         Unique,
172         Librarian,
173         Duplicate
174 } BtSlotType;
175
176 typedef struct {
177         uint off:BT_maxbits;    // page offset for key start
178         uint type:3;                    // type of slot
179         uint dead:1;                    // set for deleted slot
180 } BtSlot;
181
182 //      The key structure occupies space at the upper end of
183 //      each page.  It's a length byte followed by the key
184 //      bytes.
185
186 typedef struct {
187         unsigned char len;
188         unsigned char key[1];
189 } *BtKey;
190
191 //      the value structure also occupies space at the upper
192 //      end of the page. Each key is immediately followed by a value.
193
194 typedef struct {
195         unsigned char len;
196         unsigned char value[1];
197 } *BtVal;
198
199 //      The first part of an index page.
200 //      It is immediately followed
201 //      by the BtSlot array of keys.
202
203 //      note that this structure size
204 //      must be a multiple of 8 bytes
205 //      in order to place dups correctly.
206
207 typedef struct BtPage_ {
208         uint cnt;                                       // count of keys in page
209         uint act;                                       // count of active keys
210         uint min;                                       // next key offset
211         uint garbage;                           // page garbage in bytes
212         unsigned char bits:7;           // page size in bits
213         unsigned char free:1;           // page is on free chain
214         unsigned char lvl:7;            // level of page
215         unsigned char kill:1;           // page is being deleted
216         unsigned char left[BtId];       // page number to left
217         unsigned char filler[2];        // padding to multiple of 8
218         unsigned char right[BtId];      // page number to right
219 } *BtPage;
220
221 //      The memory mapping pool table buffer manager entry
222
223 typedef struct {
224         uid  basepage;                          // mapped base page number
225         char *map;                                      // mapped memory pointer
226         ushort slot;                            // slot index in this array
227         ushort pin;                                     // mapped page pin counter
228         void *hashprev;                         // previous pool entry for the same hash idx
229         void *hashnext;                         // next pool entry for the same hash idx
230 #ifndef unix
231         HANDLE hmap;                            // Windows memory mapping handle
232 #endif
233 } BtPool;
234
235 #define CLOCK_bit 0x8000                // bit in pool->pin
236
237 //  The loadpage interface object
238
239 typedef struct {
240         uid page_no;            // current page number
241         BtPage page;            // current page pointer
242         BtPool *pool;           // current page pool
243         BtLatchSet *latch;      // current page latch set
244 } BtPageSet;
245
246 //      structure for latch manager on ALLOC_page
247
248 typedef struct {
249         struct BtPage_ alloc[1];        // next page_no in right ptr
250         unsigned long long dups[1];     // global duplicate key uniqueifier
251         unsigned char chain[BtId];      // head of free page_nos chain
252         BtSpinLatch lock[1];            // allocation area lite latch
253         ushort latchdeployed;           // highest number of latch entries deployed
254         ushort nlatchpage;                      // number of latch pages at BT_latch
255         ushort latchtotal;                      // number of page latch entries
256         ushort latchhash;                       // number of latch hash table slots
257         ushort latchvictim;                     // next latch entry to examine
258         BtHashEntry table[0];           // the hash table
259 } BtLatchMgr;
260
261 //      The object structure for Btree access
262
263 typedef struct {
264         uint page_size;                         // page size    
265         uint page_bits;                         // page size in bits    
266         uint seg_bits;                          // seg size in pages in bits
267         uint mode;                                      // read-write mode
268 #ifdef unix
269         int idx;
270 #else
271         HANDLE idx;
272 #endif
273         ushort poolcnt;                         // highest page pool node in use
274         ushort poolmax;                         // highest page pool node allocated
275         ushort poolmask;                        // total number of pages in mmap segment - 1
276         ushort hashsize;                        // size of Hash Table for pool entries
277         volatile uint evicted;          // last evicted hash table slot
278         ushort *hash;                           // pool index for hash entries
279         BtSpinLatch *latch;                     // latches for hash table slots
280         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
281         BtLatchSet *latchsets;          // mapped latch set from latch pages
282         BtPool *pool;                           // memory pool page segments
283 #ifndef unix
284         HANDLE halloc;                          // allocation and latch table handle
285 #endif
286 } BtMgr;
287
288 typedef struct {
289         BtMgr *mgr;                             // buffer manager for thread
290         BtPage cursor;                  // cached frame for start/next (never mapped)
291         BtPage frame;                   // spare frame for the page split (never mapped)
292         uid cursor_page;                        // current cursor page number   
293         unsigned char *mem;             // frame, cursor, page memory buffer
294         unsigned char key[256]; // last found complete key
295         int found;                              // last delete or insert was found
296         int err;                                // last error
297 } BtDb;
298
299 typedef enum {
300         BTERR_ok = 0,
301         BTERR_struct,
302         BTERR_ovflw,
303         BTERR_lock,
304         BTERR_map,
305         BTERR_wrt,
306         BTERR_hash
307 } BTERR;
308
309 // B-Tree functions
310 extern void bt_close (BtDb *bt);
311 extern BtDb *bt_open (BtMgr *mgr);
312 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
313 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
314 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
315 extern BtKey bt_foundkey (BtDb *bt);
316 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
317 extern uint bt_nextkey   (BtDb *bt, uint slot);
318
319 //      manager functions
320 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
321 void bt_mgrclose (BtMgr *mgr);
322
323 //  Helper functions to return slot values
324 //      from the cursor page.
325
326 extern BtKey bt_key (BtDb *bt, uint slot);
327 extern BtVal bt_val (BtDb *bt, uint slot);
328
329 //  BTree page number constants
330 #define ALLOC_page              0       // allocation & latch manager hash table
331 #define ROOT_page               1       // root of the btree
332 #define LEAF_page               2       // first page of leaves
333 #define LATCH_page              3       // pages for latch manager
334
335 //      Number of levels to create in a new BTree
336
337 #define MIN_lvl                 2
338
339 //  The page is allocated from low and hi ends.
340 //  The key slots are allocated from the bottom,
341 //      while the text and value of the key
342 //  are allocated from the top.  When the two
343 //  areas meet, the page is split into two.
344
345 //  A key consists of a length byte, two bytes of
346 //  index number (0 - 65534), and up to 253 bytes
347 //  of key value.
348
349 //  Associated with each key is a value byte string
350 //      containing any value desired.
351
352 //  The b-tree root is always located at page 1.
353 //      The first leaf page of level zero is always
354 //      located on page 2.
355
356 //      The b-tree pages are linked with next
357 //      pointers to facilitate enumerators,
358 //      and provide for concurrency.
359
360 //      When to root page fills, it is split in two and
361 //      the tree height is raised by a new root at page
362 //      one with two keys.
363
364 //      Deleted keys are marked with a dead bit until
365 //      page cleanup. The fence key for a leaf node is
366 //      always present
367
368 //  Groups of pages called segments from the btree are optionally
369 //  cached with a memory mapped pool. A hash table is used to keep
370 //  track of the cached segments.  This behaviour is controlled
371 //  by the cache block size parameter to bt_open.
372
373 //  To achieve maximum concurrency one page is locked at a time
374 //  as the tree is traversed to find leaf key in question. The right
375 //  page numbers are used in cases where the page is being split,
376 //      or consolidated.
377
378 //  Page 0 is dedicated to lock for new page extensions,
379 //      and chains empty pages together for reuse. It also
380 //      contains the latch manager hash table.
381
382 //      The ParentModification lock on a node is obtained to serialize posting
383 //      or changing the fence key for a node.
384
385 //      Empty pages are chained together through the ALLOC page and reused.
386
387 //      Access macros to address slot and key values from the page
388 //      Page slots use 1 based indexing.
389
390 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
391 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
392 #define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
393
394 void bt_putid(unsigned char *dest, uid id)
395 {
396 int i = BtId;
397
398         while( i-- )
399                 dest[i] = (unsigned char)id, id >>= 8;
400 }
401
402 uid bt_getid(unsigned char *src)
403 {
404 uid id = 0;
405 int i;
406
407         for( i = 0; i < BtId; i++ )
408                 id <<= 8, id |= *src++; 
409
410         return id;
411 }
412
413 uid bt_newdup (BtDb *bt)
414 {
415 #ifdef unix
416         return __sync_fetch_and_add (bt->mgr->latchmgr->dups, 1) + 1;
417 #else
418         return _InterlockedIncrement64(bt->mgr->latchmgr->dups, 1);
419 #endif
420 }
421
422 //      Phase-Fair reader/writer lock implementation
423
424 void WriteLock (RWLock *lock)
425 {
426 ushort w, r, tix;
427
428 #ifdef unix
429         tix = __sync_fetch_and_add (lock->ticket, 1);
430 #else
431         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
432 #endif
433         // wait for our ticket to come up
434
435         while( tix != lock->serving[0] )
436 #ifdef unix
437                 sched_yield();
438 #else
439                 SwitchToThread ();
440 #endif
441
442         w = PRES | (tix & PHID);
443 #ifdef  unix
444         r = __sync_fetch_and_add (lock->rin, w);
445 #else
446         r = _InterlockedExchangeAdd16 (lock->rin, w);
447 #endif
448         while( r != *lock->rout )
449 #ifdef unix
450                 sched_yield();
451 #else
452                 SwitchToThread();
453 #endif
454 }
455
456 void WriteRelease (RWLock *lock)
457 {
458 #ifdef unix
459         __sync_fetch_and_and (lock->rin, ~MASK);
460 #else
461         _InterlockedAnd16 (lock->rin, ~MASK);
462 #endif
463         lock->serving[0]++;
464 }
465
466 void ReadLock (RWLock *lock)
467 {
468 ushort w;
469 #ifdef unix
470         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
471 #else
472         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
473 #endif
474         if( w )
475           while( w == (*lock->rin & MASK) )
476 #ifdef unix
477                 sched_yield ();
478 #else
479                 SwitchToThread ();
480 #endif
481 }
482
483 void ReadRelease (RWLock *lock)
484 {
485 #ifdef unix
486         __sync_fetch_and_add (lock->rout, RINC);
487 #else
488         _InterlockedExchangeAdd16 (lock->rout, RINC);
489 #endif
490 }
491
492 //      Spin Latch Manager
493
494 //      wait until write lock mode is clear
495 //      and add 1 to the share count
496
497 void bt_spinreadlock(BtSpinLatch *latch)
498 {
499 ushort prev;
500
501   do {
502 #ifdef unix
503         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
504 #else
505         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
506 #endif
507         //  see if exclusive request is granted or pending
508
509         if( !(prev & BOTH) )
510                 return;
511 #ifdef unix
512         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
513 #else
514         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
515 #endif
516 #ifdef  unix
517   } while( sched_yield(), 1 );
518 #else
519   } while( SwitchToThread(), 1 );
520 #endif
521 }
522
523 //      wait for other read and write latches to relinquish
524
525 void bt_spinwritelock(BtSpinLatch *latch)
526 {
527 ushort prev;
528
529   do {
530 #ifdef  unix
531         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
532 #else
533         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
534 #endif
535         if( !(prev & XCL) )
536           if( !(prev & ~BOTH) )
537                 return;
538           else
539 #ifdef unix
540                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
541 #else
542                 _InterlockedAnd16((ushort *)latch, ~XCL);
543 #endif
544 #ifdef  unix
545   } while( sched_yield(), 1 );
546 #else
547   } while( SwitchToThread(), 1 );
548 #endif
549 }
550
551 //      try to obtain write lock
552
553 //      return 1 if obtained,
554 //              0 otherwise
555
556 int bt_spinwritetry(BtSpinLatch *latch)
557 {
558 ushort prev;
559
560 #ifdef  unix
561         prev = __sync_fetch_and_or((ushort *)latch, XCL);
562 #else
563         prev = _InterlockedOr16((ushort *)latch, XCL);
564 #endif
565         //      take write access if all bits are clear
566
567         if( !(prev & XCL) )
568           if( !(prev & ~BOTH) )
569                 return 1;
570           else
571 #ifdef unix
572                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
573 #else
574                 _InterlockedAnd16((ushort *)latch, ~XCL);
575 #endif
576         return 0;
577 }
578
579 //      clear write mode
580
581 void bt_spinreleasewrite(BtSpinLatch *latch)
582 {
583 #ifdef unix
584         __sync_fetch_and_and((ushort *)latch, ~BOTH);
585 #else
586         _InterlockedAnd16((ushort *)latch, ~BOTH);
587 #endif
588 }
589
590 //      decrement reader count
591
592 void bt_spinreleaseread(BtSpinLatch *latch)
593 {
594 #ifdef unix
595         __sync_fetch_and_add((ushort *)latch, -SHARE);
596 #else
597         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
598 #endif
599 }
600
601 //      link latch table entry into latch hash table
602
603 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
604 {
605 BtLatchSet *set = bt->mgr->latchsets + victim;
606
607         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
608                 bt->mgr->latchsets[set->next].prev = victim;
609
610         bt->mgr->latchmgr->table[hashidx].slot = victim;
611         set->page_no = page_no;
612         set->hash = hashidx;
613         set->prev = 0;
614 }
615
616 //      release latch pin
617
618 void bt_unpinlatch (BtLatchSet *set)
619 {
620 #ifdef unix
621         __sync_fetch_and_add(&set->pin, -1);
622 #else
623         _InterlockedDecrement16 (&set->pin);
624 #endif
625 }
626
627 //      find existing latchset or inspire new one
628 //      return with latchset pinned
629
630 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
631 {
632 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
633 ushort slot, avail = 0, victim, idx;
634 BtLatchSet *set;
635
636         //  obtain read lock on hash table entry
637
638         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
639
640         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
641         {
642                 set = bt->mgr->latchsets + slot;
643                 if( page_no == set->page_no )
644                         break;
645         } while( slot = set->next );
646
647         if( slot ) {
648 #ifdef unix
649                 __sync_fetch_and_add(&set->pin, 1);
650 #else
651                 _InterlockedIncrement16 (&set->pin);
652 #endif
653         }
654
655     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
656
657         if( slot )
658                 return set;
659
660   //  try again, this time with write lock
661
662   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
663
664   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
665   {
666         set = bt->mgr->latchsets + slot;
667         if( page_no == set->page_no )
668                 break;
669         if( !set->pin && !avail )
670                 avail = slot;
671   } while( slot = set->next );
672
673   //  found our entry, or take over an unpinned one
674
675   if( slot || (slot = avail) ) {
676         set = bt->mgr->latchsets + slot;
677 #ifdef unix
678         __sync_fetch_and_add(&set->pin, 1);
679 #else
680         _InterlockedIncrement16 (&set->pin);
681 #endif
682         set->page_no = page_no;
683         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
684         return set;
685   }
686
687         //  see if there are any unused entries
688 #ifdef unix
689         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
690 #else
691         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
692 #endif
693
694         if( victim < bt->mgr->latchmgr->latchtotal ) {
695                 set = bt->mgr->latchsets + victim;
696 #ifdef unix
697                 __sync_fetch_and_add(&set->pin, 1);
698 #else
699                 _InterlockedIncrement16 (&set->pin);
700 #endif
701                 bt_latchlink (bt, hashidx, victim, page_no);
702                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
703                 return set;
704         }
705
706 #ifdef unix
707         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
708 #else
709         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
710 #endif
711   //  find and reuse previous lock entry
712
713   while( 1 ) {
714 #ifdef unix
715         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
716 #else
717         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
718 #endif
719         //      we don't use slot zero
720
721         if( victim %= bt->mgr->latchmgr->latchtotal )
722                 set = bt->mgr->latchsets + victim;
723         else
724                 continue;
725
726         //      take control of our slot
727         //      from other threads
728
729         if( set->pin || !bt_spinwritetry (set->busy) )
730                 continue;
731
732         idx = set->hash;
733
734         // try to get write lock on hash chain
735         //      skip entry if not obtained
736         //      or has outstanding locks
737
738         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
739                 bt_spinreleasewrite (set->busy);
740                 continue;
741         }
742
743         if( set->pin ) {
744                 bt_spinreleasewrite (set->busy);
745                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
746                 continue;
747         }
748
749         //  unlink our available victim from its hash chain
750
751         if( set->prev )
752                 bt->mgr->latchsets[set->prev].next = set->next;
753         else
754                 bt->mgr->latchmgr->table[idx].slot = set->next;
755
756         if( set->next )
757                 bt->mgr->latchsets[set->next].prev = set->prev;
758
759         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
760 #ifdef unix
761         __sync_fetch_and_add(&set->pin, 1);
762 #else
763         _InterlockedIncrement16 (&set->pin);
764 #endif
765         bt_latchlink (bt, hashidx, victim, page_no);
766         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
767         bt_spinreleasewrite (set->busy);
768         return set;
769   }
770 }
771
772 void bt_mgrclose (BtMgr *mgr)
773 {
774 BtPool *pool;
775 uint slot;
776
777         // release mapped pages
778         //      note that slot zero is never used
779
780         for( slot = 1; slot < mgr->poolmax; slot++ ) {
781                 pool = mgr->pool + slot;
782                 if( pool->slot )
783 #ifdef unix
784                         munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
785 #else
786                 {
787                         FlushViewOfFile(pool->map, 0);
788                         UnmapViewOfFile(pool->map);
789                         CloseHandle(pool->hmap);
790                 }
791 #endif
792         }
793
794 #ifdef unix
795         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
796         munmap (mgr->latchmgr, 2 * mgr->page_size);
797 #else
798         FlushViewOfFile(mgr->latchmgr, 0);
799         UnmapViewOfFile(mgr->latchmgr);
800         CloseHandle(mgr->halloc);
801 #endif
802 #ifdef unix
803         close (mgr->idx);
804         free (mgr->pool);
805         free (mgr->hash);
806         free ((void *)mgr->latch);
807         free (mgr);
808 #else
809         FlushFileBuffers(mgr->idx);
810         CloseHandle(mgr->idx);
811         GlobalFree (mgr->pool);
812         GlobalFree (mgr->hash);
813         GlobalFree ((void *)mgr->latch);
814         GlobalFree (mgr);
815 #endif
816 }
817
818 //      close and release memory
819
820 void bt_close (BtDb *bt)
821 {
822 #ifdef unix
823         if( bt->mem )
824                 free (bt->mem);
825 #else
826         if( bt->mem)
827                 VirtualFree (bt->mem, 0, MEM_RELEASE);
828 #endif
829         free (bt);
830 }
831
832 //  open/create new btree buffer manager
833
834 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
835 //              size of mapped page pool (e.g. 8192)
836
837 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
838 {
839 uint lvl, attr, cacheblk, last, slot, idx;
840 uint nlatchpage, latchhash;
841 unsigned char value[BtId];
842 BtLatchMgr *latchmgr;
843 off64_t size;
844 uint amt[1];
845 BtMgr* mgr;
846 BtKey key;
847 BtVal val;
848 int flag;
849
850 #ifndef unix
851 SYSTEM_INFO sysinfo[1];
852 #endif
853
854         // determine sanity of page size and buffer pool
855
856         if( bits > BT_maxbits )
857                 bits = BT_maxbits;
858         else if( bits < BT_minbits )
859                 bits = BT_minbits;
860
861         if( !poolmax )
862                 return NULL;    // must have buffer pool
863
864 #ifdef unix
865         mgr = calloc (1, sizeof(BtMgr));
866
867         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
868
869         if( mgr->idx == -1 )
870                 return free(mgr), NULL;
871         
872         cacheblk = 4096;        // minimum mmap segment size for unix
873
874 #else
875         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
876         attr = FILE_ATTRIBUTE_NORMAL;
877         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
878
879         if( mgr->idx == INVALID_HANDLE_VALUE )
880                 return GlobalFree(mgr), NULL;
881
882         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
883         GetSystemInfo(sysinfo);
884         cacheblk = sysinfo->dwAllocationGranularity;
885 #endif
886
887 #ifdef unix
888         latchmgr = malloc (BT_maxpage);
889         *amt = 0;
890
891         // read minimum page size to get root info
892
893         if( size = lseek (mgr->idx, 0L, 2) ) {
894                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
895                         bits = latchmgr->alloc->bits;
896                 else
897                         return free(mgr), free(latchmgr), NULL;
898         } else if( mode == BT_ro )
899                 return free(latchmgr), bt_mgrclose (mgr), NULL;
900 #else
901         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
902         size = GetFileSize(mgr->idx, amt);
903
904         if( size || *amt ) {
905                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
906                         return bt_mgrclose (mgr), NULL;
907                 bits = latchmgr->alloc->bits;
908         } else if( mode == BT_ro )
909                 return bt_mgrclose (mgr), NULL;
910 #endif
911
912         mgr->page_size = 1 << bits;
913         mgr->page_bits = bits;
914
915         mgr->poolmax = poolmax;
916         mgr->mode = mode;
917
918         if( cacheblk < mgr->page_size )
919                 cacheblk = mgr->page_size;
920
921         //  mask for partial memmaps
922
923         mgr->poolmask = (cacheblk >> bits) - 1;
924
925         //      see if requested size of pages per memmap is greater
926
927         if( (1 << segsize) > mgr->poolmask )
928                 mgr->poolmask = (1 << segsize) - 1;
929
930         mgr->seg_bits = 0;
931
932         while( (1 << mgr->seg_bits) <= mgr->poolmask )
933                 mgr->seg_bits++;
934
935         mgr->hashsize = hashsize;
936
937 #ifdef unix
938         mgr->pool = calloc (poolmax, sizeof(BtPool));
939         mgr->hash = calloc (hashsize, sizeof(ushort));
940         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
941 #else
942         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
943         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
944         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
945 #endif
946
947         if( size || *amt )
948                 goto mgrlatch;
949
950         // initialize an empty b-tree with latch page, root page, page of leaves
951         // and page(s) of latches
952
953         memset (latchmgr, 0, 1 << bits);
954         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
955         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
956         latchmgr->alloc->bits = mgr->page_bits;
957
958         latchmgr->nlatchpage = nlatchpage;
959         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
960
961         //  initialize latch manager
962
963         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
964
965         //      size of hash table = total number of latchsets
966
967         if( latchhash > latchmgr->latchtotal )
968                 latchhash = latchmgr->latchtotal;
969
970         latchmgr->latchhash = latchhash;
971
972         //  initialize left-most LEAF page in
973         //      alloc->left.
974
975         bt_putid (latchmgr->alloc->left, LEAF_page);
976
977 #ifdef unix
978         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
979                 return bt_mgrclose (mgr), NULL;
980 #else
981         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
982                 return bt_mgrclose (mgr), NULL;
983
984         if( *amt < mgr->page_size )
985                 return bt_mgrclose (mgr), NULL;
986 #endif
987
988         memset (latchmgr, 0, 1 << bits);
989         latchmgr->alloc->bits = mgr->page_bits;
990
991         for( lvl=MIN_lvl; lvl--; ) {
992                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
993                 key = keyptr(latchmgr->alloc, 1);
994                 key->len = 2;           // create stopper key
995                 key->key[0] = 0xff;
996                 key->key[1] = 0xff;
997
998                 bt_putid(value, MIN_lvl - lvl + 1);
999                 val = valptr(latchmgr->alloc, 1);
1000                 val->len = lvl ? BtId : 0;
1001                 memcpy (val->value, value, val->len);
1002
1003                 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
1004                 latchmgr->alloc->lvl = lvl;
1005                 latchmgr->alloc->cnt = 1;
1006                 latchmgr->alloc->act = 1;
1007 #ifdef unix
1008                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
1009                         return bt_mgrclose (mgr), NULL;
1010 #else
1011                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1012                         return bt_mgrclose (mgr), NULL;
1013
1014                 if( *amt < mgr->page_size )
1015                         return bt_mgrclose (mgr), NULL;
1016 #endif
1017         }
1018
1019         // clear out latch manager locks
1020         //      and rest of pages to round out segment
1021
1022         memset(latchmgr, 0, mgr->page_size);
1023         last = MIN_lvl + 1;
1024
1025         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
1026 #ifdef unix
1027                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
1028 #else
1029                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
1030                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1031                         return bt_mgrclose (mgr), NULL;
1032                 if( *amt < mgr->page_size )
1033                         return bt_mgrclose (mgr), NULL;
1034 #endif
1035                 last++;
1036         }
1037
1038 mgrlatch:
1039 #ifdef unix
1040         // mlock the root page and the latchmgr page
1041
1042         flag = PROT_READ | PROT_WRITE;
1043         mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1044         if( mgr->latchmgr == MAP_FAILED )
1045                 return bt_mgrclose (mgr), NULL;
1046         mlock (mgr->latchmgr, 2 * mgr->page_size);
1047
1048         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1049         if( mgr->latchsets == MAP_FAILED )
1050                 return bt_mgrclose (mgr), NULL;
1051         mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1052 #else
1053         flag = PAGE_READWRITE;
1054         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1055         if( !mgr->halloc )
1056                 return bt_mgrclose (mgr), NULL;
1057
1058         flag = FILE_MAP_WRITE;
1059         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1060         if( !mgr->latchmgr )
1061                 return GetLastError(), bt_mgrclose (mgr), NULL;
1062
1063         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1064 #endif
1065
1066 #ifdef unix
1067         free (latchmgr);
1068 #else
1069         VirtualFree (latchmgr, 0, MEM_RELEASE);
1070 #endif
1071         return mgr;
1072 }
1073
1074 //      open BTree access method
1075 //      based on buffer manager
1076
1077 BtDb *bt_open (BtMgr *mgr)
1078 {
1079 BtDb *bt = malloc (sizeof(*bt));
1080
1081         memset (bt, 0, sizeof(*bt));
1082         bt->mgr = mgr;
1083 #ifdef unix
1084         bt->mem = malloc (2 *mgr->page_size);
1085 #else
1086         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1087 #endif
1088         bt->frame = (BtPage)bt->mem;
1089         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1090         return bt;
1091 }
1092
1093 //  compare two keys, returning > 0, = 0, or < 0
1094 //  as the comparison value
1095
1096 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1097 {
1098 uint len1 = key1->len;
1099 int ans;
1100
1101         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1102                 return ans;
1103
1104         if( len1 > len2 )
1105                 return 1;
1106         if( len1 < len2 )
1107                 return -1;
1108
1109         return 0;
1110 }
1111
1112 //      Buffer Pool mgr
1113
1114 // find segment in pool
1115 // must be called with hashslot idx locked
1116 //      return NULL if not there
1117 //      otherwise return node
1118
1119 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1120 {
1121 BtPool *pool;
1122 uint slot;
1123
1124         // compute start of hash chain in pool
1125
1126         if( slot = bt->mgr->hash[idx] ) 
1127                 pool = bt->mgr->pool + slot;
1128         else
1129                 return NULL;
1130
1131         page_no &= ~bt->mgr->poolmask;
1132
1133         while( pool->basepage != page_no )
1134           if( pool = pool->hashnext )
1135                 continue;
1136           else
1137                 return NULL;
1138
1139         return pool;
1140 }
1141
1142 // add segment to hash table
1143
1144 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1145 {
1146 BtPool *node;
1147 uint slot;
1148
1149         pool->hashprev = pool->hashnext = NULL;
1150         pool->basepage = page_no & ~bt->mgr->poolmask;
1151         pool->pin = CLOCK_bit + 1;
1152
1153         if( slot = bt->mgr->hash[idx] ) {
1154                 node = bt->mgr->pool + slot;
1155                 pool->hashnext = node;
1156                 node->hashprev = pool;
1157         }
1158
1159         bt->mgr->hash[idx] = pool->slot;
1160 }
1161
1162 //  map new buffer pool segment to virtual memory
1163
1164 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1165 {
1166 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1167 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1168 int flag;
1169
1170 #ifdef unix
1171         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1172         pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1173         if( pool->map == MAP_FAILED )
1174                 return bt->err = BTERR_map;
1175
1176 #else
1177         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1178         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1179         if( !pool->hmap )
1180                 return bt->err = BTERR_map;
1181
1182         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1183         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1184         if( !pool->map )
1185                 return bt->err = BTERR_map;
1186 #endif
1187         return bt->err = 0;
1188 }
1189
1190 //      calculate page within pool
1191
1192 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1193 {
1194 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1195 BtPage page;
1196
1197         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1198 //      madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1199         return page;
1200 }
1201
1202 //  release pool pin
1203
1204 void bt_unpinpool (BtPool *pool)
1205 {
1206 #ifdef unix
1207         __sync_fetch_and_add(&pool->pin, -1);
1208 #else
1209         _InterlockedDecrement16 (&pool->pin);
1210 #endif
1211 }
1212
1213 //      find or place requested page in segment-pool
1214 //      return pool table entry, incrementing pin
1215
1216 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1217 {
1218 uint slot, hashidx, idx, victim;
1219 BtPool *pool, *node, *next;
1220
1221         //      lock hash table chain
1222
1223         hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1224         bt_spinwritelock (&bt->mgr->latch[hashidx]);
1225
1226         //      look up in hash table
1227
1228         if( pool = bt_findpool(bt, page_no, hashidx) ) {
1229 #ifdef unix
1230                 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1231                 __sync_fetch_and_add(&pool->pin, 1);
1232 #else
1233                 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1234                 _InterlockedIncrement16 (&pool->pin);
1235 #endif
1236                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1237                 return pool;
1238         }
1239
1240         // allocate a new pool node
1241         // and add to hash table
1242
1243 #ifdef unix
1244         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1245 #else
1246         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1247 #endif
1248
1249         if( ++slot < bt->mgr->poolmax ) {
1250                 pool = bt->mgr->pool + slot;
1251                 pool->slot = slot;
1252
1253                 if( bt_mapsegment(bt, pool, page_no) )
1254                         return NULL;
1255
1256                 bt_linkhash(bt, pool, page_no, hashidx);
1257                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1258                 return pool;
1259         }
1260
1261         // pool table is full
1262         //      find best pool entry to evict
1263
1264 #ifdef unix
1265         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1266 #else
1267         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1268 #endif
1269
1270         while( 1 ) {
1271 #ifdef unix
1272                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1273 #else
1274                 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1275 #endif
1276                 victim %= bt->mgr->poolmax;
1277                 pool = bt->mgr->pool + victim;
1278                 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1279
1280                 if( !victim )
1281                         continue;
1282
1283                 // try to get write lock
1284                 //      skip entry if not obtained
1285
1286                 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1287                         continue;
1288
1289                 //      skip this entry if
1290                 //      page is pinned
1291                 //  or clock bit is set
1292
1293                 if( pool->pin ) {
1294 #ifdef unix
1295                         __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1296 #else
1297                         _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1298 #endif
1299                         bt_spinreleasewrite (&bt->mgr->latch[idx]);
1300                         continue;
1301                 }
1302
1303                 // unlink victim pool node from hash table
1304
1305                 if( node = pool->hashprev )
1306                         node->hashnext = pool->hashnext;
1307                 else if( node = pool->hashnext )
1308                         bt->mgr->hash[idx] = node->slot;
1309                 else
1310                         bt->mgr->hash[idx] = 0;
1311
1312                 if( node = pool->hashnext )
1313                         node->hashprev = pool->hashprev;
1314
1315                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1316
1317                 //      remove old file mapping
1318 #ifdef unix
1319                 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1320 #else
1321 //              FlushViewOfFile(pool->map, 0);
1322                 UnmapViewOfFile(pool->map);
1323                 CloseHandle(pool->hmap);
1324 #endif
1325                 pool->map = NULL;
1326
1327                 //  create new pool mapping
1328                 //  and link into hash table
1329
1330                 if( bt_mapsegment(bt, pool, page_no) )
1331                         return NULL;
1332
1333                 bt_linkhash(bt, pool, page_no, hashidx);
1334                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1335                 return pool;
1336         }
1337 }
1338
1339 // place write, read, or parent lock on requested page_no.
1340
1341 void bt_lockpage(BtLock mode, BtLatchSet *set)
1342 {
1343         switch( mode ) {
1344         case BtLockRead:
1345                 ReadLock (set->readwr);
1346                 break;
1347         case BtLockWrite:
1348                 WriteLock (set->readwr);
1349                 break;
1350         case BtLockAccess:
1351                 ReadLock (set->access);
1352                 break;
1353         case BtLockDelete:
1354                 WriteLock (set->access);
1355                 break;
1356         case BtLockParent:
1357                 WriteLock (set->parent);
1358                 break;
1359         }
1360 }
1361
1362 // remove write, read, or parent lock on requested page
1363
1364 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1365 {
1366         switch( mode ) {
1367         case BtLockRead:
1368                 ReadRelease (set->readwr);
1369                 break;
1370         case BtLockWrite:
1371                 WriteRelease (set->readwr);
1372                 break;
1373         case BtLockAccess:
1374                 ReadRelease (set->access);
1375                 break;
1376         case BtLockDelete:
1377                 WriteRelease (set->access);
1378                 break;
1379         case BtLockParent:
1380                 WriteRelease (set->parent);
1381                 break;
1382         }
1383 }
1384
1385 //      allocate a new page
1386
1387 BTERR bt_newpage(BtDb *bt, BtPageSet *set)
1388 {
1389 int blk;
1390
1391         //      lock allocation page
1392
1393         bt_spinwritelock(bt->mgr->latchmgr->lock);
1394
1395         // use empty chain first
1396         // else allocate empty page
1397
1398         if( set->page_no = bt_getid(bt->mgr->latchmgr->chain) ) {
1399                 if( set->pool = bt_pinpool (bt, set->page_no) )
1400                         set->page = bt_page (bt, set->pool, set->page_no);
1401                 else
1402                         return bt->err = BTERR_struct;
1403
1404                 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1405         } else {
1406                 set->page_no = bt_getid(bt->mgr->latchmgr->alloc->right);
1407                 bt_putid(bt->mgr->latchmgr->alloc->right, set->page_no+1);
1408 #ifdef unix
1409                 // if writing first page of pool block, set file length thru last page
1410
1411                 if( (set->page_no & bt->mgr->poolmask) == 0 )
1412                         ftruncate (bt->mgr->idx, (set->page_no + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1413 #endif
1414                 if( set->pool = bt_pinpool (bt, set->page_no) )
1415                         set->page = bt_page (bt, set->pool, set->page_no);
1416                 else
1417                         return bt->err;
1418         }
1419
1420         // unlock allocation latch
1421
1422         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1423         return 0;
1424 }
1425
1426 //  find slot in page for given key at a given level
1427
1428 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1429 {
1430 uint diff, higher = set->page->cnt, low = 1, slot;
1431 uint good = 0;
1432
1433         //        make stopper key an infinite fence value
1434
1435         if( bt_getid (set->page->right) )
1436                 higher++;
1437         else
1438                 good++;
1439
1440         //      low is the lowest candidate.
1441         //  loop ends when they meet
1442
1443         //  higher is already
1444         //      tested as .ge. the passed key.
1445
1446         while( diff = higher - low ) {
1447                 slot = low + ( diff >> 1 );
1448                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1449                         low = slot + 1;
1450                 else
1451                         higher = slot, good++;
1452         }
1453
1454         //      return zero if key is on right link page
1455
1456         return good ? higher : 0;
1457 }
1458
1459 //  find and load page at given level for given key
1460 //      leave page rd or wr locked as requested
1461
1462 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1463 {
1464 uid page_no = ROOT_page, prevpage = 0;
1465 uint drill = 0xff, slot;
1466 BtLatchSet *prevlatch;
1467 uint mode, prevmode;
1468 BtPool *prevpool;
1469
1470   //  start at root of btree and drill down
1471
1472   do {
1473         // determine lock mode of drill level
1474         mode = (drill == lvl) ? lock : BtLockRead; 
1475
1476         set->latch = bt_pinlatch (bt, page_no);
1477         set->page_no = page_no;
1478
1479         // pin page contents
1480
1481         if( set->pool = bt_pinpool (bt, page_no) )
1482                 set->page = bt_page (bt, set->pool, page_no);
1483         else
1484                 return 0;
1485
1486         // obtain access lock using lock chaining with Access mode
1487
1488         if( page_no > ROOT_page )
1489           bt_lockpage(BtLockAccess, set->latch);
1490
1491         //      release & unpin parent page
1492
1493         if( prevpage ) {
1494           bt_unlockpage(prevmode, prevlatch);
1495           bt_unpinlatch (prevlatch);
1496           bt_unpinpool (prevpool);
1497           prevpage = 0;
1498         }
1499
1500         // obtain read lock using lock chaining
1501
1502         bt_lockpage(mode, set->latch);
1503
1504         if( set->page->free )
1505                 return bt->err = BTERR_struct, 0;
1506
1507         if( page_no > ROOT_page )
1508           bt_unlockpage(BtLockAccess, set->latch);
1509
1510         // re-read and re-lock root after determining actual level of root
1511
1512         if( set->page->lvl != drill) {
1513                 if( set->page_no != ROOT_page )
1514                         return bt->err = BTERR_struct, 0;
1515                         
1516                 drill = set->page->lvl;
1517
1518                 if( lock != BtLockRead && drill == lvl ) {
1519                   bt_unlockpage(mode, set->latch);
1520                   bt_unpinlatch (set->latch);
1521                   bt_unpinpool (set->pool);
1522                   continue;
1523                 }
1524         }
1525
1526         prevpage = set->page_no;
1527         prevlatch = set->latch;
1528         prevpool = set->pool;
1529         prevmode = mode;
1530
1531         //  find key on page at this level
1532         //  and descend to requested level
1533
1534         if( set->page->kill )
1535           goto slideright;
1536
1537         if( slot = bt_findslot (set, key, len) ) {
1538           if( drill == lvl )
1539                 return slot;
1540
1541           while( slotptr(set->page, slot)->dead )
1542                 if( slot++ < set->page->cnt )
1543                         continue;
1544                 else
1545                         goto slideright;
1546
1547           page_no = bt_getid(valptr(set->page, slot)->value);
1548           drill--;
1549           continue;
1550         }
1551
1552         //  or slide right into next page
1553
1554 slideright:
1555         page_no = bt_getid(set->page->right);
1556
1557   } while( page_no );
1558
1559   // return error on end of right chain
1560
1561   bt->err = BTERR_struct;
1562   return 0;     // return error
1563 }
1564
1565 //      return page to free list
1566 //      page must be delete & write locked
1567
1568 void bt_freepage (BtDb *bt, BtPageSet *set)
1569 {
1570         //      lock allocation page
1571
1572         bt_spinwritelock (bt->mgr->latchmgr->lock);
1573
1574         //      store chain
1575         memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1576         bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1577         set->page->free = 1;
1578
1579         // unlock released page
1580
1581         bt_unlockpage (BtLockDelete, set->latch);
1582         bt_unlockpage (BtLockWrite, set->latch);
1583         bt_unpinlatch (set->latch);
1584         bt_unpinpool (set->pool);
1585
1586         // unlock allocation page
1587
1588         bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1589 }
1590
1591 //      a fence key was deleted from a page
1592 //      push new fence value upwards
1593
1594 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1595 {
1596 unsigned char leftkey[256], rightkey[256];
1597 unsigned char value[BtId];
1598 BtKey ptr;
1599 uint idx;
1600
1601         // collapse empty slots beneath our slot
1602
1603         while( idx = set->page->cnt - 1 )
1604           if( slotptr(set->page, idx)->dead ) {
1605                 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1606                 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1607           } else
1608                 break;
1609
1610         //      remove the old fence value
1611
1612         ptr = keyptr(set->page, set->page->cnt);
1613         memcpy (rightkey, ptr, ptr->len + 1);
1614         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1615
1616         //  cache new fence value
1617
1618         ptr = keyptr(set->page, set->page->cnt);
1619         memcpy (leftkey, ptr, ptr->len + 1);
1620
1621         bt_lockpage (BtLockParent, set->latch);
1622         bt_unlockpage (BtLockWrite, set->latch);
1623
1624         //      insert new (now smaller) fence key
1625
1626         bt_putid (value, set->page_no);
1627
1628         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId, 1) )
1629           return bt->err;
1630
1631         //      now delete old fence key
1632
1633         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1) )
1634                 return bt->err;
1635
1636         bt_unlockpage (BtLockParent, set->latch);
1637         bt_unpinlatch(set->latch);
1638         bt_unpinpool (set->pool);
1639         return 0;
1640 }
1641
1642 //      root has a single child
1643 //      collapse a level from the tree
1644
1645 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1646 {
1647 BtPageSet child[1];
1648 uint idx;
1649
1650   // find the child entry and promote as new root contents
1651
1652   do {
1653         for( idx = 0; idx++ < root->page->cnt; )
1654           if( !slotptr(root->page, idx)->dead )
1655                 break;
1656
1657         child->page_no = bt_getid (valptr(root->page, idx)->value);
1658
1659         child->latch = bt_pinlatch (bt, child->page_no);
1660         bt_lockpage (BtLockDelete, child->latch);
1661         bt_lockpage (BtLockWrite, child->latch);
1662
1663         if( child->pool = bt_pinpool (bt, child->page_no) )
1664                 child->page = bt_page (bt, child->pool, child->page_no);
1665         else
1666                 return bt->err;
1667
1668         memcpy (root->page, child->page, bt->mgr->page_size);
1669         bt_freepage (bt, child);
1670
1671   } while( root->page->lvl > 1 && root->page->act == 1 );
1672
1673   bt_unlockpage (BtLockWrite, root->latch);
1674   bt_unpinlatch (root->latch);
1675   bt_unpinpool (root->pool);
1676   return 0;
1677 }
1678
1679 //      maintain reverse scan pointers by
1680 //      linking left pointer of far right node
1681
1682 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1683 {
1684 BtPageSet right2[1];
1685
1686         //      keep track of rightmost leaf page
1687
1688         if( !right_page_no ) {
1689           bt_putid (bt->mgr->latchmgr->alloc->left, left_page_no);
1690           return 0;
1691         }
1692
1693         //      link right page to left page
1694
1695         right2->latch = bt_pinlatch (bt, right_page_no);
1696         bt_lockpage (BtLockWrite, right2->latch);
1697
1698         if( right2->pool = bt_pinpool (bt, right_page_no) )
1699                 right2->page = bt_page (bt, right2->pool, right_page_no);
1700         else
1701                 return bt->err;
1702
1703         bt_putid(right2->page->left, left_page_no);
1704         bt_unlockpage (BtLockWrite, right2->latch);
1705         bt_unpinlatch (right2->latch);
1706         return 0;
1707 }
1708
1709 //  find and delete key on page by marking delete flag bit
1710 //  if page becomes empty, delete it from the btree
1711
1712 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1713 {
1714 unsigned char lowerfence[256], higherfence[256];
1715 BtPageSet set[1], right[1], right2[1];
1716 unsigned char value[BtId];
1717 uint slot, idx, found;
1718 BtKey ptr, tst;
1719 BtVal val;
1720
1721         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1722                 ptr = keyptr(set->page, slot);
1723         else
1724                 return bt->err;
1725
1726         // if librarian slot, advance to real slot
1727
1728         if( slotptr(set->page, slot)->type == Librarian )
1729                 ptr = keyptr(set->page, ++slot);
1730
1731         // if key is found delete it, otherwise ignore request
1732
1733         if( found = !keycmp (ptr, key, len) )
1734           if( found = slotptr(set->page, slot)->dead == 0 ) {
1735                 val = valptr(set->page,slot);
1736                 slotptr(set->page, slot)->dead = 1;
1737                 set->page->garbage += ptr->len + val->len + 2;
1738                 set->page->act--;
1739           }
1740
1741         //      did we delete a fence key in an upper level?
1742
1743         if( found && lvl && set->page->act && slot == set->page->cnt )
1744           if( bt_fixfence (bt, set, lvl) )
1745                 return bt->err;
1746           else
1747                 return bt->found = found, 0;
1748
1749         //      do we need to collapse root?
1750
1751         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1752           if( bt_collapseroot (bt, set) )
1753                 return bt->err;
1754           else
1755                 return bt->found = found, 0;
1756
1757         //      return if page is not empty
1758
1759         if( set->page->act ) {
1760                 bt_unlockpage(BtLockWrite, set->latch);
1761                 bt_unpinlatch (set->latch);
1762                 bt_unpinpool (set->pool);
1763                 return bt->found = found, 0;
1764         }
1765
1766         //      cache copy of fence key
1767         //      to post in parent
1768
1769         ptr = keyptr(set->page, set->page->cnt);
1770         memcpy (lowerfence, ptr, ptr->len + 1);
1771
1772         //      obtain lock on right page
1773
1774         right->page_no = bt_getid(set->page->right);
1775         right->latch = bt_pinlatch (bt, right->page_no);
1776         bt_lockpage (BtLockWrite, right->latch);
1777
1778         // pin page contents
1779
1780         if( right->pool = bt_pinpool (bt, right->page_no) )
1781                 right->page = bt_page (bt, right->pool, right->page_no);
1782         else
1783                 return bt->err;
1784
1785         if( right->page->kill )
1786                 return bt->err = BTERR_struct;
1787
1788         // transfer left link
1789
1790         memcpy (right->page->left, set->page->left, BtId);
1791
1792         // pull contents of right peer into our empty page
1793
1794         memcpy (set->page, right->page, bt->mgr->page_size);
1795
1796         // update left link
1797
1798         if( !lvl )
1799           if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1800                 return bt->err;
1801
1802         // cache copy of key to update
1803
1804         ptr = keyptr(right->page, right->page->cnt);
1805         memcpy (higherfence, ptr, ptr->len + 1);
1806
1807         // mark right page deleted and point it to left page
1808         //      until we can post parent updates
1809
1810         bt_putid (right->page->right, set->page_no);
1811         right->page->kill = 1;
1812
1813         bt_lockpage (BtLockParent, right->latch);
1814         bt_unlockpage (BtLockWrite, right->latch);
1815
1816         bt_lockpage (BtLockParent, set->latch);
1817         bt_unlockpage (BtLockWrite, set->latch);
1818
1819         // redirect higher key directly to our new node contents
1820
1821         bt_putid (value, set->page_no);
1822
1823         if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId, 1) )
1824           return bt->err;
1825
1826         //      delete old lower key to our node
1827
1828         if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1) )
1829           return bt->err;
1830
1831         //      obtain delete and write locks to right node
1832
1833         bt_unlockpage (BtLockParent, right->latch);
1834         bt_lockpage (BtLockDelete, right->latch);
1835         bt_lockpage (BtLockWrite, right->latch);
1836         bt_freepage (bt, right);
1837
1838         bt_unlockpage (BtLockParent, set->latch);
1839         bt_unpinlatch (set->latch);
1840         bt_unpinpool (set->pool);
1841         bt->found = found;
1842         return 0;
1843 }
1844
1845 BtKey bt_foundkey (BtDb *bt)
1846 {
1847         return (BtKey)(bt->key);
1848 }
1849
1850 //      advance to next slot
1851
1852 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1853 {
1854 BtLatchSet *prevlatch;
1855 BtPool *prevpool;
1856 uid page_no;
1857
1858         if( slot < set->page->cnt )
1859                 return slot + 1;
1860
1861         prevlatch = set->latch;
1862         prevpool = set->pool;
1863
1864         if( page_no = bt_getid(set->page->right) )
1865                 set->latch = bt_pinlatch (bt, page_no);
1866         else
1867                 return bt->err = BTERR_struct, 0;
1868
1869         // pin page contents
1870
1871         if( set->pool = bt_pinpool (bt, page_no) )
1872                 set->page = bt_page (bt, set->pool, page_no);
1873         else
1874                 return 0;
1875
1876         // obtain access lock using lock chaining with Access mode
1877
1878         bt_lockpage(BtLockAccess, set->latch);
1879
1880         bt_unlockpage(BtLockRead, prevlatch);
1881         bt_unpinlatch (prevlatch);
1882         bt_unpinpool (prevpool);
1883
1884         bt_lockpage(BtLockRead, set->latch);
1885         bt_unlockpage(BtLockAccess, set->latch);
1886
1887         set->page_no = page_no;
1888         return 1;
1889 }
1890
1891 //      find unique key or first duplicate key in
1892 //      leaf level and return number of value bytes
1893 //      or (-1) if not found.  Setup key for bt_foundkey
1894
1895 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1896 {
1897 BtPageSet set[1];
1898 uint len, slot;
1899 int ret = -1;
1900 BtKey ptr;
1901 BtVal val;
1902
1903   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1904    do {
1905         ptr = keyptr(set->page, slot);
1906
1907         //      skip librarian slot place holder
1908
1909         if( slotptr(set->page, slot)->type == Librarian )
1910                 ptr = keyptr(set->page, ++slot);
1911
1912         //      return actual key found
1913
1914         memcpy (bt->key, ptr, ptr->len + 1);
1915         len = ptr->len;
1916
1917         if( slotptr(set->page, slot)->type == Duplicate )
1918                 len -= BtId;
1919
1920         // if key exists, return >= 0 value bytes copied
1921         //      otherwise return (-1)
1922
1923         if( slotptr(set->page, slot)->dead )
1924                 continue;
1925
1926         if( keylen == len )
1927           if( !memcmp (ptr->key, key, len) ) {
1928                 val = valptr (set->page,slot);
1929                 if( valmax > val->len )
1930                         valmax = val->len;
1931                 memcpy (value, val->value, valmax);
1932                 ret = valmax;
1933           }
1934
1935         break;
1936
1937    } while( slot = bt_findnext (bt, set, slot) );
1938
1939   bt_unlockpage (BtLockRead, set->latch);
1940   bt_unpinlatch (set->latch);
1941   bt_unpinpool (set->pool);
1942   return ret;
1943 }
1944
1945 //      check page for space available,
1946 //      clean if necessary and return
1947 //      0 - page needs splitting
1948 //      >0  new slot value
1949
1950 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1951 {
1952 uint nxt = bt->mgr->page_size;
1953 uint cnt = 0, idx = 0;
1954 uint max = page->cnt;
1955 uint newslot = max;
1956 BtKey key;
1957 BtVal val;
1958
1959         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
1960                 return slot;
1961
1962         //      skip cleanup and proceed to split
1963         //      if there's not enough garbage
1964         //      to bother with.
1965
1966         if( page->garbage < nxt / 5 )
1967                 return 0;
1968
1969         memcpy (bt->frame, page, bt->mgr->page_size);
1970
1971         // skip page info and set rest of page to zero
1972
1973         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1974         page->garbage = 0;
1975         page->act = 0;
1976
1977         // clean up page first by
1978         // removing deleted keys
1979
1980         while( cnt++ < max ) {
1981                 if( cnt == slot )
1982                         newslot = idx + 2;
1983                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1984                         continue;
1985
1986                 // copy the value across
1987
1988                 val = valptr(bt->frame, cnt);
1989                 nxt -= val->len + 1;
1990                 ((unsigned char *)page)[nxt] = val->len;
1991                 memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
1992
1993                 // copy the key across
1994
1995                 key = keyptr(bt->frame, cnt);
1996                 nxt -= key->len + 1;
1997                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1998
1999                 // make a librarian slot
2000
2001                 if( idx ) {
2002                         slotptr(page, ++idx)->off = nxt;
2003                         slotptr(page, idx)->type = Librarian;
2004                         slotptr(page, idx)->dead = 1;
2005                 }
2006
2007                 // set up the slot
2008
2009                 slotptr(page, ++idx)->off = nxt;
2010                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2011
2012                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2013                         page->act++;
2014         }
2015
2016         page->min = nxt;
2017         page->cnt = idx;
2018
2019         //      see if page has enough space now, or does it need splitting?
2020
2021         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
2022                 return newslot;
2023
2024         return 0;
2025 }
2026
2027 // split the root and raise the height of the btree
2028
2029 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, BtPageSet *right)
2030 {
2031 uint nxt = bt->mgr->page_size;
2032 unsigned char value[BtId];
2033 BtPageSet left[1];
2034
2035         //  Obtain an empty page to use, and copy the current
2036         //  root contents into it, e.g. lower keys
2037
2038         if( bt_newpage(bt, left) )
2039                 return bt->err;
2040
2041         memcpy(left->page, root->page, bt->mgr->page_size);
2042         bt_unpinpool (left->pool);
2043
2044         // set left from higher to lower page
2045
2046         bt_putid (right->page->left, left->page_no);
2047         bt_unpinpool (right->pool);
2048
2049         // preserve the page info at the bottom
2050         // of higher keys and set rest to zero
2051
2052         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2053
2054         // insert lower keys page fence key on newroot page as first key
2055
2056         nxt -= BtId + 1;
2057         bt_putid (value, left->page_no);
2058         ((unsigned char *)root->page)[nxt] = BtId;
2059         memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
2060
2061         nxt -= *leftkey + 1;
2062         memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
2063         slotptr(root->page, 1)->off = nxt;
2064         
2065         // insert stopper key on newroot page
2066         // and increase the root height
2067
2068         nxt -= 3 + BtId + 1;
2069         ((unsigned char *)root->page)[nxt] = 2;
2070         ((unsigned char *)root->page)[nxt+1] = 0xff;
2071         ((unsigned char *)root->page)[nxt+2] = 0xff;
2072
2073         bt_putid (value, right->page_no);
2074         ((unsigned char *)root->page)[nxt+3] = BtId;
2075         memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
2076         slotptr(root->page, 2)->off = nxt;
2077
2078         bt_putid(root->page->right, 0);
2079         root->page->min = nxt;          // reset lowest used offset and key count
2080         root->page->cnt = 2;
2081         root->page->act = 2;
2082         root->page->lvl++;
2083
2084         // release and unpin root
2085
2086         bt_unlockpage(BtLockWrite, root->latch);
2087         bt_unpinlatch (root->latch);
2088         bt_unpinpool (root->pool);
2089         return 0;
2090 }
2091
2092 //  split already locked full node
2093 //      return unlocked.
2094
2095 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
2096 {
2097 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2098 unsigned char fencekey[256], rightkey[256];
2099 unsigned char value[BtId];
2100 uint lvl = set->page->lvl;
2101 BtPageSet right[1];
2102 uid right2;
2103 uint prev;
2104 BtKey key;
2105 BtVal val;
2106
2107         //  split higher half of keys to bt->frame
2108
2109         memset (bt->frame, 0, bt->mgr->page_size);
2110         max = set->page->cnt;
2111         cnt = max / 2;
2112         idx = 0;
2113
2114         while( cnt++ < max ) {
2115                 if( slotptr(set->page, cnt)->dead && cnt < max )
2116                         continue;
2117                 val = valptr(set->page, cnt);
2118                 nxt -= val->len + 1;
2119                 ((unsigned char *)bt->frame)[nxt] = val->len;
2120                 memcpy ((unsigned char *)bt->frame + nxt + 1, val->value, val->len);
2121
2122                 key = keyptr(set->page, cnt);
2123                 nxt -= key->len + 1;
2124                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
2125
2126                 //      add librarian slot
2127
2128                 if( idx ) {
2129                         slotptr(bt->frame, ++idx)->off = nxt;
2130                         slotptr(bt->frame, idx)->type = Librarian;
2131                         slotptr(bt->frame, idx)->dead = 1;
2132                 }
2133
2134                 //  add actual slot
2135
2136                 slotptr(bt->frame, ++idx)->off = nxt;
2137                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2138
2139                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2140                         bt->frame->act++;
2141         }
2142
2143         // remember existing fence key for new page to the right
2144
2145         memcpy (rightkey, key, key->len + 1);
2146
2147         bt->frame->bits = bt->mgr->page_bits;
2148         bt->frame->min = nxt;
2149         bt->frame->cnt = idx;
2150         bt->frame->lvl = lvl;
2151
2152         // link right node
2153
2154         if( set->page_no > ROOT_page ) {
2155                 bt_putid (bt->frame->right, bt_getid (set->page->right));
2156                 bt_putid(bt->frame->left, set->page_no);
2157         }
2158
2159         //      get new free page and write higher keys to it.
2160
2161         if( bt_newpage(bt, right) )
2162                 return bt->err;
2163
2164         // link left node
2165
2166         if( set->page_no > ROOT_page && !lvl )
2167           if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
2168                 return bt->err;
2169
2170         memcpy (right->page, bt->frame, bt->mgr->page_size);
2171         bt_unpinpool (right->pool);
2172
2173         //      update lower keys to continue in old page
2174
2175         memcpy (bt->frame, set->page, bt->mgr->page_size);
2176         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2177         nxt = bt->mgr->page_size;
2178         set->page->garbage = 0;
2179         set->page->act = 0;
2180         max /= 2;
2181         cnt = 0;
2182         idx = 0;
2183
2184         if( slotptr(bt->frame, max)->type == Librarian )
2185                 max--;
2186
2187         //  assemble page of smaller keys
2188
2189         while( cnt++ < max ) {
2190                 if( slotptr(bt->frame, cnt)->dead )
2191                         continue;
2192                 val = valptr(bt->frame, cnt);
2193                 nxt -= val->len + 1;
2194                 ((unsigned char *)set->page)[nxt] = val->len;
2195                 memcpy ((unsigned char *)set->page + nxt + 1, val->value, val->len);
2196
2197                 key = keyptr(bt->frame, cnt);
2198                 nxt -= key->len + 1;
2199                 memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
2200
2201                 //      add librarian slot
2202
2203                 if( idx ) {
2204                         slotptr(set->page, ++idx)->off = nxt;
2205                         slotptr(set->page, idx)->type = Librarian;
2206                         slotptr(set->page, idx)->dead = 1;
2207                 }
2208
2209                 //      add actual slot
2210
2211                 slotptr(set->page, ++idx)->off = nxt;
2212                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2213                 set->page->act++;
2214         }
2215
2216         // remember fence key for smaller page
2217
2218         memcpy(fencekey, key, key->len + 1);
2219
2220         bt_putid(set->page->right, right->page_no);
2221         set->page->min = nxt;
2222         set->page->cnt = idx;
2223
2224         // if current page is the root page, split it
2225
2226         if( set->page_no == ROOT_page )
2227                 return bt_splitroot (bt, set, fencekey, right);
2228
2229         // insert new fences in their parent pages
2230
2231         right->latch = bt_pinlatch (bt, right->page_no);
2232         bt_lockpage (BtLockParent, right->latch);
2233
2234         bt_lockpage (BtLockParent, set->latch);
2235         bt_unlockpage (BtLockWrite, set->latch);
2236
2237         // insert new fence for reformulated left block of smaller keys
2238
2239         bt_putid (value, set->page_no);
2240
2241         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 1) )
2242                 return bt->err;
2243
2244         // switch fence for right block of larger keys to new right page
2245
2246         bt_putid (value, right->page_no);
2247
2248         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, 1) )
2249                 return bt->err;
2250
2251         bt_unlockpage (BtLockParent, set->latch);
2252         bt_unpinlatch (set->latch);
2253         bt_unpinpool (set->pool);
2254
2255         bt_unlockpage (BtLockParent, right->latch);
2256         bt_unpinlatch (right->latch);
2257         return 0;
2258 }
2259
2260 //      install new key and value onto page
2261 //      page must already be checked for
2262 //      adequate space
2263
2264 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2265 {
2266 uint idx, librarian;
2267 BtSlot *node;
2268
2269         //      if found slot > desired slot and previous slot
2270         //      is a librarian slot, use it
2271
2272         if( slot > 1 )
2273           if( slotptr(set->page, slot-1)->type == Librarian )
2274                 slot--;
2275
2276         // copy value onto page
2277
2278         set->page->min -= vallen + 1; // reset lowest used offset
2279         ((unsigned char *)set->page)[set->page->min] = vallen;
2280         memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
2281
2282         // copy key onto page
2283
2284         set->page->min -= keylen + 1; // reset lowest used offset
2285         ((unsigned char *)set->page)[set->page->min] = keylen;
2286         memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
2287
2288         //      find first empty slot
2289
2290         for( idx = slot; idx < set->page->cnt; idx++ )
2291           if( slotptr(set->page, idx)->dead )
2292                 break;
2293
2294         // now insert key into array before slot
2295
2296         if( idx == set->page->cnt )
2297                 idx += 2, set->page->cnt += 2, librarian = 2;
2298         else
2299                 librarian = 1;
2300
2301         set->page->act++;
2302
2303         while( idx > slot + librarian - 1 )
2304                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2305
2306         //      add librarian slot
2307
2308         if( librarian > 1 ) {
2309                 node = slotptr(set->page, slot++);
2310                 node->off = set->page->min;
2311                 node->type = Librarian;
2312                 node->dead = 1;
2313         }
2314
2315         //      fill in new slot
2316
2317         node = slotptr(set->page, slot);
2318         node->off = set->page->min;
2319         node->type = type;
2320         node->dead = 0;
2321
2322         bt_unlockpage (BtLockWrite, set->latch);
2323         bt_unpinlatch (set->latch);
2324         bt_unpinpool (set->pool);
2325         return 0;
2326 }
2327
2328 //  Insert new key into the btree at given level.
2329 //      either add a new key or update/add an existing one
2330
2331 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2332 {
2333 unsigned char newkey[256];
2334 uint slot, idx, len;
2335 BtPageSet set[1];
2336 uid sequence;
2337 BtKey ptr;
2338 BtVal val;
2339 uint type;
2340
2341   // set up the key we're working on
2342
2343   memcpy (newkey + 1, key, keylen);
2344   newkey[0] = keylen;
2345
2346   // is this a non-unique index value?
2347
2348   if( unique )
2349         type = Unique;
2350   else {
2351         type = Duplicate;
2352         sequence = bt_newdup (bt);
2353         bt_putid (newkey + *newkey + 1, sequence);
2354         *newkey += BtId;
2355   }
2356
2357   while( 1 ) { // find the page and slot for the current key
2358         if( slot = bt_loadpage (bt, set, newkey + 1, *newkey, lvl, BtLockWrite) )
2359                 ptr = keyptr(set->page, slot);
2360         else {
2361                 if( !bt->err )
2362                         bt->err = BTERR_ovflw;
2363                 return bt->err;
2364         }
2365
2366         // if librarian slot == found slot, advance to real slot
2367
2368         if( slotptr(set->page, slot)->type == Librarian )
2369           if( !keycmp (ptr, key, keylen) )
2370                 ptr = keyptr(set->page, ++slot);
2371
2372         len = ptr->len;
2373
2374         if( slotptr(set->page, slot)->type == Duplicate )
2375                 len -= BtId;
2376
2377         //  if inserting a duplicate key or unique key
2378         //      check for adequate space on the page
2379         //      and insert the new key before slot.
2380
2381         if( unique && (len != *newkey || memcmp (ptr->key, newkey+1, *newkey)) || !unique ) {
2382           if( !(slot = bt_cleanpage (bt, set->page, *newkey, slot, vallen)) )
2383                 if( bt_splitpage (bt, set) )
2384                   return bt->err;
2385                 else
2386                   continue;
2387
2388           return bt_insertslot (bt, set, slot, newkey + 1, *newkey, value, vallen, type);
2389         }
2390
2391         // if key already exists, update value and return
2392
2393         if( val = valptr(set->page, slot), val->len >= vallen ) {
2394                 if( slotptr(set->page, slot)->dead )
2395                         set->page->act++;
2396                 set->page->garbage += val->len - vallen;
2397                 slotptr(set->page, slot)->dead = 0;
2398                 val->len = vallen;
2399                 memcpy (val->value, value, vallen);
2400                 bt_unlockpage(BtLockWrite, set->latch);
2401                 bt_unpinlatch (set->latch);
2402                 bt_unpinpool (set->pool);
2403                 return 0;
2404         }
2405
2406         //      new update value doesn't fit in existing value area
2407
2408         if( !slotptr(set->page, slot)->dead )
2409                 set->page->garbage += val->len + ptr->len + 2;
2410         else {
2411                 slotptr(set->page, slot)->dead = 0;
2412                 set->page->act++;
2413         }
2414
2415         if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
2416           if( bt_splitpage (bt, set) )
2417                 return bt->err;
2418           else
2419                 continue;
2420
2421         set->page->min -= vallen + 1;
2422         ((unsigned char *)set->page)[set->page->min] = vallen;
2423         memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen);
2424
2425         set->page->min -= keylen + 1;
2426         ((unsigned char *)set->page)[set->page->min] = keylen;
2427         memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen);
2428         
2429         slotptr(set->page, slot)->off = set->page->min;
2430         bt_unlockpage(BtLockWrite, set->latch);
2431         bt_unpinlatch (set->latch);
2432         bt_unpinpool (set->pool);
2433         return 0;
2434   }
2435   return 0;
2436 }
2437
2438 //      set cursor to highest slot on highest page
2439
2440 uint bt_lastkey (BtDb *bt)
2441 {
2442 uid page_no = bt_getid (bt->mgr->latchmgr->alloc->left);
2443 BtPageSet set[1];
2444 uint slot;
2445
2446         if( set->pool = bt_pinpool (bt, page_no) )
2447                 set->page = bt_page (bt, set->pool, page_no);
2448         else
2449                 return 0;
2450
2451         set->latch = bt_pinlatch (bt, page_no);
2452     bt_lockpage(BtLockRead, set->latch);
2453
2454         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2455         slot = set->page->cnt;
2456
2457     bt_unlockpage(BtLockRead, set->latch);
2458         bt_unpinlatch (set->latch);
2459         bt_unpinpool (set->pool);
2460
2461         return slot;
2462 }
2463
2464 //      return previous slot on cursor page
2465
2466 uint bt_prevkey (BtDb *bt, uint slot)
2467 {
2468 BtPageSet set[1];
2469 uid left;
2470
2471         if( --slot )
2472                 return slot;
2473
2474         if( left = bt_getid(bt->cursor->left) )
2475                 bt->cursor_page = left;
2476         else
2477                 return 0;
2478
2479         if( set->pool = bt_pinpool (bt, left) )
2480                 set->page = bt_page (bt, set->pool, left);
2481         else
2482                 return 0;
2483
2484         set->latch = bt_pinlatch (bt, left);
2485     bt_lockpage(BtLockRead, set->latch);
2486
2487         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2488
2489         bt_unlockpage(BtLockRead, set->latch);
2490         bt_unpinlatch (set->latch);
2491         bt_unpinpool (set->pool);
2492         return bt->cursor->cnt;
2493 }
2494
2495 //  return next slot on cursor page
2496 //  or slide cursor right into next page
2497
2498 uint bt_nextkey (BtDb *bt, uint slot)
2499 {
2500 BtPageSet set[1];
2501 uid right;
2502
2503   do {
2504         right = bt_getid(bt->cursor->right);
2505
2506         while( slot++ < bt->cursor->cnt )
2507           if( slotptr(bt->cursor,slot)->dead )
2508                 continue;
2509           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2510                 return slot;
2511           else
2512                 break;
2513
2514         if( !right )
2515                 break;
2516
2517         bt->cursor_page = right;
2518
2519         if( set->pool = bt_pinpool (bt, right) )
2520                 set->page = bt_page (bt, set->pool, right);
2521         else
2522                 return 0;
2523
2524         set->latch = bt_pinlatch (bt, right);
2525     bt_lockpage(BtLockRead, set->latch);
2526
2527         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2528
2529         bt_unlockpage(BtLockRead, set->latch);
2530         bt_unpinlatch (set->latch);
2531         bt_unpinpool (set->pool);
2532         slot = 0;
2533
2534   } while( 1 );
2535
2536   return bt->err = 0;
2537 }
2538
2539 //  cache page of keys into cursor and return starting slot for given key
2540
2541 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2542 {
2543 BtPageSet set[1];
2544 uint slot;
2545
2546         // cache page for retrieval
2547
2548         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2549           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2550         else
2551           return 0;
2552
2553         bt->cursor_page = set->page_no;
2554
2555         bt_unlockpage(BtLockRead, set->latch);
2556         bt_unpinlatch (set->latch);
2557         bt_unpinpool (set->pool);
2558         return slot;
2559 }
2560
2561 BtKey bt_key(BtDb *bt, uint slot)
2562 {
2563         return keyptr(bt->cursor, slot);
2564 }
2565
2566 BtVal bt_val(BtDb *bt, uint slot)
2567 {
2568         return valptr(bt->cursor,slot);
2569 }
2570
2571 #ifdef STANDALONE
2572
2573 #ifndef unix
2574 double getCpuTime(int type)
2575 {
2576 FILETIME crtime[1];
2577 FILETIME xittime[1];
2578 FILETIME systime[1];
2579 FILETIME usrtime[1];
2580 SYSTEMTIME timeconv[1];
2581 double ans = 0;
2582
2583         memset (timeconv, 0, sizeof(SYSTEMTIME));
2584
2585         switch( type ) {
2586         case 0:
2587                 GetSystemTimeAsFileTime (xittime);
2588                 FileTimeToSystemTime (xittime, timeconv);
2589                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2590                 break;
2591         case 1:
2592                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2593                 FileTimeToSystemTime (usrtime, timeconv);
2594                 break;
2595         case 2:
2596                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2597                 FileTimeToSystemTime (systime, timeconv);
2598                 break;
2599         }
2600
2601         ans += (double)timeconv->wHour * 3600;
2602         ans += (double)timeconv->wMinute * 60;
2603         ans += (double)timeconv->wSecond;
2604         ans += (double)timeconv->wMilliseconds / 1000;
2605         return ans;
2606 }
2607 #else
2608 #include <time.h>
2609 #include <sys/resource.h>
2610
2611 double getCpuTime(int type)
2612 {
2613 struct rusage used[1];
2614 struct timeval tv[1];
2615
2616         switch( type ) {
2617         case 0:
2618                 gettimeofday(tv, NULL);
2619                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2620
2621         case 1:
2622                 getrusage(RUSAGE_SELF, used);
2623                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2624
2625         case 2:
2626                 getrusage(RUSAGE_SELF, used);
2627                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2628         }
2629
2630         return 0;
2631 }
2632 #endif
2633
2634 uint bt_latchaudit (BtDb *bt)
2635 {
2636 ushort idx, hashidx;
2637 uid next, page_no;
2638 BtLatchSet *latch;
2639 uint cnt = 0;
2640 BtKey ptr;
2641
2642 #ifdef unix
2643         posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2644 #endif
2645         if( *(ushort *)(bt->mgr->latchmgr->lock) )
2646                 fprintf(stderr, "Alloc page locked\n");
2647         *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2648
2649         for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2650                 latch = bt->mgr->latchsets + idx;
2651                 if( *latch->readwr->rin & MASK )
2652                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2653                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2654
2655                 if( *latch->access->rin & MASK )
2656                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2657                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2658
2659                 if( *latch->parent->rin & MASK )
2660                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2661                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2662
2663                 if( latch->pin ) {
2664                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2665                         latch->pin = 0;
2666                 }
2667         }
2668
2669         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2670           if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2671                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2672
2673           *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2674
2675           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2676                 latch = bt->mgr->latchsets + idx;
2677                 if( *(ushort *)latch->busy )
2678                         fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2679                 *(ushort *)latch->busy = 0;
2680                 if( latch->pin )
2681                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2682           } while( idx = latch->next );
2683         }
2684
2685         next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2686         page_no = LEAF_page;
2687
2688         while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2689         off64_t off = page_no << bt->mgr->page_bits;
2690 #ifdef unix
2691                 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2692 #else
2693                 DWORD amt[1];
2694
2695                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2696
2697                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2698                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2699
2700                   if( *amt <  bt->mgr->page_size )
2701                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2702 #endif
2703                 if( !bt->frame->free ) {
2704                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2705                   ptr = keyptr(bt->frame, idx+1);
2706                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2707                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2708                  }
2709                  if( !bt->frame->lvl )
2710                         cnt += bt->frame->act;
2711                 }
2712                 if( page_no > LEAF_page )
2713                         next = page_no + 1;
2714                 page_no = next;
2715         }
2716         return cnt - 1;
2717 }
2718
2719 typedef struct {
2720         char idx;
2721         char *type;
2722         char *infile;
2723         BtMgr *mgr;
2724         int num;
2725 } ThreadArg;
2726
2727 //  standalone program to index file of keys
2728 //  then list them onto std-out
2729
2730 #ifdef unix
2731 void *index_file (void *arg)
2732 #else
2733 uint __stdcall index_file (void *arg)
2734 #endif
2735 {
2736 int line = 0, found = 0, cnt = 0;
2737 uid next, page_no = LEAF_page;  // start on first page of leaves
2738 unsigned char key[256];
2739 ThreadArg *args = arg;
2740 int ch, len = 0, slot;
2741 BtPageSet set[1];
2742 BtKey ptr;
2743 BtDb *bt;
2744 FILE *in;
2745
2746         bt = bt_open (args->mgr);
2747
2748         switch(args->type[0] | 0x20)
2749         {
2750         case 'a':
2751                 fprintf(stderr, "started latch mgr audit\n");
2752                 cnt = bt_latchaudit (bt);
2753                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2754                 break;
2755
2756         case 'p':
2757                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2758                 if( in = fopen (args->infile, "rb") )
2759                   while( ch = getc(in), ch != EOF )
2760                         if( ch == '\n' )
2761                         {
2762                           line++;
2763
2764                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 0) )
2765                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2766                           len = 0;
2767                         }
2768                         else if( len < 255 )
2769                                 key[len++] = ch;
2770                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2771                 break;
2772
2773         case 'w':
2774                 fprintf(stderr, "started indexing for %s\n", args->infile);
2775                 if( in = fopen (args->infile, "rb") )
2776                   while( ch = getc(in), ch != EOF )
2777                         if( ch == '\n' )
2778                         {
2779                           line++;
2780
2781                           if( args->num == 1 )
2782                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2783
2784                           else if( args->num )
2785                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2786
2787                           if( bt_insertkey (bt, key, len, 0, NULL, 0, 0) )
2788                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2789                           len = 0;
2790                         }
2791                         else if( len < 255 )
2792                                 key[len++] = ch;
2793                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2794                 break;
2795
2796         case 'd':
2797                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2798                 if( in = fopen (args->infile, "rb") )
2799                   while( ch = getc(in), ch != EOF )
2800                         if( ch == '\n' )
2801                         {
2802                           line++;
2803                           if( args->num == 1 )
2804                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2805
2806                           else if( args->num )
2807                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2808
2809                           if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2810                                 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2811                           ptr = (BtKey)(bt->key);
2812                           found++;
2813
2814                           if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2815                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2816                           len = 0;
2817                         }
2818                         else if( len < 255 )
2819                                 key[len++] = ch;
2820                 fprintf(stderr, "finished %s for %d keys, %d found\n", args->infile, line, found);
2821                 break;
2822
2823         case 'f':
2824                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2825                 if( in = fopen (args->infile, "rb") )
2826                   while( ch = getc(in), ch != EOF )
2827                         if( ch == '\n' )
2828                         {
2829                           line++;
2830                           if( args->num == 1 )
2831                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2832
2833                           else if( args->num )
2834                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2835
2836                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2837                                 found++;
2838                           else if( bt->err )
2839                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2840                           len = 0;
2841                         }
2842                         else if( len < 255 )
2843                                 key[len++] = ch;
2844                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2845                 break;
2846
2847         case 's':
2848                 fprintf(stderr, "started scanning\n");
2849                 do {
2850                         if( set->pool = bt_pinpool (bt, page_no) )
2851                                 set->page = bt_page (bt, set->pool, page_no);
2852                         else
2853                                 break;
2854                         set->latch = bt_pinlatch (bt, page_no);
2855                         bt_lockpage (BtLockRead, set->latch);
2856                         next = bt_getid (set->page->right);
2857
2858                         for( slot = 0; slot++ < set->page->cnt; )
2859                          if( next || slot < set->page->cnt )
2860                           if( !slotptr(set->page, slot)->dead ) {
2861                                 ptr = keyptr(set->page, slot);
2862                                 len = ptr->len;
2863
2864                             if( slotptr(set->page, slot)->type == Duplicate )
2865                                         len -= BtId;
2866
2867                                 fwrite (ptr->key, len, 1, stdout);
2868                                 fputc ('\n', stdout);
2869                                 cnt++;
2870                            }
2871
2872                         bt_unlockpage (BtLockRead, set->latch);
2873                         bt_unpinlatch (set->latch);
2874                         bt_unpinpool (set->pool);
2875                 } while( page_no = next );
2876
2877                 fprintf(stderr, " Total keys read %d\n", cnt);
2878                 break;
2879
2880         case 'r':
2881                 fprintf(stderr, "started reverse scan\n");
2882                 if( slot = bt_lastkey (bt) )
2883                    while( slot = bt_prevkey (bt, slot) ) {
2884                         if( slotptr(bt->cursor, slot)->dead )
2885                           continue;
2886
2887                         ptr = keyptr(bt->cursor, slot);
2888                         len = ptr->len;
2889
2890                         if( slotptr(bt->cursor, slot)->type == Duplicate )
2891                                 len -= BtId;
2892
2893                         fwrite (ptr->key, len, 1, stdout);
2894                         fputc ('\n', stdout);
2895                         cnt++;
2896                   }
2897
2898                 fprintf(stderr, " Total keys read %d\n", cnt);
2899                 break;
2900
2901         case 'c':
2902 #ifdef unix
2903                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2904 #endif
2905                 fprintf(stderr, "started counting\n");
2906                 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2907                 page_no = LEAF_page;
2908
2909                 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2910                 uid off = page_no << bt->mgr->page_bits;
2911 #ifdef unix
2912                   pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2913 #else
2914                 DWORD amt[1];
2915
2916                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2917
2918                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2919                         return bt->err = BTERR_map;
2920
2921                   if( *amt <  bt->mgr->page_size )
2922                         return bt->err = BTERR_map;
2923 #endif
2924                         if( !bt->frame->free && !bt->frame->lvl )
2925                                 cnt += bt->frame->act;
2926                         if( page_no > LEAF_page )
2927                                 next = page_no + 1;
2928                         page_no = next;
2929                 }
2930                 
2931                 cnt--;  // remove stopper key
2932                 fprintf(stderr, " Total keys read %d\n", cnt);
2933                 break;
2934         }
2935
2936         bt_close (bt);
2937 #ifdef unix
2938         return NULL;
2939 #else
2940         return 0;
2941 #endif
2942 }
2943
2944 typedef struct timeval timer;
2945
2946 int main (int argc, char **argv)
2947 {
2948 int idx, cnt, len, slot, err;
2949 int segsize, bits = 16;
2950 double start, stop;
2951 #ifdef unix
2952 pthread_t *threads;
2953 #else
2954 HANDLE *threads;
2955 #endif
2956 ThreadArg *args;
2957 uint poolsize = 0;
2958 float elapsed;
2959 int num = 0;
2960 char key[1];
2961 BtMgr *mgr;
2962 BtKey ptr;
2963 BtDb *bt;
2964
2965         if( argc < 3 ) {
2966                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2967                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2968                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2969                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2970                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2971                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2972                 exit(0);
2973         }
2974
2975         start = getCpuTime(0);
2976
2977         if( argc > 3 )
2978                 bits = atoi(argv[3]);
2979
2980         if( argc > 4 )
2981                 poolsize = atoi(argv[4]);
2982
2983         if( !poolsize )
2984                 fprintf (stderr, "Warning: no mapped_pool\n");
2985
2986         if( poolsize > 65535 )
2987                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2988
2989         if( argc > 5 )
2990                 segsize = atoi(argv[5]);
2991         else
2992                 segsize = 4;    // 16 pages per mmap segment
2993
2994         if( argc > 6 )
2995                 num = atoi(argv[6]);
2996
2997         cnt = argc - 7;
2998 #ifdef unix
2999         threads = malloc (cnt * sizeof(pthread_t));
3000 #else
3001         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3002 #endif
3003         args = malloc (cnt * sizeof(ThreadArg));
3004
3005         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
3006
3007         if( !mgr ) {
3008                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3009                 exit (1);
3010         }
3011
3012         //      fire off threads
3013
3014         for( idx = 0; idx < cnt; idx++ ) {
3015                 args[idx].infile = argv[idx + 7];
3016                 args[idx].type = argv[2];
3017                 args[idx].mgr = mgr;
3018                 args[idx].num = num;
3019                 args[idx].idx = idx;
3020 #ifdef unix
3021                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3022                         fprintf(stderr, "Error creating thread %d\n", err);
3023 #else
3024                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3025 #endif
3026         }
3027
3028         //      wait for termination
3029
3030 #ifdef unix
3031         for( idx = 0; idx < cnt; idx++ )
3032                 pthread_join (threads[idx], NULL);
3033 #else
3034         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3035
3036         for( idx = 0; idx < cnt; idx++ )
3037                 CloseHandle(threads[idx]);
3038
3039 #endif
3040         elapsed = getCpuTime(0) - start;
3041         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3042         elapsed = getCpuTime(1);
3043         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3044         elapsed = getCpuTime(2);
3045         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3046
3047         bt_mgrclose (mgr);
3048 }
3049
3050 #endif  //STANDALONE