]> pd.if.org Git - btree/blob - threadskv1.c
Fix bugs found by David Groves
[btree] / threadskv1.c
1 // btree version threadskv1 sched_yield version
2 //      with reworked bt_deletekey code
3 //      and phase-fair reader writer lock
4 // 12 MAR 2014
5
6 // author: karl malbrain, malbrain@cal.berkeley.edu
7
8 /*
9 This work, including the source code, documentation
10 and related data, is placed into the public domain.
11
12 The orginal author is Karl Malbrain.
13
14 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
15 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
16 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
17 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
18 RESULTING FROM THE USE, MODIFICATION, OR
19 REDISTRIBUTION OF THIS SOFTWARE.
20 */
21
22 // Please see the project home page for documentation
23 // code.google.com/p/high-concurrency-btree
24
25 #define _FILE_OFFSET_BITS 64
26 #define _LARGEFILE64_SOURCE
27
28 #ifdef linux
29 #define _GNU_SOURCE
30 #endif
31
32 #ifdef unix
33 #include <unistd.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <fcntl.h>
37 #include <sys/time.h>
38 #include <sys/mman.h>
39 #include <errno.h>
40 #include <pthread.h>
41 #else
42 #define WIN32_LEAN_AND_MEAN
43 #include <windows.h>
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <time.h>
47 #include <fcntl.h>
48 #include <process.h>
49 #include <intrin.h>
50 #endif
51
52 #include <memory.h>
53 #include <string.h>
54 #include <stddef.h>
55
56 typedef unsigned long long      uid;
57
58 #ifndef unix
59 typedef unsigned long long      off64_t;
60 typedef unsigned short          ushort;
61 typedef unsigned int            uint;
62 #endif
63
64 #define BT_latchtable   128                                     // number of latch manager slots
65
66 #define BT_ro 0x6f72    // ro
67 #define BT_rw 0x7772    // rw
68
69 #define BT_maxbits              24                                      // maximum page size in bits
70 #define BT_minbits              9                                       // minimum page size in bits
71 #define BT_minpage              (1 << BT_minbits)       // minimum page size
72 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
73
74 /*
75 There are five lock types for each node in three independent sets: 
76 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
77 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
78 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
79 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
80 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
81 */
82
83 typedef enum{
84         BtLockAccess,
85         BtLockDelete,
86         BtLockRead,
87         BtLockWrite,
88         BtLockParent
89 } BtLock;
90
91 //      definition for phase-fair reader/writer lock implementation
92
93 typedef struct {
94         ushort rin[1];
95         ushort rout[1];
96         ushort ticket[1];
97         ushort serving[1];
98 } RWLock;
99
100 #define PHID 0x1
101 #define PRES 0x2
102 #define MASK 0x3
103 #define RINC 0x4
104
105 //      definition for spin latch implementation
106
107 // exclusive is set for write access
108 // share is count of read accessors
109 // grant write lock when share == 0
110
111 volatile typedef struct {
112         ushort exclusive:1;
113         ushort pending:1;
114         ushort share:14;
115 } BtSpinLatch;
116
117 #define XCL 1
118 #define PEND 2
119 #define BOTH 3
120 #define SHARE 4
121
122 //  hash table entries
123
124 typedef struct {
125         BtSpinLatch latch[1];
126         volatile ushort slot;           // Latch table entry at head of chain
127 } BtHashEntry;
128
129 //      latch manager table structure
130
131 typedef struct {
132         RWLock readwr[1];               // read/write page lock
133         RWLock access[1];               // Access Intent/Page delete
134         RWLock parent[1];               // Posting of fence key in parent
135         BtSpinLatch busy[1];            // slot is being moved between chains
136         volatile ushort next;           // next entry in hash table chain
137         volatile ushort prev;           // prev entry in hash table chain
138         volatile ushort pin;            // number of outstanding locks
139         volatile ushort hash;           // hash slot entry is under
140         volatile uid page_no;           // latch set page number
141 } BtLatchSet;
142
143 //      Define the length of the page and key pointers
144
145 #define BtId 6
146
147 //      Page key slot definition.
148
149 //      If BT_maxbits is 15 or less, you can save 2 bytes
150 //      for each key stored by making the two uints
151 //      into ushorts.
152
153 //      Keys are marked dead, but remain on the page until
154 //      it cleanup is called. The fence key (highest key) for
155 //      the page is always present, even after cleanup.
156
157 typedef struct {
158         uint off:BT_maxbits;            // page offset for key start
159         uint dead:1;                            // set for deleted key
160 } BtSlot;
161
162 //      The key structure occupies space at the upper end of
163 //      each page.  It's a length byte followed by the key
164 //      bytes.
165
166 typedef struct {
167         unsigned char len;
168         unsigned char key[1];
169 } *BtKey;
170
171 //      the value structure also occupies space at the upper
172 //      end of the page.
173
174 typedef struct {
175         unsigned char len;
176         unsigned char value[1];
177 } *BtVal;
178
179 //      The first part of an index page.
180 //      It is immediately followed
181 //      by the BtSlot array of keys.
182
183 typedef struct BtPage_ {
184         uint cnt;                                       // count of keys in page
185         uint act;                                       // count of active keys
186         uint min;                                       // next key offset
187         unsigned char bits:7;           // page size in bits
188         unsigned char free:1;           // page is on free chain
189         unsigned char lvl:6;            // level of page
190         unsigned char kill:1;           // page is being deleted
191         unsigned char dirty:1;          // page has deleted keys
192         unsigned char right[BtId];      // page number to right
193 } *BtPage;
194
195 //      The memory mapping pool table buffer manager entry
196
197 typedef struct {
198         uid  basepage;                          // mapped base page number
199         char *map;                                      // mapped memory pointer
200         ushort slot;                            // slot index in this array
201         ushort pin;                                     // mapped page pin counter
202         void *hashprev;                         // previous pool entry for the same hash idx
203         void *hashnext;                         // next pool entry for the same hash idx
204 #ifndef unix
205         HANDLE hmap;                            // Windows memory mapping handle
206 #endif
207 } BtPool;
208
209 #define CLOCK_bit 0x8000                // bit in pool->pin
210
211 //  The loadpage interface object
212
213 typedef struct {
214         uid page_no;            // current page number
215         BtPage page;            // current page pointer
216         BtPool *pool;           // current page pool
217         BtLatchSet *latch;      // current page latch set
218 } BtPageSet;
219
220 //      structure for latch manager on ALLOC_page
221
222 typedef struct {
223         struct BtPage_ alloc[1];        // next page_no in right ptr
224         unsigned char chain[BtId];      // head of free page_nos chain
225         BtSpinLatch lock[1];            // allocation area lite latch
226         ushort latchdeployed;           // highest number of latch entries deployed
227         ushort nlatchpage;                      // number of latch pages at BT_latch
228         ushort latchtotal;                      // number of page latch entries
229         ushort latchhash;                       // number of latch hash table slots
230         ushort latchvictim;                     // next latch entry to examine
231         BtHashEntry table[0];           // the hash table
232 } BtLatchMgr;
233
234 //      The object structure for Btree access
235
236 typedef struct {
237         uint page_size;                         // page size    
238         uint page_bits;                         // page size in bits    
239         uint seg_bits;                          // seg size in pages in bits
240         uint mode;                                      // read-write mode
241 #ifdef unix
242         int idx;
243 #else
244         HANDLE idx;
245 #endif
246         ushort poolcnt;                         // highest page pool node in use
247         ushort poolmax;                         // highest page pool node allocated
248         ushort poolmask;                        // total number of pages in mmap segment - 1
249         ushort hashsize;                        // size of Hash Table for pool entries
250         volatile uint evicted;          // last evicted hash table slot
251         ushort *hash;                           // pool index for hash entries
252         BtSpinLatch *latch;                     // latches for hash table slots
253         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
254         BtLatchSet *latchsets;          // mapped latch set from latch pages
255         BtPool *pool;                           // memory pool page segments
256 #ifndef unix
257         HANDLE halloc;                          // allocation and latch table handle
258 #endif
259 } BtMgr;
260
261 typedef struct {
262         BtMgr *mgr;                     // buffer manager for thread
263         BtPage cursor;          // cached frame for start/next (never mapped)
264         BtPage frame;           // spare frame for the page split (never mapped)
265         uid cursor_page;        // current cursor page number   
266         unsigned char *mem;     // frame, cursor, page memory buffer
267         int found;                      // last delete or insert was found
268         int err;                        // last error
269 } BtDb;
270
271 typedef enum {
272         BTERR_ok = 0,
273         BTERR_struct,
274         BTERR_ovflw,
275         BTERR_lock,
276         BTERR_map,
277         BTERR_wrt,
278         BTERR_hash
279 } BTERR;
280
281 // B-Tree functions
282 extern void bt_close (BtDb *bt);
283 extern BtDb *bt_open (BtMgr *mgr);
284 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen);
285 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
286 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint vallen);
287 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
288 extern uint bt_nextkey   (BtDb *bt, uint slot);
289
290 //      manager functions
291 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
292 void bt_mgrclose (BtMgr *mgr);
293
294 //  Helper functions to return slot values
295 //      from the cursor page.
296
297 extern BtKey bt_key (BtDb *bt, uint slot);
298 extern BtVal bt_val (BtDb *bt, uint slot);
299
300 //  BTree page number constants
301 #define ALLOC_page              0       // allocation & latch manager hash table
302 #define ROOT_page               1       // root of the btree
303 #define LEAF_page               2       // first page of leaves
304 #define LATCH_page              3       // pages for latch manager
305
306 //      Number of levels to create in a new BTree
307
308 #define MIN_lvl                 2
309
310 //  The page is allocated from low and hi ends.
311 //  The key slots are allocated from the bottom,
312 //      while the text and value of the key
313 //  are allocated from the top.  When the two
314 //  areas meet, the page is split into two.
315
316 //  A key consists of a length byte, two bytes of
317 //  index number (0 - 65534), and up to 253 bytes
318 //  of key value.  Duplicate keys are discarded.
319 //  Associated with each key is a value byte string
320 //      containing any value desired.
321
322 //  The b-tree root is always located at page 1.
323 //      The first leaf page of level zero is always
324 //      located on page 2.
325
326 //      The b-tree pages are linked with next
327 //      pointers to facilitate enumerators,
328 //      and provide for concurrency.
329
330 //      When to root page fills, it is split in two and
331 //      the tree height is raised by a new root at page
332 //      one with two keys.
333
334 //      Deleted keys are marked with a dead bit until
335 //      page cleanup. The fence key for a leaf node is
336 //      always present
337
338 //  Groups of pages called segments from the btree are optionally
339 //  cached with a memory mapped pool. A hash table is used to keep
340 //  track of the cached segments.  This behaviour is controlled
341 //  by the cache block size parameter to bt_open.
342
343 //  To achieve maximum concurrency one page is locked at a time
344 //  as the tree is traversed to find leaf key in question. The right
345 //  page numbers are used in cases where the page is being split,
346 //      or consolidated.
347
348 //  Page 0 is dedicated to lock for new page extensions,
349 //      and chains empty pages together for reuse. It also
350 //      contains the latch manager hash table.
351
352 //      The ParentModification lock on a node is obtained to serialize posting
353 //      or changing the fence key for a node.
354
355 //      Empty pages are chained together through the ALLOC page and reused.
356
357 //      Access macros to address slot and key values from the page
358 //      Page slots use 1 based indexing.
359
360 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
361 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
362 #define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
363
364 void bt_putid(unsigned char *dest, uid id)
365 {
366 int i = BtId;
367
368         while( i-- )
369                 dest[i] = (unsigned char)id, id >>= 8;
370 }
371
372 uid bt_getid(unsigned char *src)
373 {
374 uid id = 0;
375 int i;
376
377         for( i = 0; i < BtId; i++ )
378                 id <<= 8, id |= *src++; 
379
380         return id;
381 }
382
383 //      Phase-Fair reader/writer lock implementation
384
385 void WriteLock (RWLock *lock)
386 {
387 ushort w, r, tix;
388
389 #ifdef unix
390         tix = __sync_fetch_and_add (lock->ticket, 1);
391 #else
392         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
393 #endif
394         // wait for our ticket to come up
395
396         while( tix != lock->serving[0] )
397 #ifdef unix
398                 sched_yield();
399 #else
400                 SwitchToThread ();
401 #endif
402
403         w = PRES | (tix & PHID);
404 #ifdef  unix
405         r = __sync_fetch_and_add (lock->rin, w);
406 #else
407         r = _InterlockedExchangeAdd16 (lock->rin, w);
408 #endif
409         while( r != *lock->rout )
410 #ifdef unix
411                 sched_yield();
412 #else
413                 SwitchToThread();
414 #endif
415 }
416
417 void WriteRelease (RWLock *lock)
418 {
419 #ifdef unix
420         __sync_fetch_and_and (lock->rin, ~MASK);
421 #else
422         _InterlockedAnd16 (lock->rin, ~MASK);
423 #endif
424         lock->serving[0]++;
425 }
426
427 void ReadLock (RWLock *lock)
428 {
429 ushort w;
430 #ifdef unix
431         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
432 #else
433         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
434 #endif
435         if( w )
436           while( w == (*lock->rin & MASK) )
437 #ifdef unix
438                 sched_yield ();
439 #else
440                 SwitchToThread ();
441 #endif
442 }
443
444 void ReadRelease (RWLock *lock)
445 {
446 #ifdef unix
447         __sync_fetch_and_add (lock->rout, RINC);
448 #else
449         _InterlockedExchangeAdd16 (lock->rout, RINC);
450 #endif
451 }
452
453 //      Spin Latch Manager
454
455 //      wait until write lock mode is clear
456 //      and add 1 to the share count
457
458 void bt_spinreadlock(BtSpinLatch *latch)
459 {
460 ushort prev;
461
462   do {
463 #ifdef unix
464         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
465 #else
466         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
467 #endif
468         //  see if exclusive request is granted or pending
469
470         if( !(prev & BOTH) )
471                 return;
472 #ifdef unix
473         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
474 #else
475         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
476 #endif
477 #ifdef  unix
478   } while( sched_yield(), 1 );
479 #else
480   } while( SwitchToThread(), 1 );
481 #endif
482 }
483
484 //      wait for other read and write latches to relinquish
485
486 void bt_spinwritelock(BtSpinLatch *latch)
487 {
488 ushort prev;
489
490   do {
491 #ifdef  unix
492         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
493 #else
494         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
495 #endif
496         if( !(prev & XCL) )
497           if( !(prev & ~BOTH) )
498                 return;
499           else
500 #ifdef unix
501                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
502 #else
503                 _InterlockedAnd16((ushort *)latch, ~XCL);
504 #endif
505 #ifdef  unix
506   } while( sched_yield(), 1 );
507 #else
508   } while( SwitchToThread(), 1 );
509 #endif
510 }
511
512 //      try to obtain write lock
513
514 //      return 1 if obtained,
515 //              0 otherwise
516
517 int bt_spinwritetry(BtSpinLatch *latch)
518 {
519 ushort prev;
520
521 #ifdef  unix
522         prev = __sync_fetch_and_or((ushort *)latch, XCL);
523 #else
524         prev = _InterlockedOr16((ushort *)latch, XCL);
525 #endif
526         //      take write access if all bits are clear
527
528         if( !(prev & XCL) )
529           if( !(prev & ~BOTH) )
530                 return 1;
531           else
532 #ifdef unix
533                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
534 #else
535                 _InterlockedAnd16((ushort *)latch, ~XCL);
536 #endif
537         return 0;
538 }
539
540 //      clear write mode
541
542 void bt_spinreleasewrite(BtSpinLatch *latch)
543 {
544 #ifdef unix
545         __sync_fetch_and_and((ushort *)latch, ~BOTH);
546 #else
547         _InterlockedAnd16((ushort *)latch, ~BOTH);
548 #endif
549 }
550
551 //      decrement reader count
552
553 void bt_spinreleaseread(BtSpinLatch *latch)
554 {
555 #ifdef unix
556         __sync_fetch_and_add((ushort *)latch, -SHARE);
557 #else
558         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
559 #endif
560 }
561
562 //      link latch table entry into latch hash table
563
564 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
565 {
566 BtLatchSet *set = bt->mgr->latchsets + victim;
567
568         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
569                 bt->mgr->latchsets[set->next].prev = victim;
570
571         bt->mgr->latchmgr->table[hashidx].slot = victim;
572         set->page_no = page_no;
573         set->hash = hashidx;
574         set->prev = 0;
575 }
576
577 //      release latch pin
578
579 void bt_unpinlatch (BtLatchSet *set)
580 {
581 #ifdef unix
582         __sync_fetch_and_add(&set->pin, -1);
583 #else
584         _InterlockedDecrement16 (&set->pin);
585 #endif
586 }
587
588 //      find existing latchset or inspire new one
589 //      return with latchset pinned
590
591 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
592 {
593 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
594 ushort slot, avail = 0, victim, idx;
595 BtLatchSet *set;
596
597         //  obtain read lock on hash table entry
598
599         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
600
601         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
602         {
603                 set = bt->mgr->latchsets + slot;
604                 if( page_no == set->page_no )
605                         break;
606         } while( slot = set->next );
607
608         if( slot ) {
609 #ifdef unix
610                 __sync_fetch_and_add(&set->pin, 1);
611 #else
612                 _InterlockedIncrement16 (&set->pin);
613 #endif
614         }
615
616     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
617
618         if( slot )
619                 return set;
620
621   //  try again, this time with write lock
622
623   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
624
625   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
626   {
627         set = bt->mgr->latchsets + slot;
628         if( page_no == set->page_no )
629                 break;
630         if( !set->pin && !avail )
631                 avail = slot;
632   } while( slot = set->next );
633
634   //  found our entry, or take over an unpinned one
635
636   if( slot || (slot = avail) ) {
637         set = bt->mgr->latchsets + slot;
638 #ifdef unix
639         __sync_fetch_and_add(&set->pin, 1);
640 #else
641         _InterlockedIncrement16 (&set->pin);
642 #endif
643         set->page_no = page_no;
644         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
645         return set;
646   }
647
648         //  see if there are any unused entries
649 #ifdef unix
650         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
651 #else
652         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
653 #endif
654
655         if( victim < bt->mgr->latchmgr->latchtotal ) {
656                 set = bt->mgr->latchsets + victim;
657 #ifdef unix
658                 __sync_fetch_and_add(&set->pin, 1);
659 #else
660                 _InterlockedIncrement16 (&set->pin);
661 #endif
662                 bt_latchlink (bt, hashidx, victim, page_no);
663                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
664                 return set;
665         }
666
667 #ifdef unix
668         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
669 #else
670         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
671 #endif
672   //  find and reuse previous lock entry
673
674   while( 1 ) {
675 #ifdef unix
676         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
677 #else
678         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
679 #endif
680         //      we don't use slot zero
681
682         if( victim %= bt->mgr->latchmgr->latchtotal )
683                 set = bt->mgr->latchsets + victim;
684         else
685                 continue;
686
687         //      take control of our slot
688         //      from other threads
689
690         if( set->pin || !bt_spinwritetry (set->busy) )
691                 continue;
692
693         idx = set->hash;
694
695         // try to get write lock on hash chain
696         //      skip entry if not obtained
697         //      or has outstanding locks
698
699         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
700                 bt_spinreleasewrite (set->busy);
701                 continue;
702         }
703
704         if( set->pin ) {
705                 bt_spinreleasewrite (set->busy);
706                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
707                 continue;
708         }
709
710         //  unlink our available victim from its hash chain
711
712         if( set->prev )
713                 bt->mgr->latchsets[set->prev].next = set->next;
714         else
715                 bt->mgr->latchmgr->table[idx].slot = set->next;
716
717         if( set->next )
718                 bt->mgr->latchsets[set->next].prev = set->prev;
719
720         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
721 #ifdef unix
722         __sync_fetch_and_add(&set->pin, 1);
723 #else
724         _InterlockedIncrement16 (&set->pin);
725 #endif
726         bt_latchlink (bt, hashidx, victim, page_no);
727         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
728         bt_spinreleasewrite (set->busy);
729         return set;
730   }
731 }
732
733 void bt_mgrclose (BtMgr *mgr)
734 {
735 BtPool *pool;
736 uint slot;
737
738         // release mapped pages
739         //      note that slot zero is never used
740
741         for( slot = 1; slot < mgr->poolmax; slot++ ) {
742                 pool = mgr->pool + slot;
743                 if( pool->slot )
744 #ifdef unix
745                         munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
746 #else
747                 {
748                         FlushViewOfFile(pool->map, 0);
749                         UnmapViewOfFile(pool->map);
750                         CloseHandle(pool->hmap);
751                 }
752 #endif
753         }
754
755 #ifdef unix
756         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
757         munmap (mgr->latchmgr, 2 * mgr->page_size);
758 #else
759         FlushViewOfFile(mgr->latchmgr, 0);
760         UnmapViewOfFile(mgr->latchmgr);
761         CloseHandle(mgr->halloc);
762 #endif
763 #ifdef unix
764         close (mgr->idx);
765         free (mgr->pool);
766         free (mgr->hash);
767         free ((void *)mgr->latch);
768         free (mgr);
769 #else
770         FlushFileBuffers(mgr->idx);
771         CloseHandle(mgr->idx);
772         GlobalFree (mgr->pool);
773         GlobalFree (mgr->hash);
774         GlobalFree ((void *)mgr->latch);
775         GlobalFree (mgr);
776 #endif
777 }
778
779 //      close and release memory
780
781 void bt_close (BtDb *bt)
782 {
783 #ifdef unix
784         if( bt->mem )
785                 free (bt->mem);
786 #else
787         if( bt->mem)
788                 VirtualFree (bt->mem, 0, MEM_RELEASE);
789 #endif
790         free (bt);
791 }
792
793 //  open/create new btree buffer manager
794
795 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
796 //              size of mapped page pool (e.g. 8192)
797
798 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
799 {
800 uint lvl, attr, cacheblk, last, slot, idx;
801 uint nlatchpage, latchhash;
802 unsigned char value[BtId];
803 BtLatchMgr *latchmgr;
804 off64_t size;
805 uint amt[1];
806 BtMgr* mgr;
807 BtKey key;
808 BtVal val;
809 int flag;
810
811 #ifndef unix
812 SYSTEM_INFO sysinfo[1];
813 #endif
814
815         // determine sanity of page size and buffer pool
816
817         if( bits > BT_maxbits )
818                 bits = BT_maxbits;
819         else if( bits < BT_minbits )
820                 bits = BT_minbits;
821
822         if( !poolmax )
823                 return NULL;    // must have buffer pool
824
825 #ifdef unix
826         mgr = calloc (1, sizeof(BtMgr));
827
828         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
829
830         if( mgr->idx == -1 )
831                 return free(mgr), NULL;
832         
833         cacheblk = 4096;        // minimum mmap segment size for unix
834
835 #else
836         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
837         attr = FILE_ATTRIBUTE_NORMAL;
838         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
839
840         if( mgr->idx == INVALID_HANDLE_VALUE )
841                 return GlobalFree(mgr), NULL;
842
843         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
844         GetSystemInfo(sysinfo);
845         cacheblk = sysinfo->dwAllocationGranularity;
846 #endif
847
848 #ifdef unix
849         latchmgr = malloc (BT_maxpage);
850         *amt = 0;
851
852         // read minimum page size to get root info
853
854         if( size = lseek (mgr->idx, 0L, 2) ) {
855                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
856                         bits = latchmgr->alloc->bits;
857                 else
858                         return free(mgr), free(latchmgr), NULL;
859         } else if( mode == BT_ro )
860                 return free(latchmgr), bt_mgrclose (mgr), NULL;
861 #else
862         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
863         size = GetFileSize(mgr->idx, amt);
864
865         if( size || *amt ) {
866                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
867                         return bt_mgrclose (mgr), NULL;
868                 bits = latchmgr->alloc->bits;
869         } else if( mode == BT_ro )
870                 return bt_mgrclose (mgr), NULL;
871 #endif
872
873         mgr->page_size = 1 << bits;
874         mgr->page_bits = bits;
875
876         mgr->poolmax = poolmax;
877         mgr->mode = mode;
878
879         if( cacheblk < mgr->page_size )
880                 cacheblk = mgr->page_size;
881
882         //  mask for partial memmaps
883
884         mgr->poolmask = (cacheblk >> bits) - 1;
885
886         //      see if requested size of pages per memmap is greater
887
888         if( (1 << segsize) > mgr->poolmask )
889                 mgr->poolmask = (1 << segsize) - 1;
890
891         mgr->seg_bits = 0;
892
893         while( (1 << mgr->seg_bits) <= mgr->poolmask )
894                 mgr->seg_bits++;
895
896         mgr->hashsize = hashsize;
897
898 #ifdef unix
899         mgr->pool = calloc (poolmax, sizeof(BtPool));
900         mgr->hash = calloc (hashsize, sizeof(ushort));
901         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
902 #else
903         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
904         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
905         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
906 #endif
907
908         if( size || *amt )
909                 goto mgrlatch;
910
911         // initialize an empty b-tree with latch page, root page, page of leaves
912         // and page(s) of latches
913
914         memset (latchmgr, 0, 1 << bits);
915         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
916         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
917         latchmgr->alloc->bits = mgr->page_bits;
918
919         latchmgr->nlatchpage = nlatchpage;
920         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
921
922         //  initialize latch manager
923
924         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
925
926         //      size of hash table = total number of latchsets
927
928         if( latchhash > latchmgr->latchtotal )
929                 latchhash = latchmgr->latchtotal;
930
931         latchmgr->latchhash = latchhash;
932
933 #ifdef unix
934         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
935                 return bt_mgrclose (mgr), NULL;
936 #else
937         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
938                 return bt_mgrclose (mgr), NULL;
939
940         if( *amt < mgr->page_size )
941                 return bt_mgrclose (mgr), NULL;
942 #endif
943
944         memset (latchmgr, 0, 1 << bits);
945         latchmgr->alloc->bits = mgr->page_bits;
946
947         for( lvl=MIN_lvl; lvl--; ) {
948                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
949                 key = keyptr(latchmgr->alloc, 1);
950                 key->len = 2;           // create stopper key
951                 key->key[0] = 0xff;
952                 key->key[1] = 0xff;
953
954                 bt_putid(value, MIN_lvl - lvl + 1);
955                 val = valptr(latchmgr->alloc, 1);
956                 val->len = lvl ? BtId : 0;
957                 memcpy (val->value, value, val->len);
958
959                 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
960                 latchmgr->alloc->lvl = lvl;
961                 latchmgr->alloc->cnt = 1;
962                 latchmgr->alloc->act = 1;
963 #ifdef unix
964                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
965                         return bt_mgrclose (mgr), NULL;
966 #else
967                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
968                         return bt_mgrclose (mgr), NULL;
969
970                 if( *amt < mgr->page_size )
971                         return bt_mgrclose (mgr), NULL;
972 #endif
973         }
974
975         // clear out latch manager locks
976         //      and rest of pages to round out segment
977
978         memset(latchmgr, 0, mgr->page_size);
979         last = MIN_lvl + 1;
980
981         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
982 #ifdef unix
983                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
984 #else
985                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
986                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
987                         return bt_mgrclose (mgr), NULL;
988                 if( *amt < mgr->page_size )
989                         return bt_mgrclose (mgr), NULL;
990 #endif
991                 last++;
992         }
993
994 mgrlatch:
995 #ifdef unix
996         // mlock the root page and the latchmgr page
997
998         flag = PROT_READ | PROT_WRITE;
999         mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1000         if( mgr->latchmgr == MAP_FAILED )
1001                 return bt_mgrclose (mgr), NULL;
1002         mlock (mgr->latchmgr, 2 * mgr->page_size);
1003
1004         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1005         if( mgr->latchsets == MAP_FAILED )
1006                 return bt_mgrclose (mgr), NULL;
1007         mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1008 #else
1009         flag = PAGE_READWRITE;
1010         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1011         if( !mgr->halloc )
1012                 return bt_mgrclose (mgr), NULL;
1013
1014         flag = FILE_MAP_WRITE;
1015         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1016         if( !mgr->latchmgr )
1017                 return GetLastError(), bt_mgrclose (mgr), NULL;
1018
1019         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1020 #endif
1021
1022 #ifdef unix
1023         free (latchmgr);
1024 #else
1025         VirtualFree (latchmgr, 0, MEM_RELEASE);
1026 #endif
1027         return mgr;
1028 }
1029
1030 //      open BTree access method
1031 //      based on buffer manager
1032
1033 BtDb *bt_open (BtMgr *mgr)
1034 {
1035 BtDb *bt = malloc (sizeof(*bt));
1036
1037         memset (bt, 0, sizeof(*bt));
1038         bt->mgr = mgr;
1039 #ifdef unix
1040         bt->mem = malloc (2 *mgr->page_size);
1041 #else
1042         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1043 #endif
1044         bt->frame = (BtPage)bt->mem;
1045         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1046         return bt;
1047 }
1048
1049 //  compare two keys, returning > 0, = 0, or < 0
1050 //  as the comparison value
1051
1052 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1053 {
1054 uint len1 = key1->len;
1055 int ans;
1056
1057         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1058                 return ans;
1059
1060         if( len1 > len2 )
1061                 return 1;
1062         if( len1 < len2 )
1063                 return -1;
1064
1065         return 0;
1066 }
1067
1068 //      Buffer Pool mgr
1069
1070 // find segment in pool
1071 // must be called with hashslot idx locked
1072 //      return NULL if not there
1073 //      otherwise return node
1074
1075 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1076 {
1077 BtPool *pool;
1078 uint slot;
1079
1080         // compute start of hash chain in pool
1081
1082         if( slot = bt->mgr->hash[idx] ) 
1083                 pool = bt->mgr->pool + slot;
1084         else
1085                 return NULL;
1086
1087         page_no &= ~bt->mgr->poolmask;
1088
1089         while( pool->basepage != page_no )
1090           if( pool = pool->hashnext )
1091                 continue;
1092           else
1093                 return NULL;
1094
1095         return pool;
1096 }
1097
1098 // add segment to hash table
1099
1100 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1101 {
1102 BtPool *node;
1103 uint slot;
1104
1105         pool->hashprev = pool->hashnext = NULL;
1106         pool->basepage = page_no & ~bt->mgr->poolmask;
1107         pool->pin = CLOCK_bit + 1;
1108
1109         if( slot = bt->mgr->hash[idx] ) {
1110                 node = bt->mgr->pool + slot;
1111                 pool->hashnext = node;
1112                 node->hashprev = pool;
1113         }
1114
1115         bt->mgr->hash[idx] = pool->slot;
1116 }
1117
1118 //  map new buffer pool segment to virtual memory
1119
1120 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1121 {
1122 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1123 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1124 int flag;
1125
1126 #ifdef unix
1127         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1128         pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1129         if( pool->map == MAP_FAILED )
1130                 return bt->err = BTERR_map;
1131
1132 #else
1133         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1134         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1135         if( !pool->hmap )
1136                 return bt->err = BTERR_map;
1137
1138         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1139         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1140         if( !pool->map )
1141                 return bt->err = BTERR_map;
1142 #endif
1143         return bt->err = 0;
1144 }
1145
1146 //      calculate page within pool
1147
1148 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1149 {
1150 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1151 BtPage page;
1152
1153         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1154 //      madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1155         return page;
1156 }
1157
1158 //  release pool pin
1159
1160 void bt_unpinpool (BtPool *pool)
1161 {
1162 #ifdef unix
1163         __sync_fetch_and_add(&pool->pin, -1);
1164 #else
1165         _InterlockedDecrement16 (&pool->pin);
1166 #endif
1167 }
1168
1169 //      find or place requested page in segment-pool
1170 //      return pool table entry, incrementing pin
1171
1172 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1173 {
1174 uint slot, hashidx, idx, victim;
1175 BtPool *pool, *node, *next;
1176
1177         //      lock hash table chain
1178
1179         hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1180         bt_spinwritelock (&bt->mgr->latch[hashidx]);
1181
1182         //      look up in hash table
1183
1184         if( pool = bt_findpool(bt, page_no, hashidx) ) {
1185 #ifdef unix
1186                 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1187                 __sync_fetch_and_add(&pool->pin, 1);
1188 #else
1189                 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1190                 _InterlockedIncrement16 (&pool->pin);
1191 #endif
1192                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1193                 return pool;
1194         }
1195
1196         // allocate a new pool node
1197         // and add to hash table
1198
1199 #ifdef unix
1200         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1201 #else
1202         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1203 #endif
1204
1205         if( ++slot < bt->mgr->poolmax ) {
1206                 pool = bt->mgr->pool + slot;
1207                 pool->slot = slot;
1208
1209                 if( bt_mapsegment(bt, pool, page_no) )
1210                         return NULL;
1211
1212                 bt_linkhash(bt, pool, page_no, hashidx);
1213                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1214                 return pool;
1215         }
1216
1217         // pool table is full
1218         //      find best pool entry to evict
1219
1220 #ifdef unix
1221         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1222 #else
1223         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1224 #endif
1225
1226         while( 1 ) {
1227 #ifdef unix
1228                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1229 #else
1230                 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1231 #endif
1232                 victim %= bt->mgr->poolmax;
1233                 pool = bt->mgr->pool + victim;
1234                 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1235
1236                 if( !victim )
1237                         continue;
1238
1239                 // try to get write lock
1240                 //      skip entry if not obtained
1241
1242                 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1243                         continue;
1244
1245                 //      skip this entry if
1246                 //      page is pinned
1247                 //  or clock bit is set
1248
1249                 if( pool->pin ) {
1250 #ifdef unix
1251                         __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1252 #else
1253                         _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1254 #endif
1255                         bt_spinreleasewrite (&bt->mgr->latch[idx]);
1256                         continue;
1257                 }
1258
1259                 // unlink victim pool node from hash table
1260
1261                 if( node = pool->hashprev )
1262                         node->hashnext = pool->hashnext;
1263                 else if( node = pool->hashnext )
1264                         bt->mgr->hash[idx] = node->slot;
1265                 else
1266                         bt->mgr->hash[idx] = 0;
1267
1268                 if( node = pool->hashnext )
1269                         node->hashprev = pool->hashprev;
1270
1271                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1272
1273                 //      remove old file mapping
1274 #ifdef unix
1275                 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1276 #else
1277 //              FlushViewOfFile(pool->map, 0);
1278                 UnmapViewOfFile(pool->map);
1279                 CloseHandle(pool->hmap);
1280 #endif
1281                 pool->map = NULL;
1282
1283                 //  create new pool mapping
1284                 //  and link into hash table
1285
1286                 if( bt_mapsegment(bt, pool, page_no) )
1287                         return NULL;
1288
1289                 bt_linkhash(bt, pool, page_no, hashidx);
1290                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1291                 return pool;
1292         }
1293 }
1294
1295 // place write, read, or parent lock on requested page_no.
1296
1297 void bt_lockpage(BtLock mode, BtLatchSet *set)
1298 {
1299         switch( mode ) {
1300         case BtLockRead:
1301                 ReadLock (set->readwr);
1302                 break;
1303         case BtLockWrite:
1304                 WriteLock (set->readwr);
1305                 break;
1306         case BtLockAccess:
1307                 ReadLock (set->access);
1308                 break;
1309         case BtLockDelete:
1310                 WriteLock (set->access);
1311                 break;
1312         case BtLockParent:
1313                 WriteLock (set->parent);
1314                 break;
1315         }
1316 }
1317
1318 // remove write, read, or parent lock on requested page
1319
1320 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1321 {
1322         switch( mode ) {
1323         case BtLockRead:
1324                 ReadRelease (set->readwr);
1325                 break;
1326         case BtLockWrite:
1327                 WriteRelease (set->readwr);
1328                 break;
1329         case BtLockAccess:
1330                 ReadRelease (set->access);
1331                 break;
1332         case BtLockDelete:
1333                 WriteRelease (set->access);
1334                 break;
1335         case BtLockParent:
1336                 WriteRelease (set->parent);
1337                 break;
1338         }
1339 }
1340
1341 //      allocate a new page and write page into it
1342
1343 uid bt_newpage(BtDb *bt, BtPage page)
1344 {
1345 BtPageSet set[1];
1346 uid new_page;
1347 int blk;
1348
1349         //      lock allocation page
1350
1351         bt_spinwritelock(bt->mgr->latchmgr->lock);
1352
1353         // use empty chain first
1354         // else allocate empty page
1355
1356         if( new_page = bt_getid(bt->mgr->latchmgr->chain) ) {
1357                 if( set->pool = bt_pinpool (bt, new_page) )
1358                         set->page = bt_page (bt, set->pool, new_page);
1359                 else
1360                         return 0;
1361
1362                 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1363                 bt_unpinpool (set->pool);
1364         } else {
1365                 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1366                 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1367 #ifdef unix
1368                 // if writing first page of pool block, set file length thru last page
1369
1370                 if( (new_page & bt->mgr->poolmask) == 0 )
1371                         ftruncate (bt->mgr->idx, (new_page + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1372 #endif
1373         }
1374 #ifdef unix
1375         // unlock allocation latch
1376
1377         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1378 #endif
1379
1380         //      bring new page into pool and copy page.
1381         //      this will extend the file into the new pages on WIN32.
1382
1383         if( set->pool = bt_pinpool (bt, new_page) )
1384                 set->page = bt_page (bt, set->pool, new_page);
1385         else
1386                 return 0;
1387
1388         memcpy(set->page, page, bt->mgr->page_size);
1389         bt_unpinpool (set->pool);
1390
1391 #ifndef unix
1392         // unlock allocation latch
1393
1394         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1395 #endif
1396         return new_page;
1397 }
1398
1399 //  find slot in page for given key at a given level
1400
1401 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1402 {
1403 uint diff, higher = set->page->cnt, low = 1, slot;
1404 uint good = 0;
1405
1406         //        make stopper key an infinite fence value
1407
1408         if( bt_getid (set->page->right) )
1409                 higher++;
1410         else
1411                 good++;
1412
1413         //      low is the lowest candidate.
1414         //  loop ends when they meet
1415
1416         //  higher is already
1417         //      tested as .ge. the passed key.
1418
1419         while( diff = higher - low ) {
1420                 slot = low + ( diff >> 1 );
1421                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1422                         low = slot + 1;
1423                 else
1424                         higher = slot, good++;
1425         }
1426
1427         //      return zero if key is on right link page
1428
1429         return good ? higher : 0;
1430 }
1431
1432 //  find and load page at given level for given key
1433 //      leave page rd or wr locked as requested
1434
1435 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1436 {
1437 uid page_no = ROOT_page, prevpage = 0;
1438 uint drill = 0xff, slot;
1439 BtLatchSet *prevlatch;
1440 uint mode, prevmode;
1441 BtPool *prevpool;
1442
1443   //  start at root of btree and drill down
1444
1445   do {
1446         // determine lock mode of drill level
1447         mode = (drill == lvl) ? lock : BtLockRead; 
1448
1449         set->latch = bt_pinlatch (bt, page_no);
1450         set->page_no = page_no;
1451
1452         // pin page contents
1453
1454         if( set->pool = bt_pinpool (bt, page_no) )
1455                 set->page = bt_page (bt, set->pool, page_no);
1456         else
1457                 return 0;
1458
1459         // obtain access lock using lock chaining with Access mode
1460
1461         if( page_no > ROOT_page )
1462           bt_lockpage(BtLockAccess, set->latch);
1463
1464         //      release & unpin parent page
1465
1466         if( prevpage ) {
1467           bt_unlockpage(prevmode, prevlatch);
1468           bt_unpinlatch (prevlatch);
1469           bt_unpinpool (prevpool);
1470           prevpage = 0;
1471         }
1472
1473         // obtain read lock using lock chaining
1474
1475         bt_lockpage(mode, set->latch);
1476
1477         if( set->page->free )
1478                 return bt->err = BTERR_struct, 0;
1479
1480         if( page_no > ROOT_page )
1481           bt_unlockpage(BtLockAccess, set->latch);
1482
1483         // re-read and re-lock root after determining actual level of root
1484
1485         if( set->page->lvl != drill) {
1486                 if( set->page_no != ROOT_page )
1487                         return bt->err = BTERR_struct, 0;
1488                         
1489                 drill = set->page->lvl;
1490
1491                 if( lock != BtLockRead && drill == lvl ) {
1492                   bt_unlockpage(mode, set->latch);
1493                   bt_unpinlatch (set->latch);
1494                   bt_unpinpool (set->pool);
1495                   continue;
1496                 }
1497         }
1498
1499         prevpage = set->page_no;
1500         prevlatch = set->latch;
1501         prevpool = set->pool;
1502         prevmode = mode;
1503
1504         //  find key on page at this level
1505         //  and descend to requested level
1506
1507         if( !set->page->kill )
1508          if( slot = bt_findslot (set, key, len) ) {
1509           if( drill == lvl )
1510                 return slot;
1511
1512           while( slotptr(set->page, slot)->dead )
1513                 if( slot++ < set->page->cnt )
1514                         continue;
1515                 else
1516                         goto slideright;
1517
1518           page_no = bt_getid(valptr(set->page, slot)->value);
1519           drill--;
1520           continue;
1521          }
1522
1523         //  or slide right into next page
1524
1525 slideright:
1526         page_no = bt_getid(set->page->right);
1527
1528   } while( page_no );
1529
1530   // return error on end of right chain
1531
1532   bt->err = BTERR_struct;
1533   return 0;     // return error
1534 }
1535
1536 //      return page to free list
1537 //      page must be delete & write locked
1538
1539 void bt_freepage (BtDb *bt, BtPageSet *set)
1540 {
1541         //      lock allocation page
1542
1543         bt_spinwritelock (bt->mgr->latchmgr->lock);
1544
1545         //      store chain
1546         memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1547         bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1548         set->page->free = 1;
1549
1550         // unlock released page
1551
1552         bt_unlockpage (BtLockDelete, set->latch);
1553         bt_unlockpage (BtLockWrite, set->latch);
1554         bt_unpinlatch (set->latch);
1555         bt_unpinpool (set->pool);
1556
1557         // unlock allocation page
1558
1559         bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1560 }
1561
1562 //      a fence key was deleted from a page
1563 //      push new fence value upwards
1564
1565 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1566 {
1567 unsigned char leftkey[256], rightkey[256];
1568 unsigned char value[BtId];
1569 uid page_no;
1570 BtKey ptr;
1571
1572         //      remove the old fence value
1573
1574         ptr = keyptr(set->page, set->page->cnt);
1575         memcpy (rightkey, ptr, ptr->len + 1);
1576
1577         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1578         set->page->dirty = 1;
1579
1580         ptr = keyptr(set->page, set->page->cnt);
1581         memcpy (leftkey, ptr, ptr->len + 1);
1582         page_no = set->page_no;
1583
1584         bt_lockpage (BtLockParent, set->latch);
1585         bt_unlockpage (BtLockWrite, set->latch);
1586
1587         //      insert new (now smaller) fence key
1588
1589         bt_putid (value, page_no);
1590
1591         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId) )
1592           return bt->err;
1593
1594         //      now delete old fence key
1595
1596         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1) )
1597                 return bt->err;
1598
1599         bt_unlockpage (BtLockParent, set->latch);
1600         bt_unpinlatch(set->latch);
1601         bt_unpinpool (set->pool);
1602         return 0;
1603 }
1604
1605 //      root has a single child
1606 //      collapse a level from the tree
1607
1608 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1609 {
1610 BtPageSet child[1];
1611 uint idx;
1612
1613   // find the child entry and promote as new root contents
1614
1615   do {
1616         for( idx = 0; idx++ < root->page->cnt; )
1617           if( !slotptr(root->page, idx)->dead )
1618                 break;
1619
1620         child->page_no = bt_getid (valptr(root->page, idx)->value);
1621
1622         child->latch = bt_pinlatch (bt, child->page_no);
1623         bt_lockpage (BtLockDelete, child->latch);
1624         bt_lockpage (BtLockWrite, child->latch);
1625
1626         if( child->pool = bt_pinpool (bt, child->page_no) )
1627                 child->page = bt_page (bt, child->pool, child->page_no);
1628         else
1629                 return bt->err;
1630
1631         memcpy (root->page, child->page, bt->mgr->page_size);
1632         bt_freepage (bt, child);
1633
1634   } while( root->page->lvl > 1 && root->page->act == 1 );
1635
1636   bt_unlockpage (BtLockWrite, root->latch);
1637   bt_unpinlatch (root->latch);
1638   bt_unpinpool (root->pool);
1639   return 0;
1640 }
1641
1642 //  find and delete key on page by marking delete flag bit
1643 //  if page becomes empty, delete it from the btree
1644
1645 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1646 {
1647 unsigned char lowerfence[256], higherfence[256];
1648 uint slot, idx, dirty = 0, fence, found;
1649 BtPageSet set[1], right[1];
1650 unsigned char value[BtId];
1651 BtKey ptr;
1652
1653         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1654                 ptr = keyptr(set->page, slot);
1655         else
1656                 return bt->err;
1657
1658         //      are we deleting a fence slot?
1659
1660         fence = slot == set->page->cnt;
1661
1662         // if key is found delete it, otherwise ignore request
1663
1664         if( found = !keycmp (ptr, key, len) )
1665           if( found = slotptr(set->page, slot)->dead == 0 ) {
1666                 dirty = slotptr(set->page, slot)->dead = 1;
1667                 set->page->dirty = 1;
1668                 set->page->act--;
1669
1670                 // collapse empty slots
1671
1672                 while( idx = set->page->cnt - 1 )
1673                   if( slotptr(set->page, idx)->dead ) {
1674                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1675                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1676                   } else
1677                         break;
1678           }
1679
1680         //      did we delete a fence key in an upper level?
1681
1682         if( dirty && lvl && set->page->act && fence )
1683           if( bt_fixfence (bt, set, lvl) )
1684                 return bt->err;
1685           else
1686                 return bt->found = found, 0;
1687
1688         //      is this a collapsed root?
1689
1690         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1691           if( bt_collapseroot (bt, set) )
1692                 return bt->err;
1693           else
1694                 return bt->found = found, 0;
1695
1696         //      return if page is not empty
1697
1698         if( set->page->act ) {
1699                 bt_unlockpage(BtLockWrite, set->latch);
1700                 bt_unpinlatch (set->latch);
1701                 bt_unpinpool (set->pool);
1702                 return bt->found = found, 0;
1703         }
1704
1705         //      cache copy of fence key
1706         //      to post in parent
1707
1708         ptr = keyptr(set->page, set->page->cnt);
1709         memcpy (lowerfence, ptr, ptr->len + 1);
1710
1711         //      obtain lock on right page
1712
1713         right->page_no = bt_getid(set->page->right);
1714         right->latch = bt_pinlatch (bt, right->page_no);
1715         bt_lockpage (BtLockWrite, right->latch);
1716
1717         // pin page contents
1718
1719         if( right->pool = bt_pinpool (bt, right->page_no) )
1720                 right->page = bt_page (bt, right->pool, right->page_no);
1721         else
1722                 return 0;
1723
1724         if( right->page->kill )
1725                 return bt->err = BTERR_struct;
1726
1727         // pull contents of right peer into our empty page
1728
1729         memcpy (set->page, right->page, bt->mgr->page_size);
1730
1731         // cache copy of key to update
1732
1733         ptr = keyptr(right->page, right->page->cnt);
1734         memcpy (higherfence, ptr, ptr->len + 1);
1735
1736         // mark right page deleted and point it to left page
1737         //      until we can post parent updates
1738
1739         bt_putid (right->page->right, set->page_no);
1740         right->page->kill = 1;
1741
1742         bt_lockpage (BtLockParent, right->latch);
1743         bt_unlockpage (BtLockWrite, right->latch);
1744
1745         bt_lockpage (BtLockParent, set->latch);
1746         bt_unlockpage (BtLockWrite, set->latch);
1747
1748         // redirect higher key directly to our new node contents
1749
1750         bt_putid (value, set->page_no);
1751
1752         if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId) )
1753           return bt->err;
1754
1755         //      delete old lower key to our node
1756
1757         if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1) )
1758           return bt->err;
1759
1760         //      obtain delete and write locks to right node
1761
1762         bt_unlockpage (BtLockParent, right->latch);
1763         bt_lockpage (BtLockDelete, right->latch);
1764         bt_lockpage (BtLockWrite, right->latch);
1765         bt_freepage (bt, right);
1766
1767         bt_unlockpage (BtLockParent, set->latch);
1768         bt_unpinlatch (set->latch);
1769         bt_unpinpool (set->pool);
1770         bt->found = found;
1771         return 0;
1772 }
1773
1774 //      find key in leaf level and return number of value bytes
1775 //      or (-1) if not found
1776
1777 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1778 {
1779 BtPageSet set[1];
1780 uint  slot;
1781 BtKey ptr;
1782 BtVal val;
1783 int ret;
1784
1785         if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1786                 ptr = keyptr(set->page, slot);
1787         else
1788                 return 0;
1789
1790         // if key exists, return TRUE
1791         //      otherwise return FALSE
1792
1793         if( !slotptr(set->page, slot)->dead && !keycmp (ptr, key, keylen) ) {
1794                 val = valptr (set->page,slot);
1795                 if( valmax > val->len )
1796                         valmax = val->len;
1797                 memcpy (value, val->value, valmax);
1798                 ret = valmax;
1799         } else
1800                 ret = -1;
1801
1802         bt_unlockpage (BtLockRead, set->latch);
1803         bt_unpinlatch (set->latch);
1804         bt_unpinpool (set->pool);
1805         return ret;
1806 }
1807
1808 //      check page for space available,
1809 //      clean if necessary and return
1810 //      0 - page needs splitting
1811 //      >0  new slot value
1812
1813 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1814 {
1815 uint nxt = bt->mgr->page_size;
1816 uint cnt = 0, idx = 0;
1817 uint max = page->cnt;
1818 uint newslot = max;
1819 BtKey key;
1820 BtVal val;
1821
1822         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
1823                 return slot;
1824
1825         //      skip cleanup if nothing to reclaim
1826
1827         if( !page->dirty )
1828                 return 0;
1829
1830         memcpy (bt->frame, page, bt->mgr->page_size);
1831
1832         // skip page info and set rest of page to zero
1833
1834         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1835         page->dirty = 0;
1836         page->act = 0;
1837
1838         // try cleaning up page first
1839         // by removing deleted keys
1840
1841         while( cnt++ < max ) {
1842                 if( cnt == slot )
1843                         newslot = idx + 1;
1844                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1845                         continue;
1846
1847                 // copy the key across
1848
1849                 key = keyptr(bt->frame, cnt);
1850                 nxt -= key->len + 1;
1851                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1852
1853                 // copy the value across
1854
1855                 val = valptr(bt->frame, cnt);
1856                 nxt -= val->len + 1;
1857                 ((unsigned char *)page)[nxt] = val->len;
1858                 memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
1859
1860                 // set up the slot
1861
1862                 slotptr(page, idx)->off = nxt;
1863
1864                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1865                         page->act++;
1866         }
1867
1868         page->min = nxt;
1869         page->cnt = idx;
1870
1871         //      see if page has enough space now, or does it need splitting?
1872
1873         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
1874                 return newslot;
1875
1876         return 0;
1877 }
1878
1879 // split the root and raise the height of the btree
1880
1881 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, uid page_no2)
1882 {
1883 uint nxt = bt->mgr->page_size;
1884 unsigned char value[BtId];
1885 uid left;
1886
1887         //  Obtain an empty page to use, and copy the current
1888         //  root contents into it, e.g. lower keys
1889
1890         if( !(left = bt_newpage(bt, root->page)) )
1891                 return bt->err;
1892
1893         // preserve the page info at the bottom
1894         // of higher keys and set rest to zero
1895
1896         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1897
1898         // insert lower keys page fence key on newroot page as first key
1899
1900         nxt -= BtId + 1;
1901         bt_putid (value, left);
1902         ((unsigned char *)root->page)[nxt] = BtId;
1903         memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
1904
1905         nxt -= *leftkey + 1;
1906         memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
1907         slotptr(root->page, 1)->off = nxt;
1908         
1909         // insert stopper key on newroot page
1910         // and increase the root height
1911
1912         nxt -= 3 + BtId + 1;
1913         ((unsigned char *)root->page)[nxt] = 2;
1914         ((unsigned char *)root->page)[nxt+1] = 0xff;
1915         ((unsigned char *)root->page)[nxt+2] = 0xff;
1916
1917         bt_putid (value, page_no2);
1918         ((unsigned char *)root->page)[nxt+3] = BtId;
1919         memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
1920         slotptr(root->page, 2)->off = nxt;
1921
1922         bt_putid(root->page->right, 0);
1923         root->page->min = nxt;          // reset lowest used offset and key count
1924         root->page->cnt = 2;
1925         root->page->act = 2;
1926         root->page->lvl++;
1927
1928         // release and unpin root
1929
1930         bt_unlockpage(BtLockWrite, root->latch);
1931         bt_unpinlatch (root->latch);
1932         bt_unpinpool (root->pool);
1933         return 0;
1934 }
1935
1936 //  split already locked full node
1937 //      return unlocked.
1938
1939 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1940 {
1941 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1942 unsigned char fencekey[256], rightkey[256];
1943 unsigned char value[BtId];
1944 uint lvl = set->page->lvl;
1945 BtPageSet right[1];
1946 uint prev;
1947 BtKey key;
1948 BtVal val;
1949
1950         //  split higher half of keys to bt->frame
1951
1952         memset (bt->frame, 0, bt->mgr->page_size);
1953         max = set->page->cnt;
1954         cnt = max / 2;
1955         idx = 0;
1956
1957         while( cnt++ < max ) {
1958                 val = valptr(set->page, cnt);
1959                 nxt -= val->len + 1;
1960                 ((unsigned char *)bt->frame)[nxt] = val->len;
1961                 memcpy ((unsigned char *)bt->frame + nxt + 1, val->value, val->len);
1962
1963                 key = keyptr(set->page, cnt);
1964                 nxt -= key->len + 1;
1965                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1966
1967                 slotptr(bt->frame, ++idx)->off = nxt;
1968
1969                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1970                         bt->frame->act++;
1971         }
1972
1973         // remember existing fence key for new page to the right
1974
1975         memcpy (rightkey, key, key->len + 1);
1976
1977         bt->frame->bits = bt->mgr->page_bits;
1978         bt->frame->min = nxt;
1979         bt->frame->cnt = idx;
1980         bt->frame->lvl = lvl;
1981
1982         // link right node
1983
1984         if( set->page_no > ROOT_page )
1985                 memcpy (bt->frame->right, set->page->right, BtId);
1986
1987         //      get new free page and write higher keys to it.
1988
1989         if( !(right->page_no = bt_newpage(bt, bt->frame)) )
1990                 return bt->err;
1991
1992         //      update lower keys to continue in old page
1993
1994         memcpy (bt->frame, set->page, bt->mgr->page_size);
1995         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1996         nxt = bt->mgr->page_size;
1997         set->page->dirty = 0;
1998         set->page->act = 0;
1999         cnt = 0;
2000         idx = 0;
2001
2002         //  assemble page of smaller keys
2003
2004         while( cnt++ < max / 2 ) {
2005                 val = valptr(bt->frame, cnt);
2006                 nxt -= val->len + 1;
2007                 ((unsigned char *)set->page)[nxt] = val->len;
2008                 memcpy ((unsigned char *)set->page + nxt + 1, val->value, val->len);
2009
2010                 key = keyptr(bt->frame, cnt);
2011                 nxt -= key->len + 1;
2012                 memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
2013                 slotptr(set->page, ++idx)->off = nxt;
2014                 set->page->act++;
2015         }
2016
2017         // remember fence key for smaller page
2018
2019         memcpy(fencekey, key, key->len + 1);
2020
2021         bt_putid(set->page->right, right->page_no);
2022         set->page->min = nxt;
2023         set->page->cnt = idx;
2024
2025         // if current page is the root page, split it
2026
2027         if( set->page_no == ROOT_page )
2028                 return bt_splitroot (bt, set, fencekey, right->page_no);
2029
2030         // insert new fences in their parent pages
2031
2032         right->latch = bt_pinlatch (bt, right->page_no);
2033         bt_lockpage (BtLockParent, right->latch);
2034
2035         bt_lockpage (BtLockParent, set->latch);
2036         bt_unlockpage (BtLockWrite, set->latch);
2037
2038         // insert new fence for reformulated left block of smaller keys
2039
2040         bt_putid (value, set->page_no);
2041
2042         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId) )
2043                 return bt->err;
2044
2045         // switch fence for right block of larger keys to new right page
2046
2047         bt_putid (value, right->page_no);
2048
2049         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId) )
2050                 return bt->err;
2051
2052         bt_unlockpage (BtLockParent, set->latch);
2053         bt_unpinlatch (set->latch);
2054         bt_unpinpool (set->pool);
2055
2056         bt_unlockpage (BtLockParent, right->latch);
2057         bt_unpinlatch (right->latch);
2058         return 0;
2059 }
2060 //  Insert new key into the btree at given level.
2061
2062 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen)
2063 {
2064 BtPageSet set[1];
2065 uint slot, idx;
2066 uint reuse;
2067 BtKey ptr;
2068 BtVal val;
2069
2070         while( 1 ) {
2071                 if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
2072                         ptr = keyptr(set->page, slot);
2073                 else
2074                 {
2075                         if( !bt->err )
2076                                 bt->err = BTERR_ovflw;
2077                         return bt->err;
2078                 }
2079
2080                 // if key already exists, update value and return
2081
2082                 if( reuse = !keycmp (ptr, key, keylen) )
2083                   if( val = valptr(set->page, slot), val->len >= vallen ) {
2084                         if( slotptr(set->page, slot)->dead )
2085                                 set->page->act++;
2086                         slotptr(set->page, slot)->dead = 0;
2087                         val->len = vallen;
2088                         memcpy (val->value, value, vallen);
2089                         bt_unlockpage(BtLockWrite, set->latch);
2090                         bt_unpinlatch (set->latch);
2091                         bt_unpinpool (set->pool);
2092                         return 0;
2093                   } else {
2094                         if( !slotptr(set->page, slot)->dead )
2095                                 set->page->act--;
2096                         slotptr(set->page, slot)->dead = 1;
2097                         set->page->dirty = 1;
2098                   }
2099
2100                 // check if page has enough space
2101
2102                 if( slot = bt_cleanpage (bt, set->page, keylen, slot, vallen) )
2103                         break;
2104
2105                 if( bt_splitpage (bt, set) )
2106                         return bt->err;
2107         }
2108
2109         // calculate next available slot and copy key into page
2110
2111         set->page->min -= vallen + 1; // reset lowest used offset
2112         ((unsigned char *)set->page)[set->page->min] = vallen;
2113         memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
2114
2115         set->page->min -= keylen + 1; // reset lowest used offset
2116         ((unsigned char *)set->page)[set->page->min] = keylen;
2117         memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
2118
2119         for( idx = slot; idx < set->page->cnt; idx++ )
2120           if( slotptr(set->page, idx)->dead )
2121                 break;
2122
2123         // now insert key into array before slot
2124
2125         if( !reuse && idx == set->page->cnt )
2126                 idx++, set->page->cnt++;
2127
2128         set->page->act++;
2129
2130         while( idx > slot )
2131                 *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
2132
2133         slotptr(set->page, slot)->off = set->page->min;
2134         slotptr(set->page, slot)->dead = 0;
2135
2136         bt_unlockpage (BtLockWrite, set->latch);
2137         bt_unpinlatch (set->latch);
2138         bt_unpinpool (set->pool);
2139         return 0;
2140 }
2141
2142 //  cache page of keys into cursor and return starting slot for given key
2143
2144 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2145 {
2146 BtPageSet set[1];
2147 uint slot;
2148
2149         // cache page for retrieval
2150
2151         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2152           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2153         else
2154           return 0;
2155
2156         bt->cursor_page = set->page_no;
2157
2158         bt_unlockpage(BtLockRead, set->latch);
2159         bt_unpinlatch (set->latch);
2160         bt_unpinpool (set->pool);
2161         return slot;
2162 }
2163
2164 //  return next slot for cursor page
2165 //  or slide cursor right into next page
2166
2167 uint bt_nextkey (BtDb *bt, uint slot)
2168 {
2169 BtPageSet set[1];
2170 uid right;
2171
2172   do {
2173         right = bt_getid(bt->cursor->right);
2174
2175         while( slot++ < bt->cursor->cnt )
2176           if( slotptr(bt->cursor,slot)->dead )
2177                 continue;
2178           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2179                 return slot;
2180           else
2181                 break;
2182
2183         if( !right )
2184                 break;
2185
2186         bt->cursor_page = right;
2187
2188         if( set->pool = bt_pinpool (bt, right) )
2189                 set->page = bt_page (bt, set->pool, right);
2190         else
2191                 return 0;
2192
2193         set->latch = bt_pinlatch (bt, right);
2194     bt_lockpage(BtLockRead, set->latch);
2195
2196         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2197
2198         bt_unlockpage(BtLockRead, set->latch);
2199         bt_unpinlatch (set->latch);
2200         bt_unpinpool (set->pool);
2201         slot = 0;
2202
2203   } while( 1 );
2204
2205   return bt->err = 0;
2206 }
2207
2208 BtKey bt_key(BtDb *bt, uint slot)
2209 {
2210         return keyptr(bt->cursor, slot);
2211 }
2212
2213 BtVal bt_val(BtDb *bt, uint slot)
2214 {
2215         return valptr(bt->cursor,slot);
2216 }
2217
2218 #ifdef STANDALONE
2219
2220 #ifndef unix
2221 double getCpuTime(int type)
2222 {
2223 FILETIME crtime[1];
2224 FILETIME xittime[1];
2225 FILETIME systime[1];
2226 FILETIME usrtime[1];
2227 SYSTEMTIME timeconv[1];
2228 double ans = 0;
2229
2230         memset (timeconv, 0, sizeof(SYSTEMTIME));
2231
2232         switch( type ) {
2233         case 0:
2234                 GetSystemTimeAsFileTime (xittime);
2235                 FileTimeToSystemTime (xittime, timeconv);
2236                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2237                 break;
2238         case 1:
2239                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2240                 FileTimeToSystemTime (usrtime, timeconv);
2241                 break;
2242         case 2:
2243                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2244                 FileTimeToSystemTime (systime, timeconv);
2245                 break;
2246         }
2247
2248         ans += (double)timeconv->wHour * 3600;
2249         ans += (double)timeconv->wMinute * 60;
2250         ans += (double)timeconv->wSecond;
2251         ans += (double)timeconv->wMilliseconds / 1000;
2252         return ans;
2253 }
2254 #else
2255 #include <time.h>
2256 #include <sys/resource.h>
2257
2258 double getCpuTime(int type)
2259 {
2260 struct rusage used[1];
2261 struct timeval tv[1];
2262
2263         switch( type ) {
2264         case 0:
2265                 gettimeofday(tv, NULL);
2266                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2267
2268         case 1:
2269                 getrusage(RUSAGE_SELF, used);
2270                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2271
2272         case 2:
2273                 getrusage(RUSAGE_SELF, used);
2274                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2275         }
2276
2277         return 0;
2278 }
2279 #endif
2280
2281 uint bt_latchaudit (BtDb *bt)
2282 {
2283 ushort idx, hashidx;
2284 uid next, page_no;
2285 BtLatchSet *latch;
2286 uint cnt = 0;
2287 BtKey ptr;
2288
2289 #ifdef unix
2290         posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2291 #endif
2292         if( *(ushort *)(bt->mgr->latchmgr->lock) )
2293                 fprintf(stderr, "Alloc page locked\n");
2294         *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2295
2296         for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2297                 latch = bt->mgr->latchsets + idx;
2298                 if( *latch->readwr->rin & MASK )
2299                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2300                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2301
2302                 if( *latch->access->rin & MASK )
2303                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2304                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2305
2306                 if( *latch->parent->rin & MASK )
2307                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2308                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2309
2310                 if( latch->pin ) {
2311                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2312                         latch->pin = 0;
2313                 }
2314         }
2315
2316         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2317           if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2318                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2319
2320           *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2321
2322           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2323                 latch = bt->mgr->latchsets + idx;
2324                 if( *(ushort *)latch->busy )
2325                         fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2326                 *(ushort *)latch->busy = 0;
2327                 if( latch->pin )
2328                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2329           } while( idx = latch->next );
2330         }
2331
2332         next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2333         page_no = LEAF_page;
2334
2335         while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2336         off64_t off = page_no << bt->mgr->page_bits;
2337 #ifdef unix
2338                 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2339 #else
2340                 DWORD amt[1];
2341
2342                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2343
2344                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2345                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2346
2347                   if( *amt <  bt->mgr->page_size )
2348                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2349 #endif
2350                 if( !bt->frame->free ) {
2351                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2352                   ptr = keyptr(bt->frame, idx+1);
2353                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2354                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2355                  }
2356                  if( !bt->frame->lvl )
2357                         cnt += bt->frame->act;
2358                 }
2359                 if( page_no > LEAF_page )
2360                         next = page_no + 1;
2361                 page_no = next;
2362         }
2363         return cnt - 1;
2364 }
2365
2366 typedef struct {
2367         char type, idx;
2368         char *infile;
2369         BtMgr *mgr;
2370         int num;
2371 } ThreadArg;
2372
2373 //  standalone program to index file of keys
2374 //  then list them onto std-out
2375
2376 #ifdef unix
2377 void *index_file (void *arg)
2378 #else
2379 uint __stdcall index_file (void *arg)
2380 #endif
2381 {
2382 int line = 0, found = 0, cnt = 0;
2383 uid next, page_no = LEAF_page;  // start on first page of leaves
2384 unsigned char key[256];
2385 ThreadArg *args = arg;
2386 int ch, len = 0, slot;
2387 BtPageSet set[1];
2388 BtKey ptr;
2389 BtDb *bt;
2390 FILE *in;
2391
2392         bt = bt_open (args->mgr);
2393
2394         switch(args->type | 0x20)
2395         {
2396         case 'a':
2397                 fprintf(stderr, "started latch mgr audit\n");
2398                 cnt = bt_latchaudit (bt);
2399                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2400                 break;
2401
2402         case 'p':
2403                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2404                 if( in = fopen (args->infile, "rb") )
2405                   while( ch = getc(in), ch != EOF )
2406                         if( ch == '\n' )
2407                         {
2408                           line++;
2409
2410                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10) )
2411                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2412                           len = 0;
2413                         }
2414                         else if( len < 255 )
2415                                 key[len++] = ch;
2416                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2417                 break;
2418
2419         case 'w':
2420                 fprintf(stderr, "started indexing for %s\n", args->infile);
2421                 if( in = fopen (args->infile, "rb") )
2422                   while( ch = getc(in), ch != EOF )
2423                         if( ch == '\n' )
2424                         {
2425                           line++;
2426
2427                           if( args->num == 1 )
2428                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2429
2430                           else if( args->num )
2431                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2432
2433                           if( bt_insertkey (bt, key, len, 0, NULL, 0) )
2434                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2435                           len = 0;
2436                         }
2437                         else if( len < 255 )
2438                                 key[len++] = ch;
2439                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2440                 break;
2441
2442         case 'd':
2443                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2444                 if( in = fopen (args->infile, "rb") )
2445                   while( ch = getc(in), ch != EOF )
2446                         if( ch == '\n' )
2447                         {
2448                           line++;
2449                           if( args->num == 1 )
2450                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2451
2452                           else if( args->num )
2453                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2454
2455                           if( bt_deletekey (bt, key, len, 0) )
2456                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2457                           len = 0;
2458                         }
2459                         else if( len < 255 )
2460                                 key[len++] = ch;
2461                 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
2462                 break;
2463
2464         case 'f':
2465                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2466                 if( in = fopen (args->infile, "rb") )
2467                   while( ch = getc(in), ch != EOF )
2468                         if( ch == '\n' )
2469                         {
2470                           line++;
2471                           if( args->num == 1 )
2472                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2473
2474                           else if( args->num )
2475                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2476
2477                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2478                                 found++;
2479                           else if( bt->err )
2480                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2481                           len = 0;
2482                         }
2483                         else if( len < 255 )
2484                                 key[len++] = ch;
2485                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2486                 break;
2487
2488         case 's':
2489                 fprintf(stderr, "started scanning\n");
2490                 do {
2491                         if( set->pool = bt_pinpool (bt, page_no) )
2492                                 set->page = bt_page (bt, set->pool, page_no);
2493                         else
2494                                 break;
2495                         set->latch = bt_pinlatch (bt, page_no);
2496                         bt_lockpage (BtLockRead, set->latch);
2497                         next = bt_getid (set->page->right);
2498                         cnt += set->page->act;
2499
2500                         for( slot = 0; slot++ < set->page->cnt; )
2501                          if( next || slot < set->page->cnt )
2502                           if( !slotptr(set->page, slot)->dead ) {
2503                                 ptr = keyptr(set->page, slot);
2504                                 fwrite (ptr->key, ptr->len, 1, stdout);
2505                                 fputc ('\n', stdout);
2506                           }
2507
2508                         bt_unlockpage (BtLockRead, set->latch);
2509                         bt_unpinlatch (set->latch);
2510                         bt_unpinpool (set->pool);
2511                 } while( page_no = next );
2512
2513                 cnt--;  // remove stopper key
2514                 fprintf(stderr, " Total keys read %d\n", cnt);
2515                 break;
2516
2517         case 'c':
2518 #ifdef unix
2519                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2520 #endif
2521                 fprintf(stderr, "started counting\n");
2522                 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2523                 page_no = LEAF_page;
2524
2525                 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2526                 uid off = page_no << bt->mgr->page_bits;
2527 #ifdef unix
2528                   pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2529 #else
2530                 DWORD amt[1];
2531
2532                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2533
2534                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2535                         return bt->err = BTERR_map;
2536
2537                   if( *amt <  bt->mgr->page_size )
2538                         return bt->err = BTERR_map;
2539 #endif
2540                         if( !bt->frame->free && !bt->frame->lvl )
2541                                 cnt += bt->frame->act;
2542                         if( page_no > LEAF_page )
2543                                 next = page_no + 1;
2544                         page_no = next;
2545                 }
2546                 
2547                 cnt--;  // remove stopper key
2548                 fprintf(stderr, " Total keys read %d\n", cnt);
2549                 break;
2550         }
2551
2552         bt_close (bt);
2553 #ifdef unix
2554         return NULL;
2555 #else
2556         return 0;
2557 #endif
2558 }
2559
2560 typedef struct timeval timer;
2561
2562 int main (int argc, char **argv)
2563 {
2564 int idx, cnt, len, slot, err;
2565 int segsize, bits = 16;
2566 double start, stop;
2567 #ifdef unix
2568 pthread_t *threads;
2569 #else
2570 HANDLE *threads;
2571 #endif
2572 ThreadArg *args;
2573 uint poolsize = 0;
2574 float elapsed;
2575 int num = 0;
2576 char key[1];
2577 BtMgr *mgr;
2578 BtKey ptr;
2579 BtDb *bt;
2580
2581         if( argc < 3 ) {
2582                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2583                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2584                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2585                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2586                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2587                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2588                 exit(0);
2589         }
2590
2591         start = getCpuTime(0);
2592
2593         if( argc > 3 )
2594                 bits = atoi(argv[3]);
2595
2596         if( argc > 4 )
2597                 poolsize = atoi(argv[4]);
2598
2599         if( !poolsize )
2600                 fprintf (stderr, "Warning: no mapped_pool\n");
2601
2602         if( poolsize > 65535 )
2603                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2604
2605         if( argc > 5 )
2606                 segsize = atoi(argv[5]);
2607         else
2608                 segsize = 4;    // 16 pages per mmap segment
2609
2610         if( argc > 6 )
2611                 num = atoi(argv[6]);
2612
2613         cnt = argc - 7;
2614 #ifdef unix
2615         threads = malloc (cnt * sizeof(pthread_t));
2616 #else
2617         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2618 #endif
2619         args = malloc (cnt * sizeof(ThreadArg));
2620
2621         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2622
2623         if( !mgr ) {
2624                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2625                 exit (1);
2626         }
2627
2628         //      fire off threads
2629
2630         for( idx = 0; idx < cnt; idx++ ) {
2631                 args[idx].infile = argv[idx + 7];
2632                 args[idx].type = argv[2][0];
2633                 args[idx].mgr = mgr;
2634                 args[idx].num = num;
2635                 args[idx].idx = idx;
2636 #ifdef unix
2637                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2638                         fprintf(stderr, "Error creating thread %d\n", err);
2639 #else
2640                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2641 #endif
2642         }
2643
2644         //      wait for termination
2645
2646 #ifdef unix
2647         for( idx = 0; idx < cnt; idx++ )
2648                 pthread_join (threads[idx], NULL);
2649 #else
2650         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2651
2652         for( idx = 0; idx < cnt; idx++ )
2653                 CloseHandle(threads[idx]);
2654
2655 #endif
2656         elapsed = getCpuTime(0) - start;
2657         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2658         elapsed = getCpuTime(1);
2659         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2660         elapsed = getCpuTime(2);
2661         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2662
2663         bt_mgrclose (mgr);
2664 }
2665
2666 #endif  //STANDALONE