]> pd.if.org Git - btree/blob - threadskv1.c
Initial release of generalized key/value version
[btree] / threadskv1.c
1 // btree version threadskv1 sched_yield version
2 //      with reworked bt_deletekey code
3 //      and phase-fair reader writer lock
4 // 12 MAR 2014
5
6 // author: karl malbrain, malbrain@cal.berkeley.edu
7
8 /*
9 This work, including the source code, documentation
10 and related data, is placed into the public domain.
11
12 The orginal author is Karl Malbrain.
13
14 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
15 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
16 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
17 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
18 RESULTING FROM THE USE, MODIFICATION, OR
19 REDISTRIBUTION OF THIS SOFTWARE.
20 */
21
22 // Please see the project home page for documentation
23 // code.google.com/p/high-concurrency-btree
24
25 #define _FILE_OFFSET_BITS 64
26 #define _LARGEFILE64_SOURCE
27
28 #ifdef linux
29 #define _GNU_SOURCE
30 #endif
31
32 #ifdef unix
33 #include <unistd.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <fcntl.h>
37 #include <sys/time.h>
38 #include <sys/mman.h>
39 #include <errno.h>
40 #include <pthread.h>
41 #else
42 #define WIN32_LEAN_AND_MEAN
43 #include <windows.h>
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <time.h>
47 #include <fcntl.h>
48 #include <process.h>
49 #include <intrin.h>
50 #endif
51
52 #include <memory.h>
53 #include <string.h>
54 #include <stddef.h>
55
56 typedef unsigned long long      uid;
57
58 #ifndef unix
59 typedef unsigned long long      off64_t;
60 typedef unsigned short          ushort;
61 typedef unsigned int            uint;
62 #endif
63
64 #define BT_latchtable   128                                     // number of latch manager slots
65
66 #define BT_ro 0x6f72    // ro
67 #define BT_rw 0x7772    // rw
68
69 #define BT_maxbits              24                                      // maximum page size in bits
70 #define BT_minbits              9                                       // minimum page size in bits
71 #define BT_minpage              (1 << BT_minbits)       // minimum page size
72 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
73
74 /*
75 There are five lock types for each node in three independent sets: 
76 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
77 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
78 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
79 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
80 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
81 */
82
83 typedef enum{
84         BtLockAccess,
85         BtLockDelete,
86         BtLockRead,
87         BtLockWrite,
88         BtLockParent
89 } BtLock;
90
91 //      definition for phase-fair reader/writer lock implementation
92
93 typedef struct {
94         ushort rin[1];
95         ushort rout[1];
96         ushort ticket[1];
97         ushort serving[1];
98 } RWLock;
99
100 #define PHID 0x1
101 #define PRES 0x2
102 #define MASK 0x3
103 #define RINC 0x4
104
105 //      definition for spin latch implementation
106
107 // exclusive is set for write access
108 // share is count of read accessors
109 // grant write lock when share == 0
110
111 volatile typedef struct {
112         ushort exclusive:1;
113         ushort pending:1;
114         ushort share:14;
115 } BtSpinLatch;
116
117 #define XCL 1
118 #define PEND 2
119 #define BOTH 3
120 #define SHARE 4
121
122 //  hash table entries
123
124 typedef struct {
125         BtSpinLatch latch[1];
126         volatile ushort slot;           // Latch table entry at head of chain
127 } BtHashEntry;
128
129 //      latch manager table structure
130
131 typedef struct {
132         RWLock readwr[1];               // read/write page lock
133         RWLock access[1];               // Access Intent/Page delete
134         RWLock parent[1];               // Posting of fence key in parent
135         BtSpinLatch busy[1];            // slot is being moved between chains
136         volatile ushort next;           // next entry in hash table chain
137         volatile ushort prev;           // prev entry in hash table chain
138         volatile ushort pin;            // number of outstanding locks
139         volatile ushort hash;           // hash slot entry is under
140         volatile uid page_no;           // latch set page number
141 } BtLatchSet;
142
143 //      Define the length of the page and key pointers
144
145 #define BtId 6
146
147 //      Page key slot definition.
148
149 //      If BT_maxbits is 15 or less, you can save 2 bytes
150 //      for each key stored by making the two uints
151 //      into ushorts.
152
153 //      Keys are marked dead, but remain on the page until
154 //      it cleanup is called. The fence key (highest key) for
155 //      the page is always present, even after cleanup.
156
157 typedef struct {
158         uint off:BT_maxbits;            // page offset for key start
159         uint dead:1;                            // set for deleted key
160 } BtSlot;
161
162 //      The key structure occupies space at the upper end of
163 //      each page.  It's a length byte followed by the key
164 //      bytes.
165
166 typedef struct {
167         unsigned char len;
168         unsigned char key[1];
169 } *BtKey;
170
171 //      the value structure also occupies space at the upper
172 //      end of the page.
173
174 typedef struct {
175         unsigned char len;
176         unsigned char value[1];
177 } *BtVal;
178
179 //      The first part of an index page.
180 //      It is immediately followed
181 //      by the BtSlot array of keys.
182
183 typedef struct BtPage_ {
184         uint cnt;                                       // count of keys in page
185         uint act;                                       // count of active keys
186         uint min;                                       // next key offset
187         unsigned char bits:7;           // page size in bits
188         unsigned char free:1;           // page is on free chain
189         unsigned char lvl:6;            // level of page
190         unsigned char kill:1;           // page is being deleted
191         unsigned char dirty:1;          // page has deleted keys
192         unsigned char right[BtId];      // page number to right
193 } *BtPage;
194
195 //      The memory mapping pool table buffer manager entry
196
197 typedef struct {
198         uid  basepage;                          // mapped base page number
199         char *map;                                      // mapped memory pointer
200         ushort slot;                            // slot index in this array
201         ushort pin;                                     // mapped page pin counter
202         void *hashprev;                         // previous pool entry for the same hash idx
203         void *hashnext;                         // next pool entry for the same hash idx
204 #ifndef unix
205         HANDLE hmap;                            // Windows memory mapping handle
206 #endif
207 } BtPool;
208
209 #define CLOCK_bit 0x8000                // bit in pool->pin
210
211 //  The loadpage interface object
212
213 typedef struct {
214         uid page_no;            // current page number
215         BtPage page;            // current page pointer
216         BtPool *pool;           // current page pool
217         BtLatchSet *latch;      // current page latch set
218 } BtPageSet;
219
220 //      structure for latch manager on ALLOC_page
221
222 typedef struct {
223         struct BtPage_ alloc[1];        // next page_no in right ptr
224         unsigned char chain[BtId];      // head of free page_nos chain
225         BtSpinLatch lock[1];            // allocation area lite latch
226         ushort latchdeployed;           // highest number of latch entries deployed
227         ushort nlatchpage;                      // number of latch pages at BT_latch
228         ushort latchtotal;                      // number of page latch entries
229         ushort latchhash;                       // number of latch hash table slots
230         ushort latchvictim;                     // next latch entry to examine
231         BtHashEntry table[0];           // the hash table
232 } BtLatchMgr;
233
234 //      The object structure for Btree access
235
236 typedef struct {
237         uint page_size;                         // page size    
238         uint page_bits;                         // page size in bits    
239         uint seg_bits;                          // seg size in pages in bits
240         uint mode;                                      // read-write mode
241 #ifdef unix
242         int idx;
243 #else
244         HANDLE idx;
245 #endif
246         ushort poolcnt;                         // highest page pool node in use
247         ushort poolmax;                         // highest page pool node allocated
248         ushort poolmask;                        // total number of pages in mmap segment - 1
249         ushort hashsize;                        // size of Hash Table for pool entries
250         volatile uint evicted;          // last evicted hash table slot
251         ushort *hash;                           // pool index for hash entries
252         BtSpinLatch *latch;                     // latches for hash table slots
253         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
254         BtLatchSet *latchsets;          // mapped latch set from latch pages
255         BtPool *pool;                           // memory pool page segments
256 #ifndef unix
257         HANDLE halloc;                          // allocation and latch table handle
258 #endif
259 } BtMgr;
260
261 typedef struct {
262         BtMgr *mgr;                     // buffer manager for thread
263         BtPage cursor;          // cached frame for start/next (never mapped)
264         BtPage frame;           // spare frame for the page split (never mapped)
265         uid cursor_page;        // current cursor page number   
266         unsigned char *mem;     // frame, cursor, page memory buffer
267         int found;                      // last delete or insert was found
268         int err;                        // last error
269 } BtDb;
270
271 typedef enum {
272         BTERR_ok = 0,
273         BTERR_struct,
274         BTERR_ovflw,
275         BTERR_lock,
276         BTERR_map,
277         BTERR_wrt,
278         BTERR_hash
279 } BTERR;
280
281 // B-Tree functions
282 extern void bt_close (BtDb *bt);
283 extern BtDb *bt_open (BtMgr *mgr);
284 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, unsigned char *value, uint vallen);
285 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
286 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint vallen);
287 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
288 extern uint bt_nextkey   (BtDb *bt, uint slot);
289
290 //      manager functions
291 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
292 void bt_mgrclose (BtMgr *mgr);
293
294 //  Helper functions to return slot values
295
296 extern BtKey bt_key (BtDb *bt, uint slot);
297 extern BtVal bt_val (BtDb *bt, uint slot);
298
299 //  BTree page number constants
300 #define ALLOC_page              0       // allocation & lock manager hash table
301 #define ROOT_page               1       // root of the btree
302 #define LEAF_page               2       // first page of leaves
303 #define LATCH_page              3       // pages for lock manager
304
305 //      Number of levels to create in a new BTree
306
307 #define MIN_lvl                 2
308
309 //  The page is allocated from low and hi ends.
310 //  The key slots are allocated from the bottom,
311 //      while the text and value of the key
312 //  is allocated from the top.  When the two
313 //  areas meet, the page is split into two.
314
315 //  A key consists of a length byte, two bytes of
316 //  index number (0 - 65534), and up to 253 bytes
317 //  of key value.  Duplicate keys are discarded.
318 //  Associated with each key is a value byte string
319 //      containing any value desired.
320
321 //  The b-tree root is always located at page 1.
322 //      The first leaf page of level zero is always
323 //      located on page 2.
324
325 //      The b-tree pages are linked with next
326 //      pointers to facilitate enumerators,
327 //      and provide for concurrency.
328
329 //      When to root page fills, it is split in two and
330 //      the tree height is raised by a new root at page
331 //      one with two keys.
332
333 //      Deleted keys are marked with a dead bit until
334 //      page cleanup. The fence key for a leaf node is
335 //      always present
336
337 //  Groups of pages called segments from the btree are optionally
338 //  cached with a memory mapped pool. A hash table is used to keep
339 //  track of the cached segments.  This behaviour is controlled
340 //  by the cache block size parameter to bt_open.
341
342 //  To achieve maximum concurrency one page is locked at a time
343 //  as the tree is traversed to find leaf key in question. The right
344 //  page numbers are used in cases where the page is being split,
345 //      or consolidated.
346
347 //  Page 0 is dedicated to lock for new page extensions,
348 //      and chains empty pages together for reuse. It also
349 //      contains the latch manager hash table.
350
351 //      The ParentModification lock on a node is obtained to serialize posting
352 //      or changing the fence key for a node.
353
354 //      Empty pages are chained together through the ALLOC page and reused.
355
356 //      Access macros to address slot and key values from the page
357 //      Page slots use 1 based indexing.
358
359 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
360 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
361 #define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
362
363 void bt_putid(unsigned char *dest, uid id)
364 {
365 int i = BtId;
366
367         while( i-- )
368                 dest[i] = (unsigned char)id, id >>= 8;
369 }
370
371 uid bt_getid(unsigned char *src)
372 {
373 uid id = 0;
374 int i;
375
376         for( i = 0; i < BtId; i++ )
377                 id <<= 8, id |= *src++; 
378
379         return id;
380 }
381
382 //      Phase-Fair reader/writer lock implementation
383
384 void WriteLock (RWLock *lock)
385 {
386 ushort w, r, tix;
387
388 #ifdef unix
389         tix = __sync_fetch_and_add (lock->ticket, 1);
390 #else
391         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
392 #endif
393         // wait for our ticket to come up
394
395         while( tix != lock->serving[0] )
396 #ifdef unix
397                 sched_yield();
398 #else
399                 SwitchToThread ();
400 #endif
401
402         w = PRES | (tix & PHID);
403 #ifdef  unix
404         r = __sync_fetch_and_add (lock->rin, w);
405 #else
406         r = _InterlockedExchangeAdd16 (lock->rin, w);
407 #endif
408         while( r != *lock->rout )
409 #ifdef unix
410                 sched_yield();
411 #else
412                 SwitchToThread();
413 #endif
414 }
415
416 void WriteRelease (RWLock *lock)
417 {
418 #ifdef unix
419         __sync_fetch_and_and (lock->rin, ~MASK);
420 #else
421         _InterlockedAnd16 (lock->rin, ~MASK);
422 #endif
423         lock->serving[0]++;
424 }
425
426 void ReadLock (RWLock *lock)
427 {
428 ushort w;
429 #ifdef unix
430         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
431 #else
432         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
433 #endif
434         if( w )
435           while( w == (*lock->rin & MASK) )
436 #ifdef unix
437                 sched_yield ();
438 #else
439                 SwitchToThread ();
440 #endif
441 }
442
443 void ReadRelease (RWLock *lock)
444 {
445 #ifdef unix
446         __sync_fetch_and_add (lock->rout, RINC);
447 #else
448         _InterlockedExchangeAdd16 (lock->rout, RINC);
449 #endif
450 }
451
452 //      Spin Latch Manager
453
454 //      wait until write lock mode is clear
455 //      and add 1 to the share count
456
457 void bt_spinreadlock(BtSpinLatch *latch)
458 {
459 ushort prev;
460
461   do {
462 #ifdef unix
463         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
464 #else
465         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
466 #endif
467         //  see if exclusive request is granted or pending
468
469         if( !(prev & BOTH) )
470                 return;
471 #ifdef unix
472         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
473 #else
474         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
475 #endif
476 #ifdef  unix
477   } while( sched_yield(), 1 );
478 #else
479   } while( SwitchToThread(), 1 );
480 #endif
481 }
482
483 //      wait for other read and write latches to relinquish
484
485 void bt_spinwritelock(BtSpinLatch *latch)
486 {
487 ushort prev;
488
489   do {
490 #ifdef  unix
491         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
492 #else
493         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
494 #endif
495         if( !(prev & XCL) )
496           if( !(prev & ~BOTH) )
497                 return;
498           else
499 #ifdef unix
500                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
501 #else
502                 _InterlockedAnd16((ushort *)latch, ~XCL);
503 #endif
504 #ifdef  unix
505   } while( sched_yield(), 1 );
506 #else
507   } while( SwitchToThread(), 1 );
508 #endif
509 }
510
511 //      try to obtain write lock
512
513 //      return 1 if obtained,
514 //              0 otherwise
515
516 int bt_spinwritetry(BtSpinLatch *latch)
517 {
518 ushort prev;
519
520 #ifdef  unix
521         prev = __sync_fetch_and_or((ushort *)latch, XCL);
522 #else
523         prev = _InterlockedOr16((ushort *)latch, XCL);
524 #endif
525         //      take write access if all bits are clear
526
527         if( !(prev & XCL) )
528           if( !(prev & ~BOTH) )
529                 return 1;
530           else
531 #ifdef unix
532                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
533 #else
534                 _InterlockedAnd16((ushort *)latch, ~XCL);
535 #endif
536         return 0;
537 }
538
539 //      clear write mode
540
541 void bt_spinreleasewrite(BtSpinLatch *latch)
542 {
543 #ifdef unix
544         __sync_fetch_and_and((ushort *)latch, ~BOTH);
545 #else
546         _InterlockedAnd16((ushort *)latch, ~BOTH);
547 #endif
548 }
549
550 //      decrement reader count
551
552 void bt_spinreleaseread(BtSpinLatch *latch)
553 {
554 #ifdef unix
555         __sync_fetch_and_add((ushort *)latch, -SHARE);
556 #else
557         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
558 #endif
559 }
560
561 //      link latch table entry into latch hash table
562
563 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
564 {
565 BtLatchSet *set = bt->mgr->latchsets + victim;
566
567         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
568                 bt->mgr->latchsets[set->next].prev = victim;
569
570         bt->mgr->latchmgr->table[hashidx].slot = victim;
571         set->page_no = page_no;
572         set->hash = hashidx;
573         set->prev = 0;
574 }
575
576 //      release latch pin
577
578 void bt_unpinlatch (BtLatchSet *set)
579 {
580 #ifdef unix
581         __sync_fetch_and_add(&set->pin, -1);
582 #else
583         _InterlockedDecrement16 (&set->pin);
584 #endif
585 }
586
587 //      find existing latchset or inspire new one
588 //      return with latchset pinned
589
590 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
591 {
592 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
593 ushort slot, avail = 0, victim, idx;
594 BtLatchSet *set;
595
596         //  obtain read lock on hash table entry
597
598         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
599
600         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
601         {
602                 set = bt->mgr->latchsets + slot;
603                 if( page_no == set->page_no )
604                         break;
605         } while( slot = set->next );
606
607         if( slot ) {
608 #ifdef unix
609                 __sync_fetch_and_add(&set->pin, 1);
610 #else
611                 _InterlockedIncrement16 (&set->pin);
612 #endif
613         }
614
615     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
616
617         if( slot )
618                 return set;
619
620   //  try again, this time with write lock
621
622   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
623
624   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
625   {
626         set = bt->mgr->latchsets + slot;
627         if( page_no == set->page_no )
628                 break;
629         if( !set->pin && !avail )
630                 avail = slot;
631   } while( slot = set->next );
632
633   //  found our entry, or take over an unpinned one
634
635   if( slot || (slot = avail) ) {
636         set = bt->mgr->latchsets + slot;
637 #ifdef unix
638         __sync_fetch_and_add(&set->pin, 1);
639 #else
640         _InterlockedIncrement16 (&set->pin);
641 #endif
642         set->page_no = page_no;
643         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
644         return set;
645   }
646
647         //  see if there are any unused entries
648 #ifdef unix
649         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
650 #else
651         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
652 #endif
653
654         if( victim < bt->mgr->latchmgr->latchtotal ) {
655                 set = bt->mgr->latchsets + victim;
656 #ifdef unix
657                 __sync_fetch_and_add(&set->pin, 1);
658 #else
659                 _InterlockedIncrement16 (&set->pin);
660 #endif
661                 bt_latchlink (bt, hashidx, victim, page_no);
662                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
663                 return set;
664         }
665
666 #ifdef unix
667         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
668 #else
669         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
670 #endif
671   //  find and reuse previous lock entry
672
673   while( 1 ) {
674 #ifdef unix
675         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
676 #else
677         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
678 #endif
679         //      we don't use slot zero
680
681         if( victim %= bt->mgr->latchmgr->latchtotal )
682                 set = bt->mgr->latchsets + victim;
683         else
684                 continue;
685
686         //      take control of our slot
687         //      from other threads
688
689         if( set->pin || !bt_spinwritetry (set->busy) )
690                 continue;
691
692         idx = set->hash;
693
694         // try to get write lock on hash chain
695         //      skip entry if not obtained
696         //      or has outstanding locks
697
698         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
699                 bt_spinreleasewrite (set->busy);
700                 continue;
701         }
702
703         if( set->pin ) {
704                 bt_spinreleasewrite (set->busy);
705                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
706                 continue;
707         }
708
709         //  unlink our available victim from its hash chain
710
711         if( set->prev )
712                 bt->mgr->latchsets[set->prev].next = set->next;
713         else
714                 bt->mgr->latchmgr->table[idx].slot = set->next;
715
716         if( set->next )
717                 bt->mgr->latchsets[set->next].prev = set->prev;
718
719         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
720 #ifdef unix
721         __sync_fetch_and_add(&set->pin, 1);
722 #else
723         _InterlockedIncrement16 (&set->pin);
724 #endif
725         bt_latchlink (bt, hashidx, victim, page_no);
726         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
727         bt_spinreleasewrite (set->busy);
728         return set;
729   }
730 }
731
732 void bt_mgrclose (BtMgr *mgr)
733 {
734 BtPool *pool;
735 uint slot;
736
737         // release mapped pages
738         //      note that slot zero is never used
739
740         for( slot = 1; slot < mgr->poolmax; slot++ ) {
741                 pool = mgr->pool + slot;
742                 if( pool->slot )
743 #ifdef unix
744                         munmap (pool->map, (mgr->poolmask+1) << mgr->page_bits);
745 #else
746                 {
747                         FlushViewOfFile(pool->map, 0);
748                         UnmapViewOfFile(pool->map);
749                         CloseHandle(pool->hmap);
750                 }
751 #endif
752         }
753
754 #ifdef unix
755         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
756         munmap (mgr->latchmgr, mgr->page_size);
757 #else
758         FlushViewOfFile(mgr->latchmgr, 0);
759         UnmapViewOfFile(mgr->latchmgr);
760         CloseHandle(mgr->halloc);
761 #endif
762 #ifdef unix
763         close (mgr->idx);
764         free (mgr->pool);
765         free (mgr->hash);
766         free ((void *)mgr->latch);
767         free (mgr);
768 #else
769         FlushFileBuffers(mgr->idx);
770         CloseHandle(mgr->idx);
771         GlobalFree (mgr->pool);
772         GlobalFree (mgr->hash);
773         GlobalFree ((void *)mgr->latch);
774         GlobalFree (mgr);
775 #endif
776 }
777
778 //      close and release memory
779
780 void bt_close (BtDb *bt)
781 {
782 #ifdef unix
783         if( bt->mem )
784                 free (bt->mem);
785 #else
786         if( bt->mem)
787                 VirtualFree (bt->mem, 0, MEM_RELEASE);
788 #endif
789         free (bt);
790 }
791
792 //  open/create new btree buffer manager
793
794 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
795 //              size of mapped page pool (e.g. 8192)
796
797 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
798 {
799 uint lvl, attr, cacheblk, last, slot, idx;
800 uint nlatchpage, latchhash;
801 unsigned char value[BtId];
802 BtLatchMgr *latchmgr;
803 off64_t size;
804 uint amt[1];
805 BtMgr* mgr;
806 BtKey key;
807 BtVal val;
808 int flag;
809
810 #ifndef unix
811 SYSTEM_INFO sysinfo[1];
812 #endif
813
814         // determine sanity of page size and buffer pool
815
816         if( bits > BT_maxbits )
817                 bits = BT_maxbits;
818         else if( bits < BT_minbits )
819                 bits = BT_minbits;
820
821         if( !poolmax )
822                 return NULL;    // must have buffer pool
823
824 #ifdef unix
825         mgr = calloc (1, sizeof(BtMgr));
826
827         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
828
829         if( mgr->idx == -1 )
830                 return free(mgr), NULL;
831         
832         cacheblk = 4096;        // minimum mmap segment size for unix
833
834 #else
835         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
836         attr = FILE_ATTRIBUTE_NORMAL;
837         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
838
839         if( mgr->idx == INVALID_HANDLE_VALUE )
840                 return GlobalFree(mgr), NULL;
841
842         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
843         GetSystemInfo(sysinfo);
844         cacheblk = sysinfo->dwAllocationGranularity;
845 #endif
846
847 #ifdef unix
848         latchmgr = malloc (BT_maxpage);
849         *amt = 0;
850
851         // read minimum page size to get root info
852
853         if( size = lseek (mgr->idx, 0L, 2) ) {
854                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
855                         bits = latchmgr->alloc->bits;
856                 else
857                         return free(mgr), free(latchmgr), NULL;
858         } else if( mode == BT_ro )
859                 return free(latchmgr), bt_mgrclose (mgr), NULL;
860 #else
861         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
862         size = GetFileSize(mgr->idx, amt);
863
864         if( size || *amt ) {
865                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
866                         return bt_mgrclose (mgr), NULL;
867                 bits = latchmgr->alloc->bits;
868         } else if( mode == BT_ro )
869                 return bt_mgrclose (mgr), NULL;
870 #endif
871
872         mgr->page_size = 1 << bits;
873         mgr->page_bits = bits;
874
875         mgr->poolmax = poolmax;
876         mgr->mode = mode;
877
878         if( cacheblk < mgr->page_size )
879                 cacheblk = mgr->page_size;
880
881         //  mask for partial memmaps
882
883         mgr->poolmask = (cacheblk >> bits) - 1;
884
885         //      see if requested size of pages per memmap is greater
886
887         if( (1 << segsize) > mgr->poolmask )
888                 mgr->poolmask = (1 << segsize) - 1;
889
890         mgr->seg_bits = 0;
891
892         while( (1 << mgr->seg_bits) <= mgr->poolmask )
893                 mgr->seg_bits++;
894
895         mgr->hashsize = hashsize;
896
897 #ifdef unix
898         mgr->pool = calloc (poolmax, sizeof(BtPool));
899         mgr->hash = calloc (hashsize, sizeof(ushort));
900         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
901 #else
902         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
903         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
904         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
905 #endif
906
907         if( size || *amt )
908                 goto mgrlatch;
909
910         // initialize an empty b-tree with latch page, root page, page of leaves
911         // and page(s) of latches
912
913         memset (latchmgr, 0, 1 << bits);
914         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
915         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
916         latchmgr->alloc->bits = mgr->page_bits;
917
918         latchmgr->nlatchpage = nlatchpage;
919         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
920
921         //  initialize latch manager
922
923         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
924
925         //      size of hash table = total number of latchsets
926
927         if( latchhash > latchmgr->latchtotal )
928                 latchhash = latchmgr->latchtotal;
929
930         latchmgr->latchhash = latchhash;
931
932 #ifdef unix
933         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
934                 return bt_mgrclose (mgr), NULL;
935 #else
936         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
937                 return bt_mgrclose (mgr), NULL;
938
939         if( *amt < mgr->page_size )
940                 return bt_mgrclose (mgr), NULL;
941 #endif
942
943         memset (latchmgr, 0, 1 << bits);
944         latchmgr->alloc->bits = mgr->page_bits;
945
946         for( lvl=MIN_lvl; lvl--; ) {
947                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
948                 key = keyptr(latchmgr->alloc, 1);
949                 key->len = 2;           // create stopper key
950                 key->key[0] = 0xff;
951                 key->key[1] = 0xff;
952
953                 bt_putid(value, MIN_lvl - lvl + 1);
954                 val = valptr(latchmgr->alloc, 1);
955                 val->len = lvl ? BtId : 0;
956                 memcpy (val->value, value, val->len);
957
958                 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
959                 latchmgr->alloc->lvl = lvl;
960                 latchmgr->alloc->cnt = 1;
961                 latchmgr->alloc->act = 1;
962 #ifdef unix
963                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
964                         return bt_mgrclose (mgr), NULL;
965 #else
966                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
967                         return bt_mgrclose (mgr), NULL;
968
969                 if( *amt < mgr->page_size )
970                         return bt_mgrclose (mgr), NULL;
971 #endif
972         }
973
974         // clear out latch manager locks
975         //      and rest of pages to round out segment
976
977         memset(latchmgr, 0, mgr->page_size);
978         last = MIN_lvl + 1;
979
980         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
981 #ifdef unix
982                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
983 #else
984                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
985                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
986                         return bt_mgrclose (mgr), NULL;
987                 if( *amt < mgr->page_size )
988                         return bt_mgrclose (mgr), NULL;
989 #endif
990                 last++;
991         }
992
993 mgrlatch:
994 #ifdef unix
995         flag = PROT_READ | PROT_WRITE;
996         mgr->latchmgr = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
997         if( mgr->latchmgr == MAP_FAILED )
998                 return bt_mgrclose (mgr), NULL;
999         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1000         if( mgr->latchsets == MAP_FAILED )
1001                 return bt_mgrclose (mgr), NULL;
1002 #else
1003         flag = PAGE_READWRITE;
1004         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1005         if( !mgr->halloc )
1006                 return bt_mgrclose (mgr), NULL;
1007
1008         flag = FILE_MAP_WRITE;
1009         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1010         if( !mgr->latchmgr )
1011                 return GetLastError(), bt_mgrclose (mgr), NULL;
1012
1013         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1014 #endif
1015
1016 #ifdef unix
1017         free (latchmgr);
1018 #else
1019         VirtualFree (latchmgr, 0, MEM_RELEASE);
1020 #endif
1021         return mgr;
1022 }
1023
1024 //      open BTree access method
1025 //      based on buffer manager
1026
1027 BtDb *bt_open (BtMgr *mgr)
1028 {
1029 BtDb *bt = malloc (sizeof(*bt));
1030
1031         memset (bt, 0, sizeof(*bt));
1032         bt->mgr = mgr;
1033 #ifdef unix
1034         bt->mem = malloc (2 *mgr->page_size);
1035 #else
1036         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1037 #endif
1038         bt->frame = (BtPage)bt->mem;
1039         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1040         return bt;
1041 }
1042
1043 //  compare two keys, returning > 0, = 0, or < 0
1044 //  as the comparison value
1045
1046 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1047 {
1048 uint len1 = key1->len;
1049 int ans;
1050
1051         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1052                 return ans;
1053
1054         if( len1 > len2 )
1055                 return 1;
1056         if( len1 < len2 )
1057                 return -1;
1058
1059         return 0;
1060 }
1061
1062 //      Buffer Pool mgr
1063
1064 // find segment in pool
1065 // must be called with hashslot idx locked
1066 //      return NULL if not there
1067 //      otherwise return node
1068
1069 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1070 {
1071 BtPool *pool;
1072 uint slot;
1073
1074         // compute start of hash chain in pool
1075
1076         if( slot = bt->mgr->hash[idx] ) 
1077                 pool = bt->mgr->pool + slot;
1078         else
1079                 return NULL;
1080
1081         page_no &= ~bt->mgr->poolmask;
1082
1083         while( pool->basepage != page_no )
1084           if( pool = pool->hashnext )
1085                 continue;
1086           else
1087                 return NULL;
1088
1089         return pool;
1090 }
1091
1092 // add segment to hash table
1093
1094 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1095 {
1096 BtPool *node;
1097 uint slot;
1098
1099         pool->hashprev = pool->hashnext = NULL;
1100         pool->basepage = page_no & ~bt->mgr->poolmask;
1101         pool->pin = CLOCK_bit + 1;
1102
1103         if( slot = bt->mgr->hash[idx] ) {
1104                 node = bt->mgr->pool + slot;
1105                 pool->hashnext = node;
1106                 node->hashprev = pool;
1107         }
1108
1109         bt->mgr->hash[idx] = pool->slot;
1110 }
1111
1112 //  map new buffer pool segment to virtual memory
1113
1114 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1115 {
1116 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1117 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1118 int flag;
1119
1120 #ifdef unix
1121         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1122         pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1123         if( pool->map == MAP_FAILED )
1124                 return bt->err = BTERR_map;
1125
1126 #else
1127         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1128         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1129         if( !pool->hmap )
1130                 return bt->err = BTERR_map;
1131
1132         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1133         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1134         if( !pool->map )
1135                 return bt->err = BTERR_map;
1136 #endif
1137         return bt->err = 0;
1138 }
1139
1140 //      calculate page within pool
1141
1142 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1143 {
1144 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1145 BtPage page;
1146
1147         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1148         return page;
1149 }
1150
1151 //  release pool pin
1152
1153 void bt_unpinpool (BtPool *pool)
1154 {
1155 #ifdef unix
1156         __sync_fetch_and_add(&pool->pin, -1);
1157 #else
1158         _InterlockedDecrement16 (&pool->pin);
1159 #endif
1160 }
1161
1162 //      find or place requested page in segment-pool
1163 //      return pool table entry, incrementing pin
1164
1165 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1166 {
1167 uint slot, hashidx, idx, victim;
1168 BtPool *pool, *node, *next;
1169
1170         //      lock hash table chain
1171
1172         hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1173         bt_spinwritelock (&bt->mgr->latch[hashidx]);
1174
1175         //      look up in hash table
1176
1177         if( pool = bt_findpool(bt, page_no, hashidx) ) {
1178 #ifdef unix
1179                 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1180                 __sync_fetch_and_add(&pool->pin, 1);
1181 #else
1182                 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1183                 _InterlockedIncrement16 (&pool->pin);
1184 #endif
1185                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1186                 return pool;
1187         }
1188
1189         // allocate a new pool node
1190         // and add to hash table
1191
1192 #ifdef unix
1193         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1194 #else
1195         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1196 #endif
1197
1198         if( ++slot < bt->mgr->poolmax ) {
1199                 pool = bt->mgr->pool + slot;
1200                 pool->slot = slot;
1201
1202                 if( bt_mapsegment(bt, pool, page_no) )
1203                         return NULL;
1204
1205                 bt_linkhash(bt, pool, page_no, hashidx);
1206                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1207                 return pool;
1208         }
1209
1210         // pool table is full
1211         //      find best pool entry to evict
1212
1213 #ifdef unix
1214         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1215 #else
1216         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1217 #endif
1218
1219         while( 1 ) {
1220 #ifdef unix
1221                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1222 #else
1223                 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1224 #endif
1225                 victim %= bt->mgr->poolmax;
1226                 pool = bt->mgr->pool + victim;
1227                 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1228
1229                 if( !victim )
1230                         continue;
1231
1232                 // try to get write lock
1233                 //      skip entry if not obtained
1234
1235                 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1236                         continue;
1237
1238                 //      skip this entry if
1239                 //      page is pinned
1240                 //  or clock bit is set
1241
1242                 if( pool->pin ) {
1243 #ifdef unix
1244                         __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1245 #else
1246                         _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1247 #endif
1248                         bt_spinreleasewrite (&bt->mgr->latch[idx]);
1249                         continue;
1250                 }
1251
1252                 // unlink victim pool node from hash table
1253
1254                 if( node = pool->hashprev )
1255                         node->hashnext = pool->hashnext;
1256                 else if( node = pool->hashnext )
1257                         bt->mgr->hash[idx] = node->slot;
1258                 else
1259                         bt->mgr->hash[idx] = 0;
1260
1261                 if( node = pool->hashnext )
1262                         node->hashprev = pool->hashprev;
1263
1264                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1265
1266                 //      remove old file mapping
1267 #ifdef unix
1268                 munmap (pool->map, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1269 #else
1270 //              FlushViewOfFile(pool->map, 0);
1271                 UnmapViewOfFile(pool->map);
1272                 CloseHandle(pool->hmap);
1273 #endif
1274                 pool->map = NULL;
1275
1276                 //  create new pool mapping
1277                 //  and link into hash table
1278
1279                 if( bt_mapsegment(bt, pool, page_no) )
1280                         return NULL;
1281
1282                 bt_linkhash(bt, pool, page_no, hashidx);
1283                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1284                 return pool;
1285         }
1286 }
1287
1288 // place write, read, or parent lock on requested page_no.
1289
1290 void bt_lockpage(BtLock mode, BtLatchSet *set)
1291 {
1292         switch( mode ) {
1293         case BtLockRead:
1294                 ReadLock (set->readwr);
1295                 break;
1296         case BtLockWrite:
1297                 WriteLock (set->readwr);
1298                 break;
1299         case BtLockAccess:
1300                 ReadLock (set->access);
1301                 break;
1302         case BtLockDelete:
1303                 WriteLock (set->access);
1304                 break;
1305         case BtLockParent:
1306                 WriteLock (set->parent);
1307                 break;
1308         }
1309 }
1310
1311 // remove write, read, or parent lock on requested page
1312
1313 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1314 {
1315         switch( mode ) {
1316         case BtLockRead:
1317                 ReadRelease (set->readwr);
1318                 break;
1319         case BtLockWrite:
1320                 WriteRelease (set->readwr);
1321                 break;
1322         case BtLockAccess:
1323                 ReadRelease (set->access);
1324                 break;
1325         case BtLockDelete:
1326                 WriteRelease (set->access);
1327                 break;
1328         case BtLockParent:
1329                 WriteRelease (set->parent);
1330                 break;
1331         }
1332 }
1333
1334 //      allocate a new page and write page into it
1335
1336 uid bt_newpage(BtDb *bt, BtPage page)
1337 {
1338 BtPageSet set[1];
1339 uid new_page;
1340 int blk;
1341
1342         //      lock allocation page
1343
1344         bt_spinwritelock(bt->mgr->latchmgr->lock);
1345
1346         // use empty chain first
1347         // else allocate empty page
1348
1349         if( new_page = bt_getid(bt->mgr->latchmgr->chain) ) {
1350                 if( set->pool = bt_pinpool (bt, new_page) )
1351                         set->page = bt_page (bt, set->pool, new_page);
1352                 else
1353                         return 0;
1354
1355                 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1356                 bt_unpinpool (set->pool);
1357         } else {
1358                 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1359                 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1360 #ifdef unix
1361                 // if writing first page of pool block, set file length thru last page
1362
1363                 if( (new_page & bt->mgr->poolmask) == 0 )
1364                         ftruncate (bt->mgr->idx, (new_page + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1365 #endif
1366         }
1367 #ifdef unix
1368         // unlock allocation latch
1369
1370         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1371 #endif
1372
1373         //      bring new page into pool and copy page.
1374         //      this will extend the file into the new pages on WIN32.
1375
1376         if( set->pool = bt_pinpool (bt, new_page) )
1377                 set->page = bt_page (bt, set->pool, new_page);
1378         else
1379                 return 0;
1380
1381         memcpy(set->page, page, bt->mgr->page_size);
1382         bt_unpinpool (set->pool);
1383
1384 #ifndef unix
1385         // unlock allocation latch
1386
1387         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1388 #endif
1389         return new_page;
1390 }
1391
1392 //  find slot in page for given key at a given level
1393
1394 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1395 {
1396 uint diff, higher = set->page->cnt, low = 1, slot;
1397 uint good = 0;
1398
1399         //        make stopper key an infinite fence value
1400
1401         if( bt_getid (set->page->right) )
1402                 higher++;
1403         else
1404                 good++;
1405
1406         //      low is the lowest candidate.
1407         //  loop ends when they meet
1408
1409         //  higher is already
1410         //      tested as .ge. the passed key.
1411
1412         while( diff = higher - low ) {
1413                 slot = low + ( diff >> 1 );
1414                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1415                         low = slot + 1;
1416                 else
1417                         higher = slot, good++;
1418         }
1419
1420         //      return zero if key is on right link page
1421
1422         return good ? higher : 0;
1423 }
1424
1425 //  find and load page at given level for given key
1426 //      leave page rd or wr locked as requested
1427
1428 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1429 {
1430 uid page_no = ROOT_page, prevpage = 0;
1431 uint drill = 0xff, slot;
1432 BtLatchSet *prevlatch;
1433 uint mode, prevmode;
1434 BtPool *prevpool;
1435
1436   //  start at root of btree and drill down
1437
1438   do {
1439         // determine lock mode of drill level
1440         mode = (drill == lvl) ? lock : BtLockRead; 
1441
1442         set->latch = bt_pinlatch (bt, page_no);
1443         set->page_no = page_no;
1444
1445         // pin page contents
1446
1447         if( set->pool = bt_pinpool (bt, page_no) )
1448                 set->page = bt_page (bt, set->pool, page_no);
1449         else
1450                 return 0;
1451
1452         // obtain access lock using lock chaining with Access mode
1453
1454         if( page_no > ROOT_page )
1455           bt_lockpage(BtLockAccess, set->latch);
1456
1457         //      release & unpin parent page
1458
1459         if( prevpage ) {
1460           bt_unlockpage(prevmode, prevlatch);
1461           bt_unpinlatch (prevlatch);
1462           bt_unpinpool (prevpool);
1463           prevpage = 0;
1464         }
1465
1466         // obtain read lock using lock chaining
1467
1468         bt_lockpage(mode, set->latch);
1469
1470         if( set->page->free )
1471                 return bt->err = BTERR_struct, 0;
1472
1473         if( page_no > ROOT_page )
1474           bt_unlockpage(BtLockAccess, set->latch);
1475
1476         // re-read and re-lock root after determining actual level of root
1477
1478         if( set->page->lvl != drill) {
1479                 if( set->page_no != ROOT_page )
1480                         return bt->err = BTERR_struct, 0;
1481                         
1482                 drill = set->page->lvl;
1483
1484                 if( lock != BtLockRead && drill == lvl ) {
1485                   bt_unlockpage(mode, set->latch);
1486                   bt_unpinlatch (set->latch);
1487                   bt_unpinpool (set->pool);
1488                   continue;
1489                 }
1490         }
1491
1492         prevpage = set->page_no;
1493         prevlatch = set->latch;
1494         prevpool = set->pool;
1495         prevmode = mode;
1496
1497         //  find key on page at this level
1498         //  and descend to requested level
1499
1500         if( !set->page->kill )
1501          if( slot = bt_findslot (set, key, len) ) {
1502           if( drill == lvl )
1503                 return slot;
1504
1505           while( slotptr(set->page, slot)->dead )
1506                 if( slot++ < set->page->cnt )
1507                         continue;
1508                 else
1509                         goto slideright;
1510
1511           page_no = bt_getid(valptr(set->page, slot)->value);
1512           drill--;
1513           continue;
1514          }
1515
1516         //  or slide right into next page
1517
1518 slideright:
1519         page_no = bt_getid(set->page->right);
1520
1521   } while( page_no );
1522
1523   // return error on end of right chain
1524
1525   bt->err = BTERR_struct;
1526   return 0;     // return error
1527 }
1528
1529 //      return page to free list
1530 //      page must be delete & write locked
1531
1532 void bt_freepage (BtDb *bt, BtPageSet *set)
1533 {
1534         //      lock allocation page
1535
1536         bt_spinwritelock (bt->mgr->latchmgr->lock);
1537
1538         //      store chain
1539         memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1540         bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1541         set->page->free = 1;
1542
1543         // unlock released page
1544
1545         bt_unlockpage (BtLockDelete, set->latch);
1546         bt_unlockpage (BtLockWrite, set->latch);
1547         bt_unpinlatch (set->latch);
1548         bt_unpinpool (set->pool);
1549
1550         // unlock allocation page
1551
1552         bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1553 }
1554
1555 //      a fence key was deleted from a page
1556 //      push new fence value upwards
1557
1558 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1559 {
1560 unsigned char leftkey[256], rightkey[256];
1561 unsigned char value[BtId];
1562 uid page_no;
1563 BtKey ptr;
1564
1565         //      remove the old fence value
1566
1567         ptr = keyptr(set->page, set->page->cnt);
1568         memcpy (rightkey, ptr, ptr->len + 1);
1569
1570         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1571         set->page->dirty = 1;
1572
1573         ptr = keyptr(set->page, set->page->cnt);
1574         memcpy (leftkey, ptr, ptr->len + 1);
1575         page_no = set->page_no;
1576
1577         bt_lockpage (BtLockParent, set->latch);
1578         bt_unlockpage (BtLockWrite, set->latch);
1579
1580         //      insert new (now smaller) fence key
1581
1582         bt_putid (value, page_no);
1583
1584         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId) )
1585           return bt->err;
1586
1587         //      now delete old fence key
1588
1589         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1) )
1590                 return bt->err;
1591
1592         bt_unlockpage (BtLockParent, set->latch);
1593         bt_unpinlatch(set->latch);
1594         bt_unpinpool (set->pool);
1595         return 0;
1596 }
1597
1598 //      root has a single child
1599 //      collapse a level from the tree
1600
1601 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1602 {
1603 BtPageSet child[1];
1604 uint idx;
1605
1606   // find the child entry and promote as new root contents
1607
1608   do {
1609         for( idx = 0; idx++ < root->page->cnt; )
1610           if( !slotptr(root->page, idx)->dead )
1611                 break;
1612
1613         child->page_no = bt_getid (valptr(root->page, idx)->value);
1614
1615         child->latch = bt_pinlatch (bt, child->page_no);
1616         bt_lockpage (BtLockDelete, child->latch);
1617         bt_lockpage (BtLockWrite, child->latch);
1618
1619         if( child->pool = bt_pinpool (bt, child->page_no) )
1620                 child->page = bt_page (bt, child->pool, child->page_no);
1621         else
1622                 return bt->err;
1623
1624         memcpy (root->page, child->page, bt->mgr->page_size);
1625         bt_freepage (bt, child);
1626
1627   } while( root->page->lvl > 1 && root->page->act == 1 );
1628
1629   bt_unlockpage (BtLockWrite, root->latch);
1630   bt_unpinlatch (root->latch);
1631   bt_unpinpool (root->pool);
1632   return 0;
1633 }
1634
1635 //  find and delete key on page by marking delete flag bit
1636 //  if page becomes empty, delete it from the btree
1637
1638 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1639 {
1640 unsigned char lowerfence[256], higherfence[256];
1641 uint slot, idx, dirty = 0, fence, found;
1642 BtPageSet set[1], right[1];
1643 unsigned char value[BtId];
1644 BtKey ptr;
1645
1646         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1647                 ptr = keyptr(set->page, slot);
1648         else
1649                 return bt->err;
1650
1651         //      are we deleting a fence slot?
1652
1653         fence = slot == set->page->cnt;
1654
1655         // if key is found delete it, otherwise ignore request
1656
1657         if( found = !keycmp (ptr, key, len) )
1658           if( found = slotptr(set->page, slot)->dead == 0 ) {
1659                 dirty = slotptr(set->page, slot)->dead = 1;
1660                 set->page->dirty = 1;
1661                 set->page->act--;
1662
1663                 // collapse empty slots
1664
1665                 while( idx = set->page->cnt - 1 )
1666                   if( slotptr(set->page, idx)->dead ) {
1667                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1668                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1669                   } else
1670                         break;
1671           }
1672
1673         //      did we delete a fence key in an upper level?
1674
1675         if( dirty && lvl && set->page->act && fence )
1676           if( bt_fixfence (bt, set, lvl) )
1677                 return bt->err;
1678           else
1679                 return bt->found = found, 0;
1680
1681         //      is this a collapsed root?
1682
1683         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1684           if( bt_collapseroot (bt, set) )
1685                 return bt->err;
1686           else
1687                 return bt->found = found, 0;
1688
1689         //      return if page is not empty
1690
1691         if( set->page->act ) {
1692                 bt_unlockpage(BtLockWrite, set->latch);
1693                 bt_unpinlatch (set->latch);
1694                 bt_unpinpool (set->pool);
1695                 return bt->found = found, 0;
1696         }
1697
1698         //      cache copy of fence key
1699         //      to post in parent
1700
1701         ptr = keyptr(set->page, set->page->cnt);
1702         memcpy (lowerfence, ptr, ptr->len + 1);
1703
1704         //      obtain lock on right page
1705
1706         right->page_no = bt_getid(set->page->right);
1707         right->latch = bt_pinlatch (bt, right->page_no);
1708         bt_lockpage (BtLockWrite, right->latch);
1709
1710         // pin page contents
1711
1712         if( right->pool = bt_pinpool (bt, right->page_no) )
1713                 right->page = bt_page (bt, right->pool, right->page_no);
1714         else
1715                 return 0;
1716
1717         if( right->page->kill )
1718                 return bt->err = BTERR_struct;
1719
1720         // pull contents of right peer into our empty page
1721
1722         memcpy (set->page, right->page, bt->mgr->page_size);
1723
1724         // cache copy of key to update
1725
1726         ptr = keyptr(right->page, right->page->cnt);
1727         memcpy (higherfence, ptr, ptr->len + 1);
1728
1729         // mark right page deleted and point it to left page
1730         //      until we can post parent updates
1731
1732         bt_putid (right->page->right, set->page_no);
1733         right->page->kill = 1;
1734
1735         bt_lockpage (BtLockParent, right->latch);
1736         bt_unlockpage (BtLockWrite, right->latch);
1737
1738         bt_lockpage (BtLockParent, set->latch);
1739         bt_unlockpage (BtLockWrite, set->latch);
1740
1741         // redirect higher key directly to our new node contents
1742
1743         bt_putid (value, set->page_no);
1744
1745         if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId) )
1746           return bt->err;
1747
1748         //      delete old lower key to our node
1749
1750         if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1) )
1751           return bt->err;
1752
1753         //      obtain delete and write locks to right node
1754
1755         bt_unlockpage (BtLockParent, right->latch);
1756         bt_lockpage (BtLockDelete, right->latch);
1757         bt_lockpage (BtLockWrite, right->latch);
1758         bt_freepage (bt, right);
1759
1760         bt_unlockpage (BtLockParent, set->latch);
1761         bt_unpinlatch (set->latch);
1762         bt_unpinpool (set->pool);
1763         bt->found = found;
1764         return 0;
1765 }
1766
1767 //      find key in leaf level and return number of value bytes
1768 //      or (-1) if not found
1769
1770 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1771 {
1772 BtPageSet set[1];
1773 uint  slot;
1774 BtKey ptr;
1775 BtVal val;
1776 int ret;
1777
1778         if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1779                 ptr = keyptr(set->page, slot);
1780         else
1781                 return 0;
1782
1783         // if key exists, return TRUE
1784         //      otherwise return FALSE
1785
1786         if( !keycmp (ptr, key, keylen) ) {
1787                 val = valptr (set->page,slot);
1788                 if( valmax > val->len )
1789                         valmax = val->len;
1790                 memcpy (value, val->value, valmax);
1791                 ret = valmax;
1792         } else
1793                 ret = -1;
1794
1795         bt_unlockpage (BtLockRead, set->latch);
1796         bt_unpinlatch (set->latch);
1797         bt_unpinpool (set->pool);
1798         return ret;
1799 }
1800
1801 //      check page for space available,
1802 //      clean if necessary and return
1803 //      0 - page needs splitting
1804 //      >0  new slot value
1805
1806 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1807 {
1808 uint nxt = bt->mgr->page_size;
1809 uint cnt = 0, idx = 0;
1810 uint max = page->cnt;
1811 uint newslot = max;
1812 BtKey key;
1813 BtVal val;
1814
1815         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
1816                 return slot;
1817
1818         //      skip cleanup if nothing to reclaim
1819
1820         if( !page->dirty )
1821                 return 0;
1822
1823         memcpy (bt->frame, page, bt->mgr->page_size);
1824
1825         // skip page info and set rest of page to zero
1826
1827         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1828         page->dirty = 0;
1829         page->act = 0;
1830
1831         // try cleaning up page first
1832         // by removing deleted keys
1833
1834         while( cnt++ < max ) {
1835                 if( cnt == slot )
1836                         newslot = idx + 1;
1837                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1838                         continue;
1839
1840                 // copy the key across
1841
1842                 key = keyptr(bt->frame, cnt);
1843                 nxt -= key->len + 1;
1844                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1845
1846                 // copy the value across
1847
1848                 val = valptr(bt->frame, cnt);
1849                 nxt -= val->len + 1;
1850                 ((unsigned char *)page)[nxt] = val->len;
1851                 memcpy ((unsigned char *)page + nxt + 1, val, val->len);
1852
1853                 // set up the slot
1854
1855                 slotptr(page, idx)->off = nxt;
1856
1857                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1858                         page->act++;
1859         }
1860
1861         page->min = nxt;
1862         page->cnt = idx;
1863
1864         //      see if page has enough space now, or does it need splitting?
1865
1866         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
1867                 return newslot;
1868
1869         return 0;
1870 }
1871
1872 // split the root and raise the height of the btree
1873
1874 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, uid page_no2)
1875 {
1876 uint nxt = bt->mgr->page_size;
1877 unsigned char value[BtId];
1878 uid left;
1879
1880         //  Obtain an empty page to use, and copy the current
1881         //  root contents into it, e.g. lower keys
1882
1883         if( !(left = bt_newpage(bt, root->page)) )
1884                 return bt->err;
1885
1886         // preserve the page info at the bottom
1887         // of higher keys and set rest to zero
1888
1889         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1890
1891         // insert lower keys page fence key on newroot page as first key
1892
1893         nxt -= BtId + 1;
1894         bt_putid (value, left);
1895         ((unsigned char *)root->page)[nxt] = BtId;
1896         memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
1897
1898         nxt -= *leftkey + 1;
1899         memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
1900         slotptr(root->page, 1)->off = nxt;
1901         
1902         // insert stopper key on newroot page
1903         // and increase the root height
1904
1905         nxt -= 3 + BtId + 1;
1906         ((unsigned char *)root->page)[nxt] = 2;
1907         ((unsigned char *)root->page)[nxt+1] = 0xff;
1908         ((unsigned char *)root->page)[nxt+2] = 0xff;
1909
1910         bt_putid (value, page_no2);
1911         ((unsigned char *)root->page)[nxt+3] = BtId;
1912         memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
1913         slotptr(root->page, 2)->off = nxt;
1914
1915         bt_putid(root->page->right, 0);
1916         root->page->min = nxt;          // reset lowest used offset and key count
1917         root->page->cnt = 2;
1918         root->page->act = 2;
1919         root->page->lvl++;
1920
1921         // release and unpin root
1922
1923         bt_unlockpage(BtLockWrite, root->latch);
1924         bt_unpinlatch (root->latch);
1925         bt_unpinpool (root->pool);
1926         return 0;
1927 }
1928
1929 //  split already locked full node
1930 //      return unlocked.
1931
1932 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1933 {
1934 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1935 unsigned char fencekey[256], rightkey[256];
1936 unsigned char value[BtId];
1937 uint lvl = set->page->lvl;
1938 BtPageSet right[1];
1939 uint prev;
1940 BtKey key;
1941 BtVal val;
1942
1943         //  split higher half of keys to bt->frame
1944
1945         memset (bt->frame, 0, bt->mgr->page_size);
1946         max = set->page->cnt;
1947         cnt = max / 2;
1948         idx = 0;
1949
1950         while( cnt++ < max ) {
1951                 val = valptr(set->page, cnt);
1952                 nxt -= val->len + 1;
1953                 ((unsigned char *)bt->frame)[nxt] = val->len;
1954                 memcpy ((unsigned char *)bt->frame + nxt + 1, val->value, val->len);
1955
1956                 key = keyptr(set->page, cnt);
1957                 nxt -= key->len + 1;
1958                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1959
1960                 slotptr(bt->frame, ++idx)->off = nxt;
1961
1962                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1963                         bt->frame->act++;
1964         }
1965
1966         // remember existing fence key for new page to the right
1967
1968         memcpy (rightkey, key, key->len + 1);
1969
1970         bt->frame->bits = bt->mgr->page_bits;
1971         bt->frame->min = nxt;
1972         bt->frame->cnt = idx;
1973         bt->frame->lvl = lvl;
1974
1975         // link right node
1976
1977         if( set->page_no > ROOT_page )
1978                 memcpy (bt->frame->right, set->page->right, BtId);
1979
1980         //      get new free page and write higher keys to it.
1981
1982         if( !(right->page_no = bt_newpage(bt, bt->frame)) )
1983                 return bt->err;
1984
1985         //      update lower keys to continue in old page
1986
1987         memcpy (bt->frame, set->page, bt->mgr->page_size);
1988         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1989         nxt = bt->mgr->page_size;
1990         set->page->dirty = 0;
1991         set->page->act = 0;
1992         cnt = 0;
1993         idx = 0;
1994
1995         //  assemble page of smaller keys
1996
1997         while( cnt++ < max / 2 ) {
1998                 val = valptr(bt->frame, cnt);
1999                 nxt -= val->len + 1;
2000                 ((unsigned char *)set->page)[nxt] = val->len;
2001                 memcpy ((unsigned char *)set->page + nxt + 1, val->value, val->len);
2002
2003                 key = keyptr(bt->frame, cnt);
2004                 nxt -= key->len + 1;
2005                 memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
2006                 slotptr(set->page, ++idx)->off = nxt;
2007                 set->page->act++;
2008         }
2009
2010         // remember fence key for smaller page
2011
2012         memcpy(fencekey, key, key->len + 1);
2013
2014         bt_putid(set->page->right, right->page_no);
2015         set->page->min = nxt;
2016         set->page->cnt = idx;
2017
2018         // if current page is the root page, split it
2019
2020         if( set->page_no == ROOT_page )
2021                 return bt_splitroot (bt, set, fencekey, right->page_no);
2022
2023         // insert new fences in their parent pages
2024
2025         right->latch = bt_pinlatch (bt, right->page_no);
2026         bt_lockpage (BtLockParent, right->latch);
2027
2028         bt_lockpage (BtLockParent, set->latch);
2029         bt_unlockpage (BtLockWrite, set->latch);
2030
2031         // insert new fence for reformulated left block of smaller keys
2032
2033         bt_putid (value, set->page_no);
2034
2035         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId) )
2036                 return bt->err;
2037
2038         // switch fence for right block of larger keys to new right page
2039
2040         bt_putid (value, right->page_no);
2041
2042         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId) )
2043                 return bt->err;
2044
2045         bt_unlockpage (BtLockParent, set->latch);
2046         bt_unpinlatch (set->latch);
2047         bt_unpinpool (set->pool);
2048
2049         bt_unlockpage (BtLockParent, right->latch);
2050         bt_unpinlatch (right->latch);
2051         return 0;
2052 }
2053 //  Insert new key into the btree at given level.
2054
2055 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, unsigned char *value, uint vallen)
2056 {
2057 BtPageSet set[1];
2058 uint slot, idx;
2059 uint reuse;
2060 BtKey ptr;
2061 BtVal val;
2062
2063         while( 1 ) {
2064                 if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
2065                         ptr = keyptr(set->page, slot);
2066                 else
2067                 {
2068                         if( !bt->err )
2069                                 bt->err = BTERR_ovflw;
2070                         return bt->err;
2071                 }
2072
2073                 // if key already exists, update id and return
2074
2075                 if( reuse = !keycmp (ptr, key, keylen) )
2076                   if( val = valptr(set->page, slot), val->len >= vallen ) {
2077                         if( slotptr(set->page, slot)->dead )
2078                                 set->page->act++;
2079                         slotptr(set->page, slot)->dead = 0;
2080                         val->len = vallen;
2081                         memcpy (val->value, value, vallen);
2082                         bt_unlockpage(BtLockWrite, set->latch);
2083                         bt_unpinlatch (set->latch);
2084                         bt_unpinpool (set->pool);
2085                         return 0;
2086                   } else {
2087                         if( !slotptr(set->page, slot)->dead )
2088                                 set->page->act--;
2089                         slotptr(set->page, slot)->dead = 1;
2090                         set->page->dirty = 1;
2091                   }
2092
2093                 // check if page has enough space
2094
2095                 if( slot = bt_cleanpage (bt, set->page, keylen, slot, vallen) )
2096                         break;
2097
2098                 if( bt_splitpage (bt, set) )
2099                         return bt->err;
2100         }
2101
2102         // calculate next available slot and copy key into page
2103
2104         set->page->min -= vallen + 1; // reset lowest used offset
2105         ((unsigned char *)set->page)[set->page->min] = vallen;
2106         memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
2107
2108         set->page->min -= keylen + 1; // reset lowest used offset
2109         ((unsigned char *)set->page)[set->page->min] = keylen;
2110         memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
2111
2112         for( idx = slot; idx < set->page->cnt; idx++ )
2113           if( slotptr(set->page, idx)->dead )
2114                 break;
2115
2116         // now insert key into array before slot
2117
2118         if( !reuse && idx == set->page->cnt )
2119                 idx++, set->page->cnt++;
2120
2121         set->page->act++;
2122
2123         while( idx > slot )
2124                 *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
2125
2126         slotptr(set->page, slot)->off = set->page->min;
2127         slotptr(set->page, slot)->dead = 0;
2128
2129         bt_unlockpage (BtLockWrite, set->latch);
2130         bt_unpinlatch (set->latch);
2131         bt_unpinpool (set->pool);
2132         return 0;
2133 }
2134
2135 //  cache page of keys into cursor and return starting slot for given key
2136
2137 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2138 {
2139 BtPageSet set[1];
2140 uint slot;
2141
2142         // cache page for retrieval
2143
2144         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2145           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2146         else
2147           return 0;
2148
2149         bt->cursor_page = set->page_no;
2150
2151         bt_unlockpage(BtLockRead, set->latch);
2152         bt_unpinlatch (set->latch);
2153         bt_unpinpool (set->pool);
2154         return slot;
2155 }
2156
2157 //  return next slot for cursor page
2158 //  or slide cursor right into next page
2159
2160 uint bt_nextkey (BtDb *bt, uint slot)
2161 {
2162 BtPageSet set[1];
2163 uid right;
2164
2165   do {
2166         right = bt_getid(bt->cursor->right);
2167
2168         while( slot++ < bt->cursor->cnt )
2169           if( slotptr(bt->cursor,slot)->dead )
2170                 continue;
2171           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2172                 return slot;
2173           else
2174                 break;
2175
2176         if( !right )
2177                 break;
2178
2179         bt->cursor_page = right;
2180
2181         if( set->pool = bt_pinpool (bt, right) )
2182                 set->page = bt_page (bt, set->pool, right);
2183         else
2184                 return 0;
2185
2186         set->latch = bt_pinlatch (bt, right);
2187     bt_lockpage(BtLockRead, set->latch);
2188
2189         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2190
2191         bt_unlockpage(BtLockRead, set->latch);
2192         bt_unpinlatch (set->latch);
2193         bt_unpinpool (set->pool);
2194         slot = 0;
2195
2196   } while( 1 );
2197
2198   return bt->err = 0;
2199 }
2200
2201 BtKey bt_key(BtDb *bt, uint slot)
2202 {
2203         return keyptr(bt->cursor, slot);
2204 }
2205
2206 BtVal bt_val(BtDb *bt, uint slot)
2207 {
2208         return valptr(bt->cursor,slot);
2209 }
2210
2211 #ifdef STANDALONE
2212
2213 #ifndef unix
2214 double getCpuTime(int type)
2215 {
2216 FILETIME crtime[1];
2217 FILETIME xittime[1];
2218 FILETIME systime[1];
2219 FILETIME usrtime[1];
2220 SYSTEMTIME timeconv[1];
2221 double ans = 0;
2222
2223         memset (timeconv, 0, sizeof(SYSTEMTIME));
2224
2225         switch( type ) {
2226         case 0:
2227                 GetSystemTimeAsFileTime (xittime);
2228                 FileTimeToSystemTime (xittime, timeconv);
2229                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2230                 break;
2231         case 1:
2232                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2233                 FileTimeToSystemTime (usrtime, timeconv);
2234                 break;
2235         case 2:
2236                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2237                 FileTimeToSystemTime (systime, timeconv);
2238                 break;
2239         }
2240
2241         ans += (double)timeconv->wHour * 3600;
2242         ans += (double)timeconv->wMinute * 60;
2243         ans += (double)timeconv->wSecond;
2244         ans += (double)timeconv->wMilliseconds / 1000;
2245         return ans;
2246 }
2247 #else
2248 #include <time.h>
2249 #include <sys/resource.h>
2250
2251 double getCpuTime(int type)
2252 {
2253 struct rusage used[1];
2254 struct timeval tv[1];
2255
2256         switch( type ) {
2257         case 0:
2258                 gettimeofday(tv, NULL);
2259                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2260
2261         case 1:
2262                 getrusage(RUSAGE_SELF, used);
2263                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2264
2265         case 2:
2266                 getrusage(RUSAGE_SELF, used);
2267                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2268         }
2269
2270         return 0;
2271 }
2272 #endif
2273
2274 uint bt_latchaudit (BtDb *bt)
2275 {
2276 ushort idx, hashidx;
2277 uid next, page_no;
2278 BtLatchSet *latch;
2279 uint cnt = 0;
2280 BtKey ptr;
2281
2282 #ifdef unix
2283         posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2284 #endif
2285         if( *(ushort *)(bt->mgr->latchmgr->lock) )
2286                 fprintf(stderr, "Alloc page locked\n");
2287         *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2288
2289         for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2290                 latch = bt->mgr->latchsets + idx;
2291                 if( *latch->readwr->rin & MASK )
2292                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2293                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2294
2295                 if( *latch->access->rin & MASK )
2296                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2297                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2298
2299                 if( *latch->parent->rin & MASK )
2300                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2301                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2302
2303                 if( latch->pin ) {
2304                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2305                         latch->pin = 0;
2306                 }
2307         }
2308
2309         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2310           if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2311                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2312
2313           *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2314
2315           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2316                 latch = bt->mgr->latchsets + idx;
2317                 if( *(ushort *)latch->busy )
2318                         fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2319                 *(ushort *)latch->busy = 0;
2320                 if( latch->pin )
2321                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2322           } while( idx = latch->next );
2323         }
2324
2325         next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2326         page_no = LEAF_page;
2327
2328         while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2329         off64_t off = page_no << bt->mgr->page_bits;
2330 #ifdef unix
2331                 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2332 #else
2333                 DWORD amt[1];
2334
2335                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2336
2337                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2338                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2339
2340                   if( *amt <  bt->mgr->page_size )
2341                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2342 #endif
2343                 if( !bt->frame->free ) {
2344                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2345                   ptr = keyptr(bt->frame, idx+1);
2346                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2347                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2348                  }
2349                  if( !bt->frame->lvl )
2350                         cnt += bt->frame->act;
2351                 }
2352                 if( page_no > LEAF_page )
2353                         next = page_no + 1;
2354                 page_no = next;
2355         }
2356         return cnt - 1;
2357 }
2358
2359 typedef struct {
2360         char type, idx;
2361         char *infile;
2362         BtMgr *mgr;
2363         int num;
2364 } ThreadArg;
2365
2366 //  standalone program to index file of keys
2367 //  then list them onto std-out
2368
2369 #ifdef unix
2370 void *index_file (void *arg)
2371 #else
2372 uint __stdcall index_file (void *arg)
2373 #endif
2374 {
2375 int line = 0, found = 0, cnt = 0;
2376 uid next, page_no = LEAF_page;  // start on first page of leaves
2377 unsigned char key[256];
2378 ThreadArg *args = arg;
2379 int ch, len = 0, slot;
2380 BtPageSet set[1];
2381 BtKey ptr;
2382 BtDb *bt;
2383 FILE *in;
2384
2385         bt = bt_open (args->mgr);
2386
2387         switch(args->type | 0x20)
2388         {
2389         case 'a':
2390                 fprintf(stderr, "started latch mgr audit\n");
2391                 cnt = bt_latchaudit (bt);
2392                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2393                 break;
2394
2395         case 'p':
2396                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2397                 if( in = fopen (args->infile, "rb") )
2398                   while( ch = getc(in), ch != EOF )
2399                         if( ch == '\n' )
2400                         {
2401                           line++;
2402
2403                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10) )
2404                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2405                           len = 0;
2406                         }
2407                         else if( len < 255 )
2408                                 key[len++] = ch;
2409                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2410                 break;
2411
2412         case 'w':
2413                 fprintf(stderr, "started indexing for %s\n", args->infile);
2414                 if( in = fopen (args->infile, "rb") )
2415                   while( ch = getc(in), ch != EOF )
2416                         if( ch == '\n' )
2417                         {
2418                           line++;
2419
2420                           if( args->num == 1 )
2421                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2422
2423                           else if( args->num )
2424                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2425
2426                           if( bt_insertkey (bt, key, len, 0, NULL, 0) )
2427                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2428                           len = 0;
2429                         }
2430                         else if( len < 255 )
2431                                 key[len++] = ch;
2432                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2433                 break;
2434
2435         case 'd':
2436                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2437                 if( in = fopen (args->infile, "rb") )
2438                   while( ch = getc(in), ch != EOF )
2439                         if( ch == '\n' )
2440                         {
2441                           line++;
2442                           if( args->num == 1 )
2443                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2444
2445                           else if( args->num )
2446                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2447
2448                           if( bt_deletekey (bt, key, len, 0) )
2449                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2450                           len = 0;
2451                         }
2452                         else if( len < 255 )
2453                                 key[len++] = ch;
2454                 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
2455                 break;
2456
2457         case 'f':
2458                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2459                 if( in = fopen (args->infile, "rb") )
2460                   while( ch = getc(in), ch != EOF )
2461                         if( ch == '\n' )
2462                         {
2463                           line++;
2464                           if( args->num == 1 )
2465                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2466
2467                           else if( args->num )
2468                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2469
2470                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2471                                 found++;
2472                           else if( bt->err )
2473                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2474                           len = 0;
2475                         }
2476                         else if( len < 255 )
2477                                 key[len++] = ch;
2478                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2479                 break;
2480
2481         case 's':
2482                 fprintf(stderr, "started scanning\n");
2483                 do {
2484                         if( set->pool = bt_pinpool (bt, page_no) )
2485                                 set->page = bt_page (bt, set->pool, page_no);
2486                         else
2487                                 break;
2488                         set->latch = bt_pinlatch (bt, page_no);
2489                         bt_lockpage (BtLockRead, set->latch);
2490                         next = bt_getid (set->page->right);
2491                         cnt += set->page->act;
2492
2493                         for( slot = 0; slot++ < set->page->cnt; )
2494                          if( next || slot < set->page->cnt )
2495                           if( !slotptr(set->page, slot)->dead ) {
2496                                 ptr = keyptr(set->page, slot);
2497                                 fwrite (ptr->key, ptr->len, 1, stdout);
2498                                 fputc ('\n', stdout);
2499                           }
2500
2501                         bt_unlockpage (BtLockRead, set->latch);
2502                         bt_unpinlatch (set->latch);
2503                         bt_unpinpool (set->pool);
2504                 } while( page_no = next );
2505
2506                 cnt--;  // remove stopper key
2507                 fprintf(stderr, " Total keys read %d\n", cnt);
2508                 break;
2509
2510         case 'c':
2511 #ifdef unix
2512                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2513 #endif
2514                 fprintf(stderr, "started counting\n");
2515                 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2516                 page_no = LEAF_page;
2517
2518                 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2519                 uid off = page_no << bt->mgr->page_bits;
2520 #ifdef unix
2521                   pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2522 #else
2523                 DWORD amt[1];
2524
2525                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2526
2527                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2528                         return bt->err = BTERR_map;
2529
2530                   if( *amt <  bt->mgr->page_size )
2531                         return bt->err = BTERR_map;
2532 #endif
2533                         if( !bt->frame->free && !bt->frame->lvl )
2534                                 cnt += bt->frame->act;
2535                         if( page_no > LEAF_page )
2536                                 next = page_no + 1;
2537                         page_no = next;
2538                 }
2539                 
2540                 cnt--;  // remove stopper key
2541                 fprintf(stderr, " Total keys read %d\n", cnt);
2542                 break;
2543         }
2544
2545         bt_close (bt);
2546 #ifdef unix
2547         return NULL;
2548 #else
2549         return 0;
2550 #endif
2551 }
2552
2553 typedef struct timeval timer;
2554
2555 int main (int argc, char **argv)
2556 {
2557 int idx, cnt, len, slot, err;
2558 int segsize, bits = 16;
2559 double start, stop;
2560 #ifdef unix
2561 pthread_t *threads;
2562 #else
2563 HANDLE *threads;
2564 #endif
2565 ThreadArg *args;
2566 uint poolsize = 0;
2567 float elapsed;
2568 int num = 0;
2569 char key[1];
2570 BtMgr *mgr;
2571 BtKey ptr;
2572 BtDb *bt;
2573
2574         if( argc < 3 ) {
2575                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2576                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2577                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2578                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2579                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2580                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2581                 exit(0);
2582         }
2583
2584         start = getCpuTime(0);
2585
2586         if( argc > 3 )
2587                 bits = atoi(argv[3]);
2588
2589         if( argc > 4 )
2590                 poolsize = atoi(argv[4]);
2591
2592         if( !poolsize )
2593                 fprintf (stderr, "Warning: no mapped_pool\n");
2594
2595         if( poolsize > 65535 )
2596                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2597
2598         if( argc > 5 )
2599                 segsize = atoi(argv[5]);
2600         else
2601                 segsize = 4;    // 16 pages per mmap segment
2602
2603         if( argc > 6 )
2604                 num = atoi(argv[6]);
2605
2606         cnt = argc - 7;
2607 #ifdef unix
2608         threads = malloc (cnt * sizeof(pthread_t));
2609 #else
2610         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2611 #endif
2612         args = malloc (cnt * sizeof(ThreadArg));
2613
2614         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2615
2616         if( !mgr ) {
2617                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2618                 exit (1);
2619         }
2620
2621         //      fire off threads
2622
2623         for( idx = 0; idx < cnt; idx++ ) {
2624                 args[idx].infile = argv[idx + 7];
2625                 args[idx].type = argv[2][0];
2626                 args[idx].mgr = mgr;
2627                 args[idx].num = num;
2628                 args[idx].idx = idx;
2629 #ifdef unix
2630                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2631                         fprintf(stderr, "Error creating thread %d\n", err);
2632 #else
2633                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2634 #endif
2635         }
2636
2637         //      wait for termination
2638
2639 #ifdef unix
2640         for( idx = 0; idx < cnt; idx++ )
2641                 pthread_join (threads[idx], NULL);
2642 #else
2643         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2644
2645         for( idx = 0; idx < cnt; idx++ )
2646                 CloseHandle(threads[idx]);
2647
2648 #endif
2649         elapsed = getCpuTime(0) - start;
2650         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2651         elapsed = getCpuTime(1);
2652         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2653         elapsed = getCpuTime(2);
2654         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2655
2656         bt_mgrclose (mgr);
2657 }
2658
2659 #endif  //STANDALONE