]> pd.if.org Git - btree/blob - threadskv1.c
Fix small bug in main when there is less t han one input file
[btree] / threadskv1.c
1 // btree version threadskv1 sched_yield version
2 //      with reworked bt_deletekey code
3 //      and phase-fair reader writer lock
4 // 08 SEP 2014
5
6 // author: karl malbrain, malbrain@cal.berkeley.edu
7
8 /*
9 This work, including the source code, documentation
10 and related data, is placed into the public domain.
11
12 The orginal author is Karl Malbrain.
13
14 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
15 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
16 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
17 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
18 RESULTING FROM THE USE, MODIFICATION, OR
19 REDISTRIBUTION OF THIS SOFTWARE.
20 */
21
22 // Please see the project home page for documentation
23 // code.google.com/p/high-concurrency-btree
24
25 #define _FILE_OFFSET_BITS 64
26 #define _LARGEFILE64_SOURCE
27
28 #ifdef linux
29 #define _GNU_SOURCE
30 #endif
31
32 #ifdef unix
33 #include <unistd.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <fcntl.h>
37 #include <sys/time.h>
38 #include <sys/mman.h>
39 #include <errno.h>
40 #include <pthread.h>
41 #else
42 #define WIN32_LEAN_AND_MEAN
43 #include <windows.h>
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <time.h>
47 #include <fcntl.h>
48 #include <process.h>
49 #include <intrin.h>
50 #endif
51
52 #include <memory.h>
53 #include <string.h>
54 #include <stddef.h>
55
56 typedef unsigned long long      uid;
57
58 #ifndef unix
59 typedef unsigned long long      off64_t;
60 typedef unsigned short          ushort;
61 typedef unsigned int            uint;
62 #endif
63
64 #define BT_latchtable   128                                     // number of latch manager slots
65
66 #define BT_ro 0x6f72    // ro
67 #define BT_rw 0x7772    // rw
68
69 #define BT_maxbits              24                                      // maximum page size in bits
70 #define BT_minbits              9                                       // minimum page size in bits
71 #define BT_minpage              (1 << BT_minbits)       // minimum page size
72 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
73
74 /*
75 There are five lock types for each node in three independent sets: 
76 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
77 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
78 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
79 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
80 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
81 */
82
83 typedef enum{
84         BtLockAccess,
85         BtLockDelete,
86         BtLockRead,
87         BtLockWrite,
88         BtLockParent
89 } BtLock;
90
91 //      definition for phase-fair reader/writer lock implementation
92
93 typedef struct {
94         ushort rin[1];
95         ushort rout[1];
96         ushort ticket[1];
97         ushort serving[1];
98 } RWLock;
99
100 #define PHID 0x1
101 #define PRES 0x2
102 #define MASK 0x3
103 #define RINC 0x4
104
105 //      definition for spin latch implementation
106
107 // exclusive is set for write access
108 // share is count of read accessors
109 // grant write lock when share == 0
110
111 volatile typedef struct {
112         ushort exclusive:1;
113         ushort pending:1;
114         ushort share:14;
115 } BtSpinLatch;
116
117 #define XCL 1
118 #define PEND 2
119 #define BOTH 3
120 #define SHARE 4
121
122 //  hash table entries
123
124 typedef struct {
125         BtSpinLatch latch[1];
126         volatile ushort slot;           // Latch table entry at head of chain
127 } BtHashEntry;
128
129 //      latch manager table structure
130
131 typedef struct {
132         RWLock readwr[1];               // read/write page lock
133         RWLock access[1];               // Access Intent/Page delete
134         RWLock parent[1];               // Posting of fence key in parent
135         BtSpinLatch busy[1];            // slot is being moved between chains
136         volatile ushort next;           // next entry in hash table chain
137         volatile ushort prev;           // prev entry in hash table chain
138         volatile ushort pin;            // number of outstanding locks
139         volatile ushort hash;           // hash slot entry is under
140         volatile uid page_no;           // latch set page number
141 } BtLatchSet;
142
143 //      Define the length of the page and key pointers
144
145 #define BtId 6
146
147 //      Page key slot definition.
148
149 //      If BT_maxbits is 15 or less, you can save 2 bytes
150 //      for each key stored by making the two uints
151 //      into ushorts.
152
153 //      Keys are marked dead, but remain on the page until
154 //      it cleanup is called. The fence key (highest key) for
155 //      the page is always present, even after cleanup.
156
157 typedef struct {
158         uint off:BT_maxbits;            // page offset for key start
159         uint dead:1;                            // set for deleted key
160 } BtSlot;
161
162 //      The key structure occupies space at the upper end of
163 //      each page.  It's a length byte followed by the key
164 //      bytes.
165
166 typedef struct {
167         unsigned char len;
168         unsigned char key[1];
169 } *BtKey;
170
171 //      the value structure also occupies space at the upper
172 //      end of the page.
173
174 typedef struct {
175         unsigned char len;
176         unsigned char value[1];
177 } *BtVal;
178
179 //      The first part of an index page.
180 //      It is immediately followed
181 //      by the BtSlot array of keys.
182
183 typedef struct BtPage_ {
184         uint cnt;                                       // count of keys in page
185         uint act;                                       // count of active keys
186         uint min;                                       // next key offset
187         unsigned char bits:7;           // page size in bits
188         unsigned char free:1;           // page is on free chain
189         unsigned char lvl:6;            // level of page
190         unsigned char kill:1;           // page is being deleted
191         unsigned char dirty:1;          // page has deleted keys
192         unsigned char right[BtId];      // page number to right
193 } *BtPage;
194
195 //      The memory mapping pool table buffer manager entry
196
197 typedef struct {
198         uid  basepage;                          // mapped base page number
199         char *map;                                      // mapped memory pointer
200         ushort slot;                            // slot index in this array
201         ushort pin;                                     // mapped page pin counter
202         void *hashprev;                         // previous pool entry for the same hash idx
203         void *hashnext;                         // next pool entry for the same hash idx
204 #ifndef unix
205         HANDLE hmap;                            // Windows memory mapping handle
206 #endif
207 } BtPool;
208
209 #define CLOCK_bit 0x8000                // bit in pool->pin
210
211 //  The loadpage interface object
212
213 typedef struct {
214         uid page_no;            // current page number
215         BtPage page;            // current page pointer
216         BtPool *pool;           // current page pool
217         BtLatchSet *latch;      // current page latch set
218 } BtPageSet;
219
220 //      structure for latch manager on ALLOC_page
221
222 typedef struct {
223         struct BtPage_ alloc[1];        // next page_no in right ptr
224         unsigned char chain[BtId];      // head of free page_nos chain
225         BtSpinLatch lock[1];            // allocation area lite latch
226         ushort latchdeployed;           // highest number of latch entries deployed
227         ushort nlatchpage;                      // number of latch pages at BT_latch
228         ushort latchtotal;                      // number of page latch entries
229         ushort latchhash;                       // number of latch hash table slots
230         ushort latchvictim;                     // next latch entry to examine
231         BtHashEntry table[0];           // the hash table
232 } BtLatchMgr;
233
234 //      The object structure for Btree access
235
236 typedef struct {
237         uint page_size;                         // page size    
238         uint page_bits;                         // page size in bits    
239         uint seg_bits;                          // seg size in pages in bits
240         uint mode;                                      // read-write mode
241 #ifdef unix
242         int idx;
243 #else
244         HANDLE idx;
245 #endif
246         ushort poolcnt;                         // highest page pool node in use
247         ushort poolmax;                         // highest page pool node allocated
248         ushort poolmask;                        // total number of pages in mmap segment - 1
249         ushort hashsize;                        // size of Hash Table for pool entries
250         volatile uint evicted;          // last evicted hash table slot
251         ushort *hash;                           // pool index for hash entries
252         BtSpinLatch *latch;                     // latches for hash table slots
253         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
254         BtLatchSet *latchsets;          // mapped latch set from latch pages
255         BtPool *pool;                           // memory pool page segments
256 #ifndef unix
257         HANDLE halloc;                          // allocation and latch table handle
258 #endif
259 } BtMgr;
260
261 typedef struct {
262         BtMgr *mgr;                     // buffer manager for thread
263         BtPage cursor;          // cached frame for start/next (never mapped)
264         BtPage frame;           // spare frame for the page split (never mapped)
265         uid cursor_page;        // current cursor page number   
266         unsigned char *mem;     // frame, cursor, page memory buffer
267         int found;                      // last delete or insert was found
268         int err;                        // last error
269 } BtDb;
270
271 typedef enum {
272         BTERR_ok = 0,
273         BTERR_struct,
274         BTERR_ovflw,
275         BTERR_lock,
276         BTERR_map,
277         BTERR_wrt,
278         BTERR_hash
279 } BTERR;
280
281 // B-Tree functions
282 extern void bt_close (BtDb *bt);
283 extern BtDb *bt_open (BtMgr *mgr);
284 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen);
285 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
286 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint vallen);
287 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
288 extern uint bt_nextkey   (BtDb *bt, uint slot);
289
290 //      manager functions
291 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
292 void bt_mgrclose (BtMgr *mgr);
293
294 //  Helper functions to return slot values
295 //      from the cursor page.
296
297 extern BtKey bt_key (BtDb *bt, uint slot);
298 extern BtVal bt_val (BtDb *bt, uint slot);
299
300 //  BTree page number constants
301 #define ALLOC_page              0       // allocation & latch manager hash table
302 #define ROOT_page               1       // root of the btree
303 #define LEAF_page               2       // first page of leaves
304 #define LATCH_page              3       // pages for latch manager
305
306 //      Number of levels to create in a new BTree
307
308 #define MIN_lvl                 2
309
310 //  The page is allocated from low and hi ends.
311 //  The key slots are allocated from the bottom,
312 //      while the text and value of the key
313 //  are allocated from the top.  When the two
314 //  areas meet, the page is split into two.
315
316 //  A key consists of a length byte, two bytes of
317 //  index number (0 - 65534), and up to 253 bytes
318 //  of key value.  Duplicate keys are discarded.
319 //  Associated with each key is a value byte string
320 //      containing any value desired.
321
322 //  The b-tree root is always located at page 1.
323 //      The first leaf page of level zero is always
324 //      located on page 2.
325
326 //      The b-tree pages are linked with next
327 //      pointers to facilitate enumerators,
328 //      and provide for concurrency.
329
330 //      When to root page fills, it is split in two and
331 //      the tree height is raised by a new root at page
332 //      one with two keys.
333
334 //      Deleted keys are marked with a dead bit until
335 //      page cleanup. The fence key for a leaf node is
336 //      always present
337
338 //  Groups of pages called segments from the btree are optionally
339 //  cached with a memory mapped pool. A hash table is used to keep
340 //  track of the cached segments.  This behaviour is controlled
341 //  by the cache block size parameter to bt_open.
342
343 //  To achieve maximum concurrency one page is locked at a time
344 //  as the tree is traversed to find leaf key in question. The right
345 //  page numbers are used in cases where the page is being split,
346 //      or consolidated.
347
348 //  Page 0 is dedicated to lock for new page extensions,
349 //      and chains empty pages together for reuse. It also
350 //      contains the latch manager hash table.
351
352 //      The ParentModification lock on a node is obtained to serialize posting
353 //      or changing the fence key for a node.
354
355 //      Empty pages are chained together through the ALLOC page and reused.
356
357 //      Access macros to address slot and key values from the page
358 //      Page slots use 1 based indexing.
359
360 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
361 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
362 #define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
363
364 void bt_putid(unsigned char *dest, uid id)
365 {
366 int i = BtId;
367
368         while( i-- )
369                 dest[i] = (unsigned char)id, id >>= 8;
370 }
371
372 uid bt_getid(unsigned char *src)
373 {
374 uid id = 0;
375 int i;
376
377         for( i = 0; i < BtId; i++ )
378                 id <<= 8, id |= *src++; 
379
380         return id;
381 }
382
383 //      Phase-Fair reader/writer lock implementation
384
385 void WriteLock (RWLock *lock)
386 {
387 ushort w, r, tix;
388
389 #ifdef unix
390         tix = __sync_fetch_and_add (lock->ticket, 1);
391 #else
392         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
393 #endif
394         // wait for our ticket to come up
395
396         while( tix != lock->serving[0] )
397 #ifdef unix
398                 sched_yield();
399 #else
400                 SwitchToThread ();
401 #endif
402
403         w = PRES | (tix & PHID);
404 #ifdef  unix
405         r = __sync_fetch_and_add (lock->rin, w);
406 #else
407         r = _InterlockedExchangeAdd16 (lock->rin, w);
408 #endif
409         while( r != *lock->rout )
410 #ifdef unix
411                 sched_yield();
412 #else
413                 SwitchToThread();
414 #endif
415 }
416
417 void WriteRelease (RWLock *lock)
418 {
419 #ifdef unix
420         __sync_fetch_and_and (lock->rin, ~MASK);
421 #else
422         _InterlockedAnd16 (lock->rin, ~MASK);
423 #endif
424         lock->serving[0]++;
425 }
426
427 void ReadLock (RWLock *lock)
428 {
429 ushort w;
430 #ifdef unix
431         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
432 #else
433         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
434 #endif
435         if( w )
436           while( w == (*lock->rin & MASK) )
437 #ifdef unix
438                 sched_yield ();
439 #else
440                 SwitchToThread ();
441 #endif
442 }
443
444 void ReadRelease (RWLock *lock)
445 {
446 #ifdef unix
447         __sync_fetch_and_add (lock->rout, RINC);
448 #else
449         _InterlockedExchangeAdd16 (lock->rout, RINC);
450 #endif
451 }
452
453 //      Spin Latch Manager
454
455 //      wait until write lock mode is clear
456 //      and add 1 to the share count
457
458 void bt_spinreadlock(BtSpinLatch *latch)
459 {
460 ushort prev;
461
462   do {
463 #ifdef unix
464         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
465 #else
466         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
467 #endif
468         //  see if exclusive request is granted or pending
469
470         if( !(prev & BOTH) )
471                 return;
472 #ifdef unix
473         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
474 #else
475         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
476 #endif
477 #ifdef  unix
478   } while( sched_yield(), 1 );
479 #else
480   } while( SwitchToThread(), 1 );
481 #endif
482 }
483
484 //      wait for other read and write latches to relinquish
485
486 void bt_spinwritelock(BtSpinLatch *latch)
487 {
488 ushort prev;
489
490   do {
491 #ifdef  unix
492         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
493 #else
494         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
495 #endif
496         if( !(prev & XCL) )
497           if( !(prev & ~BOTH) )
498                 return;
499           else
500 #ifdef unix
501                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
502 #else
503                 _InterlockedAnd16((ushort *)latch, ~XCL);
504 #endif
505 #ifdef  unix
506   } while( sched_yield(), 1 );
507 #else
508   } while( SwitchToThread(), 1 );
509 #endif
510 }
511
512 //      try to obtain write lock
513
514 //      return 1 if obtained,
515 //              0 otherwise
516
517 int bt_spinwritetry(BtSpinLatch *latch)
518 {
519 ushort prev;
520
521 #ifdef  unix
522         prev = __sync_fetch_and_or((ushort *)latch, XCL);
523 #else
524         prev = _InterlockedOr16((ushort *)latch, XCL);
525 #endif
526         //      take write access if all bits are clear
527
528         if( !(prev & XCL) )
529           if( !(prev & ~BOTH) )
530                 return 1;
531           else
532 #ifdef unix
533                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
534 #else
535                 _InterlockedAnd16((ushort *)latch, ~XCL);
536 #endif
537         return 0;
538 }
539
540 //      clear write mode
541
542 void bt_spinreleasewrite(BtSpinLatch *latch)
543 {
544 #ifdef unix
545         __sync_fetch_and_and((ushort *)latch, ~BOTH);
546 #else
547         _InterlockedAnd16((ushort *)latch, ~BOTH);
548 #endif
549 }
550
551 //      decrement reader count
552
553 void bt_spinreleaseread(BtSpinLatch *latch)
554 {
555 #ifdef unix
556         __sync_fetch_and_add((ushort *)latch, -SHARE);
557 #else
558         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
559 #endif
560 }
561
562 //      link latch table entry into latch hash table
563
564 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
565 {
566 BtLatchSet *set = bt->mgr->latchsets + victim;
567
568         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
569                 bt->mgr->latchsets[set->next].prev = victim;
570
571         bt->mgr->latchmgr->table[hashidx].slot = victim;
572         set->page_no = page_no;
573         set->hash = hashidx;
574         set->prev = 0;
575 }
576
577 //      release latch pin
578
579 void bt_unpinlatch (BtLatchSet *set)
580 {
581 #ifdef unix
582         __sync_fetch_and_add(&set->pin, -1);
583 #else
584         _InterlockedDecrement16 (&set->pin);
585 #endif
586 }
587
588 //      find existing latchset or inspire new one
589 //      return with latchset pinned
590
591 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
592 {
593 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
594 ushort slot, avail = 0, victim, idx;
595 BtLatchSet *set;
596
597         //  obtain read lock on hash table entry
598
599         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
600
601         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
602         {
603                 set = bt->mgr->latchsets + slot;
604                 if( page_no == set->page_no )
605                         break;
606         } while( slot = set->next );
607
608         if( slot ) {
609 #ifdef unix
610                 __sync_fetch_and_add(&set->pin, 1);
611 #else
612                 _InterlockedIncrement16 (&set->pin);
613 #endif
614         }
615
616     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
617
618         if( slot )
619                 return set;
620
621   //  try again, this time with write lock
622
623   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
624
625   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
626   {
627         set = bt->mgr->latchsets + slot;
628         if( page_no == set->page_no )
629                 break;
630         if( !set->pin && !avail )
631                 avail = slot;
632   } while( slot = set->next );
633
634   //  found our entry, or take over an unpinned one
635
636   if( slot || (slot = avail) ) {
637         set = bt->mgr->latchsets + slot;
638 #ifdef unix
639         __sync_fetch_and_add(&set->pin, 1);
640 #else
641         _InterlockedIncrement16 (&set->pin);
642 #endif
643         set->page_no = page_no;
644         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
645         return set;
646   }
647
648         //  see if there are any unused entries
649 #ifdef unix
650         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
651 #else
652         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
653 #endif
654
655         if( victim < bt->mgr->latchmgr->latchtotal ) {
656                 set = bt->mgr->latchsets + victim;
657 #ifdef unix
658                 __sync_fetch_and_add(&set->pin, 1);
659 #else
660                 _InterlockedIncrement16 (&set->pin);
661 #endif
662                 bt_latchlink (bt, hashidx, victim, page_no);
663                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
664                 return set;
665         }
666
667 #ifdef unix
668         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
669 #else
670         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
671 #endif
672   //  find and reuse previous lock entry
673
674   while( 1 ) {
675 #ifdef unix
676         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
677 #else
678         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
679 #endif
680         //      we don't use slot zero
681
682         if( victim %= bt->mgr->latchmgr->latchtotal )
683                 set = bt->mgr->latchsets + victim;
684         else
685                 continue;
686
687         //      take control of our slot
688         //      from other threads
689
690         if( set->pin || !bt_spinwritetry (set->busy) )
691                 continue;
692
693         idx = set->hash;
694
695         // try to get write lock on hash chain
696         //      skip entry if not obtained
697         //      or has outstanding locks
698
699         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
700                 bt_spinreleasewrite (set->busy);
701                 continue;
702         }
703
704         if( set->pin ) {
705                 bt_spinreleasewrite (set->busy);
706                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
707                 continue;
708         }
709
710         //  unlink our available victim from its hash chain
711
712         if( set->prev )
713                 bt->mgr->latchsets[set->prev].next = set->next;
714         else
715                 bt->mgr->latchmgr->table[idx].slot = set->next;
716
717         if( set->next )
718                 bt->mgr->latchsets[set->next].prev = set->prev;
719
720         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
721 #ifdef unix
722         __sync_fetch_and_add(&set->pin, 1);
723 #else
724         _InterlockedIncrement16 (&set->pin);
725 #endif
726         bt_latchlink (bt, hashidx, victim, page_no);
727         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
728         bt_spinreleasewrite (set->busy);
729         return set;
730   }
731 }
732
733 void bt_mgrclose (BtMgr *mgr)
734 {
735 BtPool *pool;
736 uint slot;
737
738         // release mapped pages
739         //      note that slot zero is never used
740
741         for( slot = 1; slot < mgr->poolmax; slot++ ) {
742                 pool = mgr->pool + slot;
743                 if( pool->slot )
744 #ifdef unix
745                         munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
746 #else
747                 {
748                         FlushViewOfFile(pool->map, 0);
749                         UnmapViewOfFile(pool->map);
750                         CloseHandle(pool->hmap);
751                 }
752 #endif
753         }
754
755 #ifdef unix
756         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
757         munmap (mgr->latchmgr, 2 * mgr->page_size);
758 #else
759         FlushViewOfFile(mgr->latchmgr, 0);
760         UnmapViewOfFile(mgr->latchmgr);
761         CloseHandle(mgr->halloc);
762 #endif
763 #ifdef unix
764         close (mgr->idx);
765         free (mgr->pool);
766         free (mgr->hash);
767         free ((void *)mgr->latch);
768         free (mgr);
769 #else
770         FlushFileBuffers(mgr->idx);
771         CloseHandle(mgr->idx);
772         GlobalFree (mgr->pool);
773         GlobalFree (mgr->hash);
774         GlobalFree ((void *)mgr->latch);
775         GlobalFree (mgr);
776 #endif
777 }
778
779 //      close and release memory
780
781 void bt_close (BtDb *bt)
782 {
783 #ifdef unix
784         if( bt->mem )
785                 free (bt->mem);
786 #else
787         if( bt->mem)
788                 VirtualFree (bt->mem, 0, MEM_RELEASE);
789 #endif
790         free (bt);
791 }
792
793 //  open/create new btree buffer manager
794
795 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
796 //              size of mapped page pool (e.g. 8192)
797
798 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
799 {
800 uint lvl, attr, cacheblk, last, slot, idx;
801 uint nlatchpage, latchhash;
802 unsigned char value[BtId];
803 BtLatchMgr *latchmgr;
804 off64_t size;
805 uint amt[1];
806 BtMgr* mgr;
807 BtKey key;
808 BtVal val;
809 int flag;
810
811 #ifndef unix
812 SYSTEM_INFO sysinfo[1];
813 #endif
814
815         // determine sanity of page size and buffer pool
816
817         if( bits > BT_maxbits )
818                 bits = BT_maxbits;
819         else if( bits < BT_minbits )
820                 bits = BT_minbits;
821
822         if( !poolmax )
823                 return NULL;    // must have buffer pool
824
825 #ifdef unix
826         mgr = calloc (1, sizeof(BtMgr));
827
828         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
829
830         if( mgr->idx == -1 )
831                 return free(mgr), NULL;
832         
833         cacheblk = 4096;        // minimum mmap segment size for unix
834
835 #else
836         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
837         attr = FILE_ATTRIBUTE_NORMAL;
838         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
839
840         if( mgr->idx == INVALID_HANDLE_VALUE )
841                 return GlobalFree(mgr), NULL;
842
843         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
844         GetSystemInfo(sysinfo);
845         cacheblk = sysinfo->dwAllocationGranularity;
846 #endif
847
848 #ifdef unix
849         latchmgr = malloc (BT_maxpage);
850         *amt = 0;
851
852         // read minimum page size to get root info
853
854         if( size = lseek (mgr->idx, 0L, 2) ) {
855                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
856                         bits = latchmgr->alloc->bits;
857                 else
858                         return free(mgr), free(latchmgr), NULL;
859         } else if( mode == BT_ro )
860                 return free(latchmgr), bt_mgrclose (mgr), NULL;
861 #else
862         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
863         size = GetFileSize(mgr->idx, amt);
864
865         if( size || *amt ) {
866                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
867                         return bt_mgrclose (mgr), NULL;
868                 bits = latchmgr->alloc->bits;
869         } else if( mode == BT_ro )
870                 return bt_mgrclose (mgr), NULL;
871 #endif
872
873         mgr->page_size = 1 << bits;
874         mgr->page_bits = bits;
875
876         mgr->poolmax = poolmax;
877         mgr->mode = mode;
878
879         if( cacheblk < mgr->page_size )
880                 cacheblk = mgr->page_size;
881
882         //  mask for partial memmaps
883
884         mgr->poolmask = (cacheblk >> bits) - 1;
885
886         //      see if requested size of pages per memmap is greater
887
888         if( (1 << segsize) > mgr->poolmask )
889                 mgr->poolmask = (1 << segsize) - 1;
890
891         mgr->seg_bits = 0;
892
893         while( (1 << mgr->seg_bits) <= mgr->poolmask )
894                 mgr->seg_bits++;
895
896         mgr->hashsize = hashsize;
897
898 #ifdef unix
899         mgr->pool = calloc (poolmax, sizeof(BtPool));
900         mgr->hash = calloc (hashsize, sizeof(ushort));
901         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
902 #else
903         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
904         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
905         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
906 #endif
907
908         if( size || *amt )
909                 goto mgrlatch;
910
911         // initialize an empty b-tree with latch page, root page, page of leaves
912         // and page(s) of latches
913
914         memset (latchmgr, 0, 1 << bits);
915         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
916         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
917         latchmgr->alloc->bits = mgr->page_bits;
918
919         latchmgr->nlatchpage = nlatchpage;
920         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
921
922         //  initialize latch manager
923
924         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
925
926         //      size of hash table = total number of latchsets
927
928         if( latchhash > latchmgr->latchtotal )
929                 latchhash = latchmgr->latchtotal;
930
931         latchmgr->latchhash = latchhash;
932
933 #ifdef unix
934         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
935                 return bt_mgrclose (mgr), NULL;
936 #else
937         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
938                 return bt_mgrclose (mgr), NULL;
939
940         if( *amt < mgr->page_size )
941                 return bt_mgrclose (mgr), NULL;
942 #endif
943
944         memset (latchmgr, 0, 1 << bits);
945         latchmgr->alloc->bits = mgr->page_bits;
946
947         for( lvl=MIN_lvl; lvl--; ) {
948                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
949                 key = keyptr(latchmgr->alloc, 1);
950                 key->len = 2;           // create stopper key
951                 key->key[0] = 0xff;
952                 key->key[1] = 0xff;
953
954                 bt_putid(value, MIN_lvl - lvl + 1);
955                 val = valptr(latchmgr->alloc, 1);
956                 val->len = lvl ? BtId : 0;
957                 memcpy (val->value, value, val->len);
958
959                 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
960                 latchmgr->alloc->lvl = lvl;
961                 latchmgr->alloc->cnt = 1;
962                 latchmgr->alloc->act = 1;
963 #ifdef unix
964                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
965                         return bt_mgrclose (mgr), NULL;
966 #else
967                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
968                         return bt_mgrclose (mgr), NULL;
969
970                 if( *amt < mgr->page_size )
971                         return bt_mgrclose (mgr), NULL;
972 #endif
973         }
974
975         // clear out latch manager locks
976         //      and rest of pages to round out segment
977
978         memset(latchmgr, 0, mgr->page_size);
979         last = MIN_lvl + 1;
980
981         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
982 #ifdef unix
983                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
984 #else
985                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
986                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
987                         return bt_mgrclose (mgr), NULL;
988                 if( *amt < mgr->page_size )
989                         return bt_mgrclose (mgr), NULL;
990 #endif
991                 last++;
992         }
993
994 mgrlatch:
995 #ifdef unix
996         // mlock the root page and the latchmgr page
997
998         flag = PROT_READ | PROT_WRITE;
999         mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1000         if( mgr->latchmgr == MAP_FAILED )
1001                 return bt_mgrclose (mgr), NULL;
1002         mlock (mgr->latchmgr, 2 * mgr->page_size);
1003
1004         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1005         if( mgr->latchsets == MAP_FAILED )
1006                 return bt_mgrclose (mgr), NULL;
1007         mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1008 #else
1009         flag = PAGE_READWRITE;
1010         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1011         if( !mgr->halloc )
1012                 return bt_mgrclose (mgr), NULL;
1013
1014         flag = FILE_MAP_WRITE;
1015         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1016         if( !mgr->latchmgr )
1017                 return GetLastError(), bt_mgrclose (mgr), NULL;
1018
1019         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1020 #endif
1021
1022 #ifdef unix
1023         free (latchmgr);
1024 #else
1025         VirtualFree (latchmgr, 0, MEM_RELEASE);
1026 #endif
1027         return mgr;
1028 }
1029
1030 //      open BTree access method
1031 //      based on buffer manager
1032
1033 BtDb *bt_open (BtMgr *mgr)
1034 {
1035 BtDb *bt = malloc (sizeof(*bt));
1036
1037         memset (bt, 0, sizeof(*bt));
1038         bt->mgr = mgr;
1039 #ifdef unix
1040         bt->mem = malloc (2 *mgr->page_size);
1041 #else
1042         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1043 #endif
1044         bt->frame = (BtPage)bt->mem;
1045         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1046         return bt;
1047 }
1048
1049 //  compare two keys, returning > 0, = 0, or < 0
1050 //  as the comparison value
1051
1052 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1053 {
1054 uint len1 = key1->len;
1055 int ans;
1056
1057         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1058                 return ans;
1059
1060         if( len1 > len2 )
1061                 return 1;
1062         if( len1 < len2 )
1063                 return -1;
1064
1065         return 0;
1066 }
1067
1068 //      Buffer Pool mgr
1069
1070 // find segment in pool
1071 // must be called with hashslot idx locked
1072 //      return NULL if not there
1073 //      otherwise return node
1074
1075 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1076 {
1077 BtPool *pool;
1078 uint slot;
1079
1080         // compute start of hash chain in pool
1081
1082         if( slot = bt->mgr->hash[idx] ) 
1083                 pool = bt->mgr->pool + slot;
1084         else
1085                 return NULL;
1086
1087         page_no &= ~bt->mgr->poolmask;
1088
1089         while( pool->basepage != page_no )
1090           if( pool = pool->hashnext )
1091                 continue;
1092           else
1093                 return NULL;
1094
1095         return pool;
1096 }
1097
1098 // add segment to hash table
1099
1100 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1101 {
1102 BtPool *node;
1103 uint slot;
1104
1105         pool->hashprev = pool->hashnext = NULL;
1106         pool->basepage = page_no & ~bt->mgr->poolmask;
1107         pool->pin = CLOCK_bit + 1;
1108
1109         if( slot = bt->mgr->hash[idx] ) {
1110                 node = bt->mgr->pool + slot;
1111                 pool->hashnext = node;
1112                 node->hashprev = pool;
1113         }
1114
1115         bt->mgr->hash[idx] = pool->slot;
1116 }
1117
1118 //  map new buffer pool segment to virtual memory
1119
1120 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1121 {
1122 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1123 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1124 int flag;
1125
1126 #ifdef unix
1127         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1128         pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1129         if( pool->map == MAP_FAILED )
1130                 return bt->err = BTERR_map;
1131
1132 #else
1133         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1134         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1135         if( !pool->hmap )
1136                 return bt->err = BTERR_map;
1137
1138         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1139         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1140         if( !pool->map )
1141                 return bt->err = BTERR_map;
1142 #endif
1143         return bt->err = 0;
1144 }
1145
1146 //      calculate page within pool
1147
1148 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1149 {
1150 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1151 BtPage page;
1152
1153         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1154 //      madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1155         return page;
1156 }
1157
1158 //  release pool pin
1159
1160 void bt_unpinpool (BtPool *pool)
1161 {
1162 #ifdef unix
1163         __sync_fetch_and_add(&pool->pin, -1);
1164 #else
1165         _InterlockedDecrement16 (&pool->pin);
1166 #endif
1167 }
1168
1169 //      find or place requested page in segment-pool
1170 //      return pool table entry, incrementing pin
1171
1172 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1173 {
1174 uint slot, hashidx, idx, victim;
1175 BtPool *pool, *node, *next;
1176
1177         //      lock hash table chain
1178
1179         hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1180         bt_spinwritelock (&bt->mgr->latch[hashidx]);
1181
1182         //      look up in hash table
1183
1184         if( pool = bt_findpool(bt, page_no, hashidx) ) {
1185 #ifdef unix
1186                 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1187                 __sync_fetch_and_add(&pool->pin, 1);
1188 #else
1189                 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1190                 _InterlockedIncrement16 (&pool->pin);
1191 #endif
1192                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1193                 return pool;
1194         }
1195
1196         // allocate a new pool node
1197         // and add to hash table
1198
1199 #ifdef unix
1200         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1201 #else
1202         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1203 #endif
1204
1205         if( ++slot < bt->mgr->poolmax ) {
1206                 pool = bt->mgr->pool + slot;
1207                 pool->slot = slot;
1208
1209                 if( bt_mapsegment(bt, pool, page_no) )
1210                         return NULL;
1211
1212                 bt_linkhash(bt, pool, page_no, hashidx);
1213                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1214                 return pool;
1215         }
1216
1217         // pool table is full
1218         //      find best pool entry to evict
1219
1220 #ifdef unix
1221         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1222 #else
1223         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1224 #endif
1225
1226         while( 1 ) {
1227 #ifdef unix
1228                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1229 #else
1230                 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1231 #endif
1232                 victim %= bt->mgr->poolmax;
1233                 pool = bt->mgr->pool + victim;
1234                 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1235
1236                 if( !victim )
1237                         continue;
1238
1239                 // try to get write lock
1240                 //      skip entry if not obtained
1241
1242                 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1243                         continue;
1244
1245                 //      skip this entry if
1246                 //      page is pinned
1247                 //  or clock bit is set
1248
1249                 if( pool->pin ) {
1250 #ifdef unix
1251                         __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1252 #else
1253                         _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1254 #endif
1255                         bt_spinreleasewrite (&bt->mgr->latch[idx]);
1256                         continue;
1257                 }
1258
1259                 // unlink victim pool node from hash table
1260
1261                 if( node = pool->hashprev )
1262                         node->hashnext = pool->hashnext;
1263                 else if( node = pool->hashnext )
1264                         bt->mgr->hash[idx] = node->slot;
1265                 else
1266                         bt->mgr->hash[idx] = 0;
1267
1268                 if( node = pool->hashnext )
1269                         node->hashprev = pool->hashprev;
1270
1271                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1272
1273                 //      remove old file mapping
1274 #ifdef unix
1275                 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1276 #else
1277 //              FlushViewOfFile(pool->map, 0);
1278                 UnmapViewOfFile(pool->map);
1279                 CloseHandle(pool->hmap);
1280 #endif
1281                 pool->map = NULL;
1282
1283                 //  create new pool mapping
1284                 //  and link into hash table
1285
1286                 if( bt_mapsegment(bt, pool, page_no) )
1287                         return NULL;
1288
1289                 bt_linkhash(bt, pool, page_no, hashidx);
1290                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1291                 return pool;
1292         }
1293 }
1294
1295 // place write, read, or parent lock on requested page_no.
1296
1297 void bt_lockpage(BtLock mode, BtLatchSet *set)
1298 {
1299         switch( mode ) {
1300         case BtLockRead:
1301                 ReadLock (set->readwr);
1302                 break;
1303         case BtLockWrite:
1304                 WriteLock (set->readwr);
1305                 break;
1306         case BtLockAccess:
1307                 ReadLock (set->access);
1308                 break;
1309         case BtLockDelete:
1310                 WriteLock (set->access);
1311                 break;
1312         case BtLockParent:
1313                 WriteLock (set->parent);
1314                 break;
1315         }
1316 }
1317
1318 // remove write, read, or parent lock on requested page
1319
1320 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1321 {
1322         switch( mode ) {
1323         case BtLockRead:
1324                 ReadRelease (set->readwr);
1325                 break;
1326         case BtLockWrite:
1327                 WriteRelease (set->readwr);
1328                 break;
1329         case BtLockAccess:
1330                 ReadRelease (set->access);
1331                 break;
1332         case BtLockDelete:
1333                 WriteRelease (set->access);
1334                 break;
1335         case BtLockParent:
1336                 WriteRelease (set->parent);
1337                 break;
1338         }
1339 }
1340
1341 //      allocate a new page and write page into it
1342
1343 uid bt_newpage(BtDb *bt, BtPage page)
1344 {
1345 BtPageSet set[1];
1346 uid new_page;
1347 int blk;
1348
1349         //      lock allocation page
1350
1351         bt_spinwritelock(bt->mgr->latchmgr->lock);
1352
1353         // use empty chain first
1354         // else allocate empty page
1355
1356         if( new_page = bt_getid(bt->mgr->latchmgr->chain) ) {
1357                 if( set->pool = bt_pinpool (bt, new_page) )
1358                         set->page = bt_page (bt, set->pool, new_page);
1359                 else
1360                         return 0;
1361
1362                 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1363                 bt_unpinpool (set->pool);
1364         } else {
1365                 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1366                 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1367 #ifdef unix
1368                 // if writing first page of pool block, set file length thru last page
1369
1370                 if( (new_page & bt->mgr->poolmask) == 0 )
1371                         ftruncate (bt->mgr->idx, (new_page + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1372 #endif
1373         }
1374 #ifdef unix
1375         // unlock allocation latch
1376
1377         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1378 #endif
1379
1380         //      bring new page into pool and copy page.
1381         //      this will extend the file into the new pages on WIN32.
1382
1383         if( set->pool = bt_pinpool (bt, new_page) )
1384                 set->page = bt_page (bt, set->pool, new_page);
1385         else
1386                 return 0;
1387
1388         memcpy(set->page, page, bt->mgr->page_size);
1389         bt_unpinpool (set->pool);
1390
1391 #ifndef unix
1392         // unlock allocation latch
1393
1394         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1395 #endif
1396         return new_page;
1397 }
1398
1399 //  find slot in page for given key at a given level
1400
1401 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1402 {
1403 uint diff, higher = set->page->cnt, low = 1, slot;
1404 uint good = 0;
1405
1406         //        make stopper key an infinite fence value
1407
1408         if( bt_getid (set->page->right) )
1409                 higher++;
1410         else
1411                 good++;
1412
1413         //      low is the lowest candidate.
1414         //  loop ends when they meet
1415
1416         //  higher is already
1417         //      tested as .ge. the passed key.
1418
1419         while( diff = higher - low ) {
1420                 slot = low + ( diff >> 1 );
1421                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1422                         low = slot + 1;
1423                 else
1424                         higher = slot, good++;
1425         }
1426
1427         //      return zero if key is on right link page
1428
1429         return good ? higher : 0;
1430 }
1431
1432 //  find and load page at given level for given key
1433 //      leave page rd or wr locked as requested
1434
1435 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1436 {
1437 uid page_no = ROOT_page, prevpage = 0;
1438 uint drill = 0xff, slot;
1439 BtLatchSet *prevlatch;
1440 uint mode, prevmode;
1441 BtPool *prevpool;
1442
1443   //  start at root of btree and drill down
1444
1445   do {
1446         // determine lock mode of drill level
1447         mode = (drill == lvl) ? lock : BtLockRead; 
1448
1449         set->latch = bt_pinlatch (bt, page_no);
1450         set->page_no = page_no;
1451
1452         // pin page contents
1453
1454         if( set->pool = bt_pinpool (bt, page_no) )
1455                 set->page = bt_page (bt, set->pool, page_no);
1456         else
1457                 return 0;
1458
1459         // obtain access lock using lock chaining with Access mode
1460
1461         if( page_no > ROOT_page )
1462           bt_lockpage(BtLockAccess, set->latch);
1463
1464         //      release & unpin parent page
1465
1466         if( prevpage ) {
1467           bt_unlockpage(prevmode, prevlatch);
1468           bt_unpinlatch (prevlatch);
1469           bt_unpinpool (prevpool);
1470           prevpage = 0;
1471         }
1472
1473         // obtain read lock using lock chaining
1474
1475         bt_lockpage(mode, set->latch);
1476
1477         if( set->page->free )
1478                 return bt->err = BTERR_struct, 0;
1479
1480         if( page_no > ROOT_page )
1481           bt_unlockpage(BtLockAccess, set->latch);
1482
1483         // re-read and re-lock root after determining actual level of root
1484
1485         if( set->page->lvl != drill) {
1486                 if( set->page_no != ROOT_page )
1487                         return bt->err = BTERR_struct, 0;
1488                         
1489                 drill = set->page->lvl;
1490
1491                 if( lock != BtLockRead && drill == lvl ) {
1492                   bt_unlockpage(mode, set->latch);
1493                   bt_unpinlatch (set->latch);
1494                   bt_unpinpool (set->pool);
1495                   continue;
1496                 }
1497         }
1498
1499         prevpage = set->page_no;
1500         prevlatch = set->latch;
1501         prevpool = set->pool;
1502         prevmode = mode;
1503
1504         //  find key on page at this level
1505         //  and descend to requested level
1506
1507         if( set->page->kill )
1508           goto slideright;
1509
1510         if( slot = bt_findslot (set, key, len) ) {
1511           if( drill == lvl )
1512                 return slot;
1513
1514           while( slotptr(set->page, slot)->dead )
1515                 if( slot++ < set->page->cnt )
1516                         continue;
1517                 else
1518                         goto slideright;
1519
1520           page_no = bt_getid(valptr(set->page, slot)->value);
1521           drill--;
1522           continue;
1523         }
1524
1525         //  or slide right into next page
1526
1527 slideright:
1528         page_no = bt_getid(set->page->right);
1529
1530   } while( page_no );
1531
1532   // return error on end of right chain
1533
1534   bt->err = BTERR_struct;
1535   return 0;     // return error
1536 }
1537
1538 //      return page to free list
1539 //      page must be delete & write locked
1540
1541 void bt_freepage (BtDb *bt, BtPageSet *set)
1542 {
1543         //      lock allocation page
1544
1545         bt_spinwritelock (bt->mgr->latchmgr->lock);
1546
1547         //      store chain
1548         memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1549         bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1550         set->page->free = 1;
1551
1552         // unlock released page
1553
1554         bt_unlockpage (BtLockDelete, set->latch);
1555         bt_unlockpage (BtLockWrite, set->latch);
1556         bt_unpinlatch (set->latch);
1557         bt_unpinpool (set->pool);
1558
1559         // unlock allocation page
1560
1561         bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1562 }
1563
1564 //      a fence key was deleted from a page
1565 //      push new fence value upwards
1566
1567 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1568 {
1569 unsigned char leftkey[256], rightkey[256];
1570 unsigned char value[BtId];
1571 BtKey ptr;
1572
1573         //      remove the old fence value
1574
1575         ptr = keyptr(set->page, set->page->cnt);
1576         memcpy (rightkey, ptr, ptr->len + 1);
1577
1578         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1579         set->page->dirty = 1;
1580
1581         ptr = keyptr(set->page, set->page->cnt);
1582         memcpy (leftkey, ptr, ptr->len + 1);
1583
1584         bt_lockpage (BtLockParent, set->latch);
1585         bt_unlockpage (BtLockWrite, set->latch);
1586
1587         //      insert new (now smaller) fence key
1588
1589         bt_putid (value, set->page_no);
1590
1591         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId) )
1592           return bt->err;
1593
1594         //      now delete old fence key
1595
1596         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1) )
1597                 return bt->err;
1598
1599         bt_unlockpage (BtLockParent, set->latch);
1600         bt_unpinlatch(set->latch);
1601         bt_unpinpool (set->pool);
1602         return 0;
1603 }
1604
1605 //      root has a single child
1606 //      collapse a level from the tree
1607
1608 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1609 {
1610 BtPageSet child[1];
1611 uint idx;
1612
1613   // find the child entry and promote as new root contents
1614
1615   do {
1616         for( idx = 0; idx++ < root->page->cnt; )
1617           if( !slotptr(root->page, idx)->dead )
1618                 break;
1619
1620         child->page_no = bt_getid (valptr(root->page, idx)->value);
1621
1622         child->latch = bt_pinlatch (bt, child->page_no);
1623         bt_lockpage (BtLockDelete, child->latch);
1624         bt_lockpage (BtLockWrite, child->latch);
1625
1626         if( child->pool = bt_pinpool (bt, child->page_no) )
1627                 child->page = bt_page (bt, child->pool, child->page_no);
1628         else
1629                 return bt->err;
1630
1631         memcpy (root->page, child->page, bt->mgr->page_size);
1632         bt_freepage (bt, child);
1633
1634   } while( root->page->lvl > 1 && root->page->act == 1 );
1635
1636   bt_unlockpage (BtLockWrite, root->latch);
1637   bt_unpinlatch (root->latch);
1638   bt_unpinpool (root->pool);
1639   return 0;
1640 }
1641
1642 //  find and delete key on page by marking delete flag bit
1643 //  if page becomes empty, delete it from the btree
1644
1645 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1646 {
1647 unsigned char lowerfence[256], higherfence[256];
1648 uint slot, idx, found, fence;
1649 BtPageSet set[1], right[1];
1650 unsigned char value[BtId];
1651 BtKey ptr;
1652
1653         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1654                 ptr = keyptr(set->page, slot);
1655         else
1656                 return bt->err;
1657
1658         //      are we deleting a fence key?
1659
1660         fence = slot == set->page->cnt;
1661
1662         // if key is found delete it, otherwise ignore request
1663
1664         if( found = !keycmp (ptr, key, len) )
1665           if( found = slotptr(set->page, slot)->dead == 0 ) {
1666                 slotptr(set->page, slot)->dead = 1;
1667                 set->page->dirty = 1;
1668                 set->page->act--;
1669
1670                 // collapse empty slots beneath our fence
1671
1672                 while( idx = set->page->cnt - 1 )
1673                   if( slotptr(set->page, idx)->dead ) {
1674                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1675                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1676                   } else
1677                         break;
1678           }
1679
1680         //      did we delete a fence key in an upper level?
1681
1682         if( found && fence && lvl && set->page->act )
1683           if( bt_fixfence (bt, set, lvl) )
1684                 return bt->err;
1685           else
1686                 return bt->found = found, 0;
1687
1688         //      is this a collapsed root?
1689
1690         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1691           if( bt_collapseroot (bt, set) )
1692                 return bt->err;
1693           else
1694                 return bt->found = found, 0;
1695
1696         //      return if page is not empty
1697
1698         if( set->page->act ) {
1699                 bt_unlockpage(BtLockWrite, set->latch);
1700                 bt_unpinlatch (set->latch);
1701                 bt_unpinpool (set->pool);
1702                 return bt->found = found, 0;
1703         }
1704
1705         //      cache copy of fence key
1706         //      to post in parent
1707
1708         ptr = keyptr(set->page, set->page->cnt);
1709         memcpy (lowerfence, ptr, ptr->len + 1);
1710
1711         //      obtain lock on right page
1712
1713         right->page_no = bt_getid(set->page->right);
1714         right->latch = bt_pinlatch (bt, right->page_no);
1715         bt_lockpage (BtLockWrite, right->latch);
1716
1717         // pin page contents
1718
1719         if( right->pool = bt_pinpool (bt, right->page_no) )
1720                 right->page = bt_page (bt, right->pool, right->page_no);
1721         else
1722                 return 0;
1723
1724         if( right->page->kill )
1725                 return bt->err = BTERR_struct;
1726
1727         // pull contents of right peer into our empty page
1728
1729         memcpy (set->page, right->page, bt->mgr->page_size);
1730
1731         // cache copy of key to update
1732
1733         ptr = keyptr(right->page, right->page->cnt);
1734         memcpy (higherfence, ptr, ptr->len + 1);
1735
1736         // mark right page deleted and point it to left page
1737         //      until we can post parent updates
1738
1739         bt_putid (right->page->right, set->page_no);
1740         right->page->kill = 1;
1741
1742         bt_lockpage (BtLockParent, right->latch);
1743         bt_unlockpage (BtLockWrite, right->latch);
1744
1745         bt_lockpage (BtLockParent, set->latch);
1746         bt_unlockpage (BtLockWrite, set->latch);
1747
1748         // redirect higher key directly to our new node contents
1749
1750         bt_putid (value, set->page_no);
1751
1752         if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId) )
1753           return bt->err;
1754
1755         //      delete old lower key to our node
1756
1757         if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1) )
1758           return bt->err;
1759
1760         //      obtain delete and write locks to right node
1761
1762         bt_unlockpage (BtLockParent, right->latch);
1763         bt_lockpage (BtLockDelete, right->latch);
1764         bt_lockpage (BtLockWrite, right->latch);
1765         bt_freepage (bt, right);
1766
1767         bt_unlockpage (BtLockParent, set->latch);
1768         bt_unpinlatch (set->latch);
1769         bt_unpinpool (set->pool);
1770         bt->found = found;
1771         return 0;
1772 }
1773
1774 //      find key in leaf level and return number of value bytes
1775 //      or (-1) if not found
1776
1777 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1778 {
1779 BtPageSet set[1];
1780 uint  slot;
1781 BtKey ptr;
1782 BtVal val;
1783 int ret;
1784
1785         if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1786                 ptr = keyptr(set->page, slot);
1787         else
1788                 return 0;
1789
1790         // if key exists, return >= 0 value bytes copied
1791         //      otherwise return (-1)
1792
1793         if( !slotptr(set->page, slot)->dead && !keycmp (ptr, key, keylen) ) {
1794                 val = valptr (set->page,slot);
1795                 if( valmax > val->len )
1796                         valmax = val->len;
1797                 memcpy (value, val->value, valmax);
1798                 ret = valmax;
1799         } else
1800                 ret = -1;
1801
1802         bt_unlockpage (BtLockRead, set->latch);
1803         bt_unpinlatch (set->latch);
1804         bt_unpinpool (set->pool);
1805         return ret;
1806 }
1807
1808 //      check page for space available,
1809 //      clean if necessary and return
1810 //      0 - page needs splitting
1811 //      >0  new slot value
1812
1813 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1814 {
1815 uint nxt = bt->mgr->page_size;
1816 uint cnt = 0, idx = 0;
1817 uint max = page->cnt;
1818 uint newslot = max;
1819 BtKey key;
1820 BtVal val;
1821
1822         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
1823                 return slot;
1824
1825         //      skip cleanup if nothing to reclaim
1826
1827         if( !page->dirty )
1828                 return 0;
1829
1830         memcpy (bt->frame, page, bt->mgr->page_size);
1831
1832         // skip page info and set rest of page to zero
1833
1834         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1835         page->dirty = 0;
1836         page->act = 0;
1837
1838         // try cleaning up page first
1839         // by removing deleted keys
1840
1841         while( cnt++ < max ) {
1842                 if( cnt == slot )
1843                         newslot = idx + 1;
1844                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1845                         continue;
1846
1847                 // copy the value across
1848
1849                 val = valptr(bt->frame, cnt);
1850                 nxt -= val->len + 1;
1851                 ((unsigned char *)page)[nxt] = val->len;
1852                 memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
1853
1854                 // copy the key across
1855
1856                 key = keyptr(bt->frame, cnt);
1857                 nxt -= key->len + 1;
1858                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1859
1860                 // set up the slot
1861
1862                 slotptr(page, ++idx)->off = nxt;
1863
1864                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1865                         page->act++;
1866         }
1867
1868         page->min = nxt;
1869         page->cnt = idx;
1870
1871         //      see if page has enough space now, or does it need splitting?
1872
1873         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
1874                 return newslot;
1875
1876         return 0;
1877 }
1878
1879 // split the root and raise the height of the btree
1880
1881 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, uid page_no2)
1882 {
1883 uint nxt = bt->mgr->page_size;
1884 unsigned char value[BtId];
1885 uid left;
1886
1887         //  Obtain an empty page to use, and copy the current
1888         //  root contents into it, e.g. lower keys
1889
1890         if( !(left = bt_newpage(bt, root->page)) )
1891                 return bt->err;
1892
1893         // preserve the page info at the bottom
1894         // of higher keys and set rest to zero
1895
1896         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1897
1898         // insert lower keys page fence key on newroot page as first key
1899
1900         nxt -= BtId + 1;
1901         bt_putid (value, left);
1902         ((unsigned char *)root->page)[nxt] = BtId;
1903         memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
1904
1905         nxt -= *leftkey + 1;
1906         memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
1907         slotptr(root->page, 1)->off = nxt;
1908         
1909         // insert stopper key on newroot page
1910         // and increase the root height
1911
1912         nxt -= 3 + BtId + 1;
1913         ((unsigned char *)root->page)[nxt] = 2;
1914         ((unsigned char *)root->page)[nxt+1] = 0xff;
1915         ((unsigned char *)root->page)[nxt+2] = 0xff;
1916
1917         bt_putid (value, page_no2);
1918         ((unsigned char *)root->page)[nxt+3] = BtId;
1919         memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
1920         slotptr(root->page, 2)->off = nxt;
1921
1922         bt_putid(root->page->right, 0);
1923         root->page->min = nxt;          // reset lowest used offset and key count
1924         root->page->cnt = 2;
1925         root->page->act = 2;
1926         root->page->lvl++;
1927
1928         // release and unpin root
1929
1930         bt_unlockpage(BtLockWrite, root->latch);
1931         bt_unpinlatch (root->latch);
1932         bt_unpinpool (root->pool);
1933         return 0;
1934 }
1935
1936 //  split already locked full node
1937 //      return unlocked.
1938
1939 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1940 {
1941 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1942 unsigned char fencekey[256], rightkey[256];
1943 unsigned char value[BtId];
1944 uint lvl = set->page->lvl;
1945 BtPageSet right[1];
1946 uint prev;
1947 BtKey key;
1948 BtVal val;
1949
1950         //  split higher half of keys to bt->frame
1951
1952         memset (bt->frame, 0, bt->mgr->page_size);
1953         max = set->page->cnt;
1954         cnt = max / 2;
1955         idx = 0;
1956
1957         while( cnt++ < max ) {
1958                 val = valptr(set->page, cnt);
1959                 nxt -= val->len + 1;
1960                 ((unsigned char *)bt->frame)[nxt] = val->len;
1961                 memcpy ((unsigned char *)bt->frame + nxt + 1, val->value, val->len);
1962
1963                 key = keyptr(set->page, cnt);
1964                 nxt -= key->len + 1;
1965                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1966
1967                 slotptr(bt->frame, ++idx)->off = nxt;
1968
1969                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1970                         bt->frame->act++;
1971         }
1972
1973         // remember existing fence key for new page to the right
1974
1975         memcpy (rightkey, key, key->len + 1);
1976
1977         bt->frame->bits = bt->mgr->page_bits;
1978         bt->frame->min = nxt;
1979         bt->frame->cnt = idx;
1980         bt->frame->lvl = lvl;
1981
1982         // link right node
1983
1984         if( set->page_no > ROOT_page )
1985                 memcpy (bt->frame->right, set->page->right, BtId);
1986
1987         //      get new free page and write higher keys to it.
1988
1989         if( !(right->page_no = bt_newpage(bt, bt->frame)) )
1990                 return bt->err;
1991
1992         //      update lower keys to continue in old page
1993
1994         memcpy (bt->frame, set->page, bt->mgr->page_size);
1995         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1996         nxt = bt->mgr->page_size;
1997         set->page->dirty = 0;
1998         set->page->act = 0;
1999         cnt = 0;
2000         idx = 0;
2001
2002         //  assemble page of smaller keys
2003
2004         while( cnt++ < max / 2 ) {
2005                 val = valptr(bt->frame, cnt);
2006                 nxt -= val->len + 1;
2007                 ((unsigned char *)set->page)[nxt] = val->len;
2008                 memcpy ((unsigned char *)set->page + nxt + 1, val->value, val->len);
2009
2010                 key = keyptr(bt->frame, cnt);
2011                 nxt -= key->len + 1;
2012                 memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
2013                 slotptr(set->page, ++idx)->off = nxt;
2014                 set->page->act++;
2015         }
2016
2017         // remember fence key for smaller page
2018
2019         memcpy(fencekey, key, key->len + 1);
2020
2021         bt_putid(set->page->right, right->page_no);
2022         set->page->min = nxt;
2023         set->page->cnt = idx;
2024
2025         // if current page is the root page, split it
2026
2027         if( set->page_no == ROOT_page )
2028                 return bt_splitroot (bt, set, fencekey, right->page_no);
2029
2030         // insert new fences in their parent pages
2031
2032         right->latch = bt_pinlatch (bt, right->page_no);
2033         bt_lockpage (BtLockParent, right->latch);
2034
2035         bt_lockpage (BtLockParent, set->latch);
2036         bt_unlockpage (BtLockWrite, set->latch);
2037
2038         // insert new fence for reformulated left block of smaller keys
2039
2040         bt_putid (value, set->page_no);
2041
2042         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId) )
2043                 return bt->err;
2044
2045         // switch fence for right block of larger keys to new right page
2046
2047         bt_putid (value, right->page_no);
2048
2049         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId) )
2050                 return bt->err;
2051
2052         bt_unlockpage (BtLockParent, set->latch);
2053         bt_unpinlatch (set->latch);
2054         bt_unpinpool (set->pool);
2055
2056         bt_unlockpage (BtLockParent, right->latch);
2057         bt_unpinlatch (right->latch);
2058         return 0;
2059 }
2060 //      install new key and value onto page
2061 //      page must already be checked for
2062 //      adequate space
2063
2064 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen)
2065 {
2066 BtSlot *node;
2067 uint idx;
2068
2069         // copy value onto page
2070
2071         set->page->min -= vallen + 1; // reset lowest used offset
2072         ((unsigned char *)set->page)[set->page->min] = vallen;
2073         memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
2074
2075         // copy key onto page
2076
2077         set->page->min -= keylen + 1; // reset lowest used offset
2078         ((unsigned char *)set->page)[set->page->min] = keylen;
2079         memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
2080
2081         //      find first empty slot
2082
2083         for( idx = slot; idx < set->page->cnt; idx++ )
2084           if( slotptr(set->page, idx)->dead )
2085                 break;
2086
2087         // now insert key into array before slot
2088
2089         if( idx == set->page->cnt )
2090                 idx += 1, set->page->cnt += 1;
2091
2092         set->page->act++;
2093
2094         while( idx > slot )
2095                 *slotptr(set->page, idx) = *slotptr(set->page, idx - 1), idx--;
2096
2097         //      fill in new slot
2098
2099         node = slotptr(set->page, slot);
2100         node->off = set->page->min;
2101         node->dead = 0;
2102
2103         bt_unlockpage (BtLockWrite, set->latch);
2104         bt_unpinlatch (set->latch);
2105         bt_unpinpool (set->pool);
2106         return 0;
2107 }
2108
2109 //  Insert new key into the btree at given level.
2110 //      either add a new key or update/add an existing one
2111
2112 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen)
2113 {
2114 BtPageSet set[1];
2115 uint slot, idx;
2116 uid sequence;
2117 BtKey ptr;
2118 BtVal val;
2119
2120   while( 1 ) { // find the page and slot for the current key
2121         if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
2122                 ptr = keyptr(set->page, slot);
2123         else {
2124                 if( !bt->err )
2125                         bt->err = BTERR_ovflw;
2126                 return bt->err;
2127         }
2128
2129         //      check for adequate space on the page
2130         //      and insert the new key before slot.
2131
2132         if( keycmp (ptr, key, keylen) ) {
2133           if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
2134                 if( bt_splitpage (bt, set) )
2135                   return bt->err;
2136                 else
2137                   continue;
2138
2139           return bt_insertslot (bt, set, slot, key, keylen, value, vallen);
2140         }
2141
2142         // if key already exists, update value and return
2143
2144         if( val = valptr(set->page, slot), val->len >= vallen ) {
2145                 if( slotptr(set->page, slot)->dead )
2146                         set->page->act++;
2147                 slotptr(set->page, slot)->dead = 0;
2148                 set->page->dirty = 1;
2149                 val->len = vallen;
2150                 memcpy (val->value, value, vallen);
2151                 bt_unlockpage(BtLockWrite, set->latch);
2152                 bt_unpinlatch (set->latch);
2153                 bt_unpinpool (set->pool);
2154                 return 0;
2155         }
2156
2157         //      new update value doesn't fit in existing value area
2158
2159         if( !slotptr(set->page, slot)->dead )
2160                 set->page->dirty = 1;
2161         else {
2162                 slotptr(set->page, slot)->dead = 0;
2163                 set->page->act++;
2164         }
2165
2166         if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
2167           if( bt_splitpage (bt, set) )
2168                 return bt->err;
2169           else
2170                 continue;
2171
2172         set->page->min -= vallen + 1;
2173         ((unsigned char *)set->page)[set->page->min] = vallen;
2174         memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen);
2175
2176         set->page->min -= keylen + 1;
2177         ((unsigned char *)set->page)[set->page->min] = keylen;
2178         memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen);
2179         
2180         slotptr(set->page, slot)->off = set->page->min;
2181         bt_unlockpage(BtLockWrite, set->latch);
2182         bt_unpinlatch (set->latch);
2183         bt_unpinpool (set->pool);
2184         return 0;
2185   }
2186   return 0;
2187 }
2188
2189 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2190 {
2191 BtPageSet set[1];
2192 uint slot;
2193
2194         // cache page for retrieval
2195
2196         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2197           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2198         else
2199           return 0;
2200
2201         bt->cursor_page = set->page_no;
2202
2203         bt_unlockpage(BtLockRead, set->latch);
2204         bt_unpinlatch (set->latch);
2205         bt_unpinpool (set->pool);
2206         return slot;
2207 }
2208
2209 //  return next slot for cursor page
2210 //  or slide cursor right into next page
2211
2212 uint bt_nextkey (BtDb *bt, uint slot)
2213 {
2214 BtPageSet set[1];
2215 uid right;
2216
2217   do {
2218         right = bt_getid(bt->cursor->right);
2219
2220         while( slot++ < bt->cursor->cnt )
2221           if( slotptr(bt->cursor,slot)->dead )
2222                 continue;
2223           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2224                 return slot;
2225           else
2226                 break;
2227
2228         if( !right )
2229                 break;
2230
2231         bt->cursor_page = right;
2232
2233         if( set->pool = bt_pinpool (bt, right) )
2234                 set->page = bt_page (bt, set->pool, right);
2235         else
2236                 return 0;
2237
2238         set->latch = bt_pinlatch (bt, right);
2239     bt_lockpage(BtLockRead, set->latch);
2240
2241         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2242
2243         bt_unlockpage(BtLockRead, set->latch);
2244         bt_unpinlatch (set->latch);
2245         bt_unpinpool (set->pool);
2246         slot = 0;
2247
2248   } while( 1 );
2249
2250   return bt->err = 0;
2251 }
2252
2253 BtKey bt_key(BtDb *bt, uint slot)
2254 {
2255         return keyptr(bt->cursor, slot);
2256 }
2257
2258 BtVal bt_val(BtDb *bt, uint slot)
2259 {
2260         return valptr(bt->cursor,slot);
2261 }
2262
2263 #ifdef STANDALONE
2264
2265 #ifndef unix
2266 double getCpuTime(int type)
2267 {
2268 FILETIME crtime[1];
2269 FILETIME xittime[1];
2270 FILETIME systime[1];
2271 FILETIME usrtime[1];
2272 SYSTEMTIME timeconv[1];
2273 double ans = 0;
2274
2275         memset (timeconv, 0, sizeof(SYSTEMTIME));
2276
2277         switch( type ) {
2278         case 0:
2279                 GetSystemTimeAsFileTime (xittime);
2280                 FileTimeToSystemTime (xittime, timeconv);
2281                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2282                 break;
2283         case 1:
2284                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2285                 FileTimeToSystemTime (usrtime, timeconv);
2286                 break;
2287         case 2:
2288                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2289                 FileTimeToSystemTime (systime, timeconv);
2290                 break;
2291         }
2292
2293         ans += (double)timeconv->wHour * 3600;
2294         ans += (double)timeconv->wMinute * 60;
2295         ans += (double)timeconv->wSecond;
2296         ans += (double)timeconv->wMilliseconds / 1000;
2297         return ans;
2298 }
2299 #else
2300 #include <time.h>
2301 #include <sys/resource.h>
2302
2303 double getCpuTime(int type)
2304 {
2305 struct rusage used[1];
2306 struct timeval tv[1];
2307
2308         switch( type ) {
2309         case 0:
2310                 gettimeofday(tv, NULL);
2311                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2312
2313         case 1:
2314                 getrusage(RUSAGE_SELF, used);
2315                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2316
2317         case 2:
2318                 getrusage(RUSAGE_SELF, used);
2319                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2320         }
2321
2322         return 0;
2323 }
2324 #endif
2325
2326 uint bt_latchaudit (BtDb *bt)
2327 {
2328 ushort idx, hashidx;
2329 uid next, page_no;
2330 BtLatchSet *latch;
2331 uint cnt = 0;
2332 BtKey ptr;
2333
2334 #ifdef unix
2335         posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2336 #endif
2337         if( *(ushort *)(bt->mgr->latchmgr->lock) )
2338                 fprintf(stderr, "Alloc page locked\n");
2339         *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2340
2341         for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2342                 latch = bt->mgr->latchsets + idx;
2343                 if( *latch->readwr->rin & MASK )
2344                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2345                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2346
2347                 if( *latch->access->rin & MASK )
2348                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2349                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2350
2351                 if( *latch->parent->rin & MASK )
2352                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2353                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2354
2355                 if( latch->pin ) {
2356                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2357                         latch->pin = 0;
2358                 }
2359         }
2360
2361         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2362           if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2363                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2364
2365           *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2366
2367           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2368                 latch = bt->mgr->latchsets + idx;
2369                 if( *(ushort *)latch->busy )
2370                         fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2371                 *(ushort *)latch->busy = 0;
2372                 if( latch->pin )
2373                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2374           } while( idx = latch->next );
2375         }
2376
2377         next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2378         page_no = LEAF_page;
2379
2380         while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2381         off64_t off = page_no << bt->mgr->page_bits;
2382 #ifdef unix
2383                 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2384 #else
2385                 DWORD amt[1];
2386
2387                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2388
2389                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2390                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2391
2392                   if( *amt <  bt->mgr->page_size )
2393                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2394 #endif
2395                 if( !bt->frame->free ) {
2396                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2397                   ptr = keyptr(bt->frame, idx+1);
2398                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2399                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2400                  }
2401                  if( !bt->frame->lvl )
2402                         cnt += bt->frame->act;
2403                 }
2404                 if( page_no > LEAF_page )
2405                         next = page_no + 1;
2406                 page_no = next;
2407         }
2408         return cnt - 1;
2409 }
2410
2411 typedef struct {
2412         char idx;
2413         char *type;
2414         char *infile;
2415         BtMgr *mgr;
2416         int num;
2417 } ThreadArg;
2418
2419 //  standalone program to index file of keys
2420 //  then list them onto std-out
2421
2422 #ifdef unix
2423 void *index_file (void *arg)
2424 #else
2425 uint __stdcall index_file (void *arg)
2426 #endif
2427 {
2428 int line = 0, found = 0, cnt = 0;
2429 uid next, page_no = LEAF_page;  // start on first page of leaves
2430 unsigned char key[256];
2431 ThreadArg *args = arg;
2432 int ch, len = 0, slot;
2433 BtPageSet set[1];
2434 BtKey ptr;
2435 BtDb *bt;
2436 FILE *in;
2437
2438         bt = bt_open (args->mgr);
2439
2440         switch(args->type[0] | 0x20)
2441         {
2442         case 'a':
2443                 fprintf(stderr, "started latch mgr audit\n");
2444                 cnt = bt_latchaudit (bt);
2445                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2446                 break;
2447
2448         case 'p':
2449                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2450                 if( in = fopen (args->infile, "rb") )
2451                   while( ch = getc(in), ch != EOF )
2452                         if( ch == '\n' )
2453                         {
2454                           line++;
2455
2456                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10) )
2457                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2458                           len = 0;
2459                         }
2460                         else if( len < 255 )
2461                                 key[len++] = ch;
2462                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2463                 break;
2464
2465         case 'w':
2466                 fprintf(stderr, "started indexing for %s\n", args->infile);
2467                 if( in = fopen (args->infile, "rb") )
2468                   while( ch = getc(in), ch != EOF )
2469                         if( ch == '\n' )
2470                         {
2471                           line++;
2472
2473                           if( args->num == 1 )
2474                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2475
2476                           else if( args->num )
2477                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2478
2479                           if( bt_insertkey (bt, key, len, 0, NULL, 0) )
2480                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2481                           len = 0;
2482                         }
2483                         else if( len < 255 )
2484                                 key[len++] = ch;
2485                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2486                 break;
2487
2488         case 'd':
2489                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2490                 if( in = fopen (args->infile, "rb") )
2491                   while( ch = getc(in), ch != EOF )
2492                         if( ch == '\n' )
2493                         {
2494                           line++;
2495                           if( args->num == 1 )
2496                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2497
2498                           else if( args->num )
2499                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2500
2501                           if( (args->type[1] | 0x20) == 'p' )
2502                                 len = 10;
2503                           if( bt_deletekey (bt, key, len, 0) )
2504                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2505                           len = 0;
2506                         }
2507                         else if( len < 255 )
2508                                 key[len++] = ch;
2509                 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
2510                 break;
2511
2512         case 'f':
2513                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2514                 if( in = fopen (args->infile, "rb") )
2515                   while( ch = getc(in), ch != EOF )
2516                         if( ch == '\n' )
2517                         {
2518                           line++;
2519                           if( args->num == 1 )
2520                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2521
2522                           else if( args->num )
2523                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2524
2525                           if( (args->type[1] | 0x20) == 'p' )
2526                                 len = 10;
2527                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2528                                 found++;
2529                           else if( bt->err )
2530                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2531                           len = 0;
2532                         }
2533                         else if( len < 255 )
2534                                 key[len++] = ch;
2535                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2536                 break;
2537
2538         case 's':
2539                 fprintf(stderr, "started scanning\n");
2540                 do {
2541                         if( set->pool = bt_pinpool (bt, page_no) )
2542                                 set->page = bt_page (bt, set->pool, page_no);
2543                         else
2544                                 break;
2545                         set->latch = bt_pinlatch (bt, page_no);
2546                         bt_lockpage (BtLockRead, set->latch);
2547                         next = bt_getid (set->page->right);
2548                         cnt += set->page->act;
2549
2550                         for( slot = 0; slot++ < set->page->cnt; )
2551                          if( next || slot < set->page->cnt )
2552                           if( !slotptr(set->page, slot)->dead ) {
2553                                 ptr = keyptr(set->page, slot);
2554                                 fwrite (ptr->key, ptr->len, 1, stdout);
2555                                 fputc ('\n', stdout);
2556                           }
2557
2558                         bt_unlockpage (BtLockRead, set->latch);
2559                         bt_unpinlatch (set->latch);
2560                         bt_unpinpool (set->pool);
2561                 } while( page_no = next );
2562
2563                 cnt--;  // remove stopper key
2564                 fprintf(stderr, " Total keys read %d\n", cnt);
2565                 break;
2566
2567         case 'c':
2568 #ifdef unix
2569                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2570 #endif
2571                 fprintf(stderr, "started counting\n");
2572                 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2573                 page_no = LEAF_page;
2574
2575                 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2576                 uid off = page_no << bt->mgr->page_bits;
2577 #ifdef unix
2578                   pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2579 #else
2580                 DWORD amt[1];
2581
2582                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2583
2584                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2585                         return bt->err = BTERR_map;
2586
2587                   if( *amt <  bt->mgr->page_size )
2588                         return bt->err = BTERR_map;
2589 #endif
2590                         if( !bt->frame->free && !bt->frame->lvl )
2591                                 cnt += bt->frame->act;
2592                         if( page_no > LEAF_page )
2593                                 next = page_no + 1;
2594                         page_no = next;
2595                 }
2596                 
2597                 cnt--;  // remove stopper key
2598                 fprintf(stderr, " Total keys read %d\n", cnt);
2599                 break;
2600         }
2601
2602         bt_close (bt);
2603 #ifdef unix
2604         return NULL;
2605 #else
2606         return 0;
2607 #endif
2608 }
2609
2610 typedef struct timeval timer;
2611
2612 int main (int argc, char **argv)
2613 {
2614 int idx, cnt, len, slot, err;
2615 int segsize, bits = 16;
2616 double start, stop;
2617 #ifdef unix
2618 pthread_t *threads;
2619 #else
2620 HANDLE *threads;
2621 #endif
2622 ThreadArg *args;
2623 uint poolsize = 0;
2624 float elapsed;
2625 int num = 0;
2626 char key[1];
2627 BtMgr *mgr;
2628 BtKey ptr;
2629 BtDb *bt;
2630
2631         if( argc < 3 ) {
2632                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2633                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2634                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2635                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2636                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2637                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2638                 exit(0);
2639         }
2640
2641         start = getCpuTime(0);
2642
2643         if( argc > 3 )
2644                 bits = atoi(argv[3]);
2645
2646         if( argc > 4 )
2647                 poolsize = atoi(argv[4]);
2648
2649         if( !poolsize )
2650                 fprintf (stderr, "Warning: no mapped_pool\n");
2651
2652         if( poolsize > 65535 )
2653                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2654
2655         if( argc > 5 )
2656                 segsize = atoi(argv[5]);
2657         else
2658                 segsize = 4;    // 16 pages per mmap segment
2659
2660         if( argc > 6 )
2661                 num = atoi(argv[6]);
2662
2663         cnt = argc - 7;
2664 #ifdef unix
2665         threads = malloc (cnt * sizeof(pthread_t));
2666 #else
2667         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2668 #endif
2669         args = malloc (cnt * sizeof(ThreadArg));
2670
2671 //      mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2672         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, 8191);
2673
2674         if( !mgr ) {
2675                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2676                 exit (1);
2677         }
2678
2679         //      fire off threads
2680
2681         for( idx = 0; idx < cnt; idx++ ) {
2682                 args[idx].infile = argv[idx + 7];
2683                 args[idx].type = argv[2];
2684                 args[idx].mgr = mgr;
2685                 args[idx].num = num;
2686                 args[idx].idx = idx;
2687 #ifdef unix
2688                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2689                         fprintf(stderr, "Error creating thread %d\n", err);
2690 #else
2691                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2692 #endif
2693         }
2694
2695         //      wait for termination
2696
2697 #ifdef unix
2698         for( idx = 0; idx < cnt; idx++ )
2699                 pthread_join (threads[idx], NULL);
2700 #else
2701         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2702
2703         for( idx = 0; idx < cnt; idx++ )
2704                 CloseHandle(threads[idx]);
2705
2706 #endif
2707         elapsed = getCpuTime(0) - start;
2708         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2709         elapsed = getCpuTime(1);
2710         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2711         elapsed = getCpuTime(2);
2712         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2713
2714         bt_mgrclose (mgr);
2715 }
2716
2717 #endif  //STANDALONE