]> pd.if.org Git - btree/blob - fosterbtreee.c
rework broken bt_deletekey
[btree] / fosterbtreee.c
1 // foster btree version e
2 // 29 JAN 2014
3
4 // author: karl malbrain, malbrain@cal.berkeley.edu
5
6 /*
7 This work, including the source code, documentation
8 and related data, is placed into the public domain.
9
10 The orginal author is Karl Malbrain.
11
12 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
13 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
14 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
15 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
16 RESULTING FROM THE USE, MODIFICATION, OR
17 REDISTRIBUTION OF THIS SOFTWARE.
18 */
19
20 // Please see the project home page for documentation
21 // code.google.com/p/high-concurrency-btree
22
23 #define _FILE_OFFSET_BITS 64
24 #define _LARGEFILE64_SOURCE
25
26 #ifdef linux
27 #define _GNU_SOURCE
28 #endif
29
30 #ifdef unix
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <fcntl.h>
35 #include <sys/time.h>
36 #include <sys/mman.h>
37 #include <errno.h>
38 #include <pthread.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #include <process.h>
47 #include <intrin.h>
48 #endif
49
50 #include <memory.h>
51 #include <string.h>
52
53 typedef unsigned long long      uid;
54
55 #ifndef unix
56 typedef unsigned long long      off64_t;
57 typedef unsigned short          ushort;
58 typedef unsigned int            uint;
59 #endif
60
61 #define BT_ro 0x6f72    // ro
62 #define BT_rw 0x7772    // rw
63
64 #define BT_latchtable   128                                     // number of latch manager slots
65
66 #define BT_maxbits              24                                      // maximum page size in bits
67 #define BT_minbits              9                                       // minimum page size in bits
68 #define BT_minpage              (1 << BT_minbits)       // minimum page size
69 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
70
71 /*
72 There are five lock types for each node in three independent sets: 
73 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
74 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
75 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
76 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
77 5. (set 3) ParentLock: Exclusive. Have parent adopt/delete maximum foster child from the node.
78 */
79
80 typedef enum{
81         BtLockAccess,
82         BtLockDelete,
83         BtLockRead,
84         BtLockWrite,
85         BtLockParent
86 }BtLock;
87
88 //      Define the length of the page and key pointers
89
90 #define BtId 6
91
92 //      Page key slot definition.
93
94 //      If BT_maxbits is 15 or less, you can save 4 bytes
95 //      for each key stored by making the first two uints
96 //      into ushorts.  You can also save 4 bytes by removing
97 //      the tod field from the key.
98
99 //      Keys are marked dead, but remain on the page until
100 //      cleanup is called. The fence key (highest key) for
101 //      the page is always present, even after cleanup.
102
103 typedef struct {
104         uint off:BT_maxbits;            // page offset for key start
105         uint dead:1;                            // set for deleted key
106         uint tod;                                       // time-stamp for key
107         unsigned char id[BtId];         // id associated with key
108 } BtSlot;
109
110 //      The key structure occupies space at the upper end of
111 //      each page.  It's a length byte followed by the value
112 //      bytes.
113
114 typedef struct {
115         unsigned char len;
116         unsigned char key[1];
117 } *BtKey;
118
119 //      The first part of an index page.
120 //      It is immediately followed
121 //      by the BtSlot array of keys.
122
123 typedef struct Page {
124         volatile uint cnt;                      // count of keys in page
125         volatile uint act;                      // count of active keys
126         volatile uint min;                      // next key offset
127         volatile uint foster;           // count of foster children
128         unsigned char bits;                     // page size in bits
129         unsigned char lvl:7;            // level of page
130         unsigned char dirty:1;          // page needs to be cleaned
131         unsigned char right[BtId];      // page number to right
132 } *BtPage;
133
134 //      mode & definition for hash latch implementation
135
136 enum {
137         Mutex = 1,
138         Write = 2,
139         Pending = 4,
140         Share = 8
141 } LockMode;
142
143 // mutex locks the other fields
144 // exclusive is set for write access
145 // share is count of read accessors
146
147 typedef struct {
148         volatile ushort mutex:1;
149         volatile ushort exclusive:1;
150         volatile ushort pending:1;
151         volatile ushort share:13;
152 } BtSpinLatch;
153
154 //  hash table entries
155
156 typedef struct {
157         BtSpinLatch latch[1];
158         volatile ushort slot;           // Latch table entry at head of chain
159 } BtHashEntry;
160
161 //      latch manager table structure
162
163 typedef struct {
164 #ifdef unix
165         pthread_rwlock_t lock[1];
166 #else
167         SRWLOCK srw[1];
168 #endif
169 } BtLatch;
170
171 typedef struct {
172         BtLatch readwr[1];                      // read/write page lock
173         BtLatch access[1];                      // Access Intent/Page delete
174         BtLatch parent[1];                      // adoption of foster children
175         BtSpinLatch busy[1];            // slot is being moved between chains
176         volatile ushort next;           // next entry in hash table chain
177         volatile ushort prev;           // prev entry in hash table chain
178         volatile ushort pin;            // number of outstanding locks
179         volatile ushort hash;           // hash slot entry is under
180         volatile uid page_no;           // latch set page number
181 } BtLatchSet;
182
183 //      The memory mapping pool table buffer manager entry
184
185 typedef struct {
186         unsigned long long int lru;     // number of times accessed
187         uid  basepage;                          // mapped base page number
188         char *map;                                      // mapped memory pointer
189         ushort pin;                                     // mapped page pin counter
190         ushort slot;                            // slot index in this array
191         void *hashprev;                         // previous pool entry for the same hash idx
192         void *hashnext;                         // next pool entry for the same hash idx
193 #ifndef unix
194         HANDLE hmap;                            // Windows memory mapping handle
195 #endif
196 } BtPool;
197
198 //      structure for latch manager on ALLOC_page
199
200 typedef struct {
201         struct Page alloc[2];           // next & free page_nos in right ptr
202         BtSpinLatch lock[1];            // allocation area lite latch
203         ushort latchdeployed;           // highest number of latch entries deployed
204         ushort nlatchpage;                      // number of latch pages at BT_latch
205         ushort latchtotal;                      // number of page latch entries
206         ushort latchhash;                       // number of latch hash table slots
207         ushort latchvictim;                     // next latch entry to examine
208         BtHashEntry table[0];           // the hash table
209 } BtLatchMgr;
210
211 //      The object structure for Btree access
212
213 typedef struct {
214         uint page_size;                         // page size    
215         uint page_bits;                         // page size in bits    
216         uint seg_bits;                          // seg size in pages in bits
217         uint mode;                                      // read-write mode
218 #ifdef unix
219         int idx;
220         char *pooladvise;                       // bit maps for pool page advisements
221 #else
222         HANDLE idx;
223 #endif
224         ushort poolcnt;                         // highest page pool node in use
225         ushort poolmax;                         // highest page pool node allocated
226         ushort poolmask;                        // total number of pages in mmap segment - 1
227         ushort hashsize;                        // size of Hash Table for pool entries
228         ushort evicted;                         // last evicted hash table slot
229         ushort *hash;                           // hash table of pool entries
230         BtPool *pool;                           // memory pool page segments
231         BtSpinLatch *latch;                     // latches for pool hash slots
232         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
233         BtLatchSet *latchsets;          // mapped latch set from latch pages
234 #ifndef unix
235         HANDLE halloc;                          // allocation and latch table handle
236 #endif
237 } BtMgr;
238
239 typedef struct {
240         BtMgr *mgr;                     // buffer manager for thread
241         BtPage cursor;          // cached frame for start/next (never mapped)
242         BtPage frame;           // spare frame for the page split (never mapped)
243         BtPage zero;            // page frame for zeroes at end of file
244         BtPage page;            // current page
245         uid page_no;            // current page number  
246         uid cursor_page;        // current cursor page number   
247         BtLatchSet *set;        // current page latch set
248         BtPool *pool;           // current page pool
249         unsigned char *mem;     // frame, cursor, page memory buffer
250         int foster;                     // last search was to foster child
251         int found;                      // last delete was found
252         int err;                        // last error
253 } BtDb;
254
255 typedef enum {
256         BTERR_ok = 0,
257         BTERR_struct,
258         BTERR_ovflw,
259         BTERR_lock,
260         BTERR_map,
261         BTERR_wrt,
262         BTERR_hash,
263         BTERR_latch
264 } BTERR;
265
266 // B-Tree functions
267 extern void bt_close (BtDb *bt);
268 extern BtDb *bt_open (BtMgr *mgr);
269 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl);
270 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len);
271 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
272 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
273 extern uint bt_nextkey   (BtDb *bt, uint slot);
274
275 //      manager functions
276 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
277 void bt_mgrclose (BtMgr *mgr);
278
279 //      internal functions
280 BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no);
281 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot);
282 BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl);
283
284 //  Helper functions to return cursor slot values
285
286 extern BtKey bt_key (BtDb *bt, uint slot);
287 extern uid bt_uid (BtDb *bt, uint slot);
288 extern uint bt_tod (BtDb *bt, uint slot);
289
290 //  BTree page number constants
291 #define ALLOC_page              0       // allocation & lock manager hash table
292 #define ROOT_page               1       // root of the btree
293 #define LEAF_page               2       // first page of leaves
294 #define LATCH_page              3       // pages for lock manager
295
296 //      Number of levels to create in a new BTree
297
298 #define MIN_lvl                 2
299
300 //  The page is allocated from low and hi ends.
301 //  The key offsets and row-id's are allocated
302 //  from the bottom, while the text of the key
303 //  is allocated from the top.  When the two
304 //  areas meet, the page is split into two.
305
306 //  A key consists of a length byte, two bytes of
307 //  index number (0 - 65534), and up to 253 bytes
308 //  of key value.  Duplicate keys are discarded.
309 //  Associated with each key is a 48 bit row-id.
310
311 //  The b-tree root is always located at page 1.
312 //      The first leaf page of level zero is always
313 //      located on page 2.
314
315 //      When to root page fills, it is split in two and
316 //      the tree height is raised by a new root at page
317 //      one with two keys.
318
319 //      Deleted keys are marked with a dead bit until
320 //      page cleanup The fence key for a node is always
321 //      present, even after deletion and cleanup.
322
323 //  Groups of pages called segments from the btree are
324 //  cached with memory mapping. A hash table is used to keep
325 //  track of the cached segments.  This behaviour is controlled
326 //  by the cache block size parameter to bt_open.
327
328 //  To achieve maximum concurrency one page is locked at a time
329 //  as the tree is traversed to find leaf key in question.
330
331 //      An adoption traversal leaves the parent node locked as the
332 //      tree is traversed to the level in quesiton.
333
334 //  Page 0 is dedicated to lock for new page extensions,
335 //      and chains empty pages together for reuse.
336
337 //      Empty pages are chained together through the ALLOC page and reused.
338
339 //      Access macros to address slot and key values from the page
340
341 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
342 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
343
344 void bt_putid(unsigned char *dest, uid id)
345 {
346 int i = BtId;
347
348         while( i-- )
349                 dest[i] = (unsigned char)id, id >>= 8;
350 }
351
352 uid bt_getid(unsigned char *src)
353 {
354 uid id = 0;
355 int i;
356
357         for( i = 0; i < BtId; i++ )
358                 id <<= 8, id |= *src++; 
359
360         return id;
361 }
362
363 //      wait until write lock mode is clear
364 //      and add 1 to the share count
365
366 void bt_spinreadlock(BtSpinLatch *latch)
367 {
368 ushort prev;
369
370   do {
371 #ifdef unix
372         while( __sync_fetch_and_or((ushort *)latch, Mutex) & Mutex )
373                 sched_yield();
374 #else
375         while( _InterlockedOr16((ushort *)latch, Mutex) & Mutex )
376                 SwitchToThread();
377 #endif
378
379         //  see if exclusive request is granted or pending
380
381         if( prev = !(latch->exclusive | latch->pending) )
382 #ifdef unix
383                 __sync_fetch_and_add((ushort *)latch, Share);
384 #else
385                 _InterlockedExchangeAdd16 ((ushort *)latch, Share);
386 #endif
387
388 #ifdef unix
389         __sync_fetch_and_and ((ushort *)latch, ~Mutex);
390 #else
391         _InterlockedAnd16((ushort *)latch, ~Mutex);
392 #endif
393         if( prev )
394                 return;
395 #ifdef  unix
396   } while( sched_yield(), 1 );
397 #else
398   } while( SwitchToThread(), 1 );
399 #endif
400 }
401
402 //      wait for other read and write latches to relinquish
403
404 void bt_spinwritelock(BtSpinLatch *latch)
405 {
406   do {
407 #ifdef unix
408         while( __sync_fetch_and_or((ushort *)latch, Mutex | Pending) & Mutex )
409                 sched_yield();
410 #else
411         while( _InterlockedOr16((ushort *)latch, Mutex | Pending) & Mutex )
412                 SwitchToThread();
413 #endif
414         if( !(latch->share | latch->exclusive) ) {
415 #ifdef unix
416                 __sync_fetch_and_or((ushort *)latch, Write);
417                 __sync_fetch_and_and ((ushort *)latch, ~(Mutex | Pending));
418 #else
419                 _InterlockedOr16((ushort *)latch, Write);
420                 _InterlockedAnd16((ushort *)latch, ~(Mutex | Pending));
421 #endif
422                 return;
423         }
424
425 #ifdef unix
426         __sync_fetch_and_and ((ushort *)latch, ~Mutex);
427 #else
428         _InterlockedAnd16((ushort *)latch, ~Mutex);
429 #endif
430 #ifdef  unix
431         sched_yield();
432 #else
433         SwitchToThread();
434 #endif
435   } while( 1 );
436 }
437
438 //      try to obtain write lock
439
440 //      return 1 if obtained,
441 //              0 otherwise
442
443 int bt_spinwritetry(BtSpinLatch *latch)
444 {
445 ushort prev;
446
447 #ifdef unix
448         if( prev = __sync_fetch_and_or((ushort *)latch, Mutex), prev & Mutex )
449                 return 0;
450 #else
451         if( prev = _InterlockedOr16((ushort *)latch, Mutex), prev & Mutex )
452                 return 0;
453 #endif
454         //      take write access if all bits are clear
455
456         if( !prev )
457 #ifdef unix
458                 __sync_fetch_and_or ((ushort *)latch, Write);
459 #else
460                 _InterlockedOr16((ushort *)latch, Write);
461 #endif
462
463 #ifdef unix
464         __sync_fetch_and_and ((ushort *)latch, ~Mutex);
465 #else
466         _InterlockedAnd16((ushort *)latch, ~Mutex);
467 #endif
468         return !prev;
469 }
470
471 //      clear write mode
472
473 void bt_spinreleasewrite(BtSpinLatch *latch)
474 {
475 #ifdef unix
476         __sync_fetch_and_and ((ushort *)latch, ~Write);
477 #else
478         _InterlockedAnd16((ushort *)latch, ~Write);
479 #endif
480 }
481
482 //      decrement reader count
483
484 void bt_spinreleaseread(BtSpinLatch *latch)
485 {
486 #ifdef unix
487         __sync_fetch_and_add((ushort *)latch, -Share);
488 #else
489         _InterlockedExchangeAdd16 ((ushort *)latch, -Share);
490 #endif
491 }
492
493 void bt_initlockset (BtLatchSet *set, int reuse)
494 {
495 #ifdef unix
496 pthread_rwlockattr_t rwattr[1];
497
498         if( reuse ) {
499                 pthread_rwlock_destroy (set->readwr->lock);
500                 pthread_rwlock_destroy (set->access->lock);
501                 pthread_rwlock_destroy (set->parent->lock);
502         }
503
504         pthread_rwlockattr_init (rwattr);
505         pthread_rwlockattr_setkind_np (rwattr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
506         pthread_rwlockattr_setpshared (rwattr, PTHREAD_PROCESS_SHARED);
507
508         pthread_rwlock_init (set->readwr->lock, rwattr);
509         pthread_rwlock_init (set->access->lock, rwattr);
510         pthread_rwlock_init (set->parent->lock, rwattr);
511         pthread_rwlockattr_destroy (rwattr);
512 #else
513         InitializeSRWLock (set->readwr->srw);
514         InitializeSRWLock (set->access->srw);
515         InitializeSRWLock (set->parent->srw);
516 #endif
517 }
518
519 //      link latch table entry into latch hash table
520
521 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
522 {
523 BtLatchSet *set = bt->mgr->latchsets + victim;
524
525         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
526                 bt->mgr->latchsets[set->next].prev = victim;
527
528         bt->mgr->latchmgr->table[hashidx].slot = victim;
529         set->page_no = page_no;
530         set->hash = hashidx;
531         set->prev = 0;
532 }
533
534 void bt_unpinlatch (BtLatchSet *set)
535 {
536 #ifdef unix
537         __sync_fetch_and_add(&set->pin, -1);
538 #else
539         _InterlockedDecrement16 (&set->pin);
540 #endif
541 }
542
543 //      find existing latchset or inspire new one
544 //      return with latchset pinned
545
546 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
547 {
548 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
549 ushort slot, avail = 0, victim, idx;
550 BtLatchSet *set;
551
552         //  obtain read lock on hash table entry
553
554         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
555
556         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
557         {
558                 set = bt->mgr->latchsets + slot;
559                 if( page_no == set->page_no )
560                         break;
561         } while( slot = set->next );
562
563         if( slot ) {
564 #ifdef unix
565                 __sync_fetch_and_add(&set->pin, 1);
566 #else
567                 _InterlockedIncrement16 (&set->pin);
568 #endif
569         }
570
571     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
572
573         if( slot )
574                 return set;
575
576   //  try again, this time with write lock
577
578   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
579
580   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
581   {
582         set = bt->mgr->latchsets + slot;
583         if( page_no == set->page_no )
584                 break;
585         if( !set->pin && !avail )
586                 avail = slot;
587   } while( slot = set->next );
588
589   //  found our entry, or take over an unpinned one
590
591   if( slot || (slot = avail) ) {
592         set = bt->mgr->latchsets + slot;
593 #ifdef unix
594         __sync_fetch_and_add(&set->pin, 1);
595 #else
596         _InterlockedIncrement16 (&set->pin);
597 #endif
598         set->page_no = page_no;
599         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
600         return set;
601   }
602
603         //  see if there are any unused entries
604 #ifdef unix
605         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
606 #else
607         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
608 #endif
609
610         if( victim < bt->mgr->latchmgr->latchtotal ) {
611                 set = bt->mgr->latchsets + victim;
612 #ifdef unix
613                 __sync_fetch_and_add(&set->pin, 1);
614 #else
615                 _InterlockedIncrement16 (&set->pin);
616 #endif
617                 bt_initlockset (set, 0);
618                 bt_latchlink (bt, hashidx, victim, page_no);
619                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
620                 return set;
621         }
622
623 #ifdef unix
624         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
625 #else
626         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
627 #endif
628   //  find and reuse previous lock entry
629
630   while( 1 ) {
631 #ifdef unix
632         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
633 #else
634         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
635 #endif
636         //      we don't use slot zero
637
638         if( victim %= bt->mgr->latchmgr->latchtotal )
639                 set = bt->mgr->latchsets + victim;
640         else
641                 continue;
642
643         //      take control of our slot
644         //      from other threads
645
646         if( set->pin || !bt_spinwritetry (set->busy) )
647                 continue;
648
649         idx = set->hash;
650
651         // try to get write lock on hash chain
652         //      skip entry if not obtained
653         //      or has outstanding locks
654
655         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
656                 bt_spinreleasewrite (set->busy);
657                 continue;
658         }
659
660         if( set->pin ) {
661                 bt_spinreleasewrite (set->busy);
662                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
663                 continue;
664         }
665
666         //  unlink our available victim from its hash chain
667
668         if( set->prev )
669                 bt->mgr->latchsets[set->prev].next = set->next;
670         else
671                 bt->mgr->latchmgr->table[idx].slot = set->next;
672
673         if( set->next )
674                 bt->mgr->latchsets[set->next].prev = set->prev;
675
676         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
677 #ifdef unix
678         __sync_fetch_and_add(&set->pin, 1);
679 #else
680         _InterlockedIncrement16 (&set->pin);
681 #endif
682         bt_initlockset (set, 1);
683         bt_latchlink (bt, hashidx, victim, page_no);
684         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
685         bt_spinreleasewrite (set->busy);
686         return set;
687   }
688 }
689
690 void bt_mgrclose (BtMgr *mgr)
691 {
692 BtPool *pool;
693 uint slot;
694
695         // release mapped pages
696         //      note that slot zero is never used
697
698         for( slot = 1; slot < mgr->poolmax; slot++ ) {
699                 pool = mgr->pool + slot;
700                 if( pool->slot )
701 #ifdef unix
702                         munmap (pool->map, (mgr->poolmask+1) << mgr->page_bits);
703 #else
704                 {
705                         FlushViewOfFile(pool->map, 0);
706                         UnmapViewOfFile(pool->map);
707                         CloseHandle(pool->hmap);
708                 }
709 #endif
710         }
711
712 #ifdef unix
713         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
714         munmap (mgr->latchmgr, mgr->page_size);
715 #else
716         FlushViewOfFile(mgr->latchmgr, 0);
717         UnmapViewOfFile(mgr->latchmgr);
718         CloseHandle(mgr->halloc);
719 #endif
720 #ifdef unix
721         close (mgr->idx);
722         free (mgr->pool);
723         free (mgr->hash);
724         free (mgr->latch);
725         free (mgr->pooladvise);
726         free (mgr);
727 #else
728         FlushFileBuffers(mgr->idx);
729         CloseHandle(mgr->idx);
730         GlobalFree (mgr->pool);
731         GlobalFree (mgr->hash);
732         GlobalFree (mgr->latch);
733         GlobalFree (mgr);
734 #endif
735 }
736
737 //      close and release memory
738
739 void bt_close (BtDb *bt)
740 {
741 #ifdef unix
742         if ( bt->mem )
743                 free (bt->mem);
744 #else
745         if ( bt->mem)
746                 VirtualFree (bt->mem, 0, MEM_RELEASE);
747 #endif
748         free (bt);
749 }
750
751 //  open/create new btree buffer manager
752
753 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
754 //              size of mapped page pool (e.g. 8192)
755
756 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
757 {
758 uint lvl, attr, cacheblk, last, slot, idx;
759 uint nlatchpage, latchhash;
760 BtLatchMgr *latchmgr;
761 off64_t size;
762 uint amt[1];
763 BtMgr* mgr;
764 BtKey key;
765 int flag;
766
767 #ifndef unix
768 SYSTEM_INFO sysinfo[1];
769 #endif
770
771         // determine sanity of page size and buffer pool
772
773         if( bits > BT_maxbits )
774                 bits = BT_maxbits;
775         else if( bits < BT_minbits )
776                 bits = BT_minbits;
777
778         if( !poolmax )
779                 return NULL;    // must have buffer pool
780
781 #ifdef unix
782         mgr = calloc (1, sizeof(BtMgr));
783
784         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
785
786         if( mgr->idx == -1 )
787                 return free(mgr), NULL;
788         
789         cacheblk = 4096;        // minimum mmap segment size for unix
790
791 #else
792         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
793         attr = FILE_ATTRIBUTE_NORMAL;
794         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
795
796         if( mgr->idx == INVALID_HANDLE_VALUE )
797                 return GlobalFree(mgr), NULL;
798
799         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
800         GetSystemInfo(sysinfo);
801         cacheblk = sysinfo->dwAllocationGranularity;
802 #endif
803
804 #ifdef unix
805         latchmgr = malloc (BT_maxpage);
806         *amt = 0;
807
808         // read minimum page size to get root info
809
810         if( size = lseek (mgr->idx, 0L, 2) ) {
811                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
812                         bits = latchmgr->alloc->bits;
813                 else
814                         return free(mgr), free(latchmgr), NULL;
815         } else if( mode == BT_ro )
816                 return free(latchmgr), free (mgr), NULL;
817 #else
818         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
819         size = GetFileSize(mgr->idx, amt);
820
821         if( size || *amt ) {
822                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
823                         return bt_mgrclose (mgr), NULL;
824                 bits = latchmgr->alloc->bits;
825         } else if( mode == BT_ro )
826                 return bt_mgrclose (mgr), NULL;
827 #endif
828
829         mgr->page_size = 1 << bits;
830         mgr->page_bits = bits;
831
832         mgr->poolmax = poolmax;
833         mgr->mode = mode;
834
835         if( cacheblk < mgr->page_size )
836                 cacheblk = mgr->page_size;
837
838         //  mask for partial memmaps
839
840         mgr->poolmask = (cacheblk >> bits) - 1;
841
842         //      see if requested size of pages per memmap is greater
843
844         if( (1 << segsize) > mgr->poolmask )
845                 mgr->poolmask = (1 << segsize) - 1;
846
847         mgr->seg_bits = 0;
848
849         while( (1 << mgr->seg_bits) <= mgr->poolmask )
850                 mgr->seg_bits++;
851
852         mgr->hashsize = hashsize;
853
854 #ifdef unix
855         mgr->pool = calloc (poolmax, sizeof(BtPool));
856         mgr->hash = calloc (hashsize, sizeof(ushort));
857         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
858         mgr->pooladvise = calloc (poolmax, (mgr->poolmask + 8) / 8);
859 #else
860         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
861         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
862         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
863 #endif
864
865         if( size || *amt )
866                 goto mgrlatch;
867
868         // initialize an empty b-tree with latch page, root page, page of leaves
869         // and page(s) of latches
870
871         memset (latchmgr, 0, 1 << bits);
872         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
873         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
874         latchmgr->alloc->bits = mgr->page_bits;
875
876         latchmgr->nlatchpage = nlatchpage;
877         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
878
879         //  initialize latch manager
880
881         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
882
883         //      size of hash table = total number of latchsets
884
885         if( latchhash > latchmgr->latchtotal )
886                 latchhash = latchmgr->latchtotal;
887
888         latchmgr->latchhash = latchhash;
889
890 #ifdef unix
891         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
892                 return free(latchmgr), bt_mgrclose (mgr), NULL;
893 #else
894         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
895                 return bt_mgrclose (mgr), NULL;
896
897         if( *amt < mgr->page_size )
898                 return bt_mgrclose (mgr), NULL;
899 #endif
900
901         memset (latchmgr, 0, 1 << bits);
902         latchmgr->alloc->bits = mgr->page_bits;
903
904         for( lvl=MIN_lvl; lvl--; ) {
905                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3;
906                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
907                 key = keyptr(latchmgr->alloc, 1);
908                 key->len = 2;                   // create stopper key
909                 key->key[0] = 0xff;
910                 key->key[1] = 0xff;
911                 latchmgr->alloc->min = mgr->page_size - 3;
912                 latchmgr->alloc->lvl = lvl;
913                 latchmgr->alloc->cnt = 1;
914                 latchmgr->alloc->act = 1;
915 #ifdef unix
916                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
917                         return bt_mgrclose (mgr), NULL;
918 #else
919                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
920                         return bt_mgrclose (mgr), NULL;
921
922                 if( *amt < mgr->page_size )
923                         return bt_mgrclose (mgr), NULL;
924 #endif
925         }
926
927         // clear out latch manager locks
928         //      and rest of pages to round out segment
929
930         memset(latchmgr, 0, mgr->page_size);
931         last = MIN_lvl + 1;
932
933         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
934 #ifdef unix
935                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
936 #else
937                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
938                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
939                         return bt_mgrclose (mgr), NULL;
940                 if( *amt < mgr->page_size )
941                         return bt_mgrclose (mgr), NULL;
942 #endif
943                 last++;
944         }
945
946 mgrlatch:
947 #ifdef unix
948         flag = PROT_READ | PROT_WRITE;
949         mgr->latchmgr = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
950         if( mgr->latchmgr == MAP_FAILED )
951                 return bt_mgrclose (mgr), NULL;
952         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
953         if( mgr->latchsets == MAP_FAILED )
954                 return bt_mgrclose (mgr), NULL;
955 #else
956         flag = PAGE_READWRITE;
957         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
958         if( !mgr->halloc )
959                 return bt_mgrclose (mgr), NULL;
960
961         flag = FILE_MAP_WRITE;
962         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
963         if( !mgr->latchmgr )
964                 return GetLastError(), bt_mgrclose (mgr), NULL;
965
966         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
967 #endif
968
969 #ifdef unix
970         free (latchmgr);
971 #else
972         VirtualFree (latchmgr, 0, MEM_RELEASE);
973 #endif
974         return mgr;
975 }
976
977 //      open BTree access method
978 //      based on buffer manager
979
980 BtDb *bt_open (BtMgr *mgr)
981 {
982 BtDb *bt = malloc (sizeof(*bt));
983
984         memset (bt, 0, sizeof(*bt));
985         bt->mgr = mgr;
986 #ifdef unix
987         bt->mem = malloc (3 *mgr->page_size);
988 #else
989         bt->mem = VirtualAlloc(NULL, 3 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
990 #endif
991         bt->frame = (BtPage)bt->mem;
992         bt->zero = (BtPage)(bt->mem + 1 * mgr->page_size);
993         bt->cursor = (BtPage)(bt->mem + 2 * mgr->page_size);
994
995         memset(bt->zero, 0, mgr->page_size);
996         return bt;
997 }
998
999 //  compare two keys, returning > 0, = 0, or < 0
1000 //  as the comparison value
1001
1002 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1003 {
1004 uint len1 = key1->len;
1005 int ans;
1006
1007         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1008                 return ans;
1009
1010         if( len1 > len2 )
1011                 return 1;
1012         if( len1 < len2 )
1013                 return -1;
1014
1015         return 0;
1016 }
1017
1018 //      Latch Manager
1019
1020 void bt_readlock(BtLatch *latch)
1021 {
1022 #ifdef unix
1023         pthread_rwlock_rdlock (latch->lock);
1024 #else
1025         AcquireSRWLockShared (latch->srw);
1026 #endif
1027 }
1028
1029 //      wait for other read and write latches to relinquish
1030
1031 void bt_writelock(BtLatch *latch)
1032 {
1033 #ifdef unix
1034         pthread_rwlock_wrlock (latch->lock);
1035 #else
1036         AcquireSRWLockExclusive (latch->srw);
1037 #endif
1038 }
1039
1040 //      try to obtain write lock
1041
1042 //      return 1 if obtained,
1043 //              0 if already write or read locked
1044
1045 int bt_writetry(BtLatch *latch)
1046 {
1047 int result = 0;
1048
1049 #ifdef unix
1050         result = !pthread_rwlock_trywrlock (latch->lock);
1051 #else
1052         result = TryAcquireSRWLockExclusive (latch->srw);
1053 #endif
1054         return result;
1055 }
1056
1057 //      clear write mode
1058
1059 void bt_releasewrite(BtLatch *latch)
1060 {
1061 #ifdef unix
1062         pthread_rwlock_unlock (latch->lock);
1063 #else
1064         ReleaseSRWLockExclusive (latch->srw);
1065 #endif
1066 }
1067
1068 //      decrement reader count
1069
1070 void bt_releaseread(BtLatch *latch)
1071 {
1072 #ifdef unix
1073         pthread_rwlock_unlock (latch->lock);
1074 #else
1075         ReleaseSRWLockShared (latch->srw);
1076 #endif
1077 }
1078
1079 //      Buffer Pool mgr
1080
1081 // find segment in pool
1082 // must be called with hashslot idx locked
1083 //      return NULL if not there
1084 //      otherwise return node
1085
1086 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1087 {
1088 BtPool *pool;
1089 uint slot;
1090
1091         // compute start of hash chain in pool
1092
1093         if( slot = bt->mgr->hash[idx] ) 
1094                 pool = bt->mgr->pool + slot;
1095         else
1096                 return NULL;
1097
1098         page_no &= ~bt->mgr->poolmask;
1099
1100         while( pool->basepage != page_no )
1101           if( pool = pool->hashnext )
1102                 continue;
1103           else
1104                 return NULL;
1105
1106         return pool;
1107 }
1108
1109 // add segment to hash table
1110
1111 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1112 {
1113 BtPool *node;
1114 uint slot;
1115
1116         pool->hashprev = pool->hashnext = NULL;
1117         pool->basepage = page_no & ~bt->mgr->poolmask;
1118         pool->lru = 1;
1119
1120         if( slot = bt->mgr->hash[idx] ) {
1121                 node = bt->mgr->pool + slot;
1122                 pool->hashnext = node;
1123                 node->hashprev = pool;
1124         }
1125
1126         bt->mgr->hash[idx] = pool->slot;
1127 }
1128
1129 //      find best segment to evict from buffer pool
1130
1131 BtPool *bt_findlru (BtDb *bt, uint hashslot)
1132 {
1133 unsigned long long int target = ~0LL;
1134 BtPool *pool = NULL, *node;
1135
1136         if( !hashslot )
1137                 return NULL;
1138
1139         node = bt->mgr->pool + hashslot;
1140
1141         //      scan pool entries under hash table slot
1142
1143         do {
1144           if( node->pin )
1145                 continue;
1146           if( node->lru > target )
1147                 continue;
1148           target = node->lru;
1149           pool = node;
1150         } while( node = node->hashnext );
1151
1152         return pool;
1153 }
1154
1155 //  map new buffer pool segment to virtual memory
1156
1157 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1158 {
1159 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1160 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1161 int flag;
1162
1163 #ifdef unix
1164         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1165         pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1166         if( pool->map == MAP_FAILED )
1167                 return bt->err = BTERR_map;
1168         // clear out madvise issued bits
1169         memset (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8) / 8), 0, (bt->mgr->poolmask + 8)/8);
1170 #else
1171         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1172         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1173         if( !pool->hmap )
1174                 return bt->err = BTERR_map;
1175
1176         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1177         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1178         if( !pool->map )
1179                 return bt->err = BTERR_map;
1180 #endif
1181         return bt->err = 0;
1182 }
1183
1184 //      calculate page within pool
1185
1186 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1187 {
1188 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1189 BtPage page;
1190
1191         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1192 #ifdef unix
1193         {
1194         uint idx = subpage / 8;
1195         uint bit = subpage % 8;
1196
1197                 if( ~((bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] >> bit) & 1 ) {
1198                   madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1199                   (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] |= 1 << bit;
1200                 }
1201         }
1202 #endif
1203         return page;
1204 }
1205
1206 //  release pool pin
1207
1208 void bt_unpinpool (BtPool *pool)
1209 {
1210 #ifdef unix
1211         __sync_fetch_and_add(&pool->pin, -1);
1212 #else
1213         _InterlockedDecrement16 (&pool->pin);
1214 #endif
1215 }
1216
1217 //      find or place requested page in segment-pool
1218 //      return pool table entry, incrementing pin
1219
1220 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1221 {
1222 BtPool *pool, *node, *next;
1223 uint slot, idx, victim;
1224 BtLatchSet *set;
1225
1226         //      lock hash table chain
1227
1228         idx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1229         bt_spinreadlock (&bt->mgr->latch[idx]);
1230
1231         //      look up in hash table
1232
1233         if( pool = bt_findpool(bt, page_no, idx) ) {
1234 #ifdef unix
1235                 __sync_fetch_and_add(&pool->pin, 1);
1236 #else
1237                 _InterlockedIncrement16 (&pool->pin);
1238 #endif
1239                 bt_spinreleaseread (&bt->mgr->latch[idx]);
1240                 pool->lru++;
1241                 return pool;
1242         }
1243
1244         //      upgrade to write lock
1245
1246         bt_spinreleaseread (&bt->mgr->latch[idx]);
1247         bt_spinwritelock (&bt->mgr->latch[idx]);
1248
1249         // try to find page in pool with write lock
1250
1251         if( pool = bt_findpool(bt, page_no, idx) ) {
1252 #ifdef unix
1253                 __sync_fetch_and_add(&pool->pin, 1);
1254 #else
1255                 _InterlockedIncrement16 (&pool->pin);
1256 #endif
1257                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1258                 pool->lru++;
1259                 return pool;
1260         }
1261
1262         // allocate a new pool node
1263         // and add to hash table
1264
1265 #ifdef unix
1266         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1267 #else
1268         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1269 #endif
1270
1271         if( ++slot < bt->mgr->poolmax ) {
1272                 pool = bt->mgr->pool + slot;
1273                 pool->slot = slot;
1274
1275                 if( bt_mapsegment(bt, pool, page_no) )
1276                         return NULL;
1277
1278                 bt_linkhash(bt, pool, page_no, idx);
1279 #ifdef unix
1280                 __sync_fetch_and_add(&pool->pin, 1);
1281 #else
1282                 _InterlockedIncrement16 (&pool->pin);
1283 #endif
1284                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1285                 return pool;
1286         }
1287
1288         // pool table is full
1289         //      find best pool entry to evict
1290
1291 #ifdef unix
1292         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1293 #else
1294         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1295 #endif
1296
1297         while( 1 ) {
1298 #ifdef unix
1299                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1300 #else
1301                 victim = _InterlockedIncrement16 (&bt->mgr->evicted) - 1;
1302 #endif
1303                 victim %= bt->mgr->hashsize;
1304
1305                 // try to get write lock
1306                 //      skip entry if not obtained
1307
1308                 if( !bt_spinwritetry (&bt->mgr->latch[victim]) )
1309                         continue;
1310
1311                 //  if cache entry is empty
1312                 //      or no slots are unpinned
1313                 //      skip this entry
1314
1315                 if( !(pool = bt_findlru(bt, bt->mgr->hash[victim])) ) {
1316                         bt_spinreleasewrite (&bt->mgr->latch[victim]);
1317                         continue;
1318                 }
1319
1320                 // unlink victim pool node from hash table
1321
1322                 if( node = pool->hashprev )
1323                         node->hashnext = pool->hashnext;
1324                 else if( node = pool->hashnext )
1325                         bt->mgr->hash[victim] = node->slot;
1326                 else
1327                         bt->mgr->hash[victim] = 0;
1328
1329                 if( node = pool->hashnext )
1330                         node->hashprev = pool->hashprev;
1331
1332                 bt_spinreleasewrite (&bt->mgr->latch[victim]);
1333
1334                 //      remove old file mapping
1335 #ifdef unix
1336                 munmap (pool->map, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1337 #else
1338                 FlushViewOfFile(pool->map, 0);
1339                 UnmapViewOfFile(pool->map);
1340                 CloseHandle(pool->hmap);
1341 #endif
1342                 pool->map = NULL;
1343
1344                 //  create new pool mapping
1345                 //  and link into hash table
1346
1347                 if( bt_mapsegment(bt, pool, page_no) )
1348                         return NULL;
1349
1350                 bt_linkhash(bt, pool, page_no, idx);
1351 #ifdef unix
1352                 __sync_fetch_and_add(&pool->pin, 1);
1353 #else
1354                 _InterlockedIncrement16 (&pool->pin);
1355 #endif
1356                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1357                 return pool;
1358         }
1359 }
1360
1361 // place write, read, or parent lock on requested page_no.
1362 //      pin to buffer pool and return latchset pointer
1363
1364 void bt_lockpage(BtLock mode, BtLatchSet *set)
1365 {
1366         switch( mode ) {
1367         case BtLockRead:
1368                 bt_readlock (set->readwr);
1369                 break;
1370         case BtLockWrite:
1371                 bt_writelock (set->readwr);
1372                 break;
1373         case BtLockAccess:
1374                 bt_readlock (set->access);
1375                 break;
1376         case BtLockDelete:
1377                 bt_writelock (set->access);
1378                 break;
1379         case BtLockParent:
1380                 bt_writelock (set->parent);
1381                 break;
1382         }
1383 }
1384
1385 // remove write, read, or parent lock on requested page_no.
1386
1387 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1388 {
1389         switch( mode ) {
1390         case BtLockRead:
1391                 bt_releaseread (set->readwr);
1392                 break;
1393         case BtLockWrite:
1394                 bt_releasewrite (set->readwr);
1395                 break;
1396         case BtLockAccess:
1397                 bt_releaseread (set->access);
1398                 break;
1399         case BtLockDelete:
1400                 bt_releasewrite (set->access);
1401                 break;
1402         case BtLockParent:
1403                 bt_releasewrite (set->parent);
1404                 break;
1405         }
1406 }
1407
1408 //      allocate a new page and write page into it
1409
1410 uid bt_newpage(BtDb *bt, BtPage page)
1411 {
1412 BtLatchSet *set;
1413 BtPool *pool;
1414 uid new_page;
1415 BtPage pmap;
1416 int reuse;
1417
1418         //      lock allocation page
1419
1420         bt_spinwritelock(bt->mgr->latchmgr->lock);
1421
1422         // use empty chain first
1423         // else allocate empty page
1424
1425         if( new_page = bt_getid(bt->mgr->latchmgr->alloc[1].right) ) {
1426                 if( pool = bt_pinpool (bt, new_page) )
1427                         pmap = bt_page (bt, pool, new_page);
1428                 else
1429                         return 0;
1430                 bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(pmap->right));
1431                 bt_unpinpool (pool);
1432                 reuse = 1;
1433         } else {
1434                 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1435                 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1436                 reuse = 0;
1437         }
1438 #ifdef unix
1439         // if writing first page of pool block, zero last page in the block
1440
1441         if ( !reuse && bt->mgr->poolmask > 0 && (new_page & bt->mgr->poolmask) == 0 )
1442         {
1443                 // use zero buffer to write zeros
1444                 if ( pwrite(bt->mgr->idx,bt->zero, bt->mgr->page_size, (new_page | bt->mgr->poolmask) << bt->mgr->page_bits) < bt->mgr->page_size )
1445                         return bt->err = BTERR_wrt, 0;
1446         }
1447
1448         // unlock allocation latch
1449
1450         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1451
1452         if ( pwrite(bt->mgr->idx, page, bt->mgr->page_size, new_page << bt->mgr->page_bits) < bt->mgr->page_size )
1453                 return bt->err = BTERR_wrt, 0;
1454
1455 #else
1456         // unlock allocation latch
1457
1458         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1459
1460         //      bring new page into pool and copy page.
1461         //      this will extend the file into the new pages.
1462         //      NB -- no latch required
1463
1464         if( pool = bt_pinpool (bt, new_page) )
1465                 pmap = bt_page (bt, pool, new_page);
1466         else
1467                 return 0;
1468
1469         memcpy(pmap, page, bt->mgr->page_size);
1470         bt_unpinpool (pool);
1471 #endif
1472         return new_page;
1473 }
1474
1475 //  find slot in page for given key at a given level
1476
1477 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1478 {
1479 uint diff, higher = bt->page->cnt, low = 1, slot;
1480
1481         //      low is the lowest candidate, higher is already
1482         //      tested as .ge. the given key, loop ends when they meet
1483
1484         while( diff = higher - low ) {
1485                 slot = low + ( diff >> 1 );
1486                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1487                         low = slot + 1;
1488                 else
1489                         higher = slot;
1490         }
1491
1492         return higher;
1493 }
1494
1495 //  find and load page at given level for given key
1496 //      leave page rd or wr locked as requested
1497
1498 uint bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, BtLock lock)
1499 {
1500 uid page_no = ROOT_page, prevpage = 0;
1501 BtLatchSet *set, *prevset;
1502 uint drill = 0xff, slot;
1503 uint mode, prevmode;
1504 BtPool *prevpool;
1505 int foster = 0;
1506
1507   //  start at root of btree and drill down
1508
1509   do {
1510         // determine lock mode of drill level
1511         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1512
1513         //      obtain latch set for this page
1514
1515         bt->set = bt_pinlatch (bt, page_no);
1516         bt->page_no = page_no;
1517
1518         // pin page contents
1519
1520         if( bt->pool = bt_pinpool (bt, page_no) )
1521                 bt->page = bt_page (bt, bt->pool, page_no);
1522         else
1523                 return 0;
1524
1525         // obtain access lock using lock chaining with Access mode
1526
1527         if( page_no > ROOT_page )
1528           bt_lockpage(BtLockAccess, bt->set);
1529
1530         //  now unlock and unpin our (possibly foster) parent
1531
1532         if( prevpage ) {
1533           bt_unlockpage(prevmode, prevset);
1534           bt_unpinlatch (prevset);
1535           bt_unpinpool (prevpool);
1536           prevpage = 0;
1537         }
1538
1539         // obtain read lock using lock chaining
1540
1541         bt_lockpage(mode, bt->set);
1542
1543         if( page_no > ROOT_page )
1544           bt_unlockpage(BtLockAccess, bt->set);
1545
1546         // re-read and re-lock root after determining actual level of root
1547
1548         if( page_no == ROOT_page )
1549           if( bt->page->lvl != drill) {
1550                 drill = bt->page->lvl;
1551
1552             if( lock == BtLockWrite && drill == lvl ) {
1553                   bt_unlockpage(mode, bt->set);
1554                   bt_unpinlatch (bt->set);
1555                   bt_unpinpool (bt->pool);
1556                   continue;
1557                 }
1558           }
1559
1560         prevpage = bt->page_no;
1561         prevpool = bt->pool;
1562         prevset = bt->set;
1563         prevmode = mode;
1564
1565         //      were we supposed to find a foster child?
1566         //      if so, slide right onto it
1567
1568         if( keycmp (keyptr(bt->page,bt->page->cnt), key, len) < 0 ) {
1569                 page_no = bt_getid(bt->page->right);
1570                 foster = 1;
1571                 continue;
1572         }
1573
1574         //  find key on page at this level
1575         //  and either descend to requested level
1576         //      or return key slot
1577
1578         slot = bt_findslot (bt, key, len);
1579
1580         //      is this slot < foster child area
1581         //      on the requested level?
1582
1583         //      if so, return actual slot even if dead
1584
1585         if( slot <= bt->page->cnt - bt->page->foster )
1586           if( drill == lvl )
1587                 return bt->foster = foster, slot;
1588
1589         //      find next active slot
1590
1591         //      note: foster children are never dead
1592
1593         while( slotptr(bt->page, slot)->dead )
1594           if( slot++ < bt->page->cnt )
1595                 continue;
1596           else {
1597                 //  we are waiting for fence key posting
1598                 page_no = bt_getid(bt->page->right);
1599                 continue;
1600           }
1601
1602         //      is this slot < foster child area
1603         //      if so, drill to next level
1604
1605         if( slot <= bt->page->cnt - bt->page->foster )
1606                 foster = 0, drill--;
1607         else
1608                 foster = 1;
1609
1610         //  continue right onto foster child
1611         //      or down to next level.
1612
1613         page_no = bt_getid(slotptr(bt->page, slot)->id);
1614
1615   } while( page_no );
1616
1617   // return error on end of chain
1618
1619   bt->err = BTERR_struct;
1620   return 0;     // return error
1621 }
1622
1623 //  remove empty page from the B-tree
1624 //      by pulling our right node left over ourselves
1625
1626 //  call with bt->page, etc, set to page's locked parent
1627 //      returns with page locked.
1628
1629 BTERR bt_mergeright (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl, uint slot)
1630 {
1631 BtLatchSet *rset, *pset, *rpset;
1632 BtPool *rpool, *ppool, *rppool;
1633 BtPage rpage, ppage, rppage;
1634 uid right, parent, rparent;
1635 BtKey ptr;
1636 uint idx;
1637
1638         //      cache node's parent page
1639
1640         parent = bt->page_no;
1641         ppage = bt->page;
1642         ppool = bt->pool;
1643         pset = bt->set;
1644
1645         // lock and map our right page
1646         // note that it cannot be our foster child
1647         // since the our node is empty
1648         //      and it cannot be NULL because of the stopper
1649         //      in the last right page
1650
1651         bt_lockpage (BtLockWrite, set);
1652
1653         // if we aren't dead yet
1654
1655         if( page->act )
1656                 goto rmergexit;
1657
1658         if( right = bt_getid (page->right) )
1659           if( rpool = bt_pinpool (bt, right) )
1660                 rpage = bt_page (bt, rpool, right);
1661           else
1662                 return bt->err;
1663         else
1664                 return bt->err = BTERR_struct;
1665
1666         rset = bt_pinlatch (bt, right);
1667
1668         //      find our right neighbor
1669
1670         if( ppage->act > 1 ) {
1671          for( idx = slot; idx++ < ppage->cnt; )
1672           if( !slotptr(ppage, idx)->dead )
1673                 break;
1674
1675          if( idx > ppage->cnt )
1676                 return bt->err = BTERR_struct;
1677
1678          //  redirect right neighbor in parent to left node
1679
1680          bt_putid(slotptr(ppage,idx)->id, page_no);
1681         }
1682
1683         //      if parent has only our deleted page, e.g. no right neighbor
1684         //      prepare to merge parent itself
1685
1686         if( ppage->act == 1 ) {
1687           if( rparent = bt_getid (ppage->right) )
1688            if( rppool = bt_pinpool (bt, rparent) )
1689                 rppage = bt_page (bt, rppool, rparent);
1690            else
1691                 return bt->err;
1692           else
1693                 return bt->err = BTERR_struct;
1694
1695           rpset = bt_pinlatch (bt, rparent);
1696           bt_lockpage (BtLockWrite, rpset);
1697
1698           // find our right neighbor on right parent page
1699
1700           for( idx = 0; idx++ < rppage->cnt; )
1701                 if( !slotptr(rppage, idx)->dead ) {
1702                   bt_putid (slotptr(rppage, idx)->id, page_no);
1703                   break;
1704                 }
1705
1706           if( idx > rppage->cnt )
1707                 return bt->err = BTERR_struct;
1708         }
1709
1710         //      now that there are no more pointers to our right node
1711         //      we can wait for delete lock on it
1712
1713         bt_lockpage(BtLockDelete, rset);
1714         bt_lockpage(BtLockWrite, rset);
1715
1716         // pull contents of right page into our empty page 
1717
1718         memcpy (page, rpage, bt->mgr->page_size);
1719
1720         // ready to release right parent lock
1721         //      now that we have a new page in place
1722
1723         if( ppage->act == 1 ) {
1724           bt_unlockpage (BtLockWrite, rpset);
1725           bt_unpinlatch (rpset);
1726           bt_unpinpool (rppool);
1727         }
1728
1729         //      add killed right block to free chain
1730         //      lock latch mgr
1731
1732         bt_spinwritelock(bt->mgr->latchmgr->lock);
1733
1734         //      store free chain in allocation page second right
1735
1736         bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
1737         bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
1738
1739         // unlock latch mgr and right page
1740
1741         bt_unlockpage(BtLockDelete, rset);
1742         bt_unlockpage(BtLockWrite, rset);
1743         bt_unpinlatch (rset);
1744         bt_unpinpool (rpool);
1745
1746         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1747
1748         // delete our obsolete fence key from our parent
1749
1750         slotptr(ppage, slot)->dead = 1;
1751         ppage->dirty = 1;
1752
1753         //      if our parent now empty
1754         //      remove it from the tree
1755
1756         if( ppage->act-- == 1 )
1757           if( bt_mergeleft (bt, ppage, ppool, pset, parent, lvl+1) )
1758                 return bt->err;
1759
1760 rmergexit:
1761         bt_unlockpage (BtLockWrite, pset);
1762         bt_unpinlatch (pset);
1763         bt_unpinpool (ppool);
1764
1765         bt->found = 1;
1766         return bt->err = 0;
1767 }
1768
1769 //  remove empty page from the B-tree
1770 //      try merging left first.  If no left
1771 //      sibling, then merge right.
1772
1773 //      call with page loaded and locked,
1774 //  return with page locked.
1775
1776 BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl)
1777 {
1778 unsigned char fencekey[256], postkey[256];
1779 uint slot, idx, postfence = 0;
1780 BtLatchSet *lset, *pset;
1781 BtPool *lpool, *ppool;
1782 BtPage lpage, ppage;
1783 uid left, parent;
1784 BtKey ptr;
1785
1786         ptr = keyptr(page, page->cnt);
1787         memcpy(fencekey, ptr, ptr->len + 1);
1788         bt_unlockpage (BtLockWrite, set);
1789
1790         //      load and lock our parent
1791
1792 retry:
1793         if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+1, BtLockWrite)) )
1794                 return bt->err;
1795
1796         parent = bt->page_no;
1797         ppage = bt->page;
1798         ppool = bt->pool;
1799         pset = bt->set;
1800
1801         //      wait until we are not a foster child
1802
1803         if( bt->foster ) {
1804                 bt_unlockpage (BtLockWrite, pset);
1805                 bt_unpinlatch (pset);
1806                 bt_unpinpool (ppool);
1807 #ifdef unix
1808                 sched_yield();
1809 #else
1810                 SwitchToThread();
1811 #endif
1812                 goto retry;
1813         }
1814
1815         //  find our left neighbor in our parent page
1816
1817         for( idx = slot; --idx; )
1818           if( !slotptr(ppage, idx)->dead )
1819                 break;
1820
1821         //      if no left neighbor, do right merge
1822
1823         if( !idx )
1824                 return bt_mergeright (bt, page, pool, set, page_no, lvl, slot);
1825
1826         // lock and map our left neighbor's page
1827
1828         left = bt_getid (slotptr(ppage, idx)->id);
1829
1830         if( lpool = bt_pinpool (bt, left) )
1831                 lpage = bt_page (bt, lpool, left);
1832         else
1833                 return bt->err;
1834
1835         lset = bt_pinlatch (bt, left);
1836         bt_lockpage(BtLockWrite, lset);
1837
1838         //  wait until foster sibling is in our parent
1839
1840         if( bt_getid (lpage->right) != page_no ) {
1841                 bt_unlockpage (BtLockWrite, pset);
1842                 bt_unpinlatch (pset);
1843                 bt_unpinpool (ppool);
1844                 bt_unlockpage (BtLockWrite, lset);
1845                 bt_unpinlatch (lset);
1846                 bt_unpinpool (lpool);
1847 #ifdef linux
1848                 sched_yield();
1849 #else
1850                 SwitchToThread();
1851 #endif
1852                 goto retry;
1853         }
1854
1855         //  since our page will have no more pointers to it,
1856         //      obtain Delete lock and wait for write locks to clear
1857
1858         bt_lockpage(BtLockDelete, set);
1859         bt_lockpage(BtLockWrite, set);
1860
1861         //      if we aren't dead yet,
1862         //      get ready for exit
1863
1864         if( page->act ) {
1865                 bt_unlockpage(BtLockDelete, set);
1866                 bt_unlockpage(BtLockWrite, lset);
1867                 bt_unpinlatch (lset);
1868                 bt_unpinpool (lpool);
1869                 goto lmergexit;
1870         }
1871
1872         //      are we are the fence key for our parent?
1873         //      if so, grab our old fence key
1874
1875         if( postfence = slot == ppage->cnt ) {
1876                 ptr = keyptr (ppage, ppage->cnt);
1877                 memcpy(fencekey, ptr, ptr->len + 1);
1878                 memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
1879
1880                 // clear out other dead slots
1881
1882                 while( --ppage->cnt )
1883                   if( slotptr(ppage, ppage->cnt)->dead )
1884                         memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
1885                   else
1886                         break;
1887
1888                 ptr = keyptr (ppage, ppage->cnt);
1889                 memcpy(postkey, ptr, ptr->len + 1);
1890         } else
1891                 slotptr(ppage,slot)->dead = 1;
1892
1893         ppage->dirty = 1;
1894         ppage->act--;
1895
1896         //      push our right neighbor pointer to our left
1897
1898         memcpy (lpage->right, page->right, BtId);
1899
1900         //      add ourselves to free chain
1901         //      lock latch mgr
1902
1903         bt_spinwritelock(bt->mgr->latchmgr->lock);
1904
1905         //      store free chain in allocation page second right
1906         bt_putid(page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
1907         bt_putid(bt->mgr->latchmgr->alloc[1].right, page_no);
1908
1909         // unlock latch mgr and pages
1910
1911         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1912         bt_unlockpage(BtLockWrite, lset);
1913         bt_unpinlatch (lset);
1914         bt_unpinpool (lpool);
1915
1916         // release our node's delete lock
1917
1918         bt_unlockpage(BtLockDelete, set);
1919
1920 lmergexit:
1921         bt_unlockpage (BtLockWrite, pset);
1922         bt_unpinpool (ppool);
1923
1924         //  do we need to post parent's fence key in its parent?
1925
1926         if( !postfence || parent == ROOT_page ) {
1927                 bt_unpinlatch (pset);
1928                 bt->found = 1;
1929                 return bt->err = 0;
1930         }
1931
1932         //      interlock parent fence post
1933
1934         bt_lockpage (BtLockParent, pset);
1935
1936         //      load parent's parent page
1937 posttry:
1938         if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+2, BtLockWrite)) )
1939                 return bt->err;
1940
1941         if( !(slot = bt_cleanpage (bt, bt->page, *fencekey, slot)) )
1942           if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
1943                 return bt->err;
1944           else
1945                 goto posttry;
1946
1947         page = bt->page;
1948
1949         page->min -= *postkey + 1;
1950         ((unsigned char *)page)[page->min] = *postkey;
1951         memcpy ((unsigned char *)page + page->min +1, postkey + 1, *postkey );
1952         slotptr(page, slot)->off = page->min;
1953
1954         bt_unlockpage (BtLockParent, pset);
1955         bt_unpinlatch (pset);
1956
1957         bt_unlockpage (BtLockWrite, bt->set);
1958         bt_unpinlatch (bt->set);
1959         bt_unpinpool (bt->pool);
1960
1961         bt->found = 1;
1962         return bt->err = 0;
1963 }
1964
1965 //  find and delete key on page by marking delete flag bit
1966 //  if page becomes empty, delete it from the btree
1967
1968 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
1969 {
1970 BtLatchSet *set;
1971 BtPool *pool;
1972 BtPage page;
1973 uid page_no;
1974 BtKey ptr;
1975 uint slot;
1976
1977         if( !(slot = bt_loadpage (bt, key, len, 0, BtLockWrite)) )
1978                 return bt->err;
1979
1980         page_no = bt->page_no;
1981         page = bt->page;
1982         pool = bt->pool;
1983         set = bt->set;
1984
1985         // if key is found delete it, otherwise ignore request
1986
1987         ptr = keyptr(page, slot);
1988
1989         if( bt->found = !keycmp (ptr, key, len) )
1990           if( bt->found = slotptr(page, slot)->dead == 0 ) {
1991                 slotptr(page,slot)->dead = 1;
1992                   if( slot < page->cnt )
1993                         page->dirty = 1;
1994                   if( !--page->act )
1995                         if( bt_mergeleft (bt, page, pool, set, page_no, 0) )
1996                           return bt->err;
1997                 }
1998
1999         bt_unlockpage(BtLockWrite, set);
2000         bt_unpinlatch (set);
2001         bt_unpinpool (pool);
2002         return bt->err = 0;
2003 }
2004
2005 //      find key in leaf level and return row-id
2006
2007 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
2008 {
2009 uint  slot;
2010 BtKey ptr;
2011 uid id;
2012
2013         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
2014                 ptr = keyptr(bt->page, slot);
2015         else
2016                 return 0;
2017
2018         // if key exists, return row-id
2019         //      otherwise return 0
2020
2021         if( slot <= bt->page->cnt && !keycmp (ptr, key, len) )
2022                 id = bt_getid(slotptr(bt->page,slot)->id);
2023         else
2024                 id = 0;
2025
2026         bt_unlockpage (BtLockRead, bt->set);
2027         bt_unpinlatch (bt->set);
2028         bt_unpinpool (bt->pool);
2029         return id;
2030 }
2031
2032 //      check page for space available,
2033 //      clean if necessary and return
2034 //      0 - page needs splitting
2035 //      >0  new slot value
2036
2037 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
2038 {
2039 uint nxt = bt->mgr->page_size;
2040 uint cnt = 0, idx = 0;
2041 uint max = page->cnt;
2042 uint newslot = max;
2043 BtKey key;
2044
2045         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
2046                 return slot;
2047
2048         //      skip cleanup if nothing to reclaim
2049
2050         if( !page->dirty )
2051                 return 0;
2052
2053         memcpy (bt->frame, page, bt->mgr->page_size);
2054
2055         // skip page info and set rest of page to zero
2056
2057         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2058         page->dirty = 0;
2059         page->act = 0;
2060
2061         // try cleaning up page first
2062
2063         // always leave fence key in the array
2064         // otherwise, remove deleted key
2065
2066         // note: foster children are never dead
2067
2068         while( cnt++ < max ) {
2069                 if( cnt == slot )
2070                         newslot = idx + 1;
2071                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
2072                         continue;
2073
2074                 // copy key
2075
2076                 key = keyptr(bt->frame, cnt);
2077                 nxt -= key->len + 1;
2078                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
2079
2080                 // copy slot
2081                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
2082                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2083                         page->act++;
2084                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
2085                 slotptr(page, idx)->off = nxt;
2086         }
2087
2088         page->min = nxt;
2089         page->cnt = idx;
2090
2091         //      see if page has enough space now, or does it need splitting?
2092
2093         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
2094                 return newslot;
2095
2096         return 0;
2097 }
2098
2099 //      add key to current page
2100 //      page must already be writelocked
2101
2102 void bt_addkeytopage (BtDb *bt, BtPage page, uint slot, unsigned char *key, uint len, uid id, uint tod)
2103 {
2104 uint idx;
2105
2106         // find next available dead slot and copy key onto page
2107         // note that foster children on the page are never dead
2108
2109         // look for next hole, but stay back from the fence key
2110
2111         for( idx = slot; idx < page->cnt; idx++ )
2112           if( slotptr(page, idx)->dead )
2113                 break;
2114
2115         if( idx == page->cnt )
2116                 idx++, page->cnt++;
2117
2118         page->act++;
2119
2120         // now insert key into array before slot
2121
2122         while( idx > slot )
2123                 *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
2124
2125         page->min -= len + 1;
2126         ((unsigned char *)page)[page->min] = len;
2127         memcpy ((unsigned char *)page + page->min +1, key, len );
2128
2129         bt_putid(slotptr(page,slot)->id, id);
2130         slotptr(page, slot)->off = page->min;
2131         slotptr(page, slot)->tod = tod;
2132         slotptr(page, slot)->dead = 0;
2133 }
2134
2135 // split the root and raise the height of the btree
2136 //      call with current page locked and page no of foster child
2137 //      return with current page (root) unlocked
2138
2139 BTERR bt_splitroot(BtDb *bt, uid right)
2140 {
2141 uint nxt = bt->mgr->page_size;
2142 unsigned char fencekey[256];
2143 BtPage root = bt->page;
2144 uid new_page;
2145 BtKey key;
2146
2147         //  Obtain an empty page to use, and copy the left page
2148         //  contents into it from the root.  Strip foster child key.
2149         //      (it's the stopper key)
2150
2151         memset (slotptr(root, root->cnt), 0, sizeof(BtSlot));
2152         root->dirty = 1;
2153         root->foster--;
2154         root->act--;
2155         root->cnt--;
2156
2157         //      Save left fence key.
2158
2159         key = keyptr(root, root->cnt);
2160         memcpy (fencekey, key, key->len + 1);
2161
2162         //  copy the lower keys into a new left page
2163
2164         if( !(new_page = bt_newpage(bt, root)) )
2165                 return bt->err;
2166
2167         // preserve the page info at the bottom
2168         // and set rest of the root to zero
2169
2170         memset (root+1, 0, bt->mgr->page_size - sizeof(*root));
2171
2172         // insert left fence key on empty newroot page
2173
2174         nxt -= *fencekey + 1;
2175         memcpy ((unsigned char *)root + nxt, fencekey, *fencekey + 1);
2176         bt_putid(slotptr(root, 1)->id, new_page);
2177         slotptr(root, 1)->off = nxt;
2178         
2179         // insert stopper key on newroot page
2180         // and increase the root height
2181
2182         nxt -= 3;
2183         fencekey[0] = 2;
2184         fencekey[1] = 0xff;
2185         fencekey[2] = 0xff;
2186         memcpy ((unsigned char *)root + nxt, fencekey, *fencekey + 1);
2187         bt_putid(slotptr(root, 2)->id, right);
2188         slotptr(root, 2)->off = nxt;
2189
2190         bt_putid(root->right, 0);
2191         root->min = nxt;                // reset lowest used offset and key count
2192         root->cnt = 2;
2193         root->act = 2;
2194         root->lvl++;
2195
2196         // release and unpin root (bt->page)
2197
2198         bt_unlockpage(BtLockWrite, bt->set);
2199         bt_unpinlatch (bt->set);
2200         bt_unpinpool (bt->pool);
2201         return 0;
2202 }
2203
2204 //  split already locked full node
2205 //      return unlocked and unpinned.
2206
2207 BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no)
2208 {
2209 uint slot, cnt, idx, max, nxt = bt->mgr->page_size;
2210 unsigned char fencekey[256];
2211 uint tod = time(NULL);
2212 uint lvl = page->lvl;
2213 uid new_page;
2214 BtKey key;
2215
2216         //      initialize frame buffer for right node
2217
2218         memset (bt->frame, 0, bt->mgr->page_size);
2219         max = page->cnt - page->foster;
2220         cnt = max / 2;
2221         idx = 0;
2222
2223         //  split higher half of keys to bt->frame
2224         //      leaving old foster children in the left node,
2225         //      and adding a new foster child there.
2226
2227         while( cnt++ < max ) {
2228                 key = keyptr(page, cnt);
2229                 nxt -= key->len + 1;
2230                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
2231                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
2232                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
2233                         bt->frame->act++;
2234                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
2235                 slotptr(bt->frame, idx)->off = nxt;
2236         }
2237
2238         // transfer right link node to new right node
2239
2240         if( page_no > ROOT_page )
2241                 memcpy (bt->frame->right, page->right, BtId);
2242
2243         bt->frame->bits = bt->mgr->page_bits;
2244         bt->frame->min = nxt;
2245         bt->frame->cnt = idx;
2246         bt->frame->lvl = lvl;
2247
2248         //      get new free page and write right frame to it.
2249
2250         if( !(new_page = bt_newpage(bt, bt->frame)) )
2251                 return bt->err;
2252
2253         //      remember fence key for new right page to add
2254         //      as foster child to the left node
2255
2256         key = keyptr(bt->frame, idx);
2257         memcpy (fencekey, key, key->len + 1);
2258
2259         //      update lower keys and foster children to continue in old page
2260
2261         memcpy (bt->frame, page, bt->mgr->page_size);
2262         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2263         nxt = bt->mgr->page_size;
2264         page->dirty = 0;
2265         page->act = 0;
2266         cnt = 0;
2267         idx = 0;
2268
2269         //  assemble page of smaller keys
2270         //      to remain in the old page
2271
2272         while( cnt++ < max / 2 ) {
2273                 key = keyptr(bt->frame, cnt);
2274                 nxt -= key->len + 1;
2275                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
2276                 memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
2277                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2278                         page->act++;
2279                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
2280                 slotptr(page, idx)->off = nxt;
2281         }
2282
2283         //      insert new foster child for right page in queue
2284         //      before any of the current foster children
2285
2286         nxt -= *fencekey + 1;
2287         memcpy ((unsigned char *)page + nxt, fencekey, *fencekey + 1);
2288
2289         bt_putid (slotptr(page,++idx)->id, new_page);
2290         slotptr(page, idx)->tod = tod;
2291         slotptr(page, idx)->off = nxt;
2292         page->foster++;
2293         page->act++;
2294
2295         //  continue with old foster child keys
2296         //      note that none will be dead
2297
2298         cnt = bt->frame->cnt - bt->frame->foster;
2299
2300         while( cnt++ < bt->frame->cnt ) {
2301                 key = keyptr(bt->frame, cnt);
2302                 nxt -= key->len + 1;
2303                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
2304                 memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
2305                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
2306                 slotptr(page, idx)->off = nxt;
2307                 page->act++;
2308         }
2309
2310         page->min = nxt;
2311         page->cnt = idx;
2312
2313         //      link new right page
2314
2315         bt_putid (page->right, new_page);
2316
2317         // if current page is the root page, split it
2318
2319         if( page_no == ROOT_page )
2320                 return bt_splitroot (bt, new_page);
2321
2322         //  release wr lock on our page
2323
2324         bt_unlockpage (BtLockWrite, set);
2325
2326         //  obtain ParentModification lock for current page
2327         //      to fix new fence key and oldest foster child on page
2328
2329         bt_lockpage (BtLockParent, set);
2330
2331         //  get our new fence key to insert in parent node
2332
2333         bt_lockpage (BtLockRead, set);
2334
2335         key = keyptr(page, page->cnt-1);
2336         memcpy (fencekey, key, key->len+1);
2337
2338         bt_unlockpage (BtLockRead, set);
2339
2340         if( bt_insertkey (bt, fencekey + 1, *fencekey, page_no, tod, lvl + 1) )
2341                 return bt->err;
2342
2343         //      lock our page for writing
2344
2345         bt_lockpage (BtLockRead, set);
2346
2347         //      switch old parent key from us to our oldest foster child
2348
2349         key = keyptr(page, page->cnt);
2350         memcpy (fencekey, key, key->len+1);
2351
2352         new_page = bt_getid (slotptr(page, page->cnt)->id);
2353         bt_unlockpage (BtLockRead, set);
2354
2355         if( bt_insertkey (bt, fencekey + 1, *fencekey, new_page, tod, lvl + 1) )
2356                 return bt->err;
2357
2358         //      now that it has its own parent pointer,
2359         //      remove oldest foster child from our page
2360
2361         bt_lockpage (BtLockWrite, set);
2362         memset (slotptr(page, page->cnt), 0, sizeof(BtSlot));
2363         page->dirty = 1;
2364         page->foster--;
2365         page->cnt--;
2366         page->act--;
2367
2368         bt_unlockpage (BtLockParent, set);
2369
2370         //  if this emptied page,
2371         //      undo the foster child
2372
2373         if( !page->act )
2374           if( bt_mergeleft (bt, page, pool, set, page_no, lvl) )
2375                 return bt->err;
2376
2377         //      unlock and unpin
2378
2379         bt_unlockpage (BtLockWrite, set);
2380         bt_unpinlatch (set);
2381         bt_unpinpool (pool);
2382         return 0;
2383 }
2384
2385 //  Insert new key into the btree at leaf level.
2386
2387 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl)
2388 {
2389 uint slot, idx;
2390 BtPage page;
2391 BtKey ptr;
2392
2393         while( 1 ) {
2394                 if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
2395                         ptr = keyptr(bt->page, slot);
2396                 else
2397                 {
2398                         if ( !bt->err )
2399                                 bt->err = BTERR_ovflw;
2400                         return bt->err;
2401                 }
2402
2403                 // if key already exists, update id and return
2404
2405                 page = bt->page;
2406
2407                 if( !keycmp (ptr, key, len) ) {
2408                         if( slotptr(page, slot)->dead )
2409                                 page->act++;
2410                         slotptr(page, slot)->dead = 0;
2411                         slotptr(page, slot)->tod = tod;
2412                         bt_putid(slotptr(page,slot)->id, id);
2413                         bt_unlockpage(BtLockWrite, bt->set);
2414                         bt_unpinlatch (bt->set);
2415                         bt_unpinpool (bt->pool);
2416                         return bt->err;
2417                 }
2418
2419                 // check if page has enough space
2420
2421                 if( slot = bt_cleanpage (bt, bt->page, len, slot) )
2422                         break;
2423
2424                 if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
2425                         return bt->err;
2426         }
2427
2428         bt_addkeytopage (bt, bt->page, slot, key, len, id, tod);
2429
2430         bt_unlockpage (BtLockWrite, bt->set);
2431         bt_unpinlatch (bt->set);
2432         bt_unpinpool (bt->pool);
2433         return 0;
2434 }
2435
2436 //  cache page of keys into cursor and return starting slot for given key
2437
2438 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2439 {
2440 uint slot;
2441
2442         // cache page for retrieval
2443         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
2444                 memcpy (bt->cursor, bt->page, bt->mgr->page_size);
2445
2446         bt->cursor_page = bt->page_no;
2447
2448         bt_unlockpage(BtLockRead, bt->set);
2449         bt_unpinlatch (bt->set);
2450         bt_unpinpool (bt->pool);
2451         return slot;
2452 }
2453
2454 //  return next slot for cursor page
2455 //  or slide cursor right into next page
2456
2457 uint bt_nextkey (BtDb *bt, uint slot)
2458 {
2459 BtLatchSet *set;
2460 BtPool *pool;
2461 BtPage page;
2462 uid right;
2463
2464   do {
2465         right = bt_getid(bt->cursor->right);
2466         while( slot++ < bt->cursor->cnt - bt->cursor->foster )
2467           if( slotptr(bt->cursor,slot)->dead )
2468                 continue;
2469           else if( right || (slot < bt->cursor->cnt - bt->cursor->foster) )
2470                 return slot;
2471           else
2472                 break;
2473
2474         if( !right )
2475                 break;
2476
2477         bt->cursor_page = right;
2478         if( pool = bt_pinpool (bt, right) )
2479                 page = bt_page (bt, pool, right);
2480         else
2481                 return 0;
2482
2483         set = bt_pinlatch (bt, right);
2484     bt_lockpage(BtLockRead, set);
2485
2486         memcpy (bt->cursor, page, bt->mgr->page_size);
2487
2488         bt_unlockpage(BtLockRead, set);
2489         bt_unpinlatch (set);
2490         bt_unpinpool (pool);
2491         slot = 0;
2492   } while( 1 );
2493
2494   return bt->err = 0;
2495 }
2496
2497 BtKey bt_key(BtDb *bt, uint slot)
2498 {
2499         return keyptr(bt->cursor, slot);
2500 }
2501
2502 uid bt_uid(BtDb *bt, uint slot)
2503 {
2504         return bt_getid(slotptr(bt->cursor,slot)->id);
2505 }
2506
2507 uint bt_tod(BtDb *bt, uint slot)
2508 {
2509         return slotptr(bt->cursor,slot)->tod;
2510 }
2511
2512
2513 #ifdef STANDALONE
2514
2515 typedef struct {
2516         char type, idx;
2517         char *infile;
2518         BtMgr *mgr;
2519         int num;
2520 } ThreadArg;
2521
2522 //  standalone program to index file of keys
2523 //  then list them onto std-out
2524
2525 #ifdef unix
2526 void *index_file (void *arg)
2527 #else
2528 uint __stdcall index_file (void *arg)
2529 #endif
2530 {
2531 int line = 0, found = 0, cnt = 0;
2532 uid next, page_no = LEAF_page;  // start on first page of leaves
2533 unsigned char key[256];
2534 ThreadArg *args = arg;
2535 int ch, len = 0, slot;
2536 BtLatchSet *set;
2537 time_t tod[1];
2538 BtPool *pool;
2539 BtPage page;
2540 BtKey ptr;
2541 BtDb *bt;
2542 FILE *in;
2543
2544         bt = bt_open (args->mgr);
2545         time (tod);
2546
2547         switch(args->type | 0x20)
2548         {
2549         case 'w':
2550                 fprintf(stderr, "started indexing for %s\n", args->infile);
2551                 if( in = fopen (args->infile, "rb") )
2552                   while( ch = getc(in), ch != EOF )
2553                         if( ch == '\n' )
2554                         {
2555                           line++;
2556
2557                           if( args->num == 1 )
2558                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2559
2560                           else if( args->num )
2561                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2562
2563                           if( bt_insertkey (bt, key, len, line, *tod, 0) )
2564                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2565                           len = 0;
2566                         }
2567                         else if( len < 255 )
2568                                 key[len++] = ch;
2569                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2570                 break;
2571
2572         case 'd':
2573                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2574                 if( in = fopen (args->infile, "rb") )
2575                   while( ch = getc(in), ch != EOF )
2576                         if( ch == '\n' )
2577                         {
2578                           line++;
2579                           if( args->num == 1 )
2580                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2581
2582                           else if( args->num )
2583                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2584
2585                           if( bt_deletekey (bt, key, len) )
2586                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2587                           len = 0;
2588                         }
2589                         else if( len < 255 )
2590                                 key[len++] = ch;
2591                 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
2592                 break;
2593
2594         case 'f':
2595                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2596                 if( in = fopen (args->infile, "rb") )
2597                   while( ch = getc(in), ch != EOF )
2598                         if( ch == '\n' )
2599                         {
2600                           line++;
2601                           if( args->num == 1 )
2602                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2603
2604                           else if( args->num )
2605                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2606
2607                           if( bt_findkey (bt, key, len) )
2608                                 found++;
2609                           else if( bt->err )
2610                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2611                           len = 0;
2612                         }
2613                         else if( len < 255 )
2614                                 key[len++] = ch;
2615                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2616                 break;
2617
2618         case 's':
2619                 len = key[0] = 0;
2620
2621                 fprintf(stderr, "started reading\n");
2622
2623                 if( slot = bt_startkey (bt, key, len) )
2624                   slot--;
2625                 else
2626                   fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2627
2628                 while( slot = bt_nextkey (bt, slot) ) {
2629                         ptr = bt_key(bt, slot);
2630                         fwrite (ptr->key, ptr->len, 1, stdout);
2631                         fputc ('\n', stdout);
2632                 }
2633
2634                 break;
2635
2636         case 'c':
2637                 fprintf(stderr, "started reading\n");
2638
2639                 do {
2640                         if( pool = bt_pinpool (bt, page_no) )
2641                                 page = bt_page (bt, pool, page_no);
2642                         else
2643                                 break;
2644                         set = bt_pinlatch (bt, page_no);
2645                         bt_lockpage (BtLockRead, set);
2646                         cnt += page->act;
2647                         next = bt_getid (page->right);
2648                         bt_unlockpage (BtLockRead, set);
2649                         bt_unpinlatch (set);
2650                         bt_unpinpool (pool);
2651                 } while( page_no = next );
2652
2653                 cnt--;  // remove stopper key
2654                 fprintf(stderr, " Total keys read %d\n", cnt);
2655                 break;
2656         }
2657
2658         bt_close (bt);
2659 #ifdef unix
2660         return NULL;
2661 #else
2662         return 0;
2663 #endif
2664 }
2665
2666 typedef struct timeval timer;
2667
2668 int main (int argc, char **argv)
2669 {
2670 int idx, cnt, len, slot, err;
2671 int segsize, bits = 16;
2672 #ifdef unix
2673 pthread_t *threads;
2674 timer start, stop;
2675 #else
2676 time_t start[1], stop[1];
2677 HANDLE *threads;
2678 #endif
2679 double real_time;
2680 ThreadArg *args;
2681 uint poolsize = 0;
2682 int num = 0;
2683 char key[1];
2684 BtMgr *mgr;
2685 BtKey ptr;
2686 BtDb *bt;
2687
2688         if( argc < 3 ) {
2689                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2690                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2691                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2692                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2693                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2694                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2695                 exit(0);
2696         }
2697
2698 #ifdef unix
2699         gettimeofday(&start, NULL);
2700 #else
2701         time(start);
2702 #endif
2703
2704         if( argc > 3 )
2705                 bits = atoi(argv[3]);
2706
2707         if( argc > 4 )
2708                 poolsize = atoi(argv[4]);
2709
2710         if( !poolsize )
2711                 fprintf (stderr, "Warning: no mapped_pool\n");
2712
2713         if( poolsize > 65535 )
2714                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2715
2716         if( argc > 5 )
2717                 segsize = atoi(argv[5]);
2718         else
2719                 segsize = 4;    // 16 pages per mmap segment
2720
2721         if( argc > 6 )
2722                 num = atoi(argv[6]);
2723
2724         cnt = argc - 7;
2725 #ifdef unix
2726         threads = malloc (cnt * sizeof(pthread_t));
2727 #else
2728         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2729 #endif
2730         args = malloc (cnt * sizeof(ThreadArg));
2731
2732         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2733
2734         if( !mgr ) {
2735                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2736                 exit (1);
2737         }
2738
2739         //      fire off threads
2740
2741         for( idx = 0; idx < cnt; idx++ ) {
2742                 args[idx].infile = argv[idx + 7];
2743                 args[idx].type = argv[2][0];
2744                 args[idx].mgr = mgr;
2745                 args[idx].num = num;
2746                 args[idx].idx = idx;
2747 #ifdef unix
2748                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2749                         fprintf(stderr, "Error creating thread %d\n", err);
2750 #else
2751                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2752 #endif
2753         }
2754
2755         //      wait for termination
2756
2757 #ifdef unix
2758         for( idx = 0; idx < cnt; idx++ )
2759                 pthread_join (threads[idx], NULL);
2760         gettimeofday(&stop, NULL);
2761         real_time = 1000.0 * ( stop.tv_sec - start.tv_sec ) + 0.001 * (stop.tv_usec - start.tv_usec );
2762 #else
2763         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2764
2765         for( idx = 0; idx < cnt; idx++ )
2766                 CloseHandle(threads[idx]);
2767
2768         time (stop);
2769         real_time = 1000 * (*stop - *start);
2770 #endif
2771         fprintf(stderr, " Time to complete: %.2f seconds\n", real_time/1000);
2772         bt_mgrclose (mgr);
2773 }
2774
2775 #endif  //STANDALONE