]> pd.if.org Git - btree/blob - fosterbtreef.c
Make bt_splitpage more conservative wrt posting fence keys
[btree] / fosterbtreef.c
1 // foster btree version f
2 // 29 JAN 2014
3
4 // author: karl malbrain, malbrain@cal.berkeley.edu
5
6 /*
7 This work, including the source code, documentation
8 and related data, is placed into the public domain.
9
10 The orginal author is Karl Malbrain.
11
12 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
13 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
14 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
15 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
16 RESULTING FROM THE USE, MODIFICATION, OR
17 REDISTRIBUTION OF THIS SOFTWARE.
18 */
19
20 // Please see the project home page for documentation
21 // code.google.com/p/high-concurrency-btree
22
23 #define _FILE_OFFSET_BITS 64
24 #define _LARGEFILE64_SOURCE
25
26 #ifdef linux
27 #define _GNU_SOURCE
28 #endif
29
30 #ifdef unix
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <fcntl.h>
35 #include <sys/time.h>
36 #include <sys/mman.h>
37 #include <errno.h>
38 #include <pthread.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #include <process.h>
47 #include <intrin.h>
48 #endif
49
50 #include <memory.h>
51 #include <string.h>
52
53 typedef unsigned long long      uid;
54
55 #ifndef unix
56 typedef unsigned long long      off64_t;
57 typedef unsigned short          ushort;
58 typedef unsigned int            uint;
59 #endif
60
61 #define BT_ro 0x6f72    // ro
62 #define BT_rw 0x7772    // rw
63
64 #define BT_latchtable   128                                     // number of latch manager slots
65
66 #define BT_maxbits              24                                      // maximum page size in bits
67 #define BT_minbits              9                                       // minimum page size in bits
68 #define BT_minpage              (1 << BT_minbits)       // minimum page size
69 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
70
71 /*
72 There are five lock types for each node in three independent sets: 
73 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
74 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
75 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
76 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
77 5. (set 3) ParentLock: Exclusive. Have parent adopt/delete maximum foster child from the node.
78 */
79
80 typedef enum{
81         BtLockAccess,
82         BtLockDelete,
83         BtLockRead,
84         BtLockWrite,
85         BtLockParent
86 }BtLock;
87
88 //      Define the length of the page and key pointers
89
90 #define BtId 6
91
92 //      Page key slot definition.
93
94 //      If BT_maxbits is 15 or less, you can save 4 bytes
95 //      for each key stored by making the first two uints
96 //      into ushorts.  You can also save 4 bytes by removing
97 //      the tod field from the key.
98
99 //      Keys are marked dead, but remain on the page until
100 //      cleanup is called. The fence key (highest key) for
101 //      the page is always present, even after cleanup.
102
103 typedef struct {
104         uint off:BT_maxbits;            // page offset for key start
105         uint dead:1;                            // set for deleted key
106         uint tod;                                       // time-stamp for key
107         unsigned char id[BtId];         // id associated with key
108 } BtSlot;
109
110 //      The key structure occupies space at the upper end of
111 //      each page.  It's a length byte followed by the value
112 //      bytes.
113
114 typedef struct {
115         unsigned char len;
116         unsigned char key[1];
117 } *BtKey;
118
119 //      The first part of an index page.
120 //      It is immediately followed
121 //      by the BtSlot array of keys.
122
123 typedef struct Page {
124         volatile uint cnt;                      // count of keys in page
125         volatile uint act;                      // count of active keys
126         volatile uint min;                      // next key offset
127         volatile uint foster;           // count of foster children
128         unsigned char bits;                     // page size in bits
129         unsigned char lvl:7;            // level of page
130         unsigned char dirty:1;          // page needs to be cleaned
131         unsigned char right[BtId];      // page number to right
132 } *BtPage;
133
134 //      mode & definition for hash latch implementation
135
136 enum {
137         Mutex = 1,
138         Write = 2,
139         Pending = 4,
140         Share = 8
141 } LockMode;
142
143 // mutex locks the other fields
144 // exclusive is set for write access
145 // share is count of read accessors
146
147 typedef struct {
148         volatile ushort mutex:1;
149         volatile ushort exclusive:1;
150         volatile ushort pending:1;
151         volatile ushort share:13;
152 } BtSpinLatch;
153
154 //  hash table entries
155
156 typedef struct {
157         BtSpinLatch latch[1];
158         volatile ushort slot;           // Latch table entry at head of chain
159 } BtHashEntry;
160
161 //      latch manager table structure
162
163 typedef struct {
164         BtSpinLatch readwr[1];          // read/write page lock
165         BtSpinLatch access[1];          // Access Intent/Page delete
166         BtSpinLatch parent[1];          // adoption of foster children
167         BtSpinLatch busy[1];            // slot is being moved between chains
168         volatile ushort next;           // next entry in hash table chain
169         volatile ushort prev;           // prev entry in hash table chain
170         volatile ushort pin;            // number of outstanding locks
171         volatile ushort hash;           // hash slot entry is under
172         volatile uid page_no;           // latch set page number
173 } BtLatchSet;
174
175 //      The memory mapping pool table buffer manager entry
176
177 typedef struct {
178         unsigned long long int lru;     // number of times accessed
179         uid  basepage;                          // mapped base page number
180         char *map;                                      // mapped memory pointer
181         ushort pin;                                     // mapped page pin counter
182         ushort slot;                            // slot index in this array
183         void *hashprev;                         // previous pool entry for the same hash idx
184         void *hashnext;                         // next pool entry for the same hash idx
185 #ifndef unix
186         HANDLE hmap;                            // Windows memory mapping handle
187 #endif
188 } BtPool;
189
190 //      structure for latch manager on ALLOC_page
191
192 typedef struct {
193         struct Page alloc[2];           // next & free page_nos in right ptr
194         BtSpinLatch lock[1];            // allocation area lite latch
195         ushort latchdeployed;           // highest number of latch entries deployed
196         ushort nlatchpage;                      // number of latch pages at BT_latch
197         ushort latchtotal;                      // number of page latch entries
198         ushort latchhash;                       // number of latch hash table slots
199         ushort latchvictim;                     // next latch entry to examine
200         BtHashEntry table[0];           // the hash table
201 } BtLatchMgr;
202
203 //      The object structure for Btree access
204
205 typedef struct {
206         uint page_size;                         // page size    
207         uint page_bits;                         // page size in bits    
208         uint seg_bits;                          // seg size in pages in bits
209         uint mode;                                      // read-write mode
210 #ifdef unix
211         int idx;
212 #else
213         HANDLE idx;
214 #endif
215         ushort poolcnt;                         // highest page pool node in use
216         ushort poolmax;                         // highest page pool node allocated
217         ushort poolmask;                        // total number of pages in mmap segment - 1
218         ushort hashsize;                        // size of Hash Table for pool entries
219         ushort evicted;                         // last evicted hash table slot
220         ushort *hash;                           // hash table of pool entries
221         BtPool *pool;                           // memory pool page segments
222         BtSpinLatch *latch;                     // latches for pool hash slots
223         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
224         BtLatchSet *latchsets;          // mapped latch set from latch pages
225 #ifndef unix
226         HANDLE halloc;                          // allocation and latch table handle
227 #endif
228 } BtMgr;
229
230 typedef struct {
231         BtMgr *mgr;                     // buffer manager for thread
232         BtPage cursor;          // cached frame for start/next (never mapped)
233         BtPage frame;           // spare frame for the page split (never mapped)
234         BtPage zero;            // page frame for zeroes at end of file
235         BtPage page;            // current page
236         uid page_no;            // current page number  
237         uid cursor_page;        // current cursor page number   
238         BtLatchSet *set;        // current page latch set
239         BtPool *pool;           // current page pool
240         unsigned char *mem;     // frame, cursor, page memory buffer
241         int foster;                     // last search was to foster child
242         int found;                      // last delete was found
243         int err;                        // last error
244 } BtDb;
245
246 typedef enum {
247         BTERR_ok = 0,
248         BTERR_struct,
249         BTERR_ovflw,
250         BTERR_lock,
251         BTERR_map,
252         BTERR_wrt,
253         BTERR_hash,
254         BTERR_latch
255 } BTERR;
256
257 // B-Tree functions
258 extern void bt_close (BtDb *bt);
259 extern BtDb *bt_open (BtMgr *mgr);
260 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl);
261 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len);
262 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
263 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
264 extern uint bt_nextkey   (BtDb *bt, uint slot);
265
266 //      internal functions
267 BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no);
268 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot);
269 BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl);
270
271 //      manager functions
272 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
273 void bt_mgrclose (BtMgr *mgr);
274
275 //  Helper functions to return cursor slot values
276
277 extern BtKey bt_key (BtDb *bt, uint slot);
278 extern uid bt_uid (BtDb *bt, uint slot);
279 extern uint bt_tod (BtDb *bt, uint slot);
280
281 //  BTree page number constants
282 #define ALLOC_page              0       // allocation & lock manager hash table
283 #define ROOT_page               1       // root of the btree
284 #define LEAF_page               2       // first page of leaves
285 #define LATCH_page              3       // pages for lock manager
286
287 //      Number of levels to create in a new BTree
288
289 #define MIN_lvl                 2
290
291 //  The page is allocated from low and hi ends.
292 //  The key offsets and row-id's are allocated
293 //  from the bottom, while the text of the key
294 //  is allocated from the top.  When the two
295 //  areas meet, the page is split into two.
296
297 //  A key consists of a length byte, two bytes of
298 //  index number (0 - 65534), and up to 253 bytes
299 //  of key value.  Duplicate keys are discarded.
300 //  Associated with each key is a 48 bit row-id.
301
302 //  The b-tree root is always located at page 1.
303 //      The first leaf page of level zero is always
304 //      located on page 2.
305
306 //      When to root page fills, it is split in two and
307 //      the tree height is raised by a new root at page
308 //      one with two keys.
309
310 //      Deleted keys are marked with a dead bit until
311 //      page cleanup The fence key for a node is always
312 //      present, even after deletion and cleanup.
313
314 //  Groups of pages called segments from the btree are
315 //  cached with memory mapping. A hash table is used to keep
316 //  track of the cached segments.  This behaviour is controlled
317 //  by the cache block size parameter to bt_open.
318
319 //  To achieve maximum concurrency one page is locked at a time
320 //  as the tree is traversed to find leaf key in question.
321
322 //      An adoption traversal leaves the parent node locked as the
323 //      tree is traversed to the level in quesiton.
324
325 //  Page 0 is dedicated to lock for new page extensions,
326 //      and chains empty pages together for reuse.
327
328 //      Empty pages are chained together through the ALLOC page and reused.
329
330 //      Access macros to address slot and key values from the page
331
332 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
333 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
334
335 void bt_putid(unsigned char *dest, uid id)
336 {
337 int i = BtId;
338
339         while( i-- )
340                 dest[i] = (unsigned char)id, id >>= 8;
341 }
342
343 uid bt_getid(unsigned char *src)
344 {
345 uid id = 0;
346 int i;
347
348         for( i = 0; i < BtId; i++ )
349                 id <<= 8, id |= *src++; 
350
351         return id;
352 }
353
354 //      wait until write lock mode is clear
355 //      and add 1 to the share count
356
357 void bt_spinreadlock(BtSpinLatch *latch)
358 {
359 ushort prev;
360
361   do {
362 #ifdef unix
363         while( __sync_fetch_and_or((ushort *)latch, Mutex) & Mutex )
364                 sched_yield();
365 #else
366         while( _InterlockedOr16((ushort *)latch, Mutex) & Mutex )
367                 SwitchToThread();
368 #endif
369
370         //  see if exclusive request is granted or pending
371
372         if( prev = !(latch->exclusive | latch->pending) )
373 #ifdef unix
374                 __sync_fetch_and_add((ushort *)latch, Share);
375 #else
376                 _InterlockedExchangeAdd16 ((ushort *)latch, Share);
377 #endif
378
379 #ifdef unix
380         __sync_fetch_and_and ((ushort *)latch, ~Mutex);
381 #else
382         _InterlockedAnd16((ushort *)latch, ~Mutex);
383 #endif
384         if( prev )
385                 return;
386 #ifdef  unix
387   } while( sched_yield(), 1 );
388 #else
389   } while( SwitchToThread(), 1 );
390 #endif
391 }
392
393 //      wait for other read and write latches to relinquish
394
395 void bt_spinwritelock(BtSpinLatch *latch)
396 {
397   do {
398 #ifdef  unix
399         while( __sync_fetch_and_or((ushort *)latch, Mutex | Pending) & Mutex )
400                 sched_yield();
401 #else
402         while( _InterlockedOr16((ushort *)latch, Mutex | Pending) & Mutex )
403                 SwitchToThread();
404 #endif
405         if( !(latch->share | latch->exclusive) ) {
406 #ifdef unix
407                 __sync_fetch_and_or((ushort *)latch, Write);
408                 __sync_fetch_and_and ((ushort *)latch, ~(Mutex | Pending));
409 #else
410                 _InterlockedOr16((ushort *)latch, Write);
411                 _InterlockedAnd16((ushort *)latch, ~(Mutex | Pending));
412 #endif
413                 return;
414         }
415
416 #ifdef unix
417         __sync_fetch_and_and ((ushort *)latch, ~Mutex);
418         sched_yield();
419 #else
420         _InterlockedAnd16((ushort *)latch, ~Mutex);
421         SwitchToThread();
422 #endif
423   } while( 1 );
424 }
425
426 //      try to obtain write lock
427
428 //      return 1 if obtained,
429 //              0 otherwise
430
431 int bt_spinwritetry(BtSpinLatch *latch)
432 {
433 ushort prev;
434
435 #ifdef unix
436         if( prev = __sync_fetch_and_or((ushort *)latch, Mutex), prev & Mutex )
437                 return 0;
438 #else
439         if( prev = _InterlockedOr16((ushort *)latch, Mutex), prev & Mutex )
440                 return 0;
441 #endif
442         //      take write access if all bits are clear
443
444         if( !prev )
445 #ifdef unix
446                 __sync_fetch_and_or ((ushort *)latch, Write);
447 #else
448                 _InterlockedOr16((ushort *)latch, Write);
449 #endif
450
451 #ifdef unix
452         __sync_fetch_and_and ((ushort *)latch, ~Mutex);
453 #else
454         _InterlockedAnd16((ushort *)latch, ~Mutex);
455 #endif
456         return !prev;
457 }
458
459 //      clear write mode
460
461 void bt_spinreleasewrite(BtSpinLatch *latch)
462 {
463 #ifdef unix
464         __sync_fetch_and_and ((ushort *)latch, ~Write);
465 #else
466         _InterlockedAnd16((ushort *)latch, ~Write);
467 #endif
468 }
469
470 //      decrement reader count
471
472 void bt_spinreleaseread(BtSpinLatch *latch)
473 {
474 #ifdef unix
475         __sync_fetch_and_add((ushort *)latch, -Share);
476 #else
477         _InterlockedExchangeAdd16 ((ushort *)latch, -Share);
478 #endif
479 }
480
481 //      link latch table entry into latch hash table
482
483 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
484 {
485 BtLatchSet *set = bt->mgr->latchsets + victim;
486
487         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
488                 bt->mgr->latchsets[set->next].prev = victim;
489
490         bt->mgr->latchmgr->table[hashidx].slot = victim;
491         set->page_no = page_no;
492         set->hash = hashidx;
493         set->prev = 0;
494 }
495
496 //      release latch pin
497
498 void bt_unpinlatch (BtLatchSet *set)
499 {
500 #ifdef unix
501         __sync_fetch_and_add(&set->pin, -1);
502 #else
503         _InterlockedDecrement16 (&set->pin);
504 #endif
505 }
506
507 //      find existing latchset or inspire new one
508 //      return with latchset pinned
509
510 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
511 {
512 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
513 ushort slot, avail = 0, victim, idx;
514 BtLatchSet *set;
515
516         //  obtain read lock on hash table entry
517
518         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
519
520         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
521         {
522                 set = bt->mgr->latchsets + slot;
523                 if( page_no == set->page_no )
524                         break;
525         } while( slot = set->next );
526
527         if( slot ) {
528 #ifdef unix
529                 __sync_fetch_and_add(&set->pin, 1);
530 #else
531                 _InterlockedIncrement16 (&set->pin);
532 #endif
533         }
534
535     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
536
537         if( slot )
538                 return set;
539
540   //  try again, this time with write lock
541
542   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
543
544   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
545   {
546         set = bt->mgr->latchsets + slot;
547         if( page_no == set->page_no )
548                 break;
549         if( !set->pin && !avail )
550                 avail = slot;
551   } while( slot = set->next );
552
553   //  found our entry, or take over an unpinned one
554
555   if( slot || (slot = avail) ) {
556         set = bt->mgr->latchsets + slot;
557 #ifdef unix
558         __sync_fetch_and_add(&set->pin, 1);
559 #else
560         _InterlockedIncrement16 (&set->pin);
561 #endif
562         set->page_no = page_no;
563         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
564         return set;
565   }
566
567         //  see if there are any unused entries
568 #ifdef unix
569         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
570 #else
571         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
572 #endif
573
574         if( victim < bt->mgr->latchmgr->latchtotal ) {
575                 set = bt->mgr->latchsets + victim;
576 #ifdef unix
577                 __sync_fetch_and_add(&set->pin, 1);
578 #else
579                 _InterlockedIncrement16 (&set->pin);
580 #endif
581                 bt_latchlink (bt, hashidx, victim, page_no);
582                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
583                 return set;
584         }
585
586 #ifdef unix
587         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
588 #else
589         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
590 #endif
591   //  find and reuse previous lock entry
592
593   while( 1 ) {
594 #ifdef unix
595         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
596 #else
597         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
598 #endif
599         //      we don't use slot zero
600
601         if( victim %= bt->mgr->latchmgr->latchtotal )
602                 set = bt->mgr->latchsets + victim;
603         else
604                 continue;
605
606         //      take control of our slot
607         //      from other threads
608
609         if( set->pin || !bt_spinwritetry (set->busy) )
610                 continue;
611
612         idx = set->hash;
613
614         // try to get write lock on hash chain
615         //      skip entry if not obtained
616         //      or has outstanding locks
617
618         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
619                 bt_spinreleasewrite (set->busy);
620                 continue;
621         }
622
623         if( set->pin ) {
624                 bt_spinreleasewrite (set->busy);
625                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
626                 continue;
627         }
628
629         //  unlink our available victim from its hash chain
630
631         if( set->prev )
632                 bt->mgr->latchsets[set->prev].next = set->next;
633         else
634                 bt->mgr->latchmgr->table[idx].slot = set->next;
635
636         if( set->next )
637                 bt->mgr->latchsets[set->next].prev = set->prev;
638
639         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
640 #ifdef unix
641         __sync_fetch_and_add(&set->pin, 1);
642 #else
643         _InterlockedIncrement16 (&set->pin);
644 #endif
645         bt_latchlink (bt, hashidx, victim, page_no);
646         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
647         bt_spinreleasewrite (set->busy);
648         return set;
649   }
650 }
651
652 void bt_mgrclose (BtMgr *mgr)
653 {
654 BtPool *pool;
655 uint slot;
656
657         // release mapped pages
658         //      note that slot zero is never used
659
660         for( slot = 1; slot < mgr->poolmax; slot++ ) {
661                 pool = mgr->pool + slot;
662                 if( pool->slot )
663 #ifdef unix
664                         munmap (pool->map, (mgr->poolmask+1) << mgr->page_bits);
665 #else
666                 {
667                         FlushViewOfFile(pool->map, 0);
668                         UnmapViewOfFile(pool->map);
669                         CloseHandle(pool->hmap);
670                 }
671 #endif
672         }
673
674 #ifdef unix
675         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
676         munmap (mgr->latchmgr, mgr->page_size);
677 #else
678         FlushViewOfFile(mgr->latchmgr, 0);
679         UnmapViewOfFile(mgr->latchmgr);
680         CloseHandle(mgr->halloc);
681 #endif
682 #ifdef unix
683         close (mgr->idx);
684         free (mgr->pool);
685         free (mgr->hash);
686         free (mgr->latch);
687         free (mgr);
688 #else
689         FlushFileBuffers(mgr->idx);
690         CloseHandle(mgr->idx);
691         GlobalFree (mgr->pool);
692         GlobalFree (mgr->hash);
693         GlobalFree (mgr->latch);
694         GlobalFree (mgr);
695 #endif
696 }
697
698 //      close and release memory
699
700 void bt_close (BtDb *bt)
701 {
702 #ifdef unix
703         if ( bt->mem )
704                 free (bt->mem);
705 #else
706         if ( bt->mem)
707                 VirtualFree (bt->mem, 0, MEM_RELEASE);
708 #endif
709         free (bt);
710 }
711
712 //  open/create new btree buffer manager
713
714 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
715 //              size of mapped page pool (e.g. 8192)
716
717 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
718 {
719 uint lvl, attr, cacheblk, last, slot, idx;
720 uint nlatchpage, latchhash;
721 BtLatchMgr *latchmgr;
722 off64_t size;
723 uint amt[1];
724 BtMgr* mgr;
725 BtKey key;
726 int flag;
727
728 #ifndef unix
729 SYSTEM_INFO sysinfo[1];
730 #endif
731
732         // determine sanity of page size and buffer pool
733
734         if( bits > BT_maxbits )
735                 bits = BT_maxbits;
736         else if( bits < BT_minbits )
737                 bits = BT_minbits;
738
739         if( !poolmax )
740                 return NULL;    // must have buffer pool
741
742 #ifdef unix
743         mgr = calloc (1, sizeof(BtMgr));
744
745         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
746
747         if( mgr->idx == -1 )
748                 return free(mgr), NULL;
749         
750         cacheblk = 4096;        // minimum mmap segment size for unix
751
752 #else
753         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
754         attr = FILE_ATTRIBUTE_NORMAL;
755         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
756
757         if( mgr->idx == INVALID_HANDLE_VALUE )
758                 return GlobalFree(mgr), NULL;
759
760         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
761         GetSystemInfo(sysinfo);
762         cacheblk = sysinfo->dwAllocationGranularity;
763 #endif
764
765 #ifdef unix
766         latchmgr = malloc (BT_maxpage);
767         *amt = 0;
768
769         // read minimum page size to get root info
770
771         if( size = lseek (mgr->idx, 0L, 2) ) {
772                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
773                         bits = latchmgr->alloc->bits;
774                 else
775                         return free(mgr), free(latchmgr), NULL;
776         } else if( mode == BT_ro )
777                 return free(latchmgr), bt_mgrclose (mgr), NULL;
778 #else
779         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
780         size = GetFileSize(mgr->idx, amt);
781
782         if( size || *amt ) {
783                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
784                         return bt_mgrclose (mgr), NULL;
785                 bits = latchmgr->alloc->bits;
786         } else if( mode == BT_ro )
787                 return bt_mgrclose (mgr), NULL;
788 #endif
789
790         mgr->page_size = 1 << bits;
791         mgr->page_bits = bits;
792
793         mgr->poolmax = poolmax;
794         mgr->mode = mode;
795
796         if( cacheblk < mgr->page_size )
797                 cacheblk = mgr->page_size;
798
799         //  mask for partial memmaps
800
801         mgr->poolmask = (cacheblk >> bits) - 1;
802
803         //      see if requested size of pages per memmap is greater
804
805         if( (1 << segsize) > mgr->poolmask )
806                 mgr->poolmask = (1 << segsize) - 1;
807
808         mgr->seg_bits = 0;
809
810         while( (1 << mgr->seg_bits) <= mgr->poolmask )
811                 mgr->seg_bits++;
812
813         mgr->hashsize = hashsize;
814
815 #ifdef unix
816         mgr->pool = calloc (poolmax, sizeof(BtPool));
817         mgr->hash = calloc (hashsize, sizeof(ushort));
818         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
819 #else
820         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
821         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
822         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
823 #endif
824
825         if( size || *amt )
826                 goto mgrlatch;
827
828         // initialize an empty b-tree with latch page, root page, page of leaves
829         // and page(s) of latches
830
831         memset (latchmgr, 0, 1 << bits);
832         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
833         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
834         latchmgr->alloc->bits = mgr->page_bits;
835
836         latchmgr->nlatchpage = nlatchpage;
837         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
838
839         //  initialize latch manager
840
841         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
842
843         //      size of hash table = total number of latchsets
844
845         if( latchhash > latchmgr->latchtotal )
846                 latchhash = latchmgr->latchtotal;
847
848         latchmgr->latchhash = latchhash;
849
850 #ifdef unix
851         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
852                 return bt_mgrclose (mgr), NULL;
853 #else
854         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
855                 return bt_mgrclose (mgr), NULL;
856
857         if( *amt < mgr->page_size )
858                 return bt_mgrclose (mgr), NULL;
859 #endif
860
861         memset (latchmgr, 0, 1 << bits);
862         latchmgr->alloc->bits = mgr->page_bits;
863
864         for( lvl=MIN_lvl; lvl--; ) {
865                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3;
866                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
867                 key = keyptr(latchmgr->alloc, 1);
868                 key->len = 2;                   // create stopper key
869                 key->key[0] = 0xff;
870                 key->key[1] = 0xff;
871                 latchmgr->alloc->min = mgr->page_size - 3;
872                 latchmgr->alloc->lvl = lvl;
873                 latchmgr->alloc->cnt = 1;
874                 latchmgr->alloc->act = 1;
875 #ifdef unix
876                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
877                         return bt_mgrclose (mgr), NULL;
878 #else
879                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
880                         return bt_mgrclose (mgr), NULL;
881
882                 if( *amt < mgr->page_size )
883                         return bt_mgrclose (mgr), NULL;
884 #endif
885         }
886
887         // clear out latch manager locks
888         //      and rest of pages to round out segment
889
890         memset(latchmgr, 0, mgr->page_size);
891         last = MIN_lvl + 1;
892
893         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
894 #ifdef unix
895                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
896 #else
897                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
898                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
899                         return bt_mgrclose (mgr), NULL;
900                 if( *amt < mgr->page_size )
901                         return bt_mgrclose (mgr), NULL;
902 #endif
903                 last++;
904         }
905
906 mgrlatch:
907 #ifdef unix
908         flag = PROT_READ | PROT_WRITE;
909         mgr->latchmgr = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
910         if( mgr->latchmgr == MAP_FAILED )
911                 return bt_mgrclose (mgr), NULL;
912         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
913         if( mgr->latchsets == MAP_FAILED )
914                 return bt_mgrclose (mgr), NULL;
915 #else
916         flag = PAGE_READWRITE;
917         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
918         if( !mgr->halloc )
919                 return bt_mgrclose (mgr), NULL;
920
921         flag = FILE_MAP_WRITE;
922         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
923         if( !mgr->latchmgr )
924                 return GetLastError(), bt_mgrclose (mgr), NULL;
925
926         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
927 #endif
928
929 #ifdef unix
930         free (latchmgr);
931 #else
932         VirtualFree (latchmgr, 0, MEM_RELEASE);
933 #endif
934         return mgr;
935 }
936
937 //      open BTree access method
938 //      based on buffer manager
939
940 BtDb *bt_open (BtMgr *mgr)
941 {
942 BtDb *bt = malloc (sizeof(*bt));
943
944         memset (bt, 0, sizeof(*bt));
945         bt->mgr = mgr;
946 #ifdef unix
947         bt->mem = malloc (3 *mgr->page_size);
948 #else
949         bt->mem = VirtualAlloc(NULL, 3 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
950 #endif
951         bt->frame = (BtPage)bt->mem;
952         bt->zero = (BtPage)(bt->mem + 1 * mgr->page_size);
953         bt->cursor = (BtPage)(bt->mem + 2 * mgr->page_size);
954
955         memset(bt->zero, 0, mgr->page_size);
956         return bt;
957 }
958
959 //  compare two keys, returning > 0, = 0, or < 0
960 //  as the comparison value
961
962 int keycmp (BtKey key1, unsigned char *key2, uint len2)
963 {
964 uint len1 = key1->len;
965 int ans;
966
967         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
968                 return ans;
969
970         if( len1 > len2 )
971                 return 1;
972         if( len1 < len2 )
973                 return -1;
974
975         return 0;
976 }
977
978 //      Buffer Pool mgr
979
980 // find segment in pool
981 // must be called with hashslot idx locked
982 //      return NULL if not there
983 //      otherwise return node
984
985 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
986 {
987 BtPool *pool;
988 uint slot;
989
990         // compute start of hash chain in pool
991
992         if( slot = bt->mgr->hash[idx] ) 
993                 pool = bt->mgr->pool + slot;
994         else
995                 return NULL;
996
997         page_no &= ~bt->mgr->poolmask;
998
999         while( pool->basepage != page_no )
1000           if( pool = pool->hashnext )
1001                 continue;
1002           else
1003                 return NULL;
1004
1005         return pool;
1006 }
1007
1008 // add segment to hash table
1009
1010 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1011 {
1012 BtPool *node;
1013 uint slot;
1014
1015         pool->hashprev = pool->hashnext = NULL;
1016         pool->basepage = page_no & ~bt->mgr->poolmask;
1017         pool->lru = 1;
1018
1019         if( slot = bt->mgr->hash[idx] ) {
1020                 node = bt->mgr->pool + slot;
1021                 pool->hashnext = node;
1022                 node->hashprev = pool;
1023         }
1024
1025         bt->mgr->hash[idx] = pool->slot;
1026 }
1027
1028 //      find best segment to evict from buffer pool
1029
1030 BtPool *bt_findlru (BtDb *bt, uint hashslot)
1031 {
1032 unsigned long long int target = ~0LL;
1033 BtPool *pool = NULL, *node;
1034
1035         if( !hashslot )
1036                 return NULL;
1037
1038         node = bt->mgr->pool + hashslot;
1039
1040         //      scan pool entries under hash table slot
1041
1042         do {
1043           if( node->pin )
1044                 continue;
1045           if( node->lru > target )
1046                 continue;
1047           target = node->lru;
1048           pool = node;
1049         } while( node = node->hashnext );
1050
1051         return pool;
1052 }
1053
1054 //  map new buffer pool segment to virtual memory
1055
1056 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1057 {
1058 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1059 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1060 int flag;
1061
1062 #ifdef unix
1063         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1064         pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1065         if( pool->map == MAP_FAILED )
1066                 return bt->err = BTERR_map;
1067 #else
1068         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1069         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1070         if( !pool->hmap )
1071                 return bt->err = BTERR_map;
1072
1073         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1074         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1075         if( !pool->map )
1076                 return bt->err = BTERR_map;
1077 #endif
1078         return bt->err = 0;
1079 }
1080
1081 //      calculate page within pool
1082
1083 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1084 {
1085 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1086 BtPage page;
1087
1088         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1089         return page;
1090 }
1091
1092 //  release pool pin
1093
1094 void bt_unpinpool (BtPool *pool)
1095 {
1096 #ifdef unix
1097         __sync_fetch_and_add(&pool->pin, -1);
1098 #else
1099         _InterlockedDecrement16 (&pool->pin);
1100 #endif
1101 }
1102
1103 //      find or place requested page in segment-pool
1104 //      return pool table entry, incrementing pin
1105
1106 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1107 {
1108 BtPool *pool, *node, *next;
1109 uint slot, idx, victim;
1110 BtLatchSet *set;
1111
1112         //      lock hash table chain
1113
1114         idx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1115         bt_spinreadlock (&bt->mgr->latch[idx]);
1116
1117         //      look up in hash table
1118
1119         if( pool = bt_findpool(bt, page_no, idx) ) {
1120 #ifdef unix
1121                 __sync_fetch_and_add(&pool->pin, 1);
1122 #else
1123                 _InterlockedIncrement16 (&pool->pin);
1124 #endif
1125                 bt_spinreleaseread (&bt->mgr->latch[idx]);
1126                 pool->lru++;
1127                 return pool;
1128         }
1129
1130         //      upgrade to write lock
1131
1132         bt_spinreleaseread (&bt->mgr->latch[idx]);
1133         bt_spinwritelock (&bt->mgr->latch[idx]);
1134
1135         // try to find page in pool with write lock
1136
1137         if( pool = bt_findpool(bt, page_no, idx) ) {
1138 #ifdef unix
1139                 __sync_fetch_and_add(&pool->pin, 1);
1140 #else
1141                 _InterlockedIncrement16 (&pool->pin);
1142 #endif
1143                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1144                 pool->lru++;
1145                 return pool;
1146         }
1147
1148         // allocate a new pool node
1149         // and add to hash table
1150
1151 #ifdef unix
1152         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1153 #else
1154         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1155 #endif
1156
1157         if( ++slot < bt->mgr->poolmax ) {
1158                 pool = bt->mgr->pool + slot;
1159                 pool->slot = slot;
1160
1161                 if( bt_mapsegment(bt, pool, page_no) )
1162                         return NULL;
1163
1164                 bt_linkhash(bt, pool, page_no, idx);
1165 #ifdef unix
1166                 __sync_fetch_and_add(&pool->pin, 1);
1167 #else
1168                 _InterlockedIncrement16 (&pool->pin);
1169 #endif
1170                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1171                 return pool;
1172         }
1173
1174         // pool table is full
1175         //      find best pool entry to evict
1176
1177 #ifdef unix
1178         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1179 #else
1180         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1181 #endif
1182
1183         while( 1 ) {
1184 #ifdef unix
1185                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1186 #else
1187                 victim = _InterlockedIncrement16 (&bt->mgr->evicted) - 1;
1188 #endif
1189                 victim %= bt->mgr->hashsize;
1190
1191                 // try to get write lock
1192                 //      skip entry if not obtained
1193
1194                 if( !bt_spinwritetry (&bt->mgr->latch[victim]) )
1195                         continue;
1196
1197                 //  if cache entry is empty
1198                 //      or no slots are unpinned
1199                 //      skip this entry
1200
1201                 if( !(pool = bt_findlru(bt, bt->mgr->hash[victim])) ) {
1202                         bt_spinreleasewrite (&bt->mgr->latch[victim]);
1203                         continue;
1204                 }
1205
1206                 // unlink victim pool node from hash table
1207
1208                 if( node = pool->hashprev )
1209                         node->hashnext = pool->hashnext;
1210                 else if( node = pool->hashnext )
1211                         bt->mgr->hash[victim] = node->slot;
1212                 else
1213                         bt->mgr->hash[victim] = 0;
1214
1215                 if( node = pool->hashnext )
1216                         node->hashprev = pool->hashprev;
1217
1218                 bt_spinreleasewrite (&bt->mgr->latch[victim]);
1219
1220                 //      remove old file mapping
1221 #ifdef unix
1222                 munmap (pool->map, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1223 #else
1224                 FlushViewOfFile(pool->map, 0);
1225                 UnmapViewOfFile(pool->map);
1226                 CloseHandle(pool->hmap);
1227 #endif
1228                 pool->map = NULL;
1229
1230                 //  create new pool mapping
1231                 //  and link into hash table
1232
1233                 if( bt_mapsegment(bt, pool, page_no) )
1234                         return NULL;
1235
1236                 bt_linkhash(bt, pool, page_no, idx);
1237 #ifdef unix
1238                 __sync_fetch_and_add(&pool->pin, 1);
1239 #else
1240                 _InterlockedIncrement16 (&pool->pin);
1241 #endif
1242                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1243                 return pool;
1244         }
1245 }
1246
1247 // place write, read, or parent lock on requested page_no.
1248 //      pin to buffer pool and return latchset pointer
1249
1250 void bt_lockpage(BtLock mode, BtLatchSet *set)
1251 {
1252         switch( mode ) {
1253         case BtLockRead:
1254                 bt_spinreadlock (set->readwr);
1255                 break;
1256         case BtLockWrite:
1257                 bt_spinwritelock (set->readwr);
1258                 break;
1259         case BtLockAccess:
1260                 bt_spinreadlock (set->access);
1261                 break;
1262         case BtLockDelete:
1263                 bt_spinwritelock (set->access);
1264                 break;
1265         case BtLockParent:
1266                 bt_spinwritelock (set->parent);
1267                 break;
1268         }
1269 }
1270
1271 // remove write, read, or parent lock on requested page_no.
1272
1273 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1274 {
1275         switch( mode ) {
1276         case BtLockRead:
1277                 bt_spinreleaseread (set->readwr);
1278                 break;
1279         case BtLockWrite:
1280                 bt_spinreleasewrite (set->readwr);
1281                 break;
1282         case BtLockAccess:
1283                 bt_spinreleaseread (set->access);
1284                 break;
1285         case BtLockDelete:
1286                 bt_spinreleasewrite (set->access);
1287                 break;
1288         case BtLockParent:
1289                 bt_spinreleasewrite (set->parent);
1290                 break;
1291         }
1292 }
1293
1294 //      allocate a new page and write page into it
1295
1296 uid bt_newpage(BtDb *bt, BtPage page)
1297 {
1298 BtLatchSet *set;
1299 BtPool *pool;
1300 uid new_page;
1301 BtPage pmap;
1302 int reuse;
1303
1304         //      lock allocation page
1305
1306         bt_spinwritelock(bt->mgr->latchmgr->lock);
1307
1308         // use empty chain first
1309         // else allocate empty page
1310
1311         if( new_page = bt_getid(bt->mgr->latchmgr->alloc[1].right) ) {
1312                 if( pool = bt_pinpool (bt, new_page) )
1313                         pmap = bt_page (bt, pool, new_page);
1314                 else
1315                         return 0;
1316                 bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(pmap->right));
1317                 bt_unpinpool (pool);
1318                 reuse = 1;
1319         } else {
1320                 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1321                 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1322                 reuse = 0;
1323         }
1324 #ifdef unix
1325         // if writing first page of pool block, zero last page in the block
1326
1327         if ( !reuse && bt->mgr->poolmask > 0 && (new_page & bt->mgr->poolmask) == 0 )
1328         {
1329                 // use zero buffer to write zeros
1330                 if ( pwrite(bt->mgr->idx,bt->zero, bt->mgr->page_size, (new_page | bt->mgr->poolmask) << bt->mgr->page_bits) < bt->mgr->page_size )
1331                         return bt->err = BTERR_wrt, 0;
1332         }
1333
1334         // unlock allocation latch
1335
1336         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1337
1338         if ( pwrite(bt->mgr->idx, page, bt->mgr->page_size, new_page << bt->mgr->page_bits) < bt->mgr->page_size )
1339                 return bt->err = BTERR_wrt, 0;
1340
1341 #else
1342         // unlock allocation latch
1343
1344         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1345
1346         //      bring new page into pool and copy page.
1347         //      this will extend the file into the new pages.
1348         //      NB -- no latch required
1349
1350         if( pool = bt_pinpool (bt, new_page) )
1351                 pmap = bt_page (bt, pool, new_page);
1352         else
1353                 return 0;
1354
1355         memcpy(pmap, page, bt->mgr->page_size);
1356         bt_unpinpool (pool);
1357 #endif
1358         return new_page;
1359 }
1360
1361 //  find slot in page for given key at a given level
1362
1363 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1364 {
1365 uint diff, higher = bt->page->cnt, low = 1, slot;
1366 uint good = 0;
1367
1368         //      if no right link
1369         //        make stopper key an infinite fence value
1370         //        by setting the good flag
1371
1372         if( bt_getid (bt->page->right) )
1373                 higher++;
1374         else
1375                 good++;
1376
1377         //      low is the next candidate.
1378         //  loop ends when they meet
1379
1380         //  if good, higher is already
1381         //      tested as .ge. the given key.
1382
1383         while( diff = higher - low ) {
1384                 slot = low + ( diff >> 1 );
1385                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1386                         low = slot + 1;
1387                 else
1388                         higher = slot, good++;
1389         }
1390
1391         //      return zero if key is on right link page
1392
1393         return good ? higher : 0;
1394 }
1395
1396 //  find and load page at given level for given key
1397 //      leave page rd or wr locked as requested
1398
1399 uint bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, BtLock lock)
1400 {
1401 uid page_no = ROOT_page, prevpage = 0;
1402 BtLatchSet *set, *prevset;
1403 uint drill = 0xff, slot;
1404 uint mode, prevmode;
1405 BtPool *prevpool;
1406 int foster = 0;
1407
1408   //  start at root of btree and drill down
1409
1410   do {
1411         // determine lock mode of drill level
1412         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1413
1414         //      obtain latch set for this page
1415
1416         bt->set = bt_pinlatch (bt, page_no);
1417         bt->page_no = page_no;
1418
1419         // pin page contents
1420
1421         if( bt->pool = bt_pinpool (bt, page_no) )
1422                 bt->page = bt_page (bt, bt->pool, page_no);
1423         else
1424                 return 0;
1425
1426         // obtain access lock using lock chaining with Access mode
1427
1428         if( page_no > ROOT_page )
1429           bt_lockpage(BtLockAccess, bt->set);
1430
1431         //  now unlock and unpin our (possibly foster) parent
1432
1433         if( prevpage ) {
1434           bt_unlockpage(prevmode, prevset);
1435           bt_unpinlatch (prevset);
1436           bt_unpinpool (prevpool);
1437           prevpage = 0;
1438         }
1439
1440         // obtain read lock using lock chaining
1441
1442         bt_lockpage(mode, bt->set);
1443
1444         if( page_no > ROOT_page )
1445           bt_unlockpage(BtLockAccess, bt->set);
1446
1447         // re-read and re-lock root after determining actual level of root
1448
1449         if( page_no == ROOT_page )
1450           if( bt->page->lvl != drill) {
1451                 drill = bt->page->lvl;
1452
1453             if( lock == BtLockWrite && drill == lvl ) {
1454                   bt_unlockpage(mode, bt->set);
1455                   bt_unpinlatch (bt->set);
1456                   bt_unpinpool (bt->pool);
1457                   continue;
1458                 }
1459           }
1460
1461         //  find key on page at this level
1462         //  and either descend to requested level
1463         //      or return key slot
1464
1465         if( slot = bt_findslot (bt, key, len) ) {
1466           //    is this slot < foster child area
1467           //    on the requested level?
1468
1469           //    if so, return actual slot even if dead
1470
1471           if( slot <= bt->page->cnt - bt->page->foster )
1472            if( drill == lvl )
1473                 return bt->foster = foster, slot;
1474
1475           //    find next active slot
1476           //    note: foster children are never dead
1477
1478           while( slotptr(bt->page, slot)->dead )
1479            if( slot++ < bt->page->cnt )
1480                 continue;
1481            else {
1482                 //  we are waiting for fence key posting
1483                 page_no = bt_getid(bt->page->right);
1484                 goto slideright;
1485           }
1486
1487          //     is this slot < foster child area
1488          //     if so, drill to next level
1489
1490          if( slot <= bt->page->cnt - bt->page->foster )
1491                 foster = 0, drill--;
1492          else
1493                 foster = 1;
1494
1495           //  continue right onto foster child
1496           //    or down to next level.
1497
1498           page_no = bt_getid(slotptr(bt->page, slot)->id);
1499
1500         //  or slide right into next page
1501
1502         } else {
1503                 page_no = bt_getid(bt->page->right);
1504                 foster = 1;
1505         }
1506
1507 slideright:
1508         prevpage = bt->page_no;
1509         prevpool = bt->pool;
1510         prevset = bt->set;
1511         prevmode = mode;
1512
1513   } while( page_no );
1514
1515   // return error on end of chain
1516
1517   bt->err = BTERR_struct;
1518   return 0;     // return error
1519 }
1520
1521 //  remove empty page from the B-tree
1522 //      by pulling our right node left over ourselves
1523
1524 //  call with bt->page, etc, set to page's locked parent
1525 //      returns with page locked.
1526
1527 BTERR bt_mergeright (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl, uint slot)
1528 {
1529 BtLatchSet *rset, *pset, *rpset;
1530 BtPool *rpool, *ppool, *rppool;
1531 BtPage rpage, ppage, rppage;
1532 uid right, parent, rparent;
1533 BtKey ptr;
1534 uint idx;
1535
1536         //      cache node's parent page
1537
1538         parent = bt->page_no;
1539         ppage = bt->page;
1540         ppool = bt->pool;
1541         pset = bt->set;
1542
1543         // lock and map our right page
1544         // note that it cannot be our foster child
1545         // since the our node is empty
1546         //      and it cannot be NULL because of the stopper
1547         //      in the last right page
1548
1549         bt_lockpage (BtLockWrite, set);
1550
1551         // if we aren't dead yet
1552
1553         if( page->act )
1554                 goto rmergexit;
1555
1556         if( right = bt_getid (page->right) )
1557           if( rpool = bt_pinpool (bt, right) )
1558                 rpage = bt_page (bt, rpool, right);
1559           else
1560                 return bt->err;
1561         else
1562                 return bt->err = BTERR_struct;
1563
1564         rset = bt_pinlatch (bt, right);
1565
1566         //      find our right neighbor
1567
1568         if( ppage->act > 1 ) {
1569          for( idx = slot; idx++ < ppage->cnt; )
1570           if( !slotptr(ppage, idx)->dead )
1571                 break;
1572
1573          if( idx > ppage->cnt )
1574                 return bt->err = BTERR_struct;
1575
1576          //  redirect right neighbor in parent to left node
1577
1578          bt_putid(slotptr(ppage,idx)->id, page_no);
1579         }
1580
1581         //      if parent has only our deleted page, e.g. no right neighbor
1582         //      prepare to merge parent itself
1583
1584         if( ppage->act == 1 ) {
1585           if( rparent = bt_getid (ppage->right) )
1586            if( rppool = bt_pinpool (bt, rparent) )
1587                 rppage = bt_page (bt, rppool, rparent);
1588            else
1589                 return bt->err;
1590           else
1591                 return bt->err = BTERR_struct;
1592
1593           rpset = bt_pinlatch (bt, rparent);
1594           bt_lockpage (BtLockWrite, rpset);
1595
1596           // find our right neighbor on right parent page
1597
1598           for( idx = 0; idx++ < rppage->cnt; )
1599                 if( !slotptr(rppage, idx)->dead ) {
1600                   bt_putid (slotptr(rppage, idx)->id, page_no);
1601                   break;
1602                 }
1603
1604           if( idx > rppage->cnt )
1605                 return bt->err = BTERR_struct;
1606         }
1607
1608         //      now that there are no more pointers to our right node
1609         //      we can wait for delete lock on it
1610
1611         bt_lockpage(BtLockDelete, rset);
1612         bt_lockpage(BtLockWrite, rset);
1613
1614         // pull contents of right page into our empty page 
1615
1616         memcpy (page, rpage, bt->mgr->page_size);
1617
1618         // ready to release right parent lock
1619         //      now that we have a new page in place
1620
1621         if( ppage->act == 1 ) {
1622           bt_unlockpage (BtLockWrite, rpset);
1623           bt_unpinlatch (rpset);
1624           bt_unpinpool (rppool);
1625         }
1626
1627         //      add killed right block to free chain
1628         //      lock latch mgr
1629
1630         bt_spinwritelock(bt->mgr->latchmgr->lock);
1631
1632         //      store free chain in allocation page second right
1633
1634         bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
1635         bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
1636
1637         // unlock latch mgr and right page
1638
1639         bt_unlockpage(BtLockDelete, rset);
1640         bt_unlockpage(BtLockWrite, rset);
1641         bt_unpinlatch (rset);
1642         bt_unpinpool (rpool);
1643
1644         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1645
1646         // delete our obsolete fence key from our parent
1647
1648         slotptr(ppage, slot)->dead = 1;
1649         ppage->dirty = 1;
1650
1651         //      if our parent now empty
1652         //      remove it from the tree
1653
1654         if( ppage->act-- == 1 )
1655           if( bt_mergeleft (bt, ppage, ppool, pset, parent, lvl+1) )
1656                 return bt->err;
1657
1658 rmergexit:
1659         bt_unlockpage (BtLockWrite, pset);
1660         bt_unpinlatch (pset);
1661         bt_unpinpool (ppool);
1662
1663         bt->found = 1;
1664         return bt->err = 0;
1665 }
1666
1667 //  remove empty page from the B-tree
1668 //      try merging left first.  If no left
1669 //      sibling, then merge right.
1670
1671 //      call with page loaded and locked,
1672 //  return with page locked.
1673
1674 BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl)
1675 {
1676 unsigned char fencekey[256], postkey[256];
1677 uint slot, idx, postfence = 0;
1678 BtLatchSet *lset, *pset;
1679 BtPool *lpool, *ppool;
1680 BtPage lpage, ppage;
1681 uid left, parent;
1682 BtKey ptr;
1683
1684         ptr = keyptr(page, page->cnt);
1685         memcpy(fencekey, ptr, ptr->len + 1);
1686         bt_unlockpage (BtLockWrite, set);
1687
1688         //      load and lock our parent
1689
1690 retry:
1691         if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+1, BtLockWrite)) )
1692                 return bt->err;
1693
1694         parent = bt->page_no;
1695         ppage = bt->page;
1696         ppool = bt->pool;
1697         pset = bt->set;
1698
1699         //      wait until we are not a foster child
1700
1701         if( bt->foster ) {
1702                 bt_unlockpage (BtLockWrite, pset);
1703                 bt_unpinlatch (pset);
1704                 bt_unpinpool (ppool);
1705 #ifdef unix
1706                 sched_yield();
1707 #else
1708                 SwitchToThread();
1709 #endif
1710                 goto retry;
1711         }
1712
1713         //  find our left neighbor in our parent page
1714
1715         for( idx = slot; --idx; )
1716           if( !slotptr(ppage, idx)->dead )
1717                 break;
1718
1719         //      if no left neighbor, do right merge
1720
1721         if( !idx )
1722                 return bt_mergeright (bt, page, pool, set, page_no, lvl, slot);
1723
1724         // lock and map our left neighbor's page
1725
1726         left = bt_getid (slotptr(ppage, idx)->id);
1727
1728         if( lpool = bt_pinpool (bt, left) )
1729                 lpage = bt_page (bt, lpool, left);
1730         else
1731                 return bt->err;
1732
1733         lset = bt_pinlatch (bt, left);
1734         bt_lockpage(BtLockWrite, lset);
1735
1736         //  wait until foster sibling is in our parent
1737
1738         if( bt_getid (lpage->right) != page_no ) {
1739                 bt_unlockpage (BtLockWrite, pset);
1740                 bt_unpinlatch (pset);
1741                 bt_unpinpool (ppool);
1742                 bt_unlockpage (BtLockWrite, lset);
1743                 bt_unpinlatch (lset);
1744                 bt_unpinpool (lpool);
1745 #ifdef linux
1746                 sched_yield();
1747 #else
1748                 SwitchToThread();
1749 #endif
1750                 goto retry;
1751         }
1752
1753         //  since our page will have no more pointers to it,
1754         //      obtain Delete lock and wait for write locks to clear
1755
1756         bt_lockpage(BtLockDelete, set);
1757         bt_lockpage(BtLockWrite, set);
1758
1759         //      if we aren't dead yet,
1760         //      get ready for exit
1761
1762         if( page->act ) {
1763                 bt_unlockpage(BtLockDelete, set);
1764                 bt_unlockpage(BtLockWrite, lset);
1765                 bt_unpinlatch (lset);
1766                 bt_unpinpool (lpool);
1767                 goto lmergexit;
1768         }
1769
1770         //      are we are the fence key for our parent?
1771         //      if so, grab our old fence key
1772
1773         if( postfence = slot == ppage->cnt ) {
1774                 ptr = keyptr (ppage, ppage->cnt);
1775                 memcpy(fencekey, ptr, ptr->len + 1);
1776                 memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
1777
1778                 // clear out other dead slots
1779
1780                 while( --ppage->cnt )
1781                   if( slotptr(ppage, ppage->cnt)->dead )
1782                         memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
1783                   else
1784                         break;
1785
1786                 ptr = keyptr (ppage, ppage->cnt);
1787                 memcpy(postkey, ptr, ptr->len + 1);
1788         } else
1789                 slotptr(ppage,slot)->dead = 1;
1790
1791         ppage->dirty = 1;
1792         ppage->act--;
1793
1794         //      push our right neighbor pointer to our left
1795
1796         memcpy (lpage->right, page->right, BtId);
1797
1798         //      add ourselves to free chain
1799         //      lock latch mgr
1800
1801         bt_spinwritelock(bt->mgr->latchmgr->lock);
1802
1803         //      store free chain in allocation page second right
1804         bt_putid(page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
1805         bt_putid(bt->mgr->latchmgr->alloc[1].right, page_no);
1806
1807         // unlock latch mgr and pages
1808
1809         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1810         bt_unlockpage(BtLockWrite, lset);
1811         bt_unpinlatch (lset);
1812         bt_unpinpool (lpool);
1813
1814         // release our node's delete lock
1815
1816         bt_unlockpage(BtLockDelete, set);
1817
1818 lmergexit:
1819         bt_unlockpage (BtLockWrite, pset);
1820         bt_unpinpool (ppool);
1821
1822         //  do we need to post parent's fence key in its parent?
1823
1824         if( !postfence || parent == ROOT_page ) {
1825                 bt_unpinlatch (pset);
1826                 bt->found = 1;
1827                 return bt->err = 0;
1828         }
1829
1830         //      interlock parent fence post
1831
1832         bt_lockpage (BtLockParent, pset);
1833
1834         //      load parent's parent page
1835 posttry:
1836         if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+2, BtLockWrite)) )
1837                 return bt->err;
1838
1839         if( !(slot = bt_cleanpage (bt, bt->page, *fencekey, slot)) )
1840           if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
1841                 return bt->err;
1842           else
1843                 goto posttry;
1844
1845         page = bt->page;
1846
1847         page->min -= *postkey + 1;
1848         ((unsigned char *)page)[page->min] = *postkey;
1849         memcpy ((unsigned char *)page + page->min +1, postkey + 1, *postkey );
1850         slotptr(page, slot)->off = page->min;
1851
1852         bt_unlockpage (BtLockParent, pset);
1853         bt_unpinlatch (pset);
1854
1855         bt_unlockpage (BtLockWrite, bt->set);
1856         bt_unpinlatch (bt->set);
1857         bt_unpinpool (bt->pool);
1858
1859         bt->found = 1;
1860         return bt->err = 0;
1861 }
1862
1863 //  find and delete key on page by marking delete flag bit
1864 //  if page becomes empty, delete it from the btree
1865
1866 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
1867 {
1868 BtLatchSet *set;
1869 BtPool *pool;
1870 BtPage page;
1871 uid page_no;
1872 BtKey ptr;
1873 uint slot;
1874
1875         if( !(slot = bt_loadpage (bt, key, len, 0, BtLockWrite)) )
1876                 return bt->err;
1877
1878         page_no = bt->page_no;
1879         page = bt->page;
1880         pool = bt->pool;
1881         set = bt->set;
1882
1883         // if key is found delete it, otherwise ignore request
1884
1885         ptr = keyptr(page, slot);
1886
1887         if( bt->found = !keycmp (ptr, key, len) )
1888           if( bt->found = slotptr(page, slot)->dead == 0 ) {
1889                 slotptr(page,slot)->dead = 1;
1890                   if( slot < page->cnt )
1891                         page->dirty = 1;
1892                   if( !--page->act )
1893                         if( bt_mergeleft (bt, page, pool, set, page_no, 0) )
1894                           return bt->err;
1895                 }
1896
1897         bt_unlockpage(BtLockWrite, set);
1898         bt_unpinlatch (set);
1899         bt_unpinpool (pool);
1900         return bt->err = 0;
1901 }
1902
1903 //      find key in leaf level and return row-id
1904
1905 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1906 {
1907 uint  slot;
1908 BtKey ptr;
1909 uid id;
1910
1911         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1912                 ptr = keyptr(bt->page, slot);
1913         else
1914                 return 0;
1915
1916         // if key exists, return row-id
1917         //      otherwise return 0
1918
1919         if( slot <= bt->page->cnt && !keycmp (ptr, key, len) )
1920                 id = bt_getid(slotptr(bt->page,slot)->id);
1921         else
1922                 id = 0;
1923
1924         bt_unlockpage (BtLockRead, bt->set);
1925         bt_unpinlatch (bt->set);
1926         bt_unpinpool (bt->pool);
1927         return id;
1928 }
1929
1930 //      check page for space available,
1931 //      clean if necessary and return
1932 //      0 - page needs splitting
1933 //      >0  new slot value
1934
1935 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
1936 {
1937 uint nxt = bt->mgr->page_size;
1938 uint cnt = 0, idx = 0;
1939 uint max = page->cnt;
1940 uint newslot = max;
1941 BtKey key;
1942
1943         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1944                 return slot;
1945
1946         //      skip cleanup if nothing to reclaim
1947
1948         if( !page->dirty )
1949                 return 0;
1950
1951         memcpy (bt->frame, page, bt->mgr->page_size);
1952
1953         // skip page info and set rest of page to zero
1954
1955         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1956         page->dirty = 0;
1957         page->act = 0;
1958
1959         // try cleaning up page first
1960
1961         // always leave fence key in the array
1962         // otherwise, remove deleted key
1963
1964         // note: foster children are never dead
1965
1966         while( cnt++ < max ) {
1967                 if( cnt == slot )
1968                         newslot = idx + 1;
1969                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1970                         continue;
1971
1972                 // copy key
1973
1974                 key = keyptr(bt->frame, cnt);
1975                 nxt -= key->len + 1;
1976                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1977
1978                 // copy slot
1979                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1980                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1981                         page->act++;
1982                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1983                 slotptr(page, idx)->off = nxt;
1984         }
1985
1986         page->min = nxt;
1987         page->cnt = idx;
1988
1989         //      see if page has enough space now, or does it need splitting?
1990
1991         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1992                 return newslot;
1993
1994         return 0;
1995 }
1996
1997 //      add key to current page
1998 //      page must already be writelocked
1999
2000 void bt_addkeytopage (BtDb *bt, BtPage page, uint slot, unsigned char *key, uint len, uid id, uint tod)
2001 {
2002 uint idx;
2003
2004         // find next available dead slot and copy key onto page
2005         // note that foster children on the page are never dead
2006
2007         // look for next hole, but stay back from the fence key
2008
2009         for( idx = slot; idx < page->cnt; idx++ )
2010           if( slotptr(page, idx)->dead )
2011                 break;
2012
2013         if( idx == page->cnt )
2014                 idx++, page->cnt++;
2015
2016         page->act++;
2017
2018         // now insert key into array before slot
2019
2020         while( idx > slot )
2021                 *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
2022
2023         page->min -= len + 1;
2024         ((unsigned char *)page)[page->min] = len;
2025         memcpy ((unsigned char *)page + page->min +1, key, len );
2026
2027         bt_putid(slotptr(page,slot)->id, id);
2028         slotptr(page, slot)->off = page->min;
2029         slotptr(page, slot)->tod = tod;
2030         slotptr(page, slot)->dead = 0;
2031 }
2032
2033 // split the root and raise the height of the btree
2034 //      call with current page locked and page no of foster child
2035 //      return with current page (root) unlocked
2036
2037 BTERR bt_splitroot(BtDb *bt, uid right)
2038 {
2039 uint nxt = bt->mgr->page_size;
2040 unsigned char fencekey[256];
2041 BtPage root = bt->page;
2042 uid new_page;
2043 BtKey key;
2044
2045         //  Obtain an empty page to use, and copy the left page
2046         //  contents into it from the root.  Strip foster child key.
2047         //      (it's the stopper key)
2048
2049         memset (slotptr(root, root->cnt), 0, sizeof(BtSlot));
2050         root->dirty = 1;
2051         root->foster--;
2052         root->act--;
2053         root->cnt--;
2054
2055         //      Save left fence key.
2056
2057         key = keyptr(root, root->cnt);
2058         memcpy (fencekey, key, key->len + 1);
2059
2060         //  copy the lower keys into a new left page
2061
2062         if( !(new_page = bt_newpage(bt, root)) )
2063                 return bt->err;
2064
2065         // preserve the page info at the bottom
2066         // and set rest of the root to zero
2067
2068         memset (root+1, 0, bt->mgr->page_size - sizeof(*root));
2069
2070         // insert left fence key on empty newroot page
2071
2072         nxt -= *fencekey + 1;
2073         memcpy ((unsigned char *)root + nxt, fencekey, *fencekey + 1);
2074         bt_putid(slotptr(root, 1)->id, new_page);
2075         slotptr(root, 1)->off = nxt;
2076         
2077         // insert stopper key on newroot page
2078         // and increase the root height
2079
2080         nxt -= 3;
2081         fencekey[0] = 2;
2082         fencekey[1] = 0xff;
2083         fencekey[2] = 0xff;
2084         memcpy ((unsigned char *)root + nxt, fencekey, *fencekey + 1);
2085         bt_putid(slotptr(root, 2)->id, right);
2086         slotptr(root, 2)->off = nxt;
2087
2088         bt_putid(root->right, 0);
2089         root->min = nxt;                // reset lowest used offset and key count
2090         root->cnt = 2;
2091         root->act = 2;
2092         root->lvl++;
2093
2094         // release and unpin root (bt->page)
2095
2096         bt_unlockpage(BtLockWrite, bt->set);
2097         bt_unpinlatch (bt->set);
2098         bt_unpinpool (bt->pool);
2099         return 0;
2100 }
2101
2102 //  split already locked full node
2103 //      return unlocked and unpinned.
2104
2105 BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no)
2106 {
2107 uint slot, cnt, idx, max, nxt = bt->mgr->page_size;
2108 unsigned char fencekey[256];
2109 uint tod = time(NULL);
2110 uint lvl = page->lvl;
2111 uid new_page;
2112 BtKey key;
2113
2114         //      initialize frame buffer for right node
2115
2116         memset (bt->frame, 0, bt->mgr->page_size);
2117         max = page->cnt - page->foster;
2118         cnt = max / 2;
2119         idx = 0;
2120
2121         //  split higher half of keys to bt->frame
2122         //      leaving old foster children in the left node,
2123         //      and adding a new foster child there.
2124
2125         while( cnt++ < max ) {
2126                 key = keyptr(page, cnt);
2127                 nxt -= key->len + 1;
2128                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
2129                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
2130                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
2131                         bt->frame->act++;
2132                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
2133                 slotptr(bt->frame, idx)->off = nxt;
2134         }
2135
2136         // transfer right link node to new right node
2137
2138         if( page_no > ROOT_page )
2139                 memcpy (bt->frame->right, page->right, BtId);
2140
2141         bt->frame->bits = bt->mgr->page_bits;
2142         bt->frame->min = nxt;
2143         bt->frame->cnt = idx;
2144         bt->frame->lvl = lvl;
2145
2146         //      get new free page and write right frame to it.
2147
2148         if( !(new_page = bt_newpage(bt, bt->frame)) )
2149                 return bt->err;
2150
2151         //      remember fence key for new right page to add
2152         //      as foster child to the left node
2153
2154         key = keyptr(bt->frame, idx);
2155         memcpy (fencekey, key, key->len + 1);
2156
2157         //      update lower keys and foster children to continue in old page
2158
2159         memcpy (bt->frame, page, bt->mgr->page_size);
2160         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2161         nxt = bt->mgr->page_size;
2162         page->dirty = 0;
2163         page->act = 0;
2164         cnt = 0;
2165         idx = 0;
2166
2167         //  assemble page of smaller keys
2168         //      to remain in the old page
2169
2170         while( cnt++ < max / 2 ) {
2171                 key = keyptr(bt->frame, cnt);
2172                 nxt -= key->len + 1;
2173                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
2174                 memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
2175                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2176                         page->act++;
2177                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
2178                 slotptr(page, idx)->off = nxt;
2179         }
2180
2181         //      insert new foster child for right page in queue
2182         //      before any of the current foster children
2183
2184         nxt -= *fencekey + 1;
2185         memcpy ((unsigned char *)page + nxt, fencekey, *fencekey + 1);
2186
2187         bt_putid (slotptr(page,++idx)->id, new_page);
2188         slotptr(page, idx)->tod = tod;
2189         slotptr(page, idx)->off = nxt;
2190         page->foster++;
2191         page->act++;
2192
2193         //  continue with old foster child keys
2194         //      note that none will be dead
2195
2196         cnt = bt->frame->cnt - bt->frame->foster;
2197
2198         while( cnt++ < bt->frame->cnt ) {
2199                 key = keyptr(bt->frame, cnt);
2200                 nxt -= key->len + 1;
2201                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
2202                 memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
2203                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
2204                 slotptr(page, idx)->off = nxt;
2205                 page->act++;
2206         }
2207
2208         page->min = nxt;
2209         page->cnt = idx;
2210
2211         //      link new right page
2212
2213         bt_putid (page->right, new_page);
2214
2215         // if current page is the root page, split it
2216
2217         if( page_no == ROOT_page )
2218                 return bt_splitroot (bt, new_page);
2219
2220         //  release wr lock on our page
2221
2222         bt_unlockpage (BtLockWrite, set);
2223
2224         //  obtain ParentModification lock for current page
2225         //      to fix new fence key and oldest foster child on page
2226
2227         bt_lockpage (BtLockParent, set);
2228
2229         //  get our new fence key to insert in parent node
2230
2231         bt_lockpage (BtLockRead, set);
2232
2233         key = keyptr(page, page->cnt-1);
2234         memcpy (fencekey, key, key->len+1);
2235
2236         bt_unlockpage (BtLockRead, set);
2237
2238         if( bt_insertkey (bt, fencekey + 1, *fencekey, page_no, tod, lvl + 1) )
2239                 return bt->err;
2240
2241         //      lock our page for writing
2242
2243         bt_lockpage (BtLockRead, set);
2244
2245         //      switch old parent key from us to our oldest foster child
2246
2247         key = keyptr(page, page->cnt);
2248         memcpy (fencekey, key, key->len+1);
2249
2250         new_page = bt_getid (slotptr(page, page->cnt)->id);
2251         bt_unlockpage (BtLockRead, set);
2252
2253         if( bt_insertkey (bt, fencekey + 1, *fencekey, new_page, tod, lvl + 1) )
2254                 return bt->err;
2255
2256         //      now that it has its own parent pointer,
2257         //      remove oldest foster child from our page
2258
2259         bt_lockpage (BtLockWrite, set);
2260         memset (slotptr(page, page->cnt), 0, sizeof(BtSlot));
2261         page->dirty = 1;
2262         page->foster--;
2263         page->cnt--;
2264         page->act--;
2265
2266         bt_unlockpage (BtLockParent, set);
2267
2268         //  if this emptied page,
2269         //      undo the foster child
2270
2271         if( !page->act )
2272           if( bt_mergeleft (bt, page, pool, set, page_no, lvl) )
2273                 return bt->err;
2274
2275         //      unlock and unpin
2276
2277         bt_unlockpage (BtLockWrite, set);
2278         bt_unpinlatch (set);
2279         bt_unpinpool (pool);
2280         return 0;
2281 }
2282
2283 //  Insert new key into the btree at leaf level.
2284
2285 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl)
2286 {
2287 uint slot, idx;
2288 BtPage page;
2289 BtKey ptr;
2290
2291         while( 1 ) {
2292                 if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
2293                         ptr = keyptr(bt->page, slot);
2294                 else
2295                 {
2296                         if ( !bt->err )
2297                                 bt->err = BTERR_ovflw;
2298                         return bt->err;
2299                 }
2300
2301                 // if key already exists, update id and return
2302
2303                 page = bt->page;
2304
2305                 if( !keycmp (ptr, key, len) ) {
2306                         if( slotptr(page, slot)->dead )
2307                                 page->act++;
2308                         slotptr(page, slot)->dead = 0;
2309                         slotptr(page, slot)->tod = tod;
2310                         bt_putid(slotptr(page,slot)->id, id);
2311                         bt_unlockpage(BtLockWrite, bt->set);
2312                         bt_unpinlatch (bt->set);
2313                         bt_unpinpool (bt->pool);
2314                         return bt->err;
2315                 }
2316
2317                 // check if page has enough space
2318
2319                 if( slot = bt_cleanpage (bt, bt->page, len, slot) )
2320                         break;
2321
2322                 if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
2323                         return bt->err;
2324         }
2325
2326         bt_addkeytopage (bt, bt->page, slot, key, len, id, tod);
2327
2328         bt_unlockpage (BtLockWrite, bt->set);
2329         bt_unpinlatch (bt->set);
2330         bt_unpinpool (bt->pool);
2331         return 0;
2332 }
2333
2334 //  cache page of keys into cursor and return starting slot for given key
2335
2336 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2337 {
2338 uint slot;
2339
2340         // cache page for retrieval
2341         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
2342                 memcpy (bt->cursor, bt->page, bt->mgr->page_size);
2343
2344         bt->cursor_page = bt->page_no;
2345
2346         bt_unlockpage(BtLockRead, bt->set);
2347         bt_unpinlatch (bt->set);
2348         bt_unpinpool (bt->pool);
2349         return slot;
2350 }
2351
2352 //  return next slot for cursor page
2353 //  or slide cursor right into next page
2354
2355 uint bt_nextkey (BtDb *bt, uint slot)
2356 {
2357 BtLatchSet *set;
2358 BtPool *pool;
2359 BtPage page;
2360 uid right;
2361
2362   do {
2363         right = bt_getid(bt->cursor->right);
2364         while( slot++ < bt->cursor->cnt - bt->cursor->foster )
2365           if( slotptr(bt->cursor,slot)->dead )
2366                 continue;
2367           else if( right || (slot < bt->cursor->cnt - bt->cursor->foster) )
2368                 return slot;
2369           else
2370                 break;
2371
2372         if( !right )
2373                 break;
2374
2375         bt->cursor_page = right;
2376         if( pool = bt_pinpool (bt, right) )
2377                 page = bt_page (bt, pool, right);
2378         else
2379                 return 0;
2380
2381         set = bt_pinlatch (bt, right);
2382     bt_lockpage(BtLockRead, set);
2383
2384         memcpy (bt->cursor, page, bt->mgr->page_size);
2385
2386         bt_unlockpage(BtLockRead, set);
2387         bt_unpinlatch (set);
2388         bt_unpinpool (pool);
2389         slot = 0;
2390   } while( 1 );
2391
2392   return bt->err = 0;
2393 }
2394
2395 BtKey bt_key(BtDb *bt, uint slot)
2396 {
2397         return keyptr(bt->cursor, slot);
2398 }
2399
2400 uid bt_uid(BtDb *bt, uint slot)
2401 {
2402         return bt_getid(slotptr(bt->cursor,slot)->id);
2403 }
2404
2405 uint bt_tod(BtDb *bt, uint slot)
2406 {
2407         return slotptr(bt->cursor,slot)->tod;
2408 }
2409
2410
2411 #ifdef STANDALONE
2412
2413 void bt_latchaudit (BtDb *bt)
2414 {
2415 ushort idx, hashidx;
2416 BtLatchSet *set;
2417 BtPool *pool;
2418 BtPage page;
2419 uid page_no;
2420
2421 #ifdef unix
2422         for( idx = 1; idx < bt->mgr->latchmgr->latchdeployed; idx++ ) {
2423                 set = bt->mgr->latchsets + idx;
2424                 if( *(ushort *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent ) {
2425                         fprintf(stderr, "latchset %d locked for page %6x\n", idx, set->page_no);
2426                         *(ushort *)set->readwr = 0;
2427                         *(ushort *)set->access = 0;
2428                         *(ushort *)set->parent = 0;
2429                 }
2430                 if( set->pin ) {
2431                         fprintf(stderr, "latchset %d pinned\n", idx);
2432                         set->pin = 0;
2433                 }
2434         }
2435
2436         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2437           if( *(uint *)bt->mgr->latchmgr->table[hashidx].latch )
2438                 fprintf(stderr, "latchmgr locked\n");
2439           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2440                 set = bt->mgr->latchsets + idx;
2441                 if( *(uint *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent )
2442                         fprintf(stderr, "latchset %d locked\n", idx);
2443                 if( set->hash != hashidx )
2444                         fprintf(stderr, "latchset %d wrong hashidx\n", idx);
2445                 if( set->pin )
2446                         fprintf(stderr, "latchset %d pinned\n", idx);
2447           } while( idx = set->next );
2448         }
2449         page_no = bt_getid(bt->mgr->latchmgr->alloc[1].right);
2450
2451         while( page_no ) {
2452                 fprintf(stderr, "free: %.6x\n", (uint)page_no);
2453                 pool = bt_pinpool (bt, page_no);
2454                 page = bt_page (bt, pool, page_no);
2455             page_no = bt_getid(page->right);
2456                 bt_unpinpool (pool);
2457         }
2458 #endif
2459 }
2460
2461 typedef struct {
2462         char type, idx;
2463         char *infile;
2464         BtMgr *mgr;
2465         int num;
2466 } ThreadArg;
2467
2468 //  standalone program to index file of keys
2469 //  then list them onto std-out
2470
2471 #ifdef unix
2472 void *index_file (void *arg)
2473 #else
2474 uint __stdcall index_file (void *arg)
2475 #endif
2476 {
2477 int line = 0, found = 0, cnt = 0;
2478 uid next, page_no = LEAF_page;  // start on first page of leaves
2479 unsigned char key[256];
2480 ThreadArg *args = arg;
2481 int ch, len = 0, slot;
2482 BtLatchSet *set;
2483 time_t tod[1];
2484 BtPool *pool;
2485 BtPage page;
2486 BtKey ptr;
2487 BtDb *bt;
2488 FILE *in;
2489
2490         bt = bt_open (args->mgr);
2491         time (tod);
2492
2493         switch(args->type | 0x20)
2494         {
2495         case 'a':
2496                 fprintf(stderr, "started latch mgr audit\n");
2497                 bt_latchaudit (bt);
2498                 fprintf(stderr, "finished latch mgr audit\n");
2499                 break;
2500
2501         case 'w':
2502                 fprintf(stderr, "started indexing for %s\n", args->infile);
2503                 if( in = fopen (args->infile, "rb") )
2504                   while( ch = getc(in), ch != EOF )
2505                         if( ch == '\n' )
2506                         {
2507                           line++;
2508
2509                           if( args->num == 1 )
2510                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2511
2512                           else if( args->num )
2513                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2514
2515                           if( bt_insertkey (bt, key, len, line, *tod, 0) )
2516                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2517                           len = 0;
2518                         }
2519                         else if( len < 255 )
2520                                 key[len++] = ch;
2521                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2522                 break;
2523
2524         case 'd':
2525                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2526                 if( in = fopen (args->infile, "rb") )
2527                   while( ch = getc(in), ch != EOF )
2528                         if( ch == '\n' )
2529                         {
2530                           line++;
2531                           if( args->num == 1 )
2532                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2533
2534                           else if( args->num )
2535                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2536
2537                           if( bt_deletekey (bt, key, len) )
2538                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2539                           len = 0;
2540                         }
2541                         else if( len < 255 )
2542                                 key[len++] = ch;
2543                 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
2544                 break;
2545
2546         case 'f':
2547                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2548                 if( in = fopen (args->infile, "rb") )
2549                   while( ch = getc(in), ch != EOF )
2550                         if( ch == '\n' )
2551                         {
2552                           line++;
2553                           if( args->num == 1 )
2554                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2555
2556                           else if( args->num )
2557                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2558
2559                           if( bt_findkey (bt, key, len) )
2560                                 found++;
2561                           else if( bt->err )
2562                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2563                           len = 0;
2564                         }
2565                         else if( len < 255 )
2566                                 key[len++] = ch;
2567                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2568                 break;
2569
2570         case 's':
2571                 len = key[0] = 0;
2572
2573                 fprintf(stderr, "started reading\n");
2574
2575                 if( slot = bt_startkey (bt, key, len) )
2576                   slot--;
2577                 else
2578                   fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2579
2580                 while( slot = bt_nextkey (bt, slot) ) {
2581                         ptr = bt_key(bt, slot);
2582                         fwrite (ptr->key, ptr->len, 1, stdout);
2583                         fputc ('\n', stdout);
2584                 }
2585
2586                 break;
2587
2588         case 'c':
2589                 fprintf(stderr, "started reading\n");
2590
2591                 do {
2592                         if( pool = bt_pinpool (bt, page_no) )
2593                                 page = bt_page (bt, pool, page_no);
2594                         else
2595                                 break;
2596                         set = bt_pinlatch (bt, page_no);
2597                         bt_lockpage (BtLockRead, set);
2598                         cnt += page->act;
2599                         next = bt_getid (page->right);
2600                         bt_unlockpage (BtLockRead, set);
2601                         bt_unpinlatch (set);
2602                         bt_unpinpool (pool);
2603                 } while( page_no = next );
2604
2605                 cnt--;  // remove stopper key
2606                 fprintf(stderr, " Total keys read %d\n", cnt);
2607                 break;
2608         }
2609
2610         bt_close (bt);
2611 #ifdef unix
2612         return NULL;
2613 #else
2614         return 0;
2615 #endif
2616 }
2617
2618 typedef struct timeval timer;
2619
2620 int main (int argc, char **argv)
2621 {
2622 int idx, cnt, len, slot, err;
2623 int segsize, bits = 16;
2624 #ifdef unix
2625 pthread_t *threads;
2626 timer start, stop;
2627 #else
2628 time_t start[1], stop[1];
2629 HANDLE *threads;
2630 #endif
2631 double real_time;
2632 ThreadArg *args;
2633 uint poolsize = 0;
2634 int num = 0;
2635 char key[1];
2636 BtMgr *mgr;
2637 BtKey ptr;
2638 BtDb *bt;
2639
2640         if( argc < 3 ) {
2641                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2642                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2643                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2644                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2645                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2646                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2647                 exit(0);
2648         }
2649
2650 #ifdef unix
2651         gettimeofday(&start, NULL);
2652 #else
2653         time(start);
2654 #endif
2655
2656         if( argc > 3 )
2657                 bits = atoi(argv[3]);
2658
2659         if( argc > 4 )
2660                 poolsize = atoi(argv[4]);
2661
2662         if( !poolsize )
2663                 fprintf (stderr, "Warning: no mapped_pool\n");
2664
2665         if( poolsize > 65535 )
2666                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2667
2668         if( argc > 5 )
2669                 segsize = atoi(argv[5]);
2670         else
2671                 segsize = 4;    // 16 pages per mmap segment
2672
2673         if( argc > 6 )
2674                 num = atoi(argv[6]);
2675
2676         cnt = argc - 7;
2677 #ifdef unix
2678         threads = malloc (cnt * sizeof(pthread_t));
2679 #else
2680         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2681 #endif
2682         args = malloc (cnt * sizeof(ThreadArg));
2683
2684         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2685
2686         if( !mgr ) {
2687                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2688                 exit (1);
2689         }
2690
2691         //      fire off threads
2692
2693         for( idx = 0; idx < cnt; idx++ ) {
2694                 args[idx].infile = argv[idx + 7];
2695                 args[idx].type = argv[2][0];
2696                 args[idx].mgr = mgr;
2697                 args[idx].num = num;
2698                 args[idx].idx = idx;
2699 #ifdef unix
2700                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2701                         fprintf(stderr, "Error creating thread %d\n", err);
2702 #else
2703                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2704 #endif
2705         }
2706
2707         //      wait for termination
2708
2709 #ifdef unix
2710         for( idx = 0; idx < cnt; idx++ )
2711                 pthread_join (threads[idx], NULL);
2712         gettimeofday(&stop, NULL);
2713         real_time = 1000.0 * ( stop.tv_sec - start.tv_sec ) + 0.001 * (stop.tv_usec - start.tv_usec );
2714 #else
2715         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2716
2717         for( idx = 0; idx < cnt; idx++ )
2718                 CloseHandle(threads[idx]);
2719
2720         time (stop);
2721         real_time = 1000 * (*stop - *start);
2722 #endif
2723         fprintf(stderr, " Time to complete: %.2f seconds\n", real_time/1000);
2724         bt_mgrclose (mgr);
2725 }
2726
2727 #endif  //STANDALONE