]> pd.if.org Git - btree/blob - fosterbtreee.c
rework bt_deletekey
[btree] / fosterbtreee.c
1 // foster btree version e2
2 // 18 JAN 2014
3
4 // author: karl malbrain, malbrain@cal.berkeley.edu
5
6 /*
7 This work, including the source code, documentation
8 and related data, is placed into the public domain.
9
10 The orginal author is Karl Malbrain.
11
12 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
13 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
14 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
15 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
16 RESULTING FROM THE USE, MODIFICATION, OR
17 REDISTRIBUTION OF THIS SOFTWARE.
18 */
19
20 // Please see the project home page for documentation
21 // code.google.com/p/high-concurrency-btree
22
23 #define _FILE_OFFSET_BITS 64
24 #define _LARGEFILE64_SOURCE
25
26 #ifdef linux
27 #define _GNU_SOURCE
28 #endif
29
30 #ifdef unix
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <fcntl.h>
35 #include <sys/time.h>
36 #include <sys/mman.h>
37 #include <errno.h>
38 #include <pthread.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #include <process.h>
47 #include <intrin.h>
48 #endif
49
50 #include <memory.h>
51 #include <string.h>
52
53 typedef unsigned long long      uid;
54
55 #ifndef unix
56 typedef unsigned long long      off64_t;
57 typedef unsigned short          ushort;
58 typedef unsigned int            uint;
59 #endif
60
61 #define BT_ro 0x6f72    // ro
62 #define BT_rw 0x7772    // rw
63
64 #define BT_latchtable   128                                     // number of latch manager slots
65
66 #define BT_maxbits              24                                      // maximum page size in bits
67 #define BT_minbits              9                                       // minimum page size in bits
68 #define BT_minpage              (1 << BT_minbits)       // minimum page size
69 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
70
71 /*
72 There are five lock types for each node in three independent sets: 
73 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
74 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
75 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
76 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
77 5. (set 3) ParentLock: Exclusive. Have parent adopt/delete maximum foster child from the node.
78 */
79
80 typedef enum{
81         BtLockAccess,
82         BtLockDelete,
83         BtLockRead,
84         BtLockWrite,
85         BtLockParent
86 }BtLock;
87
88 //      Define the length of the page and key pointers
89
90 #define BtId 6
91
92 //      Page key slot definition.
93
94 //      If BT_maxbits is 15 or less, you can save 4 bytes
95 //      for each key stored by making the first two uints
96 //      into ushorts.  You can also save 4 bytes by removing
97 //      the tod field from the key.
98
99 //      Keys are marked dead, but remain on the page until
100 //      cleanup is called. The fence key (highest key) for
101 //      the page is always present, even after cleanup.
102
103 typedef struct {
104         uint off:BT_maxbits;            // page offset for key start
105         uint dead:1;                            // set for deleted key
106         uint tod;                                       // time-stamp for key
107         unsigned char id[BtId];         // id associated with key
108 } BtSlot;
109
110 //      The key structure occupies space at the upper end of
111 //      each page.  It's a length byte followed by the value
112 //      bytes.
113
114 typedef struct {
115         unsigned char len;
116         unsigned char key[1];
117 } *BtKey;
118
119 //      The first part of an index page.
120 //      It is immediately followed
121 //      by the BtSlot array of keys.
122
123 typedef struct Page {
124         volatile uint cnt;                      // count of keys in page
125         volatile uint act;                      // count of active keys
126         volatile uint min;                      // next key offset
127         volatile uint foster;           // count of foster children
128         unsigned char bits;                     // page size in bits
129         unsigned char lvl:7;            // level of page
130         unsigned char dirty:1;          // page needs to be cleaned
131         unsigned char right[BtId];      // page number to right
132         BtSlot table[0];                        // the key slots
133 } *BtPage;
134
135 //      mode & definition for hash latch implementation
136
137 enum {
138         Mutex = 1,
139         Write = 2,
140         Pending = 4,
141         Share = 8
142 } LockMode;
143
144 // mutex locks the other fields
145 // exclusive is set for write access
146 // share is count of read accessors
147
148 typedef struct {
149         volatile ushort mutex:1;
150         volatile ushort exclusive:1;
151         volatile ushort pending:1;
152         volatile ushort share:13;
153 } BtSpinLatch;
154
155 //  hash table entries
156
157 typedef struct {
158         BtSpinLatch latch[1];
159         volatile ushort slot;           // Latch table entry at head of chain
160 } BtHashEntry;
161
162 //      latch manager table structure
163
164 typedef struct {
165 #ifdef unix
166         pthread_rwlock_t lock[1];
167 #else
168         SRWLOCK srw[1];
169 #endif
170 } BtLatch;
171
172 typedef struct {
173         BtLatch readwr[1];                      // read/write page lock
174         BtLatch access[1];                      // Access Intent/Page delete
175         BtLatch parent[1];                      // adoption of foster children
176         BtSpinLatch busy[1];            // slot is being moved between chains
177         volatile ushort next;           // next entry in hash table chain
178         volatile ushort prev;           // prev entry in hash table chain
179         volatile ushort pin;            // number of outstanding locks
180         volatile ushort hash;           // hash slot entry is under
181         volatile uid page_no;           // latch set page number
182 } BtLatchSet;
183
184 //      The memory mapping pool table buffer manager entry
185
186 typedef struct {
187         unsigned long long int lru;     // number of times accessed
188         uid  basepage;                          // mapped base page number
189         char *map;                                      // mapped memory pointer
190         ushort pin;                                     // mapped page pin counter
191         ushort slot;                            // slot index in this array
192         void *hashprev;                         // previous pool entry for the same hash idx
193         void *hashnext;                         // next pool entry for the same hash idx
194 #ifndef unix
195         HANDLE hmap;                            // Windows memory mapping handle
196 #endif
197 } BtPool;
198
199 //      structure for latch manager on ALLOC_page
200
201 typedef struct {
202         struct Page alloc[2];           // next & free page_nos in right ptr
203         BtSpinLatch lock[1];            // allocation area lite latch
204         ushort latchdeployed;           // highest number of latch entries deployed
205         ushort nlatchpage;                      // number of latch pages at BT_latch
206         ushort latchtotal;                      // number of page latch entries
207         ushort latchhash;                       // number of latch hash table slots
208         ushort latchvictim;                     // next latch entry to examine
209         BtHashEntry table[0];           // the hash table
210 } BtLatchMgr;
211
212 //      The object structure for Btree access
213
214 typedef struct {
215         uint page_size;                         // page size    
216         uint page_bits;                         // page size in bits    
217         uint seg_bits;                          // seg size in pages in bits
218         uint mode;                                      // read-write mode
219 #ifdef unix
220         int idx;
221         char *pooladvise;                       // bit maps for pool page advisements
222 #else
223         HANDLE idx;
224 #endif
225         ushort poolcnt;                         // highest page pool node in use
226         ushort poolmax;                         // highest page pool node allocated
227         ushort poolmask;                        // total number of pages in mmap segment - 1
228         ushort hashsize;                        // size of Hash Table for pool entries
229         ushort evicted;                         // last evicted hash table slot
230         ushort *hash;                           // hash table of pool entries
231         BtPool *pool;                           // memory pool page segments
232         BtSpinLatch *latch;                     // latches for pool hash slots
233         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
234         BtLatchSet *latchsets;          // mapped latch set from latch pages
235 #ifndef unix
236         HANDLE halloc;                          // allocation and latch table handle
237 #endif
238 } BtMgr;
239
240 typedef struct {
241         BtMgr *mgr;                     // buffer manager for thread
242         BtPage cursor;          // cached frame for start/next (never mapped)
243         BtPage frame;           // spare frame for the page split (never mapped)
244         BtPage zero;            // page frame for zeroes at end of file
245         BtPage page;            // current page
246         uid page_no;            // current page number  
247         uid cursor_page;        // current cursor page number   
248         BtLatchSet *set;        // current page latch set
249         BtPool *pool;           // current page pool
250         unsigned char *mem;     // frame, cursor, page memory buffer
251         int foster;                     // last search was to foster child
252         int found;                      // last delete was found
253         int err;                        // last error
254 } BtDb;
255
256 typedef enum {
257         BTERR_ok = 0,
258         BTERR_struct,
259         BTERR_ovflw,
260         BTERR_lock,
261         BTERR_map,
262         BTERR_wrt,
263         BTERR_hash,
264         BTERR_latch
265 } BTERR;
266
267 // B-Tree functions
268 extern void bt_close (BtDb *bt);
269 extern BtDb *bt_open (BtMgr *mgr);
270 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl);
271 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len);
272 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
273 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
274 extern uint bt_nextkey   (BtDb *bt, uint slot);
275
276 //      manager functions
277 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
278 void bt_mgrclose (BtMgr *mgr);
279
280 //      internal functions
281 BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no);
282 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot);
283 BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl);
284
285 //  Helper functions to return cursor slot values
286
287 extern BtKey bt_key (BtDb *bt, uint slot);
288 extern uid bt_uid (BtDb *bt, uint slot);
289 extern uint bt_tod (BtDb *bt, uint slot);
290
291 //  BTree page number constants
292 #define ALLOC_page              0       // allocation & lock manager hash table
293 #define ROOT_page               1       // root of the btree
294 #define LEAF_page               2       // first page of leaves
295 #define LATCH_page              3       // pages for lock manager
296
297 //      Number of levels to create in a new BTree
298
299 #define MIN_lvl                 2
300
301 //  The page is allocated from low and hi ends.
302 //  The key offsets and row-id's are allocated
303 //  from the bottom, while the text of the key
304 //  is allocated from the top.  When the two
305 //  areas meet, the page is split into two.
306
307 //  A key consists of a length byte, two bytes of
308 //  index number (0 - 65534), and up to 253 bytes
309 //  of key value.  Duplicate keys are discarded.
310 //  Associated with each key is a 48 bit row-id.
311
312 //  The b-tree root is always located at page 1.
313 //      The first leaf page of level zero is always
314 //      located on page 2.
315
316 //      When to root page fills, it is split in two and
317 //      the tree height is raised by a new root at page
318 //      one with two keys.
319
320 //      Deleted keys are marked with a dead bit until
321 //      page cleanup The fence key for a node is always
322 //      present, even after deletion and cleanup.
323
324 //  Groups of pages called segments from the btree are
325 //  cached with memory mapping. A hash table is used to keep
326 //  track of the cached segments.  This behaviour is controlled
327 //  by the cache block size parameter to bt_open.
328
329 //  To achieve maximum concurrency one page is locked at a time
330 //  as the tree is traversed to find leaf key in question.
331
332 //      An adoption traversal leaves the parent node locked as the
333 //      tree is traversed to the level in quesiton.
334
335 //  Page 0 is dedicated to lock for new page extensions,
336 //      and chains empty pages together for reuse.
337
338 //      Empty pages are chained together through the ALLOC page and reused.
339
340 //      Access macros to address slot and key values from the page
341
342 #define slotptr(page, slot) (page->table + slot-1)
343 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
344
345 void bt_putid(unsigned char *dest, uid id)
346 {
347 int i = BtId;
348
349         while( i-- )
350                 dest[i] = (unsigned char)id, id >>= 8;
351 }
352
353 uid bt_getid(unsigned char *src)
354 {
355 uid id = 0;
356 int i;
357
358         for( i = 0; i < BtId; i++ )
359                 id <<= 8, id |= *src++; 
360
361         return id;
362 }
363
364 //      wait until write lock mode is clear
365 //      and add 1 to the share count
366
367 void bt_spinreadlock(BtSpinLatch *latch)
368 {
369 ushort prev;
370
371   do {
372 #ifdef unix
373         while( __sync_fetch_and_or((ushort *)latch, Mutex) & Mutex )
374                 sched_yield();
375 #else
376         while( _InterlockedOr16((ushort *)latch, Mutex) & Mutex )
377                 SwitchToThread();
378 #endif
379
380         //  see if exclusive request is granted or pending
381
382         if( prev = !(latch->exclusive | latch->pending) )
383 #ifdef unix
384                 __sync_fetch_and_add((ushort *)latch, Share);
385 #else
386                 _InterlockedExchangeAdd16 ((ushort *)latch, Share);
387 #endif
388
389 #ifdef unix
390         __sync_fetch_and_and ((ushort *)latch, ~Mutex);
391 #else
392         _InterlockedAnd16((ushort *)latch, ~Mutex);
393 #endif
394         if( prev )
395                 return;
396 #ifdef  unix
397   } while( sched_yield(), 1 );
398 #else
399   } while( SwitchToThread(), 1 );
400 #endif
401 }
402
403 //      wait for other read and write latches to relinquish
404
405 void bt_spinwritelock(BtSpinLatch *latch)
406 {
407   do {
408 #ifdef unix
409         while( __sync_fetch_and_or((ushort *)latch, Mutex | Pending) & Mutex )
410                 sched_yield();
411 #else
412         while( _InterlockedOr16((ushort *)latch, Mutex | Pending) & Mutex )
413                 SwitchToThread();
414 #endif
415         if( !(latch->share | latch->exclusive) ) {
416 #ifdef unix
417                 __sync_fetch_and_or((ushort *)latch, Write);
418                 __sync_fetch_and_and ((ushort *)latch, ~(Mutex | Pending));
419 #else
420                 _InterlockedOr16((ushort *)latch, Write);
421                 _InterlockedAnd16((ushort *)latch, ~(Mutex | Pending));
422 #endif
423                 return;
424         }
425
426 #ifdef unix
427         __sync_fetch_and_and ((ushort *)latch, ~Mutex);
428 #else
429         _InterlockedAnd16((ushort *)latch, ~Mutex);
430 #endif
431 #ifdef  unix
432         sched_yield();
433 #else
434         SwitchToThread();
435 #endif
436   } while( 1 );
437 }
438
439 //      try to obtain write lock
440
441 //      return 1 if obtained,
442 //              0 otherwise
443
444 int bt_spinwritetry(BtSpinLatch *latch)
445 {
446 ushort prev;
447
448 #ifdef unix
449         if( prev = __sync_fetch_and_or((ushort *)latch, Mutex), prev & Mutex )
450                 return 0;
451 #else
452         if( prev = _InterlockedOr16((ushort *)latch, Mutex), prev & Mutex )
453                 return 0;
454 #endif
455         //      take write access if all bits are clear
456
457         if( !prev )
458 #ifdef unix
459                 __sync_fetch_and_or ((ushort *)latch, Write);
460 #else
461                 _InterlockedOr16((ushort *)latch, Write);
462 #endif
463
464 #ifdef unix
465         __sync_fetch_and_and ((ushort *)latch, ~Mutex);
466 #else
467         _InterlockedAnd16((ushort *)latch, ~Mutex);
468 #endif
469         return !prev;
470 }
471
472 //      clear write mode
473
474 void bt_spinreleasewrite(BtSpinLatch *latch)
475 {
476 #ifdef unix
477         __sync_fetch_and_and ((ushort *)latch, ~Write);
478 #else
479         _InterlockedAnd16((ushort *)latch, ~Write);
480 #endif
481 }
482
483 //      decrement reader count
484
485 void bt_spinreleaseread(BtSpinLatch *latch)
486 {
487 #ifdef unix
488         __sync_fetch_and_add((ushort *)latch, -Share);
489 #else
490         _InterlockedExchangeAdd16 ((ushort *)latch, -Share);
491 #endif
492 }
493
494 void bt_initlockset (BtLatchSet *set, int reuse)
495 {
496 #ifdef unix
497 pthread_rwlockattr_t rwattr[1];
498
499         if( reuse ) {
500                 pthread_rwlock_destroy (set->readwr->lock);
501                 pthread_rwlock_destroy (set->access->lock);
502                 pthread_rwlock_destroy (set->parent->lock);
503         }
504
505         pthread_rwlockattr_init (rwattr);
506         pthread_rwlockattr_setkind_np (rwattr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
507         pthread_rwlockattr_setpshared (rwattr, PTHREAD_PROCESS_SHARED);
508
509         pthread_rwlock_init (set->readwr->lock, rwattr);
510         pthread_rwlock_init (set->access->lock, rwattr);
511         pthread_rwlock_init (set->parent->lock, rwattr);
512         pthread_rwlockattr_destroy (rwattr);
513 #else
514         InitializeSRWLock (set->readwr->srw);
515         InitializeSRWLock (set->access->srw);
516         InitializeSRWLock (set->parent->srw);
517 #endif
518 }
519
520 //      link latch table entry into latch hash table
521
522 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
523 {
524 BtLatchSet *set = bt->mgr->latchsets + victim;
525
526         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
527                 bt->mgr->latchsets[set->next].prev = victim;
528
529         bt->mgr->latchmgr->table[hashidx].slot = victim;
530         set->page_no = page_no;
531         set->hash = hashidx;
532         set->prev = 0;
533 }
534
535 void bt_unpinlatch (BtLatchSet *set)
536 {
537 #ifdef unix
538         __sync_fetch_and_add(&set->pin, -1);
539 #else
540         _InterlockedDecrement16 (&set->pin);
541 #endif
542 }
543
544 //      find existing latchset or inspire new one
545 //      return with latchset pinned
546
547 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
548 {
549 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
550 ushort slot, avail = 0, victim, idx;
551 BtLatchSet *set;
552
553         //  obtain read lock on hash table entry
554
555         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
556
557         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
558         {
559                 set = bt->mgr->latchsets + slot;
560                 if( page_no == set->page_no )
561                         break;
562         } while( slot = set->next );
563
564         if( slot ) {
565 #ifdef unix
566                 __sync_fetch_and_add(&set->pin, 1);
567 #else
568                 _InterlockedIncrement16 (&set->pin);
569 #endif
570         }
571
572     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
573
574         if( slot )
575                 return set;
576
577   //  try again, this time with write lock
578
579   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
580
581   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
582   {
583         set = bt->mgr->latchsets + slot;
584         if( page_no == set->page_no )
585                 break;
586         if( !set->pin && !avail )
587                 avail = slot;
588   } while( slot = set->next );
589
590   //  found our entry, or take over an unpinned one
591
592   if( slot || (slot = avail) ) {
593         set = bt->mgr->latchsets + slot;
594 #ifdef unix
595         __sync_fetch_and_add(&set->pin, 1);
596 #else
597         _InterlockedIncrement16 (&set->pin);
598 #endif
599         set->page_no = page_no;
600         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
601         return set;
602   }
603
604         //  see if there are any unused entries
605 #ifdef unix
606         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
607 #else
608         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
609 #endif
610
611         if( victim < bt->mgr->latchmgr->latchtotal ) {
612                 set = bt->mgr->latchsets + victim;
613 #ifdef unix
614                 __sync_fetch_and_add(&set->pin, 1);
615 #else
616                 _InterlockedIncrement16 (&set->pin);
617 #endif
618                 bt_initlockset (set, 0);
619                 bt_latchlink (bt, hashidx, victim, page_no);
620                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
621                 return set;
622         }
623
624 #ifdef unix
625         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
626 #else
627         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
628 #endif
629   //  find and reuse previous lock entry
630
631   while( 1 ) {
632 #ifdef unix
633         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
634 #else
635         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
636 #endif
637         //      we don't use slot zero
638
639         if( victim %= bt->mgr->latchmgr->latchtotal )
640                 set = bt->mgr->latchsets + victim;
641         else
642                 continue;
643
644         //      take control of our slot
645         //      from other threads
646
647         if( set->pin || !bt_spinwritetry (set->busy) )
648                 continue;
649
650         idx = set->hash;
651
652         // try to get write lock on hash chain
653         //      skip entry if not obtained
654         //      or has outstanding locks
655
656         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
657                 bt_spinreleasewrite (set->busy);
658                 continue;
659         }
660
661         if( set->pin ) {
662                 bt_spinreleasewrite (set->busy);
663                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
664                 continue;
665         }
666
667         //  unlink our available victim from its hash chain
668
669         if( set->prev )
670                 bt->mgr->latchsets[set->prev].next = set->next;
671         else
672                 bt->mgr->latchmgr->table[idx].slot = set->next;
673
674         if( set->next )
675                 bt->mgr->latchsets[set->next].prev = set->prev;
676
677         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
678 #ifdef unix
679         __sync_fetch_and_add(&set->pin, 1);
680 #else
681         _InterlockedIncrement16 (&set->pin);
682 #endif
683         bt_initlockset (set, 1);
684         bt_latchlink (bt, hashidx, victim, page_no);
685         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
686         bt_spinreleasewrite (set->busy);
687         return set;
688   }
689 }
690
691 void bt_mgrclose (BtMgr *mgr)
692 {
693 BtPool *pool;
694 uint slot;
695
696         // release mapped pages
697         //      note that slot zero is never used
698
699         for( slot = 1; slot < mgr->poolmax; slot++ ) {
700                 pool = mgr->pool + slot;
701                 if( pool->slot )
702 #ifdef unix
703                         munmap (pool->map, (mgr->poolmask+1) << mgr->page_bits);
704 #else
705                 {
706                         FlushViewOfFile(pool->map, 0);
707                         UnmapViewOfFile(pool->map);
708                         CloseHandle(pool->hmap);
709                 }
710 #endif
711         }
712
713 #ifdef unix
714         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
715         munmap (mgr->latchmgr, mgr->page_size);
716 #else
717         FlushViewOfFile(mgr->latchmgr, 0);
718         UnmapViewOfFile(mgr->latchmgr);
719         CloseHandle(mgr->halloc);
720 #endif
721 #ifdef unix
722         close (mgr->idx);
723         free (mgr->pool);
724         free (mgr->hash);
725         free (mgr->latch);
726         free (mgr->pooladvise);
727         free (mgr);
728 #else
729         FlushFileBuffers(mgr->idx);
730         CloseHandle(mgr->idx);
731         GlobalFree (mgr->pool);
732         GlobalFree (mgr->hash);
733         GlobalFree (mgr->latch);
734         GlobalFree (mgr);
735 #endif
736 }
737
738 //      close and release memory
739
740 void bt_close (BtDb *bt)
741 {
742 #ifdef unix
743         if ( bt->mem )
744                 free (bt->mem);
745 #else
746         if ( bt->mem)
747                 VirtualFree (bt->mem, 0, MEM_RELEASE);
748 #endif
749         free (bt);
750 }
751
752 //  open/create new btree buffer manager
753
754 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
755 //              size of mapped page pool (e.g. 8192)
756
757 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
758 {
759 uint lvl, attr, cacheblk, last, slot, idx;
760 uint nlatchpage, latchhash;
761 BtLatchMgr *latchmgr;
762 off64_t size;
763 uint amt[1];
764 BtMgr* mgr;
765 BtKey key;
766 int flag;
767
768 #ifndef unix
769 SYSTEM_INFO sysinfo[1];
770 #endif
771
772         // determine sanity of page size and buffer pool
773
774         if( bits > BT_maxbits )
775                 bits = BT_maxbits;
776         else if( bits < BT_minbits )
777                 bits = BT_minbits;
778
779         if( !poolmax )
780                 return NULL;    // must have buffer pool
781
782 #ifdef unix
783         mgr = calloc (1, sizeof(BtMgr));
784
785         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
786
787         if( mgr->idx == -1 )
788                 return free(mgr), NULL;
789         
790         cacheblk = 4096;        // minimum mmap segment size for unix
791
792 #else
793         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
794         attr = FILE_ATTRIBUTE_NORMAL;
795         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
796
797         if( mgr->idx == INVALID_HANDLE_VALUE )
798                 return GlobalFree(mgr), NULL;
799
800         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
801         GetSystemInfo(sysinfo);
802         cacheblk = sysinfo->dwAllocationGranularity;
803 #endif
804
805 #ifdef unix
806         latchmgr = malloc (BT_maxpage);
807         *amt = 0;
808
809         // read minimum page size to get root info
810
811         if( size = lseek (mgr->idx, 0L, 2) ) {
812                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
813                         bits = latchmgr->alloc->bits;
814                 else
815                         return free(mgr), free(latchmgr), NULL;
816         } else if( mode == BT_ro )
817                 return free(latchmgr), free (mgr), NULL;
818 #else
819         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
820         size = GetFileSize(mgr->idx, amt);
821
822         if( size || *amt ) {
823                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
824                         return bt_mgrclose (mgr), NULL;
825                 bits = latchmgr->alloc->bits;
826         } else if( mode == BT_ro )
827                 return bt_mgrclose (mgr), NULL;
828 #endif
829
830         mgr->page_size = 1 << bits;
831         mgr->page_bits = bits;
832
833         mgr->poolmax = poolmax;
834         mgr->mode = mode;
835
836         if( cacheblk < mgr->page_size )
837                 cacheblk = mgr->page_size;
838
839         //  mask for partial memmaps
840
841         mgr->poolmask = (cacheblk >> bits) - 1;
842
843         //      see if requested size of pages per memmap is greater
844
845         if( (1 << segsize) > mgr->poolmask )
846                 mgr->poolmask = (1 << segsize) - 1;
847
848         mgr->seg_bits = 0;
849
850         while( (1 << mgr->seg_bits) <= mgr->poolmask )
851                 mgr->seg_bits++;
852
853         mgr->hashsize = hashsize;
854
855 #ifdef unix
856         mgr->pool = calloc (poolmax, sizeof(BtPool));
857         mgr->hash = calloc (hashsize, sizeof(ushort));
858         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
859         mgr->pooladvise = calloc (poolmax, (mgr->poolmask + 8) / 8);
860 #else
861         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
862         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
863         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
864 #endif
865
866         if( size || *amt )
867                 goto mgrlatch;
868
869         // initialize an empty b-tree with latch page, root page, page of leaves
870         // and page(s) of latches
871
872         memset (latchmgr, 0, 1 << bits);
873         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
874         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
875         latchmgr->alloc->bits = mgr->page_bits;
876
877         latchmgr->nlatchpage = nlatchpage;
878         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
879
880         //  initialize latch manager
881
882         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
883
884         //      size of hash table = total number of latchsets
885
886         if( latchhash > latchmgr->latchtotal )
887                 latchhash = latchmgr->latchtotal;
888
889         latchmgr->latchhash = latchhash;
890
891 #ifdef unix
892         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
893                 return free(latchmgr), bt_mgrclose (mgr), NULL;
894 #else
895         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
896                 return bt_mgrclose (mgr), NULL;
897
898         if( *amt < mgr->page_size )
899                 return bt_mgrclose (mgr), NULL;
900 #endif
901
902         memset (latchmgr, 0, 1 << bits);
903         latchmgr->alloc->bits = mgr->page_bits;
904
905         for( lvl=MIN_lvl; lvl--; ) {
906                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3;
907                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
908                 key = keyptr(latchmgr->alloc, 1);
909                 key->len = 2;                   // create stopper key
910                 key->key[0] = 0xff;
911                 key->key[1] = 0xff;
912                 latchmgr->alloc->min = mgr->page_size - 3;
913                 latchmgr->alloc->lvl = lvl;
914                 latchmgr->alloc->cnt = 1;
915                 latchmgr->alloc->act = 1;
916 #ifdef unix
917                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
918                         return bt_mgrclose (mgr), NULL;
919 #else
920                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
921                         return bt_mgrclose (mgr), NULL;
922
923                 if( *amt < mgr->page_size )
924                         return bt_mgrclose (mgr), NULL;
925 #endif
926         }
927
928         // clear out latch manager locks
929         //      and rest of pages to round out segment
930
931         memset(latchmgr, 0, mgr->page_size);
932         last = MIN_lvl + 1;
933
934         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
935 #ifdef unix
936                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
937 #else
938                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
939                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
940                         return bt_mgrclose (mgr), NULL;
941                 if( *amt < mgr->page_size )
942                         return bt_mgrclose (mgr), NULL;
943 #endif
944                 last++;
945         }
946
947 mgrlatch:
948 #ifdef unix
949         flag = PROT_READ | PROT_WRITE;
950         mgr->latchmgr = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
951         if( mgr->latchmgr == MAP_FAILED )
952                 return bt_mgrclose (mgr), NULL;
953         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
954         if( mgr->latchsets == MAP_FAILED )
955                 return bt_mgrclose (mgr), NULL;
956 #else
957         flag = PAGE_READWRITE;
958         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
959         if( !mgr->halloc )
960                 return bt_mgrclose (mgr), NULL;
961
962         flag = FILE_MAP_WRITE;
963         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
964         if( !mgr->latchmgr )
965                 return GetLastError(), bt_mgrclose (mgr), NULL;
966
967         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
968 #endif
969
970 #ifdef unix
971         free (latchmgr);
972 #else
973         VirtualFree (latchmgr, 0, MEM_RELEASE);
974 #endif
975         return mgr;
976 }
977
978 //      open BTree access method
979 //      based on buffer manager
980
981 BtDb *bt_open (BtMgr *mgr)
982 {
983 BtDb *bt = malloc (sizeof(*bt));
984
985         memset (bt, 0, sizeof(*bt));
986         bt->mgr = mgr;
987 #ifdef unix
988         bt->mem = malloc (3 *mgr->page_size);
989 #else
990         bt->mem = VirtualAlloc(NULL, 3 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
991 #endif
992         bt->frame = (BtPage)bt->mem;
993         bt->zero = (BtPage)(bt->mem + 1 * mgr->page_size);
994         bt->cursor = (BtPage)(bt->mem + 2 * mgr->page_size);
995
996         memset(bt->zero, 0, mgr->page_size);
997         return bt;
998 }
999
1000 //  compare two keys, returning > 0, = 0, or < 0
1001 //  as the comparison value
1002
1003 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1004 {
1005 uint len1 = key1->len;
1006 int ans;
1007
1008         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1009                 return ans;
1010
1011         if( len1 > len2 )
1012                 return 1;
1013         if( len1 < len2 )
1014                 return -1;
1015
1016         return 0;
1017 }
1018
1019 //      Latch Manager
1020
1021 void bt_readlock(BtLatch *latch)
1022 {
1023 #ifdef unix
1024         pthread_rwlock_rdlock (latch->lock);
1025 #else
1026         AcquireSRWLockShared (latch->srw);
1027 #endif
1028 }
1029
1030 //      wait for other read and write latches to relinquish
1031
1032 void bt_writelock(BtLatch *latch)
1033 {
1034 #ifdef unix
1035         pthread_rwlock_wrlock (latch->lock);
1036 #else
1037         AcquireSRWLockExclusive (latch->srw);
1038 #endif
1039 }
1040
1041 //      try to obtain write lock
1042
1043 //      return 1 if obtained,
1044 //              0 if already write or read locked
1045
1046 int bt_writetry(BtLatch *latch)
1047 {
1048 int result = 0;
1049
1050 #ifdef unix
1051         result = !pthread_rwlock_trywrlock (latch->lock);
1052 #else
1053         result = TryAcquireSRWLockExclusive (latch->srw);
1054 #endif
1055         return result;
1056 }
1057
1058 //      clear write mode
1059
1060 void bt_releasewrite(BtLatch *latch)
1061 {
1062 #ifdef unix
1063         pthread_rwlock_unlock (latch->lock);
1064 #else
1065         ReleaseSRWLockExclusive (latch->srw);
1066 #endif
1067 }
1068
1069 //      decrement reader count
1070
1071 void bt_releaseread(BtLatch *latch)
1072 {
1073 #ifdef unix
1074         pthread_rwlock_unlock (latch->lock);
1075 #else
1076         ReleaseSRWLockShared (latch->srw);
1077 #endif
1078 }
1079
1080 //      Buffer Pool mgr
1081
1082 // find segment in pool
1083 // must be called with hashslot idx locked
1084 //      return NULL if not there
1085 //      otherwise return node
1086
1087 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1088 {
1089 BtPool *pool;
1090 uint slot;
1091
1092         // compute start of hash chain in pool
1093
1094         if( slot = bt->mgr->hash[idx] ) 
1095                 pool = bt->mgr->pool + slot;
1096         else
1097                 return NULL;
1098
1099         page_no &= ~bt->mgr->poolmask;
1100
1101         while( pool->basepage != page_no )
1102           if( pool = pool->hashnext )
1103                 continue;
1104           else
1105                 return NULL;
1106
1107         return pool;
1108 }
1109
1110 // add segment to hash table
1111
1112 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1113 {
1114 BtPool *node;
1115 uint slot;
1116
1117         pool->hashprev = pool->hashnext = NULL;
1118         pool->basepage = page_no & ~bt->mgr->poolmask;
1119         pool->lru = 1;
1120
1121         if( slot = bt->mgr->hash[idx] ) {
1122                 node = bt->mgr->pool + slot;
1123                 pool->hashnext = node;
1124                 node->hashprev = pool;
1125         }
1126
1127         bt->mgr->hash[idx] = pool->slot;
1128 }
1129
1130 //      find best segment to evict from buffer pool
1131
1132 BtPool *bt_findlru (BtDb *bt, uint hashslot)
1133 {
1134 unsigned long long int target = ~0LL;
1135 BtPool *pool = NULL, *node;
1136
1137         if( !hashslot )
1138                 return NULL;
1139
1140         node = bt->mgr->pool + hashslot;
1141
1142         //      scan pool entries under hash table slot
1143
1144         do {
1145           if( node->pin )
1146                 continue;
1147           if( node->lru > target )
1148                 continue;
1149           target = node->lru;
1150           pool = node;
1151         } while( node = node->hashnext );
1152
1153         return pool;
1154 }
1155
1156 //  map new buffer pool segment to virtual memory
1157
1158 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1159 {
1160 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1161 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1162 int flag;
1163
1164 #ifdef unix
1165         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1166         pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1167         if( pool->map == MAP_FAILED )
1168                 return bt->err = BTERR_map;
1169         // clear out madvise issued bits
1170         memset (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8) / 8), 0, (bt->mgr->poolmask + 8)/8);
1171 #else
1172         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1173         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1174         if( !pool->hmap )
1175                 return bt->err = BTERR_map;
1176
1177         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1178         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1179         if( !pool->map )
1180                 return bt->err = BTERR_map;
1181 #endif
1182         return bt->err = 0;
1183 }
1184
1185 //      calculate page within pool
1186
1187 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1188 {
1189 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1190 BtPage page;
1191
1192         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1193 #ifdef unix
1194         {
1195         uint idx = subpage / 8;
1196         uint bit = subpage % 8;
1197
1198                 if( ~((bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] >> bit) & 1 ) {
1199                   madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1200                   (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] |= 1 << bit;
1201                 }
1202         }
1203 #endif
1204         return page;
1205 }
1206
1207 //  release pool pin
1208
1209 void bt_unpinpool (BtPool *pool)
1210 {
1211 #ifdef unix
1212         __sync_fetch_and_add(&pool->pin, -1);
1213 #else
1214         _InterlockedDecrement16 (&pool->pin);
1215 #endif
1216 }
1217
1218 //      find or place requested page in segment-pool
1219 //      return pool table entry, incrementing pin
1220
1221 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1222 {
1223 BtPool *pool, *node, *next;
1224 uint slot, idx, victim;
1225 BtLatchSet *set;
1226
1227         //      lock hash table chain
1228
1229         idx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1230         bt_spinreadlock (&bt->mgr->latch[idx]);
1231
1232         //      look up in hash table
1233
1234         if( pool = bt_findpool(bt, page_no, idx) ) {
1235 #ifdef unix
1236                 __sync_fetch_and_add(&pool->pin, 1);
1237 #else
1238                 _InterlockedIncrement16 (&pool->pin);
1239 #endif
1240                 bt_spinreleaseread (&bt->mgr->latch[idx]);
1241                 pool->lru++;
1242                 return pool;
1243         }
1244
1245         //      upgrade to write lock
1246
1247         bt_spinreleaseread (&bt->mgr->latch[idx]);
1248         bt_spinwritelock (&bt->mgr->latch[idx]);
1249
1250         // try to find page in pool with write lock
1251
1252         if( pool = bt_findpool(bt, page_no, idx) ) {
1253 #ifdef unix
1254                 __sync_fetch_and_add(&pool->pin, 1);
1255 #else
1256                 _InterlockedIncrement16 (&pool->pin);
1257 #endif
1258                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1259                 pool->lru++;
1260                 return pool;
1261         }
1262
1263         // allocate a new pool node
1264         // and add to hash table
1265
1266 #ifdef unix
1267         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1268 #else
1269         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1270 #endif
1271
1272         if( ++slot < bt->mgr->poolmax ) {
1273                 pool = bt->mgr->pool + slot;
1274                 pool->slot = slot;
1275
1276                 if( bt_mapsegment(bt, pool, page_no) )
1277                         return NULL;
1278
1279                 bt_linkhash(bt, pool, page_no, idx);
1280 #ifdef unix
1281                 __sync_fetch_and_add(&pool->pin, 1);
1282 #else
1283                 _InterlockedIncrement16 (&pool->pin);
1284 #endif
1285                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1286                 return pool;
1287         }
1288
1289         // pool table is full
1290         //      find best pool entry to evict
1291
1292 #ifdef unix
1293         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1294 #else
1295         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1296 #endif
1297
1298         while( 1 ) {
1299 #ifdef unix
1300                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1301 #else
1302                 victim = _InterlockedIncrement16 (&bt->mgr->evicted) - 1;
1303 #endif
1304                 victim %= bt->mgr->hashsize;
1305
1306                 // try to get write lock
1307                 //      skip entry if not obtained
1308
1309                 if( !bt_spinwritetry (&bt->mgr->latch[victim]) )
1310                         continue;
1311
1312                 //  if cache entry is empty
1313                 //      or no slots are unpinned
1314                 //      skip this entry
1315
1316                 if( !(pool = bt_findlru(bt, bt->mgr->hash[victim])) ) {
1317                         bt_spinreleasewrite (&bt->mgr->latch[victim]);
1318                         continue;
1319                 }
1320
1321                 // unlink victim pool node from hash table
1322
1323                 if( node = pool->hashprev )
1324                         node->hashnext = pool->hashnext;
1325                 else if( node = pool->hashnext )
1326                         bt->mgr->hash[victim] = node->slot;
1327                 else
1328                         bt->mgr->hash[victim] = 0;
1329
1330                 if( node = pool->hashnext )
1331                         node->hashprev = pool->hashprev;
1332
1333                 bt_spinreleasewrite (&bt->mgr->latch[victim]);
1334
1335                 //      remove old file mapping
1336 #ifdef unix
1337                 munmap (pool->map, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1338 #else
1339                 FlushViewOfFile(pool->map, 0);
1340                 UnmapViewOfFile(pool->map);
1341                 CloseHandle(pool->hmap);
1342 #endif
1343                 pool->map = NULL;
1344
1345                 //  create new pool mapping
1346                 //  and link into hash table
1347
1348                 if( bt_mapsegment(bt, pool, page_no) )
1349                         return NULL;
1350
1351                 bt_linkhash(bt, pool, page_no, idx);
1352 #ifdef unix
1353                 __sync_fetch_and_add(&pool->pin, 1);
1354 #else
1355                 _InterlockedIncrement16 (&pool->pin);
1356 #endif
1357                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1358                 return pool;
1359         }
1360 }
1361
1362 // place write, read, or parent lock on requested page_no.
1363 //      pin to buffer pool and return latchset pointer
1364
1365 void bt_lockpage(BtLock mode, BtLatchSet *set)
1366 {
1367         switch( mode ) {
1368         case BtLockRead:
1369                 bt_readlock (set->readwr);
1370                 break;
1371         case BtLockWrite:
1372                 bt_writelock (set->readwr);
1373                 break;
1374         case BtLockAccess:
1375                 bt_readlock (set->access);
1376                 break;
1377         case BtLockDelete:
1378                 bt_writelock (set->access);
1379                 break;
1380         case BtLockParent:
1381                 bt_writelock (set->parent);
1382                 break;
1383         }
1384 }
1385
1386 // remove write, read, or parent lock on requested page_no.
1387
1388 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1389 {
1390         switch( mode ) {
1391         case BtLockRead:
1392                 bt_releaseread (set->readwr);
1393                 break;
1394         case BtLockWrite:
1395                 bt_releasewrite (set->readwr);
1396                 break;
1397         case BtLockAccess:
1398                 bt_releaseread (set->access);
1399                 break;
1400         case BtLockDelete:
1401                 bt_releasewrite (set->access);
1402                 break;
1403         case BtLockParent:
1404                 bt_releasewrite (set->parent);
1405                 break;
1406         }
1407 }
1408
1409 //      allocate a new page and write page into it
1410
1411 uid bt_newpage(BtDb *bt, BtPage page)
1412 {
1413 BtLatchSet *set;
1414 BtPool *pool;
1415 uid new_page;
1416 BtPage pmap;
1417 int reuse;
1418
1419         //      lock allocation page
1420
1421         bt_spinwritelock(bt->mgr->latchmgr->lock);
1422
1423         // use empty chain first
1424         // else allocate empty page
1425
1426         if( new_page = bt_getid(bt->mgr->latchmgr->alloc[1].right) ) {
1427                 if( pool = bt_pinpool (bt, new_page) )
1428                         pmap = bt_page (bt, pool, new_page);
1429                 else
1430                         return 0;
1431                 bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(pmap->right));
1432                 bt_unpinpool (pool);
1433                 reuse = 1;
1434         } else {
1435                 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1436                 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1437                 reuse = 0;
1438         }
1439 #ifdef unix
1440         // if writing first page of pool block, zero last page in the block
1441
1442         if ( !reuse && bt->mgr->poolmask > 0 && (new_page & bt->mgr->poolmask) == 0 )
1443         {
1444                 // use zero buffer to write zeros
1445                 if ( pwrite(bt->mgr->idx,bt->zero, bt->mgr->page_size, (new_page | bt->mgr->poolmask) << bt->mgr->page_bits) < bt->mgr->page_size )
1446                         return bt->err = BTERR_wrt, 0;
1447         }
1448
1449         // unlock allocation latch
1450
1451         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1452
1453         if ( pwrite(bt->mgr->idx, page, bt->mgr->page_size, new_page << bt->mgr->page_bits) < bt->mgr->page_size )
1454                 return bt->err = BTERR_wrt, 0;
1455
1456 #else
1457         // unlock allocation latch
1458
1459         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1460
1461         //      bring new page into pool and copy page.
1462         //      this will extend the file into the new pages.
1463         //      NB -- no latch required
1464
1465         if( pool = bt_pinpool (bt, new_page) )
1466                 pmap = bt_page (bt, pool, new_page);
1467         else
1468                 return 0;
1469
1470         memcpy(pmap, page, bt->mgr->page_size);
1471         bt_unpinpool (pool);
1472 #endif
1473         return new_page;
1474 }
1475
1476 //  find slot in page for given key at a given level
1477
1478 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1479 {
1480 uint diff, higher = bt->page->cnt, low = 1, slot;
1481
1482         //      low is the lowest candidate, higher is already
1483         //      tested as .ge. the given key, loop ends when they meet
1484
1485         while( diff = higher - low ) {
1486                 slot = low + ( diff >> 1 );
1487                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1488                         low = slot + 1;
1489                 else
1490                         higher = slot;
1491         }
1492
1493         return higher;
1494 }
1495
1496 //  find and load page at given level for given key
1497 //      leave page rd or wr locked as requested
1498
1499 int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, BtLock lock)
1500 {
1501 uid page_no = ROOT_page, prevpage = 0;
1502 BtLatchSet *set, *prevset;
1503 uint drill = 0xff, slot;
1504 uint mode, prevmode;
1505 BtPool *prevpool;
1506 int foster = 0;
1507
1508   //  start at root of btree and drill down
1509
1510   do {
1511         // determine lock mode of drill level
1512         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1513
1514         //      obtain latch set for this page
1515
1516         bt->set = bt_pinlatch (bt, page_no);
1517         bt->page_no = page_no;
1518
1519         // pin page contents
1520
1521         if( bt->pool = bt_pinpool (bt, page_no) )
1522                 bt->page = bt_page (bt, bt->pool, page_no);
1523         else
1524                 return 0;
1525
1526         // obtain access lock using lock chaining with Access mode
1527
1528         if( page_no > ROOT_page )
1529           bt_lockpage(BtLockAccess, bt->set);
1530
1531         //  now unlock and unpin our (possibly foster) parent
1532
1533         if( prevpage ) {
1534           bt_unlockpage(prevmode, prevset);
1535           bt_unpinlatch (prevset);
1536           bt_unpinpool (prevpool);
1537           prevpage = 0;
1538         }
1539
1540         // obtain read lock using lock chaining
1541
1542         bt_lockpage(mode, bt->set);
1543
1544         if( page_no > ROOT_page )
1545           bt_unlockpage(BtLockAccess, bt->set);
1546
1547         // re-read and re-lock root after determining actual level of root
1548
1549         if( page_no == ROOT_page )
1550           if( bt->page->lvl != drill) {
1551                 drill = bt->page->lvl;
1552
1553             if( lock == BtLockWrite && drill == lvl ) {
1554                   bt_unlockpage(mode, bt->set);
1555                   bt_unpinlatch (bt->set);
1556                   bt_unpinpool (bt->pool);
1557                   continue;
1558                 }
1559           }
1560
1561         prevpage = bt->page_no;
1562         prevpool = bt->pool;
1563         prevset = bt->set;
1564         prevmode = mode;
1565
1566         //      were we supposed to find a foster child?
1567         //      if so, slide right onto it
1568
1569         if( keycmp (keyptr(bt->page,bt->page->cnt), key, len) < 0 ) {
1570                 page_no = bt_getid(bt->page->right);
1571                 continue;
1572         }
1573
1574         //  find key on page at this level
1575         //  and either descend to requested level
1576         //      or return key slot
1577
1578         slot = bt_findslot (bt, key, len);
1579
1580         //      is this slot < foster child area
1581         //      on the requested level?
1582
1583         //      if so, return actual slot even if dead
1584
1585         if( slot <= bt->page->cnt - bt->page->foster )
1586           if( drill == lvl )
1587                 return bt->foster = foster, slot;
1588
1589         //      find next active slot
1590
1591         //      note: foster children are never dead
1592
1593         while( slotptr(bt->page, slot)->dead )
1594           if( slot++ < bt->page->cnt )
1595                 continue;
1596           else {
1597                 //  we are waiting for fence key posting
1598                 page_no = bt_getid(bt->page->right);
1599                 continue;
1600           }
1601
1602         //      is this slot < foster child area
1603         //      if so, drill to next level
1604
1605         if( slot <= bt->page->cnt - bt->page->foster )
1606                 foster = 0, drill--;
1607         else
1608                 foster = 1;
1609
1610         //  continue right onto foster child
1611         //      or down to next level.
1612
1613         page_no = bt_getid(slotptr(bt->page, slot)->id);
1614
1615   } while( page_no );
1616
1617   // return error on end of chain
1618
1619   bt->err = BTERR_struct;
1620   return 0;     // return error
1621 }
1622
1623 //  remove empty page from the B-tree
1624 //      by pulling our right node left over ourselves
1625
1626 //  call with bt->page, etc, set to page's locked parent
1627 //      returns with page locked.
1628
1629 BTERR bt_mergeright (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl, uint slot)
1630 {
1631 BtLatchSet *rset, *pset, *rpset;
1632 BtPool *rpool, *ppool, *rppool;
1633 BtPage rpage, ppage, rppage;
1634 uid right, parent, rparent;
1635 BtKey ptr;
1636 uint idx;
1637
1638         //      cache node's parent page
1639
1640         parent = bt->page_no;
1641         ppage = bt->page;
1642         ppool = bt->pool;
1643         pset = bt->set;
1644
1645         // lock and map our right page
1646         // note that it cannot be our foster child
1647         // since the our node is empty
1648         //      and it cannot be NULL because of the stopper
1649         //      in the last right page
1650
1651         bt_lockpage (BtLockWrite, set);
1652
1653         // if we aren't dead yet
1654
1655         if( page->act )
1656                 goto rmergexit;
1657
1658         if( right = bt_getid (page->right) )
1659           if( rpool = bt_pinpool (bt, right) )
1660                 rpage = bt_page (bt, rpool, right);
1661           else
1662                 return bt->err;
1663         else
1664                 return bt->err = BTERR_struct;
1665
1666         rset = bt_pinlatch (bt, right);
1667
1668         //      find our right neighbor
1669
1670         if( ppage->act > 1 ) {
1671          for( idx = slot; idx++ < ppage->cnt; )
1672           if( !slotptr(ppage, idx)->dead )
1673                 break;
1674
1675          if( idx > ppage->cnt )
1676                 return bt->err = BTERR_struct;
1677
1678          //  redirect right neighbor in parent to left node
1679
1680          bt_putid(slotptr(ppage,idx)->id, page_no);
1681         }
1682
1683         //      if parent has only our deleted page, e.g. no right neighbor
1684         //      prepare to merge parent itself
1685
1686         if( ppage->act == 1 ) {
1687           if( rparent = bt_getid (ppage->right) )
1688            if( rppool = bt_pinpool (bt, rparent) )
1689                 rppage = bt_page (bt, rppool, rparent);
1690            else
1691                 return bt->err;
1692           else
1693                 return bt->err = BTERR_struct;
1694
1695           rpset = bt_pinlatch (bt, rparent);
1696           bt_lockpage (BtLockWrite, rpset);
1697
1698           // find our right neighbor on right parent page
1699
1700           for( idx = 0; idx++ < rppage->cnt; )
1701                 if( !slotptr(rppage, idx)->dead ) {
1702                   bt_putid (slotptr(rppage, idx)->id, page_no);
1703                   break;
1704                 }
1705
1706           if( idx > rppage->cnt )
1707                 return bt->err = BTERR_struct;
1708         }
1709
1710         //      now that there are no more pointers to our right node
1711         //      we can wait for delete lock on it
1712
1713         bt_lockpage(BtLockDelete, rset);
1714         bt_lockpage(BtLockWrite, rset);
1715
1716         // pull contents of right page into our empty page 
1717
1718         memcpy (page, rpage, bt->mgr->page_size);
1719
1720         // ready to release right parent lock
1721         //      now that we have a new page in place
1722
1723         if( ppage->act == 1 ) {
1724           bt_unlockpage (BtLockWrite, rpset);
1725           bt_unpinlatch (rpset);
1726           bt_unpinpool (rppool);
1727         }
1728
1729         //      add killed right block to free chain
1730         //      lock latch mgr
1731
1732         bt_spinwritelock(bt->mgr->latchmgr->lock);
1733
1734         //      store free chain in allocation page second right
1735
1736         bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
1737         bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
1738
1739         // unlock latch mgr and right page
1740
1741         bt_unlockpage(BtLockDelete, rset);
1742         bt_unlockpage(BtLockWrite, rset);
1743         bt_unpinlatch (rset);
1744         bt_unpinpool (rpool);
1745
1746         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1747
1748         // delete our obsolete fence key from our parent
1749
1750         slotptr(ppage, slot)->dead = 1;
1751         ppage->dirty = 1;
1752
1753         //      if our parent now empty
1754         //      remove it from the tree
1755
1756         if( ppage->act-- == 1 )
1757           if( bt_mergeleft (bt, ppage, ppool, pset, parent, lvl+1) )
1758                 return bt->err;
1759
1760 rmergexit:
1761         bt_unlockpage (BtLockWrite, pset);
1762         bt_unpinlatch (pset);
1763         bt_unpinpool (ppool);
1764
1765         bt->found = 1;
1766         return bt->err = 0;
1767 }
1768
1769 //  remove empty page from the B-tree
1770 //      try merging left first.  If no left
1771 //      sibling, then merge right.
1772
1773 //      call with page loaded and locked,
1774 //  return with page locked.
1775
1776 BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl)
1777 {
1778 unsigned char fencekey[256], postkey[256];
1779 uint slot, idx, postfence = 0;
1780 BtLatchSet *lset, *pset;
1781 BtPool *lpool, *ppool;
1782 BtPage lpage, ppage;
1783 uid left, parent;
1784 BtKey ptr;
1785
1786         ptr = keyptr(page, page->cnt);
1787         memcpy(fencekey, ptr, ptr->len + 1);
1788         bt_unlockpage (BtLockWrite, set);
1789
1790         //      load and lock our parent
1791
1792 retry:
1793         if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+1, BtLockWrite)) )
1794                 return bt->err;
1795
1796         parent = bt->page_no;
1797         ppage = bt->page;
1798         ppool = bt->pool;
1799         pset = bt->set;
1800
1801         //      wait until we are not a foster child
1802
1803         if( bt->foster ) {
1804                 bt_unlockpage (BtLockWrite, pset);
1805                 bt_unpinlatch (pset);
1806                 bt_unpinpool (ppool);
1807 #ifdef unix
1808                 sched_yield();
1809 #else
1810                 SwitchToThread();
1811 #endif
1812                 goto retry;
1813         }
1814
1815         //  find our left neighbor in our parent page
1816
1817         for( idx = slot; --idx; )
1818           if( !slotptr(ppage, idx)->dead )
1819                 break;
1820
1821         //      if no left neighbor, do right merge
1822
1823         if( !idx )
1824                 return bt_mergeright (bt, page, pool, set, page_no, lvl, slot);
1825
1826         // lock and map our left neighbor's page
1827
1828         left = bt_getid (slotptr(ppage, idx)->id);
1829
1830         if( lpool = bt_pinpool (bt, left) )
1831                 lpage = bt_page (bt, lpool, left);
1832         else
1833                 return bt->err;
1834
1835         lset = bt_pinlatch (bt, left);
1836         bt_lockpage(BtLockWrite, lset);
1837
1838         //  wait until foster sibling is in our parent
1839
1840         if( bt_getid (lpage->right) != page_no ) {
1841                 bt_unlockpage (BtLockWrite, pset);
1842                 bt_unpinlatch (pset);
1843                 bt_unpinpool (ppool);
1844                 bt_unlockpage (BtLockWrite, lset);
1845                 bt_unpinlatch (lset);
1846                 bt_unpinpool (lpool);
1847 #ifdef linux
1848                 sched_yield();
1849 #else
1850                 SwitchToThread();
1851 #endif
1852                 goto retry;
1853         }
1854
1855         //  since our page will have no more pointers to it,
1856         //      obtain Delete lock and wait for write locks to clear
1857
1858         bt_lockpage(BtLockDelete, set);
1859         bt_lockpage(BtLockWrite, set);
1860
1861         //      if we aren't dead yet,
1862         //      get ready for exit
1863
1864         if( page->act ) {
1865                 bt_unlockpage(BtLockDelete, set);
1866                 bt_unlockpage(BtLockWrite, lset);
1867                 bt_unpinlatch (lset);
1868                 bt_unpinpool (lpool);
1869                 goto lmergexit;
1870         }
1871
1872         //      are we are the fence key for our parent?
1873         //      if so, grab our old fence key
1874
1875         if( postfence = slot == ppage->cnt ) {
1876                 ptr = keyptr (ppage, ppage->cnt);
1877                 memcpy(fencekey, ptr, ptr->len + 1);
1878                 memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
1879
1880                 // clear out other dead slots
1881
1882                 while( --ppage->cnt )
1883                   if( slotptr(ppage, ppage->cnt)->dead )
1884                         memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
1885                   else
1886                         break;
1887
1888                 ptr = keyptr (ppage, ppage->cnt);
1889                 memcpy(postkey, ptr, ptr->len + 1);
1890         } else
1891                 slotptr(ppage,slot)->dead = 1;
1892
1893         ppage->dirty = 1;
1894         ppage->act--;
1895
1896         //      push our right neighbor pointer to our left
1897
1898         memcpy (lpage->right, page->right, BtId);
1899
1900         //      add ourselves to free chain
1901         //      lock latch mgr
1902
1903         bt_spinwritelock(bt->mgr->latchmgr->lock);
1904
1905         //      store free chain in allocation page second right
1906         bt_putid(page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
1907         bt_putid(bt->mgr->latchmgr->alloc[1].right, page_no);
1908
1909         // unlock latch mgr and pages
1910
1911         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1912         bt_unlockpage(BtLockWrite, lset);
1913         bt_unpinlatch (lset);
1914         bt_unpinpool (lpool);
1915
1916         // release our node's delete lock
1917
1918         bt_unlockpage(BtLockDelete, set);
1919
1920 lmergexit:
1921         bt_unlockpage (BtLockWrite, pset);
1922         bt_unpinpool (ppool);
1923
1924         //  do we need to post parent's fence key in its parent?
1925
1926         if( !postfence || parent == ROOT_page ) {
1927                 bt_unpinlatch (pset);
1928                 bt->found = 1;
1929                 return bt->err = 0;
1930         }
1931
1932         //      interlock parent fence post
1933
1934         bt_lockpage (BtLockParent, pset);
1935
1936         //      load parent's parent page
1937 posttry:
1938         if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+2, BtLockWrite)) )
1939                 return bt->err;
1940
1941         if( !(slot = bt_cleanpage (bt, bt->page, *fencekey, slot)) )
1942           if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
1943                 return bt->err;
1944           else
1945                 goto posttry;
1946
1947         page = bt->page;
1948
1949         page->min -= *postkey + 1;
1950         ((unsigned char *)page)[page->min] = *postkey;
1951         memcpy ((unsigned char *)page + page->min +1, postkey + 1, *postkey );
1952         slotptr(page, slot)->off = page->min;
1953
1954         bt_unlockpage (BtLockParent, pset);
1955         bt_unpinlatch (pset);
1956
1957         bt_unlockpage (BtLockWrite, bt->set);
1958         bt_unpinlatch (bt->set);
1959         bt_unpinpool (bt->pool);
1960
1961         bt->found = 1;
1962         return bt->err = 0;
1963 }
1964
1965 //  find and delete key on page by marking delete flag bit
1966 //  if page becomes empty, delete it from the btree
1967
1968 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
1969 {
1970 BtLatchSet *set;
1971 BtPool *pool;
1972 BtPage page;
1973 uid page_no;
1974 BtKey ptr;
1975 uint slot;
1976
1977         if( !(slot = bt_loadpage (bt, key, len, 0, BtLockWrite)) )
1978                 return bt->err;
1979
1980         page_no = bt->page_no;
1981         page = bt->page;
1982         pool = bt->pool;
1983         set = bt->set;
1984
1985         // if key is found delete it, otherwise ignore request
1986
1987         ptr = keyptr(page, slot);
1988
1989         if( bt->found = !keycmp (ptr, key, len) )
1990           if( bt->found = slotptr(page, slot)->dead == 0 ) {
1991                 slotptr(page,slot)->dead = 1;
1992                   if( slot < page->cnt )
1993                         page->dirty = 1;
1994                   if( !--page->act )
1995                         if( bt_mergeleft (bt, page, pool, set, page_no, 0) )
1996                           return bt->err;
1997                 }
1998
1999         bt_unlockpage(BtLockWrite, set);
2000         bt_unpinlatch (set);
2001         bt_unpinpool (pool);
2002         return bt->err = 0;
2003 }
2004
2005 //      find key in leaf level and return row-id
2006
2007 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
2008 {
2009 uint  slot;
2010 BtKey ptr;
2011 uid id;
2012
2013         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
2014                 ptr = keyptr(bt->page, slot);
2015         else
2016                 return 0;
2017
2018         // if key exists, return row-id
2019         //      otherwise return 0
2020
2021         if( slot <= bt->page->cnt && !keycmp (ptr, key, len) )
2022                 id = bt_getid(slotptr(bt->page,slot)->id);
2023         else
2024                 id = 0;
2025
2026         bt_unlockpage (BtLockRead, bt->set);
2027         bt_unpinlatch (bt->set);
2028         bt_unpinpool (bt->pool);
2029         return id;
2030 }
2031
2032 //      check page for space available,
2033 //      clean if necessary and return
2034 //      0 - page needs splitting
2035 //      >0  new slot value
2036
2037 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
2038 {
2039 uint nxt = bt->mgr->page_size;
2040 uint cnt = 0, idx = 0;
2041 uint max = page->cnt;
2042 uint newslot = max;
2043 BtKey key;
2044
2045         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
2046                 return slot;
2047
2048         //      skip cleanup if nothing to reclaim
2049
2050         if( !page->dirty )
2051                 return 0;
2052
2053         memcpy (bt->frame, page, bt->mgr->page_size);
2054
2055         // skip page info and set rest of page to zero
2056
2057         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2058         page->dirty = 0;
2059         page->act = 0;
2060
2061         // try cleaning up page first
2062
2063         // always leave fence key in the array
2064         // otherwise, remove deleted key
2065
2066         // note: foster children are never dead
2067
2068         while( cnt++ < max ) {
2069                 if( cnt == slot )
2070                         newslot = idx + 1;
2071                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
2072                         continue;
2073
2074                 // copy key
2075
2076                 key = keyptr(bt->frame, cnt);
2077                 nxt -= key->len + 1;
2078                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
2079
2080                 // copy slot
2081                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
2082                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2083                         page->act++;
2084                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
2085                 slotptr(page, idx)->off = nxt;
2086         }
2087
2088         page->min = nxt;
2089         page->cnt = idx;
2090
2091         //      see if page has enough space now, or does it need splitting?
2092
2093         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
2094                 return newslot;
2095
2096         return 0;
2097 }
2098
2099 //      add key to current page
2100 //      page must already be writelocked
2101
2102 void bt_addkeytopage (BtDb *bt, BtPage page, uint slot, unsigned char *key, uint len, uid id, uint tod)
2103 {
2104 uint idx;
2105
2106         // find next available dead slot and copy key onto page
2107         // note that foster children on the page are never dead
2108
2109         // look for next hole, but stay back from the fence key
2110
2111         for( idx = slot; idx < page->cnt; idx++ )
2112           if( slotptr(page, idx)->dead )
2113                 break;
2114
2115         if( idx == page->cnt )
2116                 idx++, page->cnt++;
2117
2118         page->act++;
2119
2120         // now insert key into array before slot
2121
2122         while( idx > slot )
2123                 *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
2124
2125         page->min -= len + 1;
2126         ((unsigned char *)page)[page->min] = len;
2127         memcpy ((unsigned char *)page + page->min +1, key, len );
2128
2129         bt_putid(slotptr(page,slot)->id, id);
2130         slotptr(page, slot)->off = page->min;
2131         slotptr(page, slot)->tod = tod;
2132         slotptr(page, slot)->dead = 0;
2133 }
2134
2135 // split the root and raise the height of the btree
2136 //      call with current page locked and page no of foster child
2137 //      return with current page (root) unlocked
2138
2139 BTERR bt_splitroot(BtDb *bt, uid right)
2140 {
2141 uint nxt = bt->mgr->page_size;
2142 unsigned char fencekey[256];
2143 BtPage root = bt->page;
2144 uid new_page;
2145 BtKey key;
2146
2147         //  Obtain an empty page to use, and copy the left page
2148         //  contents into it from the root.  Strip foster child key.
2149         //      (it's the stopper key)
2150
2151         memset (slotptr(root, root->cnt), 0, sizeof(BtSlot));
2152         root->dirty = 1;
2153         root->foster--;
2154         root->act--;
2155         root->cnt--;
2156
2157         //      Save left fence key.
2158
2159         key = keyptr(root, root->cnt);
2160         memcpy (fencekey, key, key->len + 1);
2161
2162         //  copy the lower keys into a new left page
2163
2164         if( !(new_page = bt_newpage(bt, root)) )
2165                 return bt->err;
2166
2167         // preserve the page info at the bottom
2168         // and set rest of the root to zero
2169
2170         memset (root+1, 0, bt->mgr->page_size - sizeof(*root));
2171
2172         // insert left fence key on empty newroot page
2173
2174         nxt -= *fencekey + 1;
2175         memcpy ((unsigned char *)root + nxt, fencekey, *fencekey + 1);
2176         bt_putid(slotptr(root, 1)->id, new_page);
2177         slotptr(root, 1)->off = nxt;
2178         
2179         // insert stopper key on newroot page
2180         // and increase the root height
2181
2182         nxt -= 3;
2183         fencekey[0] = 2;
2184         fencekey[1] = 0xff;
2185         fencekey[2] = 0xff;
2186         memcpy ((unsigned char *)root + nxt, fencekey, *fencekey + 1);
2187         bt_putid(slotptr(root, 2)->id, right);
2188         slotptr(root, 2)->off = nxt;
2189
2190         bt_putid(root->right, 0);
2191         root->min = nxt;                // reset lowest used offset and key count
2192         root->cnt = 2;
2193         root->act = 2;
2194         root->lvl++;
2195
2196         // release and unpin root (bt->page)
2197
2198         bt_unlockpage(BtLockWrite, bt->set);
2199         bt_unpinlatch (bt->set);
2200         bt_unpinpool (bt->pool);
2201         return 0;
2202 }
2203
2204 //  split already locked full node
2205 //      return unlocked and unpinned.
2206
2207 BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no)
2208 {
2209 uint slot, cnt, idx, max, nxt = bt->mgr->page_size;
2210 unsigned char fencekey[256];
2211 uint tod = time(NULL);
2212 uint lvl = page->lvl;
2213 uid new_page;
2214 BtKey key;
2215
2216         //      initialize frame buffer for right node
2217
2218         memset (bt->frame, 0, bt->mgr->page_size);
2219         max = page->cnt - page->foster;
2220         cnt = max / 2;
2221         idx = 0;
2222
2223         //  split higher half of keys to bt->frame
2224         //      leaving old foster children in the left node,
2225         //      and adding a new foster child there.
2226
2227         while( cnt++ < max ) {
2228                 key = keyptr(page, cnt);
2229                 nxt -= key->len + 1;
2230                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
2231                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
2232                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
2233                         bt->frame->act++;
2234                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
2235                 slotptr(bt->frame, idx)->off = nxt;
2236         }
2237
2238         // transfer right link node to new right node
2239
2240         if( page_no > ROOT_page )
2241                 memcpy (bt->frame->right, page->right, BtId);
2242
2243         bt->frame->bits = bt->mgr->page_bits;
2244         bt->frame->min = nxt;
2245         bt->frame->cnt = idx;
2246         bt->frame->lvl = lvl;
2247
2248         //      get new free page and write right frame to it.
2249
2250         if( !(new_page = bt_newpage(bt, bt->frame)) )
2251                 return bt->err;
2252
2253         //      remember fence key for new right page to add
2254         //      as foster child to the left node
2255
2256         key = keyptr(bt->frame, idx);
2257         memcpy (fencekey, key, key->len + 1);
2258
2259         //      update lower keys and foster children to continue in old page
2260
2261         memcpy (bt->frame, page, bt->mgr->page_size);
2262         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2263         nxt = bt->mgr->page_size;
2264         page->dirty = 0;
2265         page->act = 0;
2266         cnt = 0;
2267         idx = 0;
2268
2269         //  assemble page of smaller keys
2270         //      to remain in the old page
2271
2272         while( cnt++ < max / 2 ) {
2273                 key = keyptr(bt->frame, cnt);
2274                 nxt -= key->len + 1;
2275                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
2276                 memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
2277                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2278                         page->act++;
2279                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
2280                 slotptr(page, idx)->off = nxt;
2281         }
2282
2283         //      insert new foster child for right page in queue
2284         //      before any of the current foster children
2285
2286         nxt -= *fencekey + 1;
2287         memcpy ((unsigned char *)page + nxt, fencekey, *fencekey + 1);
2288
2289         bt_putid (slotptr(page,++idx)->id, new_page);
2290         slotptr(page, idx)->tod = tod;
2291         slotptr(page, idx)->off = nxt;
2292         page->foster++;
2293         page->act++;
2294
2295         //  continue with old foster child keys
2296         //      note that none will be dead
2297
2298         cnt = bt->frame->cnt - bt->frame->foster;
2299
2300         while( cnt++ < bt->frame->cnt ) {
2301                 key = keyptr(bt->frame, cnt);
2302                 nxt -= key->len + 1;
2303                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
2304                 memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
2305                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
2306                 slotptr(page, idx)->off = nxt;
2307                 page->act++;
2308         }
2309
2310         page->min = nxt;
2311         page->cnt = idx;
2312
2313         //      link new right page
2314
2315         bt_putid (page->right, new_page);
2316
2317         // if current page is the root page, split it
2318
2319         if( page_no == ROOT_page )
2320                 return bt_splitroot (bt, new_page);
2321
2322         //  release wr lock on our page
2323
2324         bt_unlockpage (BtLockWrite, set);
2325
2326         //  obtain ParentModification lock for current page
2327         //      to fix new fence key and oldest foster child on page
2328
2329         bt_lockpage (BtLockParent, set);
2330
2331         //  get our new fence key to insert in parent node
2332
2333         bt_lockpage (BtLockRead, set);
2334
2335         key = keyptr(page, page->cnt-1);
2336         memcpy (fencekey, key, key->len+1);
2337
2338         bt_unlockpage (BtLockRead, set);
2339
2340         if( bt_insertkey (bt, fencekey + 1, *fencekey, page_no, tod, lvl + 1) )
2341                 return bt->err;
2342
2343         //      lock our page for writing
2344
2345         bt_lockpage (BtLockRead, set);
2346
2347         //      switch old parent key from us to our oldest foster child
2348
2349         key = keyptr(page, page->cnt);
2350         memcpy (fencekey, key, key->len+1);
2351
2352         new_page = bt_getid (slotptr(page, page->cnt)->id);
2353         bt_unlockpage (BtLockRead, set);
2354
2355         if( bt_insertkey (bt, fencekey + 1, *fencekey, new_page, tod, lvl + 1) )
2356                 return bt->err;
2357
2358         //      now that it has its own parent pointer,
2359         //      remove oldest foster child from our page
2360
2361         bt_lockpage (BtLockWrite, set);
2362         memset (slotptr(page, page->cnt), 0, sizeof(BtSlot));
2363         page->dirty = 1;
2364         page->foster--;
2365         page->cnt--;
2366         page->act--;
2367
2368         bt_unlockpage (BtLockParent, set);
2369
2370         //  if this emptied page,
2371         //      undo the foster child
2372
2373         if( !page->act )
2374           if( bt_mergeleft (bt, page, pool, set, page_no, lvl) )
2375                 return bt->err;
2376
2377         //      unlock and unpin
2378
2379         bt_unlockpage (BtLockWrite, set);
2380         bt_unpinlatch (set);
2381         bt_unpinpool (pool);
2382         return 0;
2383 }
2384
2385 //  Insert new key into the btree at leaf level.
2386
2387 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl)
2388 {
2389 uint slot, idx;
2390 BtPage page;
2391 BtKey ptr;
2392
2393         while( 1 ) {
2394                 if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
2395                         ptr = keyptr(bt->page, slot);
2396                 else
2397                 {
2398                         if ( !bt->err )
2399                                 bt->err = BTERR_ovflw;
2400                         return bt->err;
2401                 }
2402
2403                 // if key already exists, update id and return
2404
2405                 page = bt->page;
2406
2407                 if( !keycmp (ptr, key, len) ) {
2408                         if( slotptr(page, slot)->dead )
2409                                 page->act++;
2410                         slotptr(page, slot)->dead = 0;
2411                         slotptr(page, slot)->tod = tod;
2412                         bt_putid(slotptr(page,slot)->id, id);
2413                         bt_unlockpage(BtLockWrite, bt->set);
2414                         bt_unpinlatch (bt->set);
2415                         bt_unpinpool (bt->pool);
2416                         return bt->err;
2417                 }
2418
2419                 // check if page has enough space
2420
2421                 if( slot = bt_cleanpage (bt, bt->page, len, slot) )
2422                         break;
2423
2424                 if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
2425                         return bt->err;
2426         }
2427
2428         bt_addkeytopage (bt, bt->page, slot, key, len, id, tod);
2429
2430         bt_unlockpage (BtLockWrite, bt->set);
2431         bt_unpinlatch (bt->set);
2432         bt_unpinpool (bt->pool);
2433         return 0;
2434 }
2435
2436 //  cache page of keys into cursor and return starting slot for given key
2437
2438 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2439 {
2440 uint slot;
2441
2442         // cache page for retrieval
2443         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
2444                 memcpy (bt->cursor, bt->page, bt->mgr->page_size);
2445
2446         bt->cursor_page = bt->page_no;
2447
2448         bt_unlockpage(BtLockRead, bt->set);
2449         bt_unpinlatch (bt->set);
2450         bt_unpinpool (bt->pool);
2451         return slot;
2452 }
2453
2454 //  return next slot for cursor page
2455 //  or slide cursor right into next page
2456
2457 uint bt_nextkey (BtDb *bt, uint slot)
2458 {
2459 BtLatchSet *set;
2460 BtPool *pool;
2461 BtPage page;
2462 uid right;
2463
2464   do {
2465         right = bt_getid(bt->cursor->right);
2466         while( slot++ < bt->cursor->cnt - bt->cursor->foster )
2467           if( slotptr(bt->cursor,slot)->dead )
2468                 continue;
2469           else if( right || (slot < bt->cursor->cnt - bt->cursor->foster) )
2470                 return slot;
2471           else
2472                 break;
2473
2474         if( !right )
2475                 break;
2476
2477         bt->cursor_page = right;
2478         if( pool = bt_pinpool (bt, right) )
2479                 page = bt_page (bt, pool, right);
2480         else
2481                 return 0;
2482
2483         set = bt_pinlatch (bt, right);
2484     bt_lockpage(BtLockRead, set);
2485
2486         memcpy (bt->cursor, page, bt->mgr->page_size);
2487
2488         bt_unlockpage(BtLockRead, set);
2489         bt_unpinlatch (set);
2490         bt_unpinpool (pool);
2491         slot = 0;
2492   } while( 1 );
2493
2494   return bt->err = 0;
2495 }
2496
2497 BtKey bt_key(BtDb *bt, uint slot)
2498 {
2499         return keyptr(bt->cursor, slot);
2500 }
2501
2502 uid bt_uid(BtDb *bt, uint slot)
2503 {
2504         return bt_getid(slotptr(bt->cursor,slot)->id);
2505 }
2506
2507 uint bt_tod(BtDb *bt, uint slot)
2508 {
2509         return slotptr(bt->cursor,slot)->tod;
2510 }
2511
2512
2513 #ifdef STANDALONE
2514
2515 typedef struct {
2516         char type, idx;
2517         char *infile;
2518         BtMgr *mgr;
2519         int num;
2520 } ThreadArg;
2521
2522 //  standalone program to index file of keys
2523 //  then list them onto std-out
2524
2525 #ifdef unix
2526 void *index_file (void *arg)
2527 #else
2528 uint __stdcall index_file (void *arg)
2529 #endif
2530 {
2531 int line = 0, found = 0, cnt = 0;
2532 uid next, page_no = LEAF_page;  // start on first page of leaves
2533 unsigned char key[256];
2534 ThreadArg *args = arg;
2535 int ch, len = 0, slot;
2536 BtLatchSet *set;
2537 time_t tod[1];
2538 BtPool *pool;
2539 BtPage page;
2540 BtKey ptr;
2541 BtDb *bt;
2542 FILE *in;
2543
2544         bt = bt_open (args->mgr);
2545         time (tod);
2546
2547         switch(args->type | 0x20)
2548         {
2549         case 'w':
2550                 fprintf(stderr, "started indexing for %s\n", args->infile);
2551                 if( in = fopen (args->infile, "rb") )
2552                   while( ch = getc(in), ch != EOF )
2553                         if( ch == '\n' )
2554                         {
2555                           line++;
2556
2557                           if( args->num == 1 )
2558                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2559
2560                           else if( args->num )
2561                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2562
2563                           if( bt_insertkey (bt, key, len, line, *tod, 0) )
2564                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2565                           len = 0;
2566                         }
2567                         else if( len < 255 )
2568                                 key[len++] = ch;
2569                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2570                 break;
2571
2572         case 'd':
2573                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2574                 if( in = fopen (args->infile, "rb") )
2575                   while( ch = getc(in), ch != EOF )
2576                         if( ch == '\n' )
2577                         {
2578                           line++;
2579                           if( args->num == 1 )
2580                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2581
2582                           else if( args->num )
2583                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2584
2585                           if( bt_deletekey (bt, key, len) )
2586                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2587                           len = 0;
2588                         }
2589                         else if( len < 255 )
2590                                 key[len++] = ch;
2591                 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
2592                 break;
2593
2594         case 'f':
2595                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2596                 if( in = fopen (args->infile, "rb") )
2597                   while( ch = getc(in), ch != EOF )
2598                         if( ch == '\n' )
2599                         {
2600                           line++;
2601                           if( args->num == 1 )
2602                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2603
2604                           else if( args->num )
2605                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2606
2607                           if( bt_findkey (bt, key, len) )
2608                                 found++;
2609                           else if( bt->err )
2610                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2611                           len = 0;
2612                         }
2613                         else if( len < 255 )
2614                                 key[len++] = ch;
2615                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2616                 break;
2617
2618         case 's':
2619                 len = key[0] = 0;
2620
2621                 fprintf(stderr, "started reading\n");
2622
2623                 if( slot = bt_startkey (bt, key, len) )
2624                   slot--;
2625                 else
2626                   fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2627
2628                 while( slot = bt_nextkey (bt, slot) ) {
2629                         ptr = bt_key(bt, slot);
2630                         fwrite (ptr->key, ptr->len, 1, stdout);
2631                         fputc ('\n', stdout);
2632                 }
2633
2634                 break;
2635
2636         case 'c':
2637                 fprintf(stderr, "started reading\n");
2638
2639                 do {
2640                         if( pool = bt_pinpool (bt, page_no) )
2641                                 page = bt_page (bt, pool, page_no);
2642                         else
2643                                 break;
2644                         set = bt_pinlatch (bt, page_no);
2645                         bt_lockpage (BtLockRead, set);
2646                         cnt += page->act;
2647                         next = bt_getid (page->right);
2648                         bt_unlockpage (BtLockRead, set);
2649                         bt_unpinlatch (set);
2650                         bt_unpinpool (pool);
2651                 } while( page_no = next );
2652
2653                 cnt--;  // remove stopper key
2654                 fprintf(stderr, " Total keys read %d\n", cnt);
2655                 break;
2656         }
2657
2658         bt_close (bt);
2659 #ifdef unix
2660         return NULL;
2661 #else
2662         return 0;
2663 #endif
2664 }
2665
2666 typedef struct timeval timer;
2667
2668 int main (int argc, char **argv)
2669 {
2670 int idx, cnt, len, slot, err;
2671 int segsize, bits = 16;
2672 #ifdef unix
2673 pthread_t *threads;
2674 timer start, stop;
2675 #else
2676 time_t start[1], stop[1];
2677 HANDLE *threads;
2678 #endif
2679 double real_time;
2680 ThreadArg *args;
2681 uint poolsize = 0;
2682 int num = 0;
2683 char key[1];
2684 BtMgr *mgr;
2685 BtKey ptr;
2686 BtDb *bt;
2687
2688         if( argc < 3 ) {
2689                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2690                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2691                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2692                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2693                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2694                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2695                 exit(0);
2696         }
2697
2698 #ifdef unix
2699         gettimeofday(&start, NULL);
2700 #else
2701         time(start);
2702 #endif
2703
2704         if( argc > 3 )
2705                 bits = atoi(argv[3]);
2706
2707         if( argc > 4 )
2708                 poolsize = atoi(argv[4]);
2709
2710         if( !poolsize )
2711                 fprintf (stderr, "Warning: no mapped_pool\n");
2712
2713         if( poolsize > 65535 )
2714                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2715
2716         if( argc > 5 )
2717                 segsize = atoi(argv[5]);
2718         else
2719                 segsize = 4;    // 16 pages per mmap segment
2720
2721         if( argc > 6 )
2722                 num = atoi(argv[6]);
2723
2724         cnt = argc - 7;
2725 #ifdef unix
2726         threads = malloc (cnt * sizeof(pthread_t));
2727 #else
2728         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2729 #endif
2730         args = malloc (cnt * sizeof(ThreadArg));
2731
2732         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2733
2734         if( !mgr ) {
2735                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2736                 exit (1);
2737         }
2738
2739         //      fire off threads
2740
2741         for( idx = 0; idx < cnt; idx++ ) {
2742                 args[idx].infile = argv[idx + 7];
2743                 args[idx].type = argv[2][0];
2744                 args[idx].mgr = mgr;
2745                 args[idx].num = num;
2746                 args[idx].idx = idx;
2747 #ifdef unix
2748                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2749                         fprintf(stderr, "Error creating thread %d\n", err);
2750 #else
2751                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2752 #endif
2753         }
2754
2755         //      wait for termination
2756
2757 #ifdef unix
2758         for( idx = 0; idx < cnt; idx++ )
2759                 pthread_join (threads[idx], NULL);
2760         gettimeofday(&stop, NULL);
2761         real_time = 1000.0 * ( stop.tv_sec - start.tv_sec ) + 0.001 * (stop.tv_usec - start.tv_usec );
2762 #else
2763         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2764
2765         for( idx = 0; idx < cnt; idx++ )
2766                 CloseHandle(threads[idx]);
2767
2768         time (stop);
2769         real_time = 1000 * (*stop - *start);
2770 #endif
2771         fprintf(stderr, " Time to complete: %.2f seconds\n", real_time/1000);
2772         bt_mgrclose (mgr);
2773 }
2774
2775 #endif  //STANDALONE