]> pd.if.org Git - btree/blob - threadskv5.c
clean up atomic delete deadlock problem
[btree] / threadskv5.c
1 // btree version threadskv5 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      and bi-directional cursors
7
8 // 09 SEP 2014
9
10 // author: karl malbrain, malbrain@cal.berkeley.edu
11
12 /*
13 This work, including the source code, documentation
14 and related data, is placed into the public domain.
15
16 The orginal author is Karl Malbrain.
17
18 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
19 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
20 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
21 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
22 RESULTING FROM THE USE, MODIFICATION, OR
23 REDISTRIBUTION OF THIS SOFTWARE.
24 */
25
26 // Please see the project home page for documentation
27 // code.google.com/p/high-concurrency-btree
28
29 #define _FILE_OFFSET_BITS 64
30 #define _LARGEFILE64_SOURCE
31
32 #ifdef linux
33 #define _GNU_SOURCE
34 #endif
35
36 #ifdef unix
37 #include <unistd.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <fcntl.h>
41 #include <sys/time.h>
42 #include <sys/mman.h>
43 #include <errno.h>
44 #include <pthread.h>
45 #else
46 #define WIN32_LEAN_AND_MEAN
47 #include <windows.h>
48 #include <stdio.h>
49 #include <stdlib.h>
50 #include <time.h>
51 #include <fcntl.h>
52 #include <process.h>
53 #include <intrin.h>
54 #endif
55
56 #include <memory.h>
57 #include <string.h>
58 #include <stddef.h>
59
60 typedef unsigned long long      uid;
61
62 #ifndef unix
63 typedef unsigned long long      off64_t;
64 typedef unsigned short          ushort;
65 typedef unsigned int            uint;
66 #endif
67
68 #define BT_latchtable   128                                     // number of latch manager slots
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 /*
79 There are five lock types for each node in three independent sets: 
80 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
81 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
82 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
83 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
84 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
85 */
86
87 typedef enum{
88         BtLockAccess,
89         BtLockDelete,
90         BtLockRead,
91         BtLockWrite,
92         BtLockParent
93 } BtLock;
94
95 //      definition for phase-fair reader/writer lock implementation
96
97 typedef struct {
98         ushort rin[1];
99         ushort rout[1];
100         ushort ticket[1];
101         ushort serving[1];
102 } RWLock;
103
104 #define PHID 0x1
105 #define PRES 0x2
106 #define MASK 0x3
107 #define RINC 0x4
108
109 //      definition for spin latch implementation
110
111 // exclusive is set for write access
112 // share is count of read accessors
113 // grant write lock when share == 0
114
115 volatile typedef struct {
116         ushort exclusive:1;
117         ushort pending:1;
118         ushort share:14;
119 } BtSpinLatch;
120
121 #define XCL 1
122 #define PEND 2
123 #define BOTH 3
124 #define SHARE 4
125
126 //  hash table entries
127
128 typedef struct {
129         BtSpinLatch latch[1];
130         volatile ushort slot;           // Latch table entry at head of chain
131 } BtHashEntry;
132
133 //      latch manager table structure
134
135 typedef struct {
136         RWLock readwr[1];               // read/write page lock
137         RWLock access[1];               // Access Intent/Page delete
138         RWLock parent[1];               // Posting of fence key in parent
139         BtSpinLatch busy[1];            // slot is being moved between chains
140         volatile ushort next;           // next entry in hash table chain
141         volatile ushort prev;           // prev entry in hash table chain
142         volatile ushort pin;            // number of outstanding locks
143         volatile ushort hash;           // hash slot entry is under
144         volatile uid page_no;           // latch set page number
145 } BtLatchSet;
146
147 //      Define the length of the page record numbers
148
149 #define BtId 6
150
151 //      Page key slot definition.
152
153 //      Keys are marked dead, but remain on the page until
154 //      it cleanup is called. The fence key (highest key) for
155 //      a leaf page is always present, even after cleanup.
156
157 //      Slot types
158
159 //      In addition to the Unique keys that occupy slots
160 //      there are Librarian and Duplicate key
161 //      slots occupying the key slot array.
162
163 //      The Librarian slots are dead keys that
164 //      serve as filler, available to add new Unique
165 //      or Dup slots that are inserted into the B-tree.
166
167 //      The Duplicate slots have had their key bytes extended
168 //      by 6 bytes to contain a binary duplicate key uniqueifier.
169
170 typedef enum {
171         Unique,
172         Librarian,
173         Duplicate
174 } BtSlotType;
175
176 typedef struct {
177         uint off:BT_maxbits;    // page offset for key start
178         uint type:3;                    // type of slot
179         uint dead:1;                    // set for deleted slot
180 } BtSlot;
181
182 //      The key structure occupies space at the upper end of
183 //      each page.  It's a length byte followed by the key
184 //      bytes.
185
186 typedef struct {
187         unsigned char len;              // this can be changed to a ushort or uint
188         unsigned char key[0];
189 } BtKey;
190
191 //      the value structure also occupies space at the upper
192 //      end of the page. Each key is immediately followed by a value.
193
194 typedef struct {
195         unsigned char len;              // this can be changed to a ushort or uint
196         unsigned char value[0];
197 } BtVal;
198
199 #define BT_maxkey       255             // maximum number of bytes in a key
200 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
201
202 //      The first part of an index page.
203 //      It is immediately followed
204 //      by the BtSlot array of keys.
205
206 //      note that this structure size
207 //      must be a multiple of 8 bytes
208 //      in order to place dups correctly.
209
210 typedef struct BtPage_ {
211         uint cnt;                                       // count of keys in page
212         uint act;                                       // count of active keys
213         uint min;                                       // next key offset
214         uint garbage;                           // page garbage in bytes
215         unsigned char bits:7;           // page size in bits
216         unsigned char free:1;           // page is on free chain
217         unsigned char lvl:7;            // level of page
218         unsigned char kill:1;           // page is being deleted
219         unsigned char left[BtId];       // page number to left
220         unsigned char filler[2];        // padding to multiple of 8
221         unsigned char right[BtId];      // page number to right
222 } *BtPage;
223
224 //      The memory mapping pool table buffer manager entry
225
226 typedef struct {
227         uid  basepage;                          // mapped base page number
228         char *map;                                      // mapped memory pointer
229         ushort slot;                            // slot index in this array
230         ushort pin;                                     // mapped page pin counter
231         void *hashprev;                         // previous pool entry for the same hash idx
232         void *hashnext;                         // next pool entry for the same hash idx
233 #ifndef unix
234         HANDLE hmap;                            // Windows memory mapping handle
235 #endif
236 } BtPool;
237
238 #define CLOCK_bit 0x8000                // bit in pool->pin
239
240 //  The loadpage interface object
241
242 typedef struct {
243         uid page_no;            // current page number
244         BtPage page;            // current page pointer
245         BtPool *pool;           // current page pool
246         BtLatchSet *latch;      // current page latch set
247 } BtPageSet;
248
249 //      structure for latch manager on ALLOC_page
250
251 typedef struct {
252         struct BtPage_ alloc[1];        // next page_no in right ptr
253         unsigned long long dups[1];     // global duplicate key uniqueifier
254         unsigned char chain[BtId];      // head of free page_nos chain
255         BtSpinLatch lock[1];            // allocation area lite latch
256         ushort latchdeployed;           // highest number of latch entries deployed
257         ushort nlatchpage;                      // number of latch pages at BT_latch
258         ushort latchtotal;                      // number of page latch entries
259         ushort latchhash;                       // number of latch hash table slots
260         ushort latchvictim;                     // next latch entry to examine
261         BtHashEntry table[0];           // the hash table
262 } BtLatchMgr;
263
264 //      The object structure for Btree access
265
266 typedef struct {
267         uint page_size;                         // page size    
268         uint page_bits;                         // page size in bits    
269         uint seg_bits;                          // seg size in pages in bits
270         uint mode;                                      // read-write mode
271 #ifdef unix
272         int idx;
273 #else
274         HANDLE idx;
275 #endif
276         ushort poolcnt;                         // highest page pool node in use
277         ushort poolmax;                         // highest page pool node allocated
278         ushort poolmask;                        // total number of pages in mmap segment - 1
279         ushort hashsize;                        // size of Hash Table for pool entries
280         volatile uint evicted;          // last evicted hash table slot
281         ushort *hash;                           // pool index for hash entries
282         BtSpinLatch *latch;                     // latches for hash table slots
283         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
284         BtLatchSet *latchsets;          // mapped latch set from latch pages
285         BtPool *pool;                           // memory pool page segments
286 #ifndef unix
287         HANDLE halloc;                          // allocation and latch table handle
288 #endif
289 } BtMgr;
290
291 typedef struct {
292         BtMgr *mgr;                             // buffer manager for thread
293         BtPage cursor;                  // cached frame for start/next (never mapped)
294         BtPage frame;                   // spare frame for the page split (never mapped)
295         uid cursor_page;                                // current cursor page number   
296         unsigned char *mem;                             // frame, cursor, page memory buffer
297         unsigned char key[BT_keyarray]; // last found complete key
298         int found;                              // last delete or insert was found
299         int err;                                // last error
300 } BtDb;
301
302 typedef enum {
303         BTERR_ok = 0,
304         BTERR_struct,
305         BTERR_ovflw,
306         BTERR_lock,
307         BTERR_map,
308         BTERR_wrt,
309         BTERR_hash
310 } BTERR;
311
312 // B-Tree functions
313 extern void bt_close (BtDb *bt);
314 extern BtDb *bt_open (BtMgr *mgr);
315 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
316 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
317 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
318 extern BtKey *bt_foundkey (BtDb *bt);
319 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
320 extern uint bt_nextkey   (BtDb *bt, uint slot);
321
322 //      manager functions
323 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
324 void bt_mgrclose (BtMgr *mgr);
325
326 //  Helper functions to return slot values
327 //      from the cursor page.
328
329 extern BtKey *bt_key (BtDb *bt, uint slot);
330 extern BtVal *bt_val (BtDb *bt, uint slot);
331
332 //  BTree page number constants
333 #define ALLOC_page              0       // allocation & latch manager hash table
334 #define ROOT_page               1       // root of the btree
335 #define LEAF_page               2       // first page of leaves
336 #define LATCH_page              3       // pages for latch manager
337
338 //      Number of levels to create in a new BTree
339
340 #define MIN_lvl                 2
341
342 //  The page is allocated from low and hi ends.
343 //  The key slots are allocated from the bottom,
344 //      while the text and value of the key
345 //  are allocated from the top.  When the two
346 //  areas meet, the page is split into two.
347
348 //  A key consists of a length byte, two bytes of
349 //  index number (0 - 65534), and up to 253 bytes
350 //  of key value.
351
352 //  Associated with each key is a value byte string
353 //      containing any value desired.
354
355 //  The b-tree root is always located at page 1.
356 //      The first leaf page of level zero is always
357 //      located on page 2.
358
359 //      The b-tree pages are linked with next
360 //      pointers to facilitate enumerators,
361 //      and provide for concurrency.
362
363 //      When to root page fills, it is split in two and
364 //      the tree height is raised by a new root at page
365 //      one with two keys.
366
367 //      Deleted keys are marked with a dead bit until
368 //      page cleanup. The fence key for a leaf node is
369 //      always present
370
371 //  Groups of pages called segments from the btree are optionally
372 //  cached with a memory mapped pool. A hash table is used to keep
373 //  track of the cached segments.  This behaviour is controlled
374 //  by the cache block size parameter to bt_open.
375
376 //  To achieve maximum concurrency one page is locked at a time
377 //  as the tree is traversed to find leaf key in question. The right
378 //  page numbers are used in cases where the page is being split,
379 //      or consolidated.
380
381 //  Page 0 is dedicated to lock for new page extensions,
382 //      and chains empty pages together for reuse. It also
383 //      contains the latch manager hash table.
384
385 //      The ParentModification lock on a node is obtained to serialize posting
386 //      or changing the fence key for a node.
387
388 //      Empty pages are chained together through the ALLOC page and reused.
389
390 //      Access macros to address slot and key values from the page
391 //      Page slots use 1 based indexing.
392
393 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
394 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
395 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
396
397 void bt_putid(unsigned char *dest, uid id)
398 {
399 int i = BtId;
400
401         while( i-- )
402                 dest[i] = (unsigned char)id, id >>= 8;
403 }
404
405 uid bt_getid(unsigned char *src)
406 {
407 uid id = 0;
408 int i;
409
410         for( i = 0; i < BtId; i++ )
411                 id <<= 8, id |= *src++; 
412
413         return id;
414 }
415
416 uid bt_newdup (BtDb *bt)
417 {
418 #ifdef unix
419         return __sync_fetch_and_add (bt->mgr->latchmgr->dups, 1) + 1;
420 #else
421         return _InterlockedIncrement64(bt->mgr->latchmgr->dups, 1);
422 #endif
423 }
424
425 //      Phase-Fair reader/writer lock implementation
426
427 void WriteLock (RWLock *lock)
428 {
429 ushort w, r, tix;
430
431 #ifdef unix
432         tix = __sync_fetch_and_add (lock->ticket, 1);
433 #else
434         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
435 #endif
436         // wait for our ticket to come up
437
438         while( tix != lock->serving[0] )
439 #ifdef unix
440                 sched_yield();
441 #else
442                 SwitchToThread ();
443 #endif
444
445         w = PRES | (tix & PHID);
446 #ifdef  unix
447         r = __sync_fetch_and_add (lock->rin, w);
448 #else
449         r = _InterlockedExchangeAdd16 (lock->rin, w);
450 #endif
451         while( r != *lock->rout )
452 #ifdef unix
453                 sched_yield();
454 #else
455                 SwitchToThread();
456 #endif
457 }
458
459 void WriteRelease (RWLock *lock)
460 {
461 #ifdef unix
462         __sync_fetch_and_and (lock->rin, ~MASK);
463 #else
464         _InterlockedAnd16 (lock->rin, ~MASK);
465 #endif
466         lock->serving[0]++;
467 }
468
469 void ReadLock (RWLock *lock)
470 {
471 ushort w;
472 #ifdef unix
473         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
474 #else
475         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
476 #endif
477         if( w )
478           while( w == (*lock->rin & MASK) )
479 #ifdef unix
480                 sched_yield ();
481 #else
482                 SwitchToThread ();
483 #endif
484 }
485
486 void ReadRelease (RWLock *lock)
487 {
488 #ifdef unix
489         __sync_fetch_and_add (lock->rout, RINC);
490 #else
491         _InterlockedExchangeAdd16 (lock->rout, RINC);
492 #endif
493 }
494
495 //      Spin Latch Manager
496
497 //      wait until write lock mode is clear
498 //      and add 1 to the share count
499
500 void bt_spinreadlock(BtSpinLatch *latch)
501 {
502 ushort prev;
503
504   do {
505 #ifdef unix
506         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
507 #else
508         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
509 #endif
510         //  see if exclusive request is granted or pending
511
512         if( !(prev & BOTH) )
513                 return;
514 #ifdef unix
515         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
516 #else
517         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
518 #endif
519 #ifdef  unix
520   } while( sched_yield(), 1 );
521 #else
522   } while( SwitchToThread(), 1 );
523 #endif
524 }
525
526 //      wait for other read and write latches to relinquish
527
528 void bt_spinwritelock(BtSpinLatch *latch)
529 {
530 ushort prev;
531
532   do {
533 #ifdef  unix
534         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
535 #else
536         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
537 #endif
538         if( !(prev & XCL) )
539           if( !(prev & ~BOTH) )
540                 return;
541           else
542 #ifdef unix
543                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
544 #else
545                 _InterlockedAnd16((ushort *)latch, ~XCL);
546 #endif
547 #ifdef  unix
548   } while( sched_yield(), 1 );
549 #else
550   } while( SwitchToThread(), 1 );
551 #endif
552 }
553
554 //      try to obtain write lock
555
556 //      return 1 if obtained,
557 //              0 otherwise
558
559 int bt_spinwritetry(BtSpinLatch *latch)
560 {
561 ushort prev;
562
563 #ifdef  unix
564         prev = __sync_fetch_and_or((ushort *)latch, XCL);
565 #else
566         prev = _InterlockedOr16((ushort *)latch, XCL);
567 #endif
568         //      take write access if all bits are clear
569
570         if( !(prev & XCL) )
571           if( !(prev & ~BOTH) )
572                 return 1;
573           else
574 #ifdef unix
575                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
576 #else
577                 _InterlockedAnd16((ushort *)latch, ~XCL);
578 #endif
579         return 0;
580 }
581
582 //      clear write mode
583
584 void bt_spinreleasewrite(BtSpinLatch *latch)
585 {
586 #ifdef unix
587         __sync_fetch_and_and((ushort *)latch, ~BOTH);
588 #else
589         _InterlockedAnd16((ushort *)latch, ~BOTH);
590 #endif
591 }
592
593 //      decrement reader count
594
595 void bt_spinreleaseread(BtSpinLatch *latch)
596 {
597 #ifdef unix
598         __sync_fetch_and_add((ushort *)latch, -SHARE);
599 #else
600         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
601 #endif
602 }
603
604 //      link latch table entry into latch hash table
605
606 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
607 {
608 BtLatchSet *set = bt->mgr->latchsets + victim;
609
610         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
611                 bt->mgr->latchsets[set->next].prev = victim;
612
613         bt->mgr->latchmgr->table[hashidx].slot = victim;
614         set->page_no = page_no;
615         set->hash = hashidx;
616         set->prev = 0;
617 }
618
619 //      release latch pin
620
621 void bt_unpinlatch (BtLatchSet *set)
622 {
623 #ifdef unix
624         __sync_fetch_and_add(&set->pin, -1);
625 #else
626         _InterlockedDecrement16 (&set->pin);
627 #endif
628 }
629
630 //      find existing latchset or inspire new one
631 //      return with latchset pinned
632
633 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
634 {
635 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
636 ushort slot, avail = 0, victim, idx;
637 BtLatchSet *set;
638
639         //  obtain read lock on hash table entry
640
641         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
642
643         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
644         {
645                 set = bt->mgr->latchsets + slot;
646                 if( page_no == set->page_no )
647                         break;
648         } while( slot = set->next );
649
650         if( slot ) {
651 #ifdef unix
652                 __sync_fetch_and_add(&set->pin, 1);
653 #else
654                 _InterlockedIncrement16 (&set->pin);
655 #endif
656         }
657
658     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
659
660         if( slot )
661                 return set;
662
663   //  try again, this time with write lock
664
665   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
666
667   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
668   {
669         set = bt->mgr->latchsets + slot;
670         if( page_no == set->page_no )
671                 break;
672         if( !set->pin && !avail )
673                 avail = slot;
674   } while( slot = set->next );
675
676   //  found our entry, or take over an unpinned one
677
678   if( slot || (slot = avail) ) {
679         set = bt->mgr->latchsets + slot;
680 #ifdef unix
681         __sync_fetch_and_add(&set->pin, 1);
682 #else
683         _InterlockedIncrement16 (&set->pin);
684 #endif
685         set->page_no = page_no;
686         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
687         return set;
688   }
689
690         //  see if there are any unused entries
691 #ifdef unix
692         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
693 #else
694         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
695 #endif
696
697         if( victim < bt->mgr->latchmgr->latchtotal ) {
698                 set = bt->mgr->latchsets + victim;
699 #ifdef unix
700                 __sync_fetch_and_add(&set->pin, 1);
701 #else
702                 _InterlockedIncrement16 (&set->pin);
703 #endif
704                 bt_latchlink (bt, hashidx, victim, page_no);
705                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
706                 return set;
707         }
708
709 #ifdef unix
710         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
711 #else
712         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
713 #endif
714   //  find and reuse previous lock entry
715
716   while( 1 ) {
717 #ifdef unix
718         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
719 #else
720         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
721 #endif
722         //      we don't use slot zero
723
724         if( victim %= bt->mgr->latchmgr->latchtotal )
725                 set = bt->mgr->latchsets + victim;
726         else
727                 continue;
728
729         //      take control of our slot
730         //      from other threads
731
732         if( set->pin || !bt_spinwritetry (set->busy) )
733                 continue;
734
735         idx = set->hash;
736
737         // try to get write lock on hash chain
738         //      skip entry if not obtained
739         //      or has outstanding locks
740
741         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
742                 bt_spinreleasewrite (set->busy);
743                 continue;
744         }
745
746         if( set->pin ) {
747                 bt_spinreleasewrite (set->busy);
748                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
749                 continue;
750         }
751
752         //  unlink our available victim from its hash chain
753
754         if( set->prev )
755                 bt->mgr->latchsets[set->prev].next = set->next;
756         else
757                 bt->mgr->latchmgr->table[idx].slot = set->next;
758
759         if( set->next )
760                 bt->mgr->latchsets[set->next].prev = set->prev;
761
762         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
763 #ifdef unix
764         __sync_fetch_and_add(&set->pin, 1);
765 #else
766         _InterlockedIncrement16 (&set->pin);
767 #endif
768         bt_latchlink (bt, hashidx, victim, page_no);
769         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
770         bt_spinreleasewrite (set->busy);
771         return set;
772   }
773 }
774
775 void bt_mgrclose (BtMgr *mgr)
776 {
777 BtPool *pool;
778 uint slot;
779
780         // release mapped pages
781         //      note that slot zero is never used
782
783         for( slot = 1; slot < mgr->poolmax; slot++ ) {
784                 pool = mgr->pool + slot;
785                 if( pool->slot )
786 #ifdef unix
787                         munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
788 #else
789                 {
790                         FlushViewOfFile(pool->map, 0);
791                         UnmapViewOfFile(pool->map);
792                         CloseHandle(pool->hmap);
793                 }
794 #endif
795         }
796
797 #ifdef unix
798         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
799         munmap (mgr->latchmgr, 2 * mgr->page_size);
800 #else
801         FlushViewOfFile(mgr->latchmgr, 0);
802         UnmapViewOfFile(mgr->latchmgr);
803         CloseHandle(mgr->halloc);
804 #endif
805 #ifdef unix
806         close (mgr->idx);
807         free (mgr->pool);
808         free (mgr->hash);
809         free ((void *)mgr->latch);
810         free (mgr);
811 #else
812         FlushFileBuffers(mgr->idx);
813         CloseHandle(mgr->idx);
814         GlobalFree (mgr->pool);
815         GlobalFree (mgr->hash);
816         GlobalFree ((void *)mgr->latch);
817         GlobalFree (mgr);
818 #endif
819 }
820
821 //      close and release memory
822
823 void bt_close (BtDb *bt)
824 {
825 #ifdef unix
826         if( bt->mem )
827                 free (bt->mem);
828 #else
829         if( bt->mem)
830                 VirtualFree (bt->mem, 0, MEM_RELEASE);
831 #endif
832         free (bt);
833 }
834
835 //  open/create new btree buffer manager
836
837 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
838 //              size of mapped page pool (e.g. 8192)
839
840 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
841 {
842 uint lvl, attr, cacheblk, last, slot, idx;
843 uint nlatchpage, latchhash;
844 unsigned char value[BtId];
845 int flag, initit = 0;
846 BtLatchMgr *latchmgr;
847 off64_t size;
848 uint amt[1];
849 BtMgr* mgr;
850 BtKey* key;
851 BtVal *val;
852
853 #ifndef unix
854 SYSTEM_INFO sysinfo[1];
855 #endif
856
857         // determine sanity of page size and buffer pool
858
859         if( bits > BT_maxbits )
860                 bits = BT_maxbits;
861         else if( bits < BT_minbits )
862                 bits = BT_minbits;
863
864         if( !poolmax )
865                 return NULL;    // must have buffer pool
866
867 #ifdef unix
868         mgr = calloc (1, sizeof(BtMgr));
869
870         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
871
872         if( mgr->idx == -1 )
873                 return free(mgr), NULL;
874         
875         cacheblk = 4096;        // minimum mmap segment size for unix
876
877 #else
878         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
879         attr = FILE_ATTRIBUTE_NORMAL;
880         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
881
882         if( mgr->idx == INVALID_HANDLE_VALUE )
883                 return GlobalFree(mgr), NULL;
884
885         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
886         GetSystemInfo(sysinfo);
887         cacheblk = sysinfo->dwAllocationGranularity;
888 #endif
889
890 #ifdef unix
891         latchmgr = malloc (BT_maxpage);
892         *amt = 0;
893
894         // read minimum page size to get root info
895         //      to support raw disk partition files
896         //      check if bits == 0 on the disk.
897
898         if( size = lseek (mgr->idx, 0L, 2) ) {
899                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
900                         if( latchmgr->alloc->bits )
901                                 bits = latchmgr->alloc->bits;
902                         else
903                                 initit = 1;
904                 else
905                         return free(mgr), free(latchmgr), NULL;
906         } else if( mode == BT_ro )
907                 return free(latchmgr), bt_mgrclose (mgr), NULL;
908         else
909                 initit = 1;
910 #else
911         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
912         size = GetFileSize(mgr->idx, amt);
913
914         if( size || *amt ) {
915                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
916                         return bt_mgrclose (mgr), NULL;
917                 bits = latchmgr->alloc->bits;
918         } else if( mode == BT_ro )
919                 return bt_mgrclose (mgr), NULL;
920         else
921                 initit = 1;
922 #endif
923
924         mgr->page_size = 1 << bits;
925         mgr->page_bits = bits;
926
927         mgr->poolmax = poolmax;
928         mgr->mode = mode;
929
930         if( cacheblk < mgr->page_size )
931                 cacheblk = mgr->page_size;
932
933         //  mask for partial memmaps
934
935         mgr->poolmask = (cacheblk >> bits) - 1;
936
937         //      see if requested size of pages per memmap is greater
938
939         if( (1 << segsize) > mgr->poolmask )
940                 mgr->poolmask = (1 << segsize) - 1;
941
942         mgr->seg_bits = 0;
943
944         while( (1 << mgr->seg_bits) <= mgr->poolmask )
945                 mgr->seg_bits++;
946
947         mgr->hashsize = hashsize;
948
949 #ifdef unix
950         mgr->pool = calloc (poolmax, sizeof(BtPool));
951         mgr->hash = calloc (hashsize, sizeof(ushort));
952         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
953 #else
954         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
955         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
956         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
957 #endif
958
959         if( !initit )
960                 goto mgrlatch;
961
962         // initialize an empty b-tree with latch page, root page, page of leaves
963         // and page(s) of latches
964
965         memset (latchmgr, 0, 1 << bits);
966         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
967         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
968         latchmgr->alloc->bits = mgr->page_bits;
969
970         latchmgr->nlatchpage = nlatchpage;
971         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
972
973         //  initialize latch manager
974
975         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
976
977         //      size of hash table = total number of latchsets
978
979         if( latchhash > latchmgr->latchtotal )
980                 latchhash = latchmgr->latchtotal;
981
982         latchmgr->latchhash = latchhash;
983
984         //  initialize left-most LEAF page in
985         //      alloc->left.
986
987         bt_putid (latchmgr->alloc->left, LEAF_page);
988
989 #ifdef unix
990         if( pwrite (mgr->idx, latchmgr, mgr->page_size, 0) < mgr->page_size )
991                 return bt_mgrclose (mgr), NULL;
992 #else
993         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
994                 return bt_mgrclose (mgr), NULL;
995
996         if( *amt < mgr->page_size )
997                 return bt_mgrclose (mgr), NULL;
998 #endif
999
1000         memset (latchmgr, 0, 1 << bits);
1001         latchmgr->alloc->bits = mgr->page_bits;
1002
1003         for( lvl=MIN_lvl; lvl--; ) {
1004                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1005                 key = keyptr(latchmgr->alloc, 1);
1006                 key->len = 2;           // create stopper key
1007                 key->key[0] = 0xff;
1008                 key->key[1] = 0xff;
1009
1010                 bt_putid(value, MIN_lvl - lvl + 1);
1011                 val = valptr(latchmgr->alloc, 1);
1012                 val->len = lvl ? BtId : 0;
1013                 memcpy (val->value, value, val->len);
1014
1015                 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
1016                 latchmgr->alloc->lvl = lvl;
1017                 latchmgr->alloc->cnt = 1;
1018                 latchmgr->alloc->act = 1;
1019 #ifdef unix
1020                 if( pwrite (mgr->idx, latchmgr, mgr->page_size, (MIN_lvl - lvl) << bits) < mgr->page_size )
1021                         return bt_mgrclose (mgr), NULL;
1022 #else
1023                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1024                         return bt_mgrclose (mgr), NULL;
1025
1026                 if( *amt < mgr->page_size )
1027                         return bt_mgrclose (mgr), NULL;
1028 #endif
1029         }
1030
1031         // clear out latch manager locks
1032         //      and rest of pages to round out segment
1033
1034         memset(latchmgr, 0, mgr->page_size);
1035         last = MIN_lvl + 1;
1036
1037         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
1038 #ifdef unix
1039                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
1040 #else
1041                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
1042                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1043                         return bt_mgrclose (mgr), NULL;
1044                 if( *amt < mgr->page_size )
1045                         return bt_mgrclose (mgr), NULL;
1046 #endif
1047                 last++;
1048         }
1049
1050 mgrlatch:
1051 #ifdef unix
1052         // mlock the root page and the latchmgr page
1053
1054         flag = PROT_READ | PROT_WRITE;
1055         mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1056         if( mgr->latchmgr == MAP_FAILED )
1057                 return bt_mgrclose (mgr), NULL;
1058         mlock (mgr->latchmgr, 2 * mgr->page_size);
1059
1060         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1061         if( mgr->latchsets == MAP_FAILED )
1062                 return bt_mgrclose (mgr), NULL;
1063         mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1064 #else
1065         flag = PAGE_READWRITE;
1066         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1067         if( !mgr->halloc )
1068                 return bt_mgrclose (mgr), NULL;
1069
1070         flag = FILE_MAP_WRITE;
1071         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1072         if( !mgr->latchmgr )
1073                 return GetLastError(), bt_mgrclose (mgr), NULL;
1074
1075         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1076 #endif
1077
1078 #ifdef unix
1079         free (latchmgr);
1080 #else
1081         VirtualFree (latchmgr, 0, MEM_RELEASE);
1082 #endif
1083         return mgr;
1084 }
1085
1086 //      open BTree access method
1087 //      based on buffer manager
1088
1089 BtDb *bt_open (BtMgr *mgr)
1090 {
1091 BtDb *bt = malloc (sizeof(*bt));
1092
1093         memset (bt, 0, sizeof(*bt));
1094         bt->mgr = mgr;
1095 #ifdef unix
1096         bt->mem = malloc (2 *mgr->page_size);
1097 #else
1098         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1099 #endif
1100         bt->frame = (BtPage)bt->mem;
1101         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1102         return bt;
1103 }
1104
1105 //  compare two keys, returning > 0, = 0, or < 0
1106 //  as the comparison value
1107
1108 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1109 {
1110 uint len1 = key1->len;
1111 int ans;
1112
1113         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1114                 return ans;
1115
1116         if( len1 > len2 )
1117                 return 1;
1118         if( len1 < len2 )
1119                 return -1;
1120
1121         return 0;
1122 }
1123
1124 //      Buffer Pool mgr
1125
1126 // find segment in pool
1127 // must be called with hashslot idx locked
1128 //      return NULL if not there
1129 //      otherwise return node
1130
1131 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1132 {
1133 BtPool *pool;
1134 uint slot;
1135
1136         // compute start of hash chain in pool
1137
1138         if( slot = bt->mgr->hash[idx] ) 
1139                 pool = bt->mgr->pool + slot;
1140         else
1141                 return NULL;
1142
1143         page_no &= ~bt->mgr->poolmask;
1144
1145         while( pool->basepage != page_no )
1146           if( pool = pool->hashnext )
1147                 continue;
1148           else
1149                 return NULL;
1150
1151         return pool;
1152 }
1153
1154 // add segment to hash table
1155
1156 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1157 {
1158 BtPool *node;
1159 uint slot;
1160
1161         pool->hashprev = pool->hashnext = NULL;
1162         pool->basepage = page_no & ~bt->mgr->poolmask;
1163         pool->pin = CLOCK_bit + 1;
1164
1165         if( slot = bt->mgr->hash[idx] ) {
1166                 node = bt->mgr->pool + slot;
1167                 pool->hashnext = node;
1168                 node->hashprev = pool;
1169         }
1170
1171         bt->mgr->hash[idx] = pool->slot;
1172 }
1173
1174 //  map new buffer pool segment to virtual memory
1175
1176 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1177 {
1178 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1179 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1180 int flag;
1181
1182 #ifdef unix
1183         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1184         pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1185         if( pool->map == MAP_FAILED )
1186                 return bt->err = BTERR_map;
1187
1188 #else
1189         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1190         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1191         if( !pool->hmap )
1192                 return bt->err = BTERR_map;
1193
1194         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1195         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1196         if( !pool->map )
1197                 return bt->err = BTERR_map;
1198 #endif
1199         return bt->err = 0;
1200 }
1201
1202 //      calculate page within pool
1203
1204 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1205 {
1206 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1207 BtPage page;
1208
1209         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1210         return page;
1211 }
1212
1213 //  release pool pin
1214
1215 void bt_unpinpool (BtPool *pool)
1216 {
1217 #ifdef unix
1218         __sync_fetch_and_add(&pool->pin, -1);
1219 #else
1220         _InterlockedDecrement16 (&pool->pin);
1221 #endif
1222 }
1223
1224 //      find or place requested page in segment-pool
1225 //      return pool table entry, incrementing pin
1226
1227 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1228 {
1229 uint slot, hashidx, idx, victim;
1230 BtPool *pool, *node, *next;
1231
1232         //      lock hash table chain
1233
1234         hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1235         bt_spinwritelock (&bt->mgr->latch[hashidx]);
1236
1237         //      look up in hash table
1238
1239         if( pool = bt_findpool(bt, page_no, hashidx) ) {
1240 #ifdef unix
1241                 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1242                 __sync_fetch_and_add(&pool->pin, 1);
1243 #else
1244                 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1245                 _InterlockedIncrement16 (&pool->pin);
1246 #endif
1247                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1248                 return pool;
1249         }
1250
1251         // allocate a new pool node
1252         // and add to hash table
1253
1254 #ifdef unix
1255         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1256 #else
1257         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1258 #endif
1259
1260         if( ++slot < bt->mgr->poolmax ) {
1261                 pool = bt->mgr->pool + slot;
1262                 pool->slot = slot;
1263
1264                 if( bt_mapsegment(bt, pool, page_no) )
1265                         return NULL;
1266
1267                 bt_linkhash(bt, pool, page_no, hashidx);
1268                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1269                 return pool;
1270         }
1271
1272         // pool table is full
1273         //      find best pool entry to evict
1274
1275 #ifdef unix
1276         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1277 #else
1278         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1279 #endif
1280
1281         while( 1 ) {
1282 #ifdef unix
1283                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1284 #else
1285                 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1286 #endif
1287                 victim %= bt->mgr->poolmax;
1288                 pool = bt->mgr->pool + victim;
1289                 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1290
1291                 if( !victim )
1292                         continue;
1293
1294                 // try to get write lock
1295                 //      skip entry if not obtained
1296
1297                 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1298                         continue;
1299
1300                 //      skip this entry if
1301                 //      page is pinned
1302                 //  or clock bit is set
1303
1304                 if( pool->pin ) {
1305 #ifdef unix
1306                         __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1307 #else
1308                         _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1309 #endif
1310                         bt_spinreleasewrite (&bt->mgr->latch[idx]);
1311                         continue;
1312                 }
1313
1314                 // unlink victim pool node from hash table
1315
1316                 if( node = pool->hashprev )
1317                         node->hashnext = pool->hashnext;
1318                 else if( node = pool->hashnext )
1319                         bt->mgr->hash[idx] = node->slot;
1320                 else
1321                         bt->mgr->hash[idx] = 0;
1322
1323                 if( node = pool->hashnext )
1324                         node->hashprev = pool->hashprev;
1325
1326                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1327
1328                 //      remove old file mapping
1329 #ifdef unix
1330                 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1331 #else
1332 //              FlushViewOfFile(pool->map, 0);
1333                 UnmapViewOfFile(pool->map);
1334                 CloseHandle(pool->hmap);
1335 #endif
1336                 pool->map = NULL;
1337
1338                 //  create new pool mapping
1339                 //  and link into hash table
1340
1341                 if( bt_mapsegment(bt, pool, page_no) )
1342                         return NULL;
1343
1344                 bt_linkhash(bt, pool, page_no, hashidx);
1345                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1346                 return pool;
1347         }
1348 }
1349
1350 // place write, read, or parent lock on requested page_no.
1351
1352 void bt_lockpage(BtLock mode, BtLatchSet *set)
1353 {
1354         switch( mode ) {
1355         case BtLockRead:
1356                 ReadLock (set->readwr);
1357                 break;
1358         case BtLockWrite:
1359                 WriteLock (set->readwr);
1360                 break;
1361         case BtLockAccess:
1362                 ReadLock (set->access);
1363                 break;
1364         case BtLockDelete:
1365                 WriteLock (set->access);
1366                 break;
1367         case BtLockParent:
1368                 WriteLock (set->parent);
1369                 break;
1370         }
1371 }
1372
1373 // remove write, read, or parent lock on requested page
1374
1375 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1376 {
1377         switch( mode ) {
1378         case BtLockRead:
1379                 ReadRelease (set->readwr);
1380                 break;
1381         case BtLockWrite:
1382                 WriteRelease (set->readwr);
1383                 break;
1384         case BtLockAccess:
1385                 ReadRelease (set->access);
1386                 break;
1387         case BtLockDelete:
1388                 WriteRelease (set->access);
1389                 break;
1390         case BtLockParent:
1391                 WriteRelease (set->parent);
1392                 break;
1393         }
1394 }
1395
1396 //      allocate a new page
1397 //      return with page pinned.
1398
1399 BTERR bt_newpage(BtDb *bt, BtPageSet *set)
1400 {
1401 int blk;
1402
1403         //      lock allocation page
1404
1405         bt_spinwritelock(bt->mgr->latchmgr->lock);
1406
1407         // use empty chain first
1408         // else allocate empty page
1409
1410         if( set->page_no = bt_getid(bt->mgr->latchmgr->chain) ) {
1411                 if( set->pool = bt_pinpool (bt, set->page_no) )
1412                         set->page = bt_page (bt, set->pool, set->page_no);
1413                 else
1414                         return bt->err = BTERR_struct;
1415
1416                 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1417         } else {
1418                 set->page_no = bt_getid(bt->mgr->latchmgr->alloc->right);
1419                 bt_putid(bt->mgr->latchmgr->alloc->right, set->page_no+1);
1420 #ifdef unix
1421                 // if writing first page of pool block, set file length thru last page
1422
1423                 if( (set->page_no & bt->mgr->poolmask) == 0 )
1424                         ftruncate (bt->mgr->idx, (set->page_no + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1425 #endif
1426                 if( set->pool = bt_pinpool (bt, set->page_no) )
1427                         set->page = bt_page (bt, set->pool, set->page_no);
1428                 else
1429                         return bt->err;
1430         }
1431
1432         // unlock allocation latch
1433
1434         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1435         return 0;
1436 }
1437
1438 //  find slot in page for given key at a given level
1439
1440 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1441 {
1442 uint diff, higher = set->page->cnt, low = 1, slot;
1443 uint good = 0;
1444
1445         //        make stopper key an infinite fence value
1446
1447         if( bt_getid (set->page->right) )
1448                 higher++;
1449         else
1450                 good++;
1451
1452         //      low is the lowest candidate.
1453         //  loop ends when they meet
1454
1455         //  higher is already
1456         //      tested as .ge. the passed key.
1457
1458         while( diff = higher - low ) {
1459                 slot = low + ( diff >> 1 );
1460                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1461                         low = slot + 1;
1462                 else
1463                         higher = slot, good++;
1464         }
1465
1466         //      return zero if key is on right link page
1467
1468         return good ? higher : 0;
1469 }
1470
1471 //  find and load page at given level for given key
1472 //      leave page rd or wr locked as requested
1473
1474 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1475 {
1476 uid page_no = ROOT_page, prevpage = 0;
1477 uint drill = 0xff, slot;
1478 BtLatchSet *prevlatch;
1479 uint mode, prevmode;
1480 BtPool *prevpool;
1481
1482   //  start at root of btree and drill down
1483
1484   do {
1485         // determine lock mode of drill level
1486         mode = (drill == lvl) ? lock : BtLockRead; 
1487
1488         set->latch = bt_pinlatch (bt, page_no);
1489         set->page_no = page_no;
1490
1491         // pin page contents
1492
1493         if( set->pool = bt_pinpool (bt, page_no) )
1494                 set->page = bt_page (bt, set->pool, page_no);
1495         else
1496                 return 0;
1497
1498         // obtain access lock using lock chaining with Access mode
1499
1500         if( page_no > ROOT_page )
1501           bt_lockpage(BtLockAccess, set->latch);
1502
1503         //      release & unpin parent page
1504
1505         if( prevpage ) {
1506           bt_unlockpage(prevmode, prevlatch);
1507           bt_unpinlatch (prevlatch);
1508           bt_unpinpool (prevpool);
1509           prevpage = 0;
1510         }
1511
1512         // obtain read lock using lock chaining
1513
1514         bt_lockpage(mode, set->latch);
1515
1516         if( set->page->free )
1517                 return bt->err = BTERR_struct, 0;
1518
1519         if( page_no > ROOT_page )
1520           bt_unlockpage(BtLockAccess, set->latch);
1521
1522         // re-read and re-lock root after determining actual level of root
1523
1524         if( set->page->lvl != drill) {
1525                 if( set->page_no != ROOT_page )
1526                         return bt->err = BTERR_struct, 0;
1527                         
1528                 drill = set->page->lvl;
1529
1530                 if( lock != BtLockRead && drill == lvl ) {
1531                   bt_unlockpage(mode, set->latch);
1532                   bt_unpinlatch (set->latch);
1533                   bt_unpinpool (set->pool);
1534                   continue;
1535                 }
1536         }
1537
1538         prevpage = set->page_no;
1539         prevlatch = set->latch;
1540         prevpool = set->pool;
1541         prevmode = mode;
1542
1543         //  find key on page at this level
1544         //  and descend to requested level
1545
1546         if( set->page->kill )
1547           goto slideright;
1548
1549         if( slot = bt_findslot (set, key, len) ) {
1550           if( drill == lvl )
1551                 return slot;
1552
1553           while( slotptr(set->page, slot)->dead )
1554                 if( slot++ < set->page->cnt )
1555                         continue;
1556                 else
1557                         goto slideright;
1558
1559           page_no = bt_getid(valptr(set->page, slot)->value);
1560           drill--;
1561           continue;
1562         }
1563
1564         //  or slide right into next page
1565
1566 slideright:
1567         page_no = bt_getid(set->page->right);
1568
1569   } while( page_no );
1570
1571   // return error on end of right chain
1572
1573   bt->err = BTERR_struct;
1574   return 0;     // return error
1575 }
1576
1577 //      return page to free list
1578 //      page must be delete & write locked
1579
1580 void bt_freepage (BtDb *bt, BtPageSet *set)
1581 {
1582         //      lock allocation page
1583
1584         bt_spinwritelock (bt->mgr->latchmgr->lock);
1585
1586         //      store chain
1587         memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1588         bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1589         set->page->free = 1;
1590
1591         // unlock released page
1592
1593         bt_unlockpage (BtLockDelete, set->latch);
1594         bt_unlockpage (BtLockWrite, set->latch);
1595         bt_unpinlatch (set->latch);
1596         bt_unpinpool (set->pool);
1597
1598         // unlock allocation page
1599
1600         bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1601 }
1602
1603 //      a fence key was deleted from a page
1604 //      push new fence value upwards
1605
1606 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1607 {
1608 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1609 unsigned char value[BtId];
1610 BtKey* ptr;
1611 uint idx;
1612
1613         //      remove the old fence value
1614
1615         ptr = keyptr(set->page, set->page->cnt);
1616         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1617         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1618
1619         //  cache new fence value
1620
1621         ptr = keyptr(set->page, set->page->cnt);
1622         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1623
1624         bt_lockpage (BtLockParent, set->latch);
1625         bt_unlockpage (BtLockWrite, set->latch);
1626
1627         //      insert new (now smaller) fence key
1628
1629         bt_putid (value, set->page_no);
1630         ptr = (BtKey*)leftkey;
1631
1632         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1633           return bt->err;
1634
1635         //      now delete old fence key
1636
1637         ptr = (BtKey*)rightkey;
1638
1639         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1640                 return bt->err;
1641
1642         bt_unlockpage (BtLockParent, set->latch);
1643         bt_unpinlatch(set->latch);
1644         bt_unpinpool (set->pool);
1645         return 0;
1646 }
1647
1648 //      root has a single child
1649 //      collapse a level from the tree
1650
1651 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1652 {
1653 BtPageSet child[1];
1654 uint idx;
1655
1656   // find the child entry and promote as new root contents
1657
1658   do {
1659         for( idx = 0; idx++ < root->page->cnt; )
1660           if( !slotptr(root->page, idx)->dead )
1661                 break;
1662
1663         child->page_no = bt_getid (valptr(root->page, idx)->value);
1664
1665         child->latch = bt_pinlatch (bt, child->page_no);
1666         bt_lockpage (BtLockDelete, child->latch);
1667         bt_lockpage (BtLockWrite, child->latch);
1668
1669         if( child->pool = bt_pinpool (bt, child->page_no) )
1670                 child->page = bt_page (bt, child->pool, child->page_no);
1671         else
1672                 return bt->err;
1673
1674         memcpy (root->page, child->page, bt->mgr->page_size);
1675         bt_freepage (bt, child);
1676
1677   } while( root->page->lvl > 1 && root->page->act == 1 );
1678
1679   bt_unlockpage (BtLockWrite, root->latch);
1680   bt_unpinlatch (root->latch);
1681   bt_unpinpool (root->pool);
1682   return 0;
1683 }
1684
1685 //      maintain reverse scan pointers by
1686 //      linking left pointer of far right node
1687
1688 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1689 {
1690 BtPageSet right2[1];
1691
1692         //      keep track of rightmost leaf page
1693
1694         if( !right_page_no ) {
1695           bt_putid (bt->mgr->latchmgr->alloc->left, left_page_no);
1696           return 0;
1697         }
1698
1699         //      link right page to left page
1700
1701         right2->latch = bt_pinlatch (bt, right_page_no);
1702         bt_lockpage (BtLockWrite, right2->latch);
1703
1704         if( right2->pool = bt_pinpool (bt, right_page_no) )
1705                 right2->page = bt_page (bt, right2->pool, right_page_no);
1706         else
1707                 return bt->err;
1708
1709         bt_putid(right2->page->left, left_page_no);
1710         bt_unlockpage (BtLockWrite, right2->latch);
1711         bt_unpinlatch (right2->latch);
1712         bt_unpinpool (right2->pool);
1713         return 0;
1714 }
1715
1716 //  find and delete key on page by marking delete flag bit
1717 //  if page becomes empty, delete it from the btree
1718
1719 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1720 {
1721 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1722 uint slot, idx, found, fence;
1723 BtPageSet set[1], right[1];
1724 unsigned char value[BtId];
1725 BtKey *ptr, *tst;
1726 BtVal *val;
1727
1728         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1729                 ptr = keyptr(set->page, slot);
1730         else
1731                 return bt->err;
1732
1733         // if librarian slot, advance to real slot
1734
1735         if( slotptr(set->page, slot)->type == Librarian )
1736                 ptr = keyptr(set->page, ++slot);
1737
1738         fence = slot == set->page->cnt;
1739
1740         // if key is found delete it, otherwise ignore request
1741
1742         if( found = !keycmp (ptr, key, len) )
1743           if( found = slotptr(set->page, slot)->dead == 0 ) {
1744                 val = valptr(set->page,slot);
1745                 slotptr(set->page, slot)->dead = 1;
1746                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1747                 set->page->act--;
1748
1749                 // collapse empty slots beneath the fence
1750
1751                 while( idx = set->page->cnt - 1 )
1752                   if( slotptr(set->page, idx)->dead ) {
1753                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1754                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1755                   } else
1756                         break;
1757           }
1758
1759         //      did we delete a fence key in an upper level?
1760
1761         if( found && lvl && set->page->act && fence )
1762           if( bt_fixfence (bt, set, lvl) )
1763                 return bt->err;
1764           else
1765                 return bt->found = found, 0;
1766
1767         //      do we need to collapse root?
1768
1769         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1770           if( bt_collapseroot (bt, set) )
1771                 return bt->err;
1772           else
1773                 return bt->found = found, 0;
1774
1775         //      return if page is not empty
1776
1777         if( set->page->act ) {
1778                 bt_unlockpage(BtLockWrite, set->latch);
1779                 bt_unpinlatch (set->latch);
1780                 bt_unpinpool (set->pool);
1781                 return bt->found = found, 0;
1782         }
1783
1784         //      cache copy of fence key
1785         //      to post in parent
1786
1787         ptr = keyptr(set->page, set->page->cnt);
1788         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1789
1790         //      obtain lock on right page
1791
1792         right->page_no = bt_getid(set->page->right);
1793         right->latch = bt_pinlatch (bt, right->page_no);
1794         bt_lockpage (BtLockWrite, right->latch);
1795
1796         // pin page contents
1797
1798         if( right->pool = bt_pinpool (bt, right->page_no) )
1799                 right->page = bt_page (bt, right->pool, right->page_no);
1800         else
1801                 return bt->err;
1802
1803         if( right->page->kill )
1804                 return bt->err = BTERR_struct;
1805
1806         // transfer left link
1807
1808         memcpy (right->page->left, set->page->left, BtId);
1809
1810         // pull contents of right peer into our empty page
1811
1812         memcpy (set->page, right->page, bt->mgr->page_size);
1813
1814         // update left link
1815
1816         if( !lvl )
1817           if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1818                 return bt->err;
1819
1820         // cache copy of key to update
1821
1822         ptr = keyptr(right->page, right->page->cnt);
1823         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1824
1825         // mark right page deleted and point it to left page
1826         //      until we can post parent updates
1827
1828         bt_putid (right->page->right, set->page_no);
1829         right->page->kill = 1;
1830
1831         bt_lockpage (BtLockParent, right->latch);
1832         bt_unlockpage (BtLockWrite, right->latch);
1833
1834         bt_lockpage (BtLockParent, set->latch);
1835         bt_unlockpage (BtLockWrite, set->latch);
1836
1837         // redirect higher key directly to our new node contents
1838
1839         bt_putid (value, set->page_no);
1840         ptr = (BtKey*)higherfence;
1841
1842         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1843           return bt->err;
1844
1845         //      delete old lower key to our node
1846
1847         ptr = (BtKey*)lowerfence;
1848
1849         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1850           return bt->err;
1851
1852         //      obtain delete and write locks to right node
1853
1854         bt_unlockpage (BtLockParent, right->latch);
1855         bt_lockpage (BtLockDelete, right->latch);
1856         bt_lockpage (BtLockWrite, right->latch);
1857         bt_freepage (bt, right);
1858
1859         bt_unlockpage (BtLockParent, set->latch);
1860         bt_unpinlatch (set->latch);
1861         bt_unpinpool (set->pool);
1862         bt->found = found;
1863         return 0;
1864 }
1865
1866 BtKey *bt_foundkey (BtDb *bt)
1867 {
1868         return (BtKey*)(bt->key);
1869 }
1870
1871 //      advance to next slot
1872
1873 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1874 {
1875 BtLatchSet *prevlatch;
1876 BtPool *prevpool;
1877 uid page_no;
1878
1879         if( slot < set->page->cnt )
1880                 return slot + 1;
1881
1882         prevlatch = set->latch;
1883         prevpool = set->pool;
1884
1885         if( page_no = bt_getid(set->page->right) )
1886                 set->latch = bt_pinlatch (bt, page_no);
1887         else
1888                 return bt->err = BTERR_struct, 0;
1889
1890         // pin page contents
1891
1892         if( set->pool = bt_pinpool (bt, page_no) )
1893                 set->page = bt_page (bt, set->pool, page_no);
1894         else
1895                 return 0;
1896
1897         // obtain access lock using lock chaining with Access mode
1898
1899         bt_lockpage(BtLockAccess, set->latch);
1900
1901         bt_unlockpage(BtLockRead, prevlatch);
1902         bt_unpinlatch (prevlatch);
1903         bt_unpinpool (prevpool);
1904
1905         bt_lockpage(BtLockRead, set->latch);
1906         bt_unlockpage(BtLockAccess, set->latch);
1907
1908         set->page_no = page_no;
1909         return 1;
1910 }
1911
1912 //      find unique key or first duplicate key in
1913 //      leaf level and return number of value bytes
1914 //      or (-1) if not found.  Setup key for bt_foundkey
1915
1916 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1917 {
1918 BtPageSet set[1];
1919 uint len, slot;
1920 int ret = -1;
1921 BtKey *ptr;
1922 BtVal *val;
1923
1924   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1925    do {
1926         ptr = keyptr(set->page, slot);
1927
1928         //      skip librarian slot place holder
1929
1930         if( slotptr(set->page, slot)->type == Librarian )
1931                 ptr = keyptr(set->page, ++slot);
1932
1933         //      return actual key found
1934
1935         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1936         len = ptr->len;
1937
1938         if( slotptr(set->page, slot)->type == Duplicate )
1939                 len -= BtId;
1940
1941         //      not there if we reach the stopper key
1942
1943         if( slot == set->page->cnt )
1944           if( !bt_getid (set->page->right) )
1945                 break;
1946
1947         // if key exists, return >= 0 value bytes copied
1948         //      otherwise return (-1)
1949
1950         if( slotptr(set->page, slot)->dead )
1951                 continue;
1952
1953         if( keylen == len )
1954           if( !memcmp (ptr->key, key, len) ) {
1955                 val = valptr (set->page,slot);
1956                 if( valmax > val->len )
1957                         valmax = val->len;
1958                 memcpy (value, val->value, valmax);
1959                 ret = valmax;
1960           }
1961
1962         break;
1963
1964    } while( slot = bt_findnext (bt, set, slot) );
1965
1966   bt_unlockpage (BtLockRead, set->latch);
1967   bt_unpinlatch (set->latch);
1968   bt_unpinpool (set->pool);
1969   return ret;
1970 }
1971
1972 //      check page for space available,
1973 //      clean if necessary and return
1974 //      0 - page needs splitting
1975 //      >0  new slot value
1976
1977 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1978 {
1979 uint nxt = bt->mgr->page_size;
1980 uint cnt = 0, idx = 0;
1981 uint max = page->cnt;
1982 uint newslot = max;
1983 BtKey *key;
1984 BtVal *val;
1985
1986         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1987                 return slot;
1988
1989         //      skip cleanup and proceed to split
1990         //      if there's not enough garbage
1991         //      to bother with.
1992
1993         if( page->garbage < nxt / 5 )
1994                 return 0;
1995
1996         memcpy (bt->frame, page, bt->mgr->page_size);
1997
1998         // skip page info and set rest of page to zero
1999
2000         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2001         page->garbage = 0;
2002         page->act = 0;
2003
2004         // clean up page first by
2005         // removing deleted keys
2006
2007         while( cnt++ < max ) {
2008                 if( cnt == slot )
2009                         newslot = idx + 2;
2010                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
2011                         continue;
2012
2013                 // copy the value across
2014
2015                 val = valptr(bt->frame, cnt);
2016                 nxt -= val->len + sizeof(BtVal);
2017                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2018
2019                 // copy the key across
2020
2021                 key = keyptr(bt->frame, cnt);
2022                 nxt -= key->len + sizeof(BtKey);
2023                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2024
2025                 // make a librarian slot
2026
2027                 if( idx ) {
2028                         slotptr(page, ++idx)->off = nxt;
2029                         slotptr(page, idx)->type = Librarian;
2030                         slotptr(page, idx)->dead = 1;
2031                 }
2032
2033                 // set up the slot
2034
2035                 slotptr(page, ++idx)->off = nxt;
2036                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2037
2038                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2039                         page->act++;
2040         }
2041
2042         page->min = nxt;
2043         page->cnt = idx;
2044
2045         //      see if page has enough space now, or does it need splitting?
2046
2047         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2048                 return newslot;
2049
2050         return 0;
2051 }
2052
2053 // split the root and raise the height of the btree
2054
2055 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
2056 {
2057 uint nxt = bt->mgr->page_size;
2058 unsigned char value[BtId];
2059 BtPageSet left[1];
2060 BtKey *ptr;
2061 BtVal *val;
2062
2063         //  Obtain an empty page to use, and copy the current
2064         //  root contents into it, e.g. lower keys
2065
2066         if( bt_newpage(bt, left) )
2067                 return bt->err;
2068
2069         memcpy(left->page, root->page, bt->mgr->page_size);
2070         bt_unpinpool (left->pool);
2071
2072         // set left from higher to lower page
2073
2074         bt_putid (right->page->left, left->page_no);
2075         bt_unpinpool (right->pool);
2076
2077         // preserve the page info at the bottom
2078         // of higher keys and set rest to zero
2079
2080         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2081
2082         // insert stopper key at top of newroot page
2083         // and increase the root height
2084
2085         nxt -= BtId + sizeof(BtVal);
2086         bt_putid (value, right->page_no);
2087         val = (BtVal *)((unsigned char *)root->page + nxt);
2088         memcpy (val->value, value, BtId);
2089         val->len = BtId;
2090
2091         nxt -= 2 + sizeof(BtKey);
2092         slotptr(root->page, 2)->off = nxt;
2093         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2094         ptr->len = 2;
2095         ptr->key[0] = 0xff;
2096         ptr->key[1] = 0xff;
2097
2098         // insert lower keys page fence key on newroot page as first key
2099
2100         nxt -= BtId + sizeof(BtVal);
2101         bt_putid (value, left->page_no);
2102         val = (BtVal *)((unsigned char *)root->page + nxt);
2103         memcpy (val->value, value, BtId);
2104         val->len = BtId;
2105
2106         nxt -= leftkey->len + sizeof(BtKey);
2107         slotptr(root->page, 1)->off = nxt;
2108         memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
2109         
2110         bt_putid(root->page->right, 0);
2111         root->page->min = nxt;          // reset lowest used offset and key count
2112         root->page->cnt = 2;
2113         root->page->act = 2;
2114         root->page->lvl++;
2115
2116         // release and unpin root
2117
2118         bt_unlockpage(BtLockWrite, root->latch);
2119         bt_unpinlatch (root->latch);
2120         bt_unpinpool (root->pool);
2121         return 0;
2122 }
2123
2124 //  split already locked full node
2125 //      return unlocked.
2126
2127 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
2128 {
2129 unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
2130 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2131 unsigned char value[BtId];
2132 uint lvl = set->page->lvl;
2133 BtPageSet right[1];
2134 BtKey *key, *ptr;
2135 BtVal *val, *src;
2136 uid right2;
2137 uint prev;
2138
2139         //  split higher half of keys to bt->frame
2140
2141         memset (bt->frame, 0, bt->mgr->page_size);
2142         max = set->page->cnt;
2143         cnt = max / 2;
2144         idx = 0;
2145
2146         while( cnt++ < max ) {
2147                 if( slotptr(set->page, cnt)->dead && cnt < max )
2148                         continue;
2149                 src = valptr(set->page, cnt);
2150                 nxt -= src->len + sizeof(BtVal);
2151                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
2152
2153                 key = keyptr(set->page, cnt);
2154                 nxt -= key->len + sizeof(BtKey);
2155                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + sizeof(BtVal));
2156
2157                 //      add librarian slot
2158
2159                 if( idx ) {
2160                         slotptr(bt->frame, ++idx)->off = nxt;
2161                         slotptr(bt->frame, idx)->type = Librarian;
2162                         slotptr(bt->frame, idx)->dead = 1;
2163                 }
2164
2165                 //  add actual slot
2166
2167                 slotptr(bt->frame, ++idx)->off = nxt;
2168                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2169
2170                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2171                         bt->frame->act++;
2172         }
2173
2174         // remember existing fence key for new page to the right
2175
2176         memcpy (rightkey, key, key->len + sizeof(BtKey));
2177
2178         bt->frame->bits = bt->mgr->page_bits;
2179         bt->frame->min = nxt;
2180         bt->frame->cnt = idx;
2181         bt->frame->lvl = lvl;
2182
2183         // link right node
2184
2185         if( set->page_no > ROOT_page ) {
2186                 bt_putid (bt->frame->right, bt_getid (set->page->right));
2187                 bt_putid(bt->frame->left, set->page_no);
2188         }
2189
2190         //      get new free page and write higher keys to it.
2191
2192         if( bt_newpage(bt, right) )
2193                 return bt->err;
2194
2195         // link left node
2196
2197         if( set->page_no > ROOT_page && !lvl )
2198           if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
2199                 return bt->err;
2200
2201         memcpy (right->page, bt->frame, bt->mgr->page_size);
2202
2203         //      update lower keys to continue in old page
2204
2205         memcpy (bt->frame, set->page, bt->mgr->page_size);
2206         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2207         nxt = bt->mgr->page_size;
2208         set->page->garbage = 0;
2209         set->page->act = 0;
2210         max /= 2;
2211         cnt = 0;
2212         idx = 0;
2213
2214         if( slotptr(bt->frame, max)->type == Librarian )
2215                 max--;
2216
2217         //  assemble page of smaller keys
2218
2219         while( cnt++ < max ) {
2220                 if( slotptr(bt->frame, cnt)->dead )
2221                         continue;
2222                 src = valptr(bt->frame, cnt);
2223                 nxt -= src->len + sizeof(BtVal);
2224                 memcpy ((unsigned char *)set->page + nxt, src, src->len + sizeof(BtVal));
2225
2226                 key = keyptr(bt->frame, cnt);
2227                 nxt -= key->len + sizeof(BtKey);
2228                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2229
2230                 //      add librarian slot
2231
2232                 if( idx ) {
2233                         slotptr(set->page, ++idx)->off = nxt;
2234                         slotptr(set->page, idx)->type = Librarian;
2235                         slotptr(set->page, idx)->dead = 1;
2236                 }
2237
2238                 //      add actual slot
2239
2240                 slotptr(set->page, ++idx)->off = nxt;
2241                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2242                 set->page->act++;
2243         }
2244
2245         // remember fence key for smaller page
2246
2247         memcpy(fencekey, key, key->len + sizeof(BtKey));
2248
2249         bt_putid(set->page->right, right->page_no);
2250         set->page->min = nxt;
2251         set->page->cnt = idx;
2252
2253         // if current page is the root page, split it
2254
2255         if( set->page_no == ROOT_page )
2256                 return bt_splitroot (bt, set, (BtKey *)fencekey, right);
2257
2258         // insert new fences in their parent pages
2259
2260         right->latch = bt_pinlatch (bt, right->page_no);
2261         bt_lockpage (BtLockParent, right->latch);
2262
2263         bt_lockpage (BtLockParent, set->latch);
2264         bt_unlockpage (BtLockWrite, set->latch);
2265
2266         // insert new fence for reformulated left block of smaller keys
2267
2268         bt_putid (value, set->page_no);
2269         ptr = (BtKey*)fencekey;
2270
2271         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2272                 return bt->err;
2273
2274         // switch fence for right block of larger keys to new right page
2275
2276         bt_putid (value, right->page_no);
2277         ptr = (BtKey*)rightkey;
2278
2279         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2280                 return bt->err;
2281
2282         bt_unlockpage (BtLockParent, set->latch);
2283         bt_unpinlatch (set->latch);
2284         bt_unpinpool (set->pool);
2285
2286         bt_unlockpage (BtLockParent, right->latch);
2287         bt_unpinlatch (right->latch);
2288         bt_unpinpool (right->pool);
2289         return 0;
2290 }
2291
2292 //      install new key and value onto page
2293 //      page must already be checked for
2294 //      adequate space
2295
2296 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2297 {
2298 uint idx, librarian;
2299 BtSlot *node;
2300 BtKey *ptr;
2301 BtVal *val;
2302
2303         //      if found slot > desired slot and previous slot
2304         //      is a librarian slot, use it
2305
2306         if( slot > 1 )
2307           if( slotptr(set->page, slot-1)->type == Librarian )
2308                 slot--;
2309
2310         // copy value onto page
2311
2312         set->page->min -= vallen + sizeof(BtVal);
2313         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2314         memcpy (val->value, value, vallen);
2315         val->len = vallen;
2316
2317         // copy key onto page
2318
2319         set->page->min -= keylen + sizeof(BtKey);
2320         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2321         memcpy (ptr->key, key, keylen);
2322         ptr->len = keylen;
2323         
2324         //      find first empty slot
2325
2326         for( idx = slot; idx < set->page->cnt; idx++ )
2327           if( slotptr(set->page, idx)->dead )
2328                 break;
2329
2330         // now insert key into array before slot
2331
2332         if( idx == set->page->cnt )
2333                 idx += 2, set->page->cnt += 2, librarian = 2;
2334         else
2335                 librarian = 1;
2336
2337         set->page->act++;
2338
2339         while( idx > slot + librarian - 1 )
2340                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2341
2342         //      add librarian slot
2343
2344         if( librarian > 1 ) {
2345                 node = slotptr(set->page, slot++);
2346                 node->off = set->page->min;
2347                 node->type = Librarian;
2348                 node->dead = 1;
2349         }
2350
2351         //      fill in new slot
2352
2353         node = slotptr(set->page, slot);
2354         node->off = set->page->min;
2355         node->type = type;
2356         node->dead = 0;
2357
2358         bt_unlockpage (BtLockWrite, set->latch);
2359         bt_unpinlatch (set->latch);
2360         bt_unpinpool (set->pool);
2361         return 0;
2362 }
2363
2364 //  Insert new key into the btree at given level.
2365 //      either add a new key or update/add an existing one
2366
2367 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2368 {
2369 unsigned char newkey[BT_keyarray];
2370 uint slot, idx, len;
2371 BtPageSet set[1];
2372 BtKey *ptr, *ins;
2373 uid sequence;
2374 BtVal *val;
2375 uint type;
2376
2377   // set up the key we're working on
2378
2379   ins = (BtKey*)newkey;
2380   memcpy (ins->key, key, keylen);
2381   ins->len = keylen;
2382
2383   // is this a non-unique index value?
2384
2385   if( unique )
2386         type = Unique;
2387   else {
2388         type = Duplicate;
2389         sequence = bt_newdup (bt);
2390         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2391         ins->len += BtId;
2392   }
2393
2394   while( 1 ) { // find the page and slot for the current key
2395         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2396                 ptr = keyptr(set->page, slot);
2397         else {
2398                 if( !bt->err )
2399                         bt->err = BTERR_ovflw;
2400                 return bt->err;
2401         }
2402
2403         // if librarian slot == found slot, advance to real slot
2404
2405         if( slotptr(set->page, slot)->type == Librarian )
2406           if( !keycmp (ptr, key, keylen) )
2407                 ptr = keyptr(set->page, ++slot);
2408
2409         len = ptr->len;
2410
2411         if( slotptr(set->page, slot)->type == Duplicate )
2412                 len -= BtId;
2413
2414         //  if inserting a duplicate key or unique key
2415         //      check for adequate space on the page
2416         //      and insert the new key before slot.
2417
2418         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2419           if( !(slot = bt_cleanpage (bt, set->page, ins->len, slot, vallen)) )
2420                 if( bt_splitpage (bt, set) )
2421                   return bt->err;
2422                 else
2423                   continue;
2424
2425           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
2426         }
2427
2428         // if key already exists, update value and return
2429
2430         val = valptr(set->page, slot);
2431
2432         if( val->len >= vallen ) {
2433                 if( slotptr(set->page, slot)->dead )
2434                         set->page->act++;
2435                 set->page->garbage += val->len - vallen;
2436                 slotptr(set->page, slot)->dead = 0;
2437                 val->len = vallen;
2438                 memcpy (val->value, value, vallen);
2439                 bt_unlockpage(BtLockWrite, set->latch);
2440                 bt_unpinlatch (set->latch);
2441                 bt_unpinpool (set->pool);
2442                 return 0;
2443         }
2444
2445         //      new update value doesn't fit in existing value area
2446
2447         if( !slotptr(set->page, slot)->dead )
2448                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2449         else {
2450                 slotptr(set->page, slot)->dead = 0;
2451                 set->page->act++;
2452         }
2453
2454         if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
2455           if( bt_splitpage (bt, set) )
2456                 return bt->err;
2457           else
2458                 continue;
2459
2460         set->page->min -= vallen + sizeof(BtVal);
2461         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2462         memcpy (val->value, value, vallen);
2463         val->len = vallen;
2464
2465         set->page->min -= keylen + sizeof(BtKey);
2466         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2467         memcpy (ptr->key, key, keylen);
2468         ptr->len = keylen;
2469         
2470         slotptr(set->page, slot)->off = set->page->min;
2471         bt_unlockpage(BtLockWrite, set->latch);
2472         bt_unpinlatch (set->latch);
2473         bt_unpinpool (set->pool);
2474         return 0;
2475   }
2476   return 0;
2477 }
2478
2479 //      set cursor to highest slot on highest page
2480
2481 uint bt_lastkey (BtDb *bt)
2482 {
2483 uid page_no = bt_getid (bt->mgr->latchmgr->alloc->left);
2484 BtPageSet set[1];
2485 uint slot;
2486
2487         if( set->pool = bt_pinpool (bt, page_no) )
2488                 set->page = bt_page (bt, set->pool, page_no);
2489         else
2490                 return 0;
2491
2492         set->latch = bt_pinlatch (bt, page_no);
2493     bt_lockpage(BtLockRead, set->latch);
2494
2495         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2496         slot = set->page->cnt;
2497
2498     bt_unlockpage(BtLockRead, set->latch);
2499         bt_unpinlatch (set->latch);
2500         bt_unpinpool (set->pool);
2501
2502         return slot;
2503 }
2504
2505 //      return previous slot on cursor page
2506
2507 uint bt_prevkey (BtDb *bt, uint slot)
2508 {
2509 BtPageSet set[1];
2510 uid left;
2511
2512         if( --slot )
2513                 return slot;
2514
2515         if( left = bt_getid(bt->cursor->left) )
2516                 bt->cursor_page = left;
2517         else
2518                 return 0;
2519
2520         if( set->pool = bt_pinpool (bt, left) )
2521                 set->page = bt_page (bt, set->pool, left);
2522         else
2523                 return 0;
2524
2525         set->latch = bt_pinlatch (bt, left);
2526     bt_lockpage(BtLockRead, set->latch);
2527
2528         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2529
2530         bt_unlockpage(BtLockRead, set->latch);
2531         bt_unpinlatch (set->latch);
2532         bt_unpinpool (set->pool);
2533         return bt->cursor->cnt;
2534 }
2535
2536 //  return next slot on cursor page
2537 //  or slide cursor right into next page
2538
2539 uint bt_nextkey (BtDb *bt, uint slot)
2540 {
2541 BtPageSet set[1];
2542 uid right;
2543
2544   do {
2545         right = bt_getid(bt->cursor->right);
2546
2547         while( slot++ < bt->cursor->cnt )
2548           if( slotptr(bt->cursor,slot)->dead )
2549                 continue;
2550           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2551                 return slot;
2552           else
2553                 break;
2554
2555         if( !right )
2556                 break;
2557
2558         bt->cursor_page = right;
2559
2560         if( set->pool = bt_pinpool (bt, right) )
2561                 set->page = bt_page (bt, set->pool, right);
2562         else
2563                 return 0;
2564
2565         set->latch = bt_pinlatch (bt, right);
2566     bt_lockpage(BtLockRead, set->latch);
2567
2568         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2569
2570         bt_unlockpage(BtLockRead, set->latch);
2571         bt_unpinlatch (set->latch);
2572         bt_unpinpool (set->pool);
2573         slot = 0;
2574
2575   } while( 1 );
2576
2577   return bt->err = 0;
2578 }
2579
2580 //  cache page of keys into cursor and return starting slot for given key
2581
2582 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2583 {
2584 BtPageSet set[1];
2585 uint slot;
2586
2587         // cache page for retrieval
2588
2589         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2590           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2591         else
2592           return 0;
2593
2594         bt->cursor_page = set->page_no;
2595
2596         bt_unlockpage(BtLockRead, set->latch);
2597         bt_unpinlatch (set->latch);
2598         bt_unpinpool (set->pool);
2599         return slot;
2600 }
2601
2602 BtKey *bt_key(BtDb *bt, uint slot)
2603 {
2604         return keyptr(bt->cursor, slot);
2605 }
2606
2607 BtVal *bt_val(BtDb *bt, uint slot)
2608 {
2609         return valptr(bt->cursor,slot);
2610 }
2611
2612 #ifdef STANDALONE
2613
2614 #ifndef unix
2615 double getCpuTime(int type)
2616 {
2617 FILETIME crtime[1];
2618 FILETIME xittime[1];
2619 FILETIME systime[1];
2620 FILETIME usrtime[1];
2621 SYSTEMTIME timeconv[1];
2622 double ans = 0;
2623
2624         memset (timeconv, 0, sizeof(SYSTEMTIME));
2625
2626         switch( type ) {
2627         case 0:
2628                 GetSystemTimeAsFileTime (xittime);
2629                 FileTimeToSystemTime (xittime, timeconv);
2630                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2631                 break;
2632         case 1:
2633                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2634                 FileTimeToSystemTime (usrtime, timeconv);
2635                 break;
2636         case 2:
2637                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2638                 FileTimeToSystemTime (systime, timeconv);
2639                 break;
2640         }
2641
2642         ans += (double)timeconv->wHour * 3600;
2643         ans += (double)timeconv->wMinute * 60;
2644         ans += (double)timeconv->wSecond;
2645         ans += (double)timeconv->wMilliseconds / 1000;
2646         return ans;
2647 }
2648 #else
2649 #include <time.h>
2650 #include <sys/resource.h>
2651
2652 double getCpuTime(int type)
2653 {
2654 struct rusage used[1];
2655 struct timeval tv[1];
2656
2657         switch( type ) {
2658         case 0:
2659                 gettimeofday(tv, NULL);
2660                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2661
2662         case 1:
2663                 getrusage(RUSAGE_SELF, used);
2664                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2665
2666         case 2:
2667                 getrusage(RUSAGE_SELF, used);
2668                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2669         }
2670
2671         return 0;
2672 }
2673 #endif
2674
2675 void bt_poolaudit (BtMgr *mgr)
2676 {
2677 uint slot = 0;
2678 BtPool *pool;
2679
2680         while( slot++ < mgr->poolcnt ) {
2681                 pool = mgr->pool + slot;
2682                 if( pool->pin & ~CLOCK_bit )
2683                   fprintf(stderr, "pool slot %d pinned\n", slot);
2684         }
2685 }
2686
2687 uint bt_latchaudit (BtDb *bt)
2688 {
2689 ushort idx, hashidx;
2690 uid next, page_no;
2691 BtLatchSet *latch;
2692 uint cnt = 0;
2693 BtKey *ptr;
2694
2695 #ifdef unix
2696         posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2697 #endif
2698         if( *(ushort *)(bt->mgr->latchmgr->lock) )
2699                 fprintf(stderr, "Alloc page locked\n");
2700         *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2701
2702         for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2703                 latch = bt->mgr->latchsets + idx;
2704                 if( *latch->readwr->rin & MASK )
2705                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2706                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2707
2708                 if( *latch->access->rin & MASK )
2709                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2710                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2711
2712                 if( *latch->parent->rin & MASK )
2713                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2714                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2715
2716                 if( latch->pin ) {
2717                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2718                         latch->pin = 0;
2719                 }
2720         }
2721
2722         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2723           if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2724                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2725
2726           *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2727
2728           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2729                 latch = bt->mgr->latchsets + idx;
2730                 if( *(ushort *)latch->busy )
2731                         fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2732                 *(ushort *)latch->busy = 0;
2733                 if( latch->pin )
2734                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2735           } while( idx = latch->next );
2736         }
2737
2738         next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2739         page_no = LEAF_page;
2740
2741         while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2742         off64_t off = page_no << bt->mgr->page_bits;
2743 #ifdef unix
2744                 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2745 #else
2746                 DWORD amt[1];
2747
2748                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2749
2750                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2751                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2752
2753                   if( *amt <  bt->mgr->page_size )
2754                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2755 #endif
2756                 if( !bt->frame->free ) {
2757                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2758                   ptr = keyptr(bt->frame, idx+1);
2759                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) > 0 )
2760                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2761                  }
2762                  if( !bt->frame->lvl )
2763                         cnt += bt->frame->act;
2764                 }
2765                 if( page_no > LEAF_page )
2766                         next = page_no + 1;
2767                 page_no = next;
2768         }
2769         return cnt - 1;
2770 }
2771
2772 typedef struct {
2773         char idx;
2774         char *type;
2775         char *infile;
2776         BtMgr *mgr;
2777         int num;
2778 } ThreadArg;
2779
2780 //  standalone program to index file of keys
2781 //  then list them onto std-out
2782
2783 #ifdef unix
2784 void *index_file (void *arg)
2785 #else
2786 uint __stdcall index_file (void *arg)
2787 #endif
2788 {
2789 int line = 0, found = 0, cnt = 0, unique;
2790 uid next, page_no = LEAF_page;  // start on first page of leaves
2791 unsigned char key[BT_maxkey];
2792 ThreadArg *args = arg;
2793 int ch, len = 0, slot;
2794 BtPageSet set[1];
2795 BtKey *ptr;
2796 BtVal *val;
2797 BtDb *bt;
2798 FILE *in;
2799
2800         bt = bt_open (args->mgr);
2801
2802         unique = (args->type[1] | 0x20) != 'd';
2803
2804         switch(args->type[0] | 0x20)
2805         {
2806         case 'a':
2807                 fprintf(stderr, "started latch mgr audit\n");
2808                 cnt = bt_latchaudit (bt);
2809                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2810                 break;
2811
2812         case 'p':
2813                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2814                 if( in = fopen (args->infile, "rb") )
2815                   while( ch = getc(in), ch != EOF )
2816                         if( ch == '\n' )
2817                         {
2818                           line++;
2819
2820                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2821                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2822                           len = 0;
2823                         }
2824                         else if( len < BT_maxkey )
2825                                 key[len++] = ch;
2826                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2827                 break;
2828
2829         case 'w':
2830                 fprintf(stderr, "started indexing for %s\n", args->infile);
2831                 if( in = fopen (args->infile, "rb") )
2832                   while( ch = getc(in), ch != EOF )
2833                         if( ch == '\n' )
2834                         {
2835                           line++;
2836
2837                           if( args->num == 1 )
2838                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2839
2840                           else if( args->num )
2841                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2842
2843                           if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
2844                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2845                           len = 0;
2846                         }
2847                         else if( len < BT_maxkey )
2848                                 key[len++] = ch;
2849                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2850                 break;
2851
2852         case 'd':
2853                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2854                 if( in = fopen (args->infile, "rb") )
2855                   while( ch = getc(in), ch != EOF )
2856                         if( ch == '\n' )
2857                         {
2858                           line++;
2859                           if( args->num == 1 )
2860                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2861
2862                           else if( args->num )
2863                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2864
2865                           if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2866                                 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2867                           ptr = (BtKey*)(bt->key);
2868                           found++;
2869
2870                           if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2871                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2872                           len = 0;
2873                         }
2874                         else if( len < BT_maxkey )
2875                                 key[len++] = ch;
2876
2877                 fprintf(stderr, "finished %s for %d keys, %d found\n", args->infile, line, found);
2878                 break;
2879
2880         case 'f':
2881                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2882                 if( in = fopen (args->infile, "rb") )
2883                   while( ch = getc(in), ch != EOF )
2884                         if( ch == '\n' )
2885                         {
2886                           line++;
2887                           if( args->num == 1 )
2888                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2889
2890                           else if( args->num )
2891                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2892
2893                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2894                                 found++;
2895                           else if( bt->err )
2896                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2897                           len = 0;
2898                         }
2899                         else if( len < BT_maxkey )
2900                                 key[len++] = ch;
2901                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2902                 break;
2903
2904         case 's':
2905                 fprintf(stderr, "started scanning\n");
2906                 do {
2907                         if( set->pool = bt_pinpool (bt, page_no) )
2908                                 set->page = bt_page (bt, set->pool, page_no);
2909                         else
2910                                 break;
2911                         madvise (set->page, bt->mgr->page_size, MADV_WILLNEED);
2912                         set->latch = bt_pinlatch (bt, page_no);
2913                         bt_lockpage (BtLockRead, set->latch);
2914                         next = bt_getid (set->page->right);
2915
2916                         for( slot = 0; slot++ < set->page->cnt; )
2917                          if( next || slot < set->page->cnt )
2918                           if( !slotptr(set->page, slot)->dead ) {
2919                                 ptr = keyptr(set->page, slot);
2920                                 len = ptr->len;
2921
2922                             if( slotptr(set->page, slot)->type == Duplicate )
2923                                         len -= BtId;
2924
2925                                 fwrite (ptr->key, len, 1, stdout);
2926                                 val = valptr(set->page, slot);
2927                                 fwrite (val->value, val->len, 1, stdout);
2928                                 fputc ('\n', stdout);
2929                                 cnt++;
2930                            }
2931
2932                         bt_unlockpage (BtLockRead, set->latch);
2933                         bt_unpinlatch (set->latch);
2934                         bt_unpinpool (set->pool);
2935                 } while( page_no = next );
2936
2937                 fprintf(stderr, " Total keys read %d\n", cnt);
2938                 break;
2939
2940         case 'r':
2941                 fprintf(stderr, "started reverse scan\n");
2942                 if( slot = bt_lastkey (bt) )
2943                    while( slot = bt_prevkey (bt, slot) ) {
2944                         if( slotptr(bt->cursor, slot)->dead )
2945                           continue;
2946
2947                         ptr = keyptr(bt->cursor, slot);
2948                         len = ptr->len;
2949
2950                         if( slotptr(bt->cursor, slot)->type == Duplicate )
2951                                 len -= BtId;
2952
2953                         fwrite (ptr->key, len, 1, stdout);
2954                         val = valptr(bt->cursor, slot);
2955                         fwrite (val->value, val->len, 1, stdout);
2956                         fputc ('\n', stdout);
2957                         cnt++;
2958                   }
2959
2960                 fprintf(stderr, " Total keys read %d\n", cnt);
2961                 break;
2962
2963         case 'c':
2964 #ifdef unix
2965                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2966 #endif
2967                 fprintf(stderr, "started counting\n");
2968                 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2969                 page_no = LEAF_page;
2970
2971                 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2972                 uid off = page_no << bt->mgr->page_bits;
2973 #ifdef unix
2974                   pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2975 #else
2976                 DWORD amt[1];
2977
2978                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2979
2980                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2981                         return bt->err = BTERR_map;
2982
2983                   if( *amt <  bt->mgr->page_size )
2984                         return bt->err = BTERR_map;
2985 #endif
2986                         if( !bt->frame->free && !bt->frame->lvl )
2987                                 cnt += bt->frame->act;
2988                         if( page_no > LEAF_page )
2989                                 next = page_no + 1;
2990                         page_no = next;
2991                 }
2992                 
2993                 cnt--;  // remove stopper key
2994                 fprintf(stderr, " Total keys read %d\n", cnt);
2995                 break;
2996         }
2997
2998         bt_close (bt);
2999 #ifdef unix
3000         return NULL;
3001 #else
3002         return 0;
3003 #endif
3004 }
3005
3006 typedef struct timeval timer;
3007
3008 int main (int argc, char **argv)
3009 {
3010 int idx, cnt, len, slot, err;
3011 int segsize, bits = 16;
3012 double start, stop;
3013 #ifdef unix
3014 pthread_t *threads;
3015 #else
3016 HANDLE *threads;
3017 #endif
3018 ThreadArg *args;
3019 uint poolsize = 0;
3020 float elapsed;
3021 int num = 0;
3022 char key[1];
3023 BtMgr *mgr;
3024 BtKey *ptr;
3025 BtDb *bt;
3026
3027         if( argc < 3 ) {
3028                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3029                 fprintf (stderr, "  where page_bits is the page size in bits\n");
3030                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
3031                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
3032                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
3033                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3034                 exit(0);
3035         }
3036
3037         start = getCpuTime(0);
3038
3039         if( argc > 3 )
3040                 bits = atoi(argv[3]);
3041
3042         if( argc > 4 )
3043                 poolsize = atoi(argv[4]);
3044
3045         if( !poolsize )
3046                 fprintf (stderr, "Warning: no mapped_pool\n");
3047
3048         if( poolsize > 65535 )
3049                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
3050
3051         if( argc > 5 )
3052                 segsize = atoi(argv[5]);
3053         else
3054                 segsize = 4;    // 16 pages per mmap segment
3055
3056         if( argc > 6 )
3057                 num = atoi(argv[6]);
3058
3059         cnt = argc - 7;
3060 #ifdef unix
3061         threads = malloc (cnt * sizeof(pthread_t));
3062 #else
3063         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3064 #endif
3065         args = malloc (cnt * sizeof(ThreadArg));
3066
3067         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
3068
3069         if( !mgr ) {
3070                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3071                 exit (1);
3072         }
3073
3074         //      fire off threads
3075
3076         for( idx = 0; idx < cnt; idx++ ) {
3077                 args[idx].infile = argv[idx + 7];
3078                 args[idx].type = argv[2];
3079                 args[idx].mgr = mgr;
3080                 args[idx].num = num;
3081                 args[idx].idx = idx;
3082 #ifdef unix
3083                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3084                         fprintf(stderr, "Error creating thread %d\n", err);
3085 #else
3086                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3087 #endif
3088         }
3089
3090         //      wait for termination
3091
3092 #ifdef unix
3093         for( idx = 0; idx < cnt; idx++ )
3094                 pthread_join (threads[idx], NULL);
3095 #else
3096         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3097
3098         for( idx = 0; idx < cnt; idx++ )
3099                 CloseHandle(threads[idx]);
3100
3101 #endif
3102         elapsed = getCpuTime(0) - start;
3103         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3104         elapsed = getCpuTime(1);
3105         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3106         elapsed = getCpuTime(2);
3107         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3108
3109         bt_poolaudit (mgr);
3110         bt_mgrclose (mgr);
3111 }
3112
3113 #endif  //STANDALONE