]> pd.if.org Git - btree/blob - fosterbtreee.c
Make bt_splitpage more conservative wrt posting fence keys
[btree] / fosterbtreee.c
1 // foster btree version e
2 // 29 JAN 2014
3
4 // author: karl malbrain, malbrain@cal.berkeley.edu
5
6 /*
7 This work, including the source code, documentation
8 and related data, is placed into the public domain.
9
10 The orginal author is Karl Malbrain.
11
12 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
13 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
14 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
15 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
16 RESULTING FROM THE USE, MODIFICATION, OR
17 REDISTRIBUTION OF THIS SOFTWARE.
18 */
19
20 // Please see the project home page for documentation
21 // code.google.com/p/high-concurrency-btree
22
23 #define _FILE_OFFSET_BITS 64
24 #define _LARGEFILE64_SOURCE
25
26 #ifdef linux
27 #define _GNU_SOURCE
28 #endif
29
30 #ifdef unix
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <fcntl.h>
35 #include <sys/time.h>
36 #include <sys/mman.h>
37 #include <errno.h>
38 #include <pthread.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #include <process.h>
47 #include <intrin.h>
48 #endif
49
50 #include <memory.h>
51 #include <string.h>
52
53 typedef unsigned long long      uid;
54
55 #ifndef unix
56 typedef unsigned long long      off64_t;
57 typedef unsigned short          ushort;
58 typedef unsigned int            uint;
59 #endif
60
61 #define BT_ro 0x6f72    // ro
62 #define BT_rw 0x7772    // rw
63
64 #define BT_latchtable   128                                     // number of latch manager slots
65
66 #define BT_maxbits              24                                      // maximum page size in bits
67 #define BT_minbits              9                                       // minimum page size in bits
68 #define BT_minpage              (1 << BT_minbits)       // minimum page size
69 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
70
71 /*
72 There are five lock types for each node in three independent sets: 
73 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
74 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
75 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
76 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
77 5. (set 3) ParentLock: Exclusive. Have parent adopt/delete maximum foster child from the node.
78 */
79
80 typedef enum{
81         BtLockAccess,
82         BtLockDelete,
83         BtLockRead,
84         BtLockWrite,
85         BtLockParent
86 }BtLock;
87
88 //      Define the length of the page and key pointers
89
90 #define BtId 6
91
92 //      Page key slot definition.
93
94 //      If BT_maxbits is 15 or less, you can save 4 bytes
95 //      for each key stored by making the first two uints
96 //      into ushorts.  You can also save 4 bytes by removing
97 //      the tod field from the key.
98
99 //      Keys are marked dead, but remain on the page until
100 //      cleanup is called. The fence key (highest key) for
101 //      the page is always present, even after cleanup.
102
103 typedef struct {
104         uint off:BT_maxbits;            // page offset for key start
105         uint dead:1;                            // set for deleted key
106         uint tod;                                       // time-stamp for key
107         unsigned char id[BtId];         // id associated with key
108 } BtSlot;
109
110 //      The key structure occupies space at the upper end of
111 //      each page.  It's a length byte followed by the value
112 //      bytes.
113
114 typedef struct {
115         unsigned char len;
116         unsigned char key[1];
117 } *BtKey;
118
119 //      The first part of an index page.
120 //      It is immediately followed
121 //      by the BtSlot array of keys.
122
123 typedef struct Page {
124         volatile uint cnt;                      // count of keys in page
125         volatile uint act;                      // count of active keys
126         volatile uint min;                      // next key offset
127         volatile uint foster;           // count of foster children
128         unsigned char bits;                     // page size in bits
129         unsigned char lvl:7;            // level of page
130         unsigned char dirty:1;          // page needs to be cleaned
131         unsigned char right[BtId];      // page number to right
132 } *BtPage;
133
134 //      mode & definition for hash latch implementation
135
136 enum {
137         Mutex = 1,
138         Write = 2,
139         Pending = 4,
140         Share = 8
141 } LockMode;
142
143 // mutex locks the other fields
144 // exclusive is set for write access
145 // share is count of read accessors
146
147 typedef struct {
148         volatile ushort mutex:1;
149         volatile ushort exclusive:1;
150         volatile ushort pending:1;
151         volatile ushort share:13;
152 } BtSpinLatch;
153
154 //  hash table entries
155
156 typedef struct {
157         BtSpinLatch latch[1];
158         volatile ushort slot;           // Latch table entry at head of chain
159 } BtHashEntry;
160
161 //      latch manager table structure
162
163 typedef struct {
164 #ifdef unix
165         pthread_rwlock_t lock[1];
166 #else
167         SRWLOCK srw[1];
168 #endif
169 } BtLatch;
170
171 typedef struct {
172         BtLatch readwr[1];                      // read/write page lock
173         BtLatch access[1];                      // Access Intent/Page delete
174         BtLatch parent[1];                      // adoption of foster children
175         BtSpinLatch busy[1];            // slot is being moved between chains
176         volatile ushort next;           // next entry in hash table chain
177         volatile ushort prev;           // prev entry in hash table chain
178         volatile ushort pin;            // number of outstanding locks
179         volatile ushort hash;           // hash slot entry is under
180         volatile uid page_no;           // latch set page number
181 } BtLatchSet;
182
183 //      The memory mapping pool table buffer manager entry
184
185 typedef struct {
186         unsigned long long int lru;     // number of times accessed
187         uid  basepage;                          // mapped base page number
188         char *map;                                      // mapped memory pointer
189         ushort pin;                                     // mapped page pin counter
190         ushort slot;                            // slot index in this array
191         void *hashprev;                         // previous pool entry for the same hash idx
192         void *hashnext;                         // next pool entry for the same hash idx
193 #ifndef unix
194         HANDLE hmap;                            // Windows memory mapping handle
195 #endif
196 } BtPool;
197
198 //      structure for latch manager on ALLOC_page
199
200 typedef struct {
201         struct Page alloc[2];           // next & free page_nos in right ptr
202         BtSpinLatch lock[1];            // allocation area lite latch
203         ushort latchdeployed;           // highest number of latch entries deployed
204         ushort nlatchpage;                      // number of latch pages at BT_latch
205         ushort latchtotal;                      // number of page latch entries
206         ushort latchhash;                       // number of latch hash table slots
207         ushort latchvictim;                     // next latch entry to examine
208         BtHashEntry table[0];           // the hash table
209 } BtLatchMgr;
210
211 //      The object structure for Btree access
212
213 typedef struct {
214         uint page_size;                         // page size    
215         uint page_bits;                         // page size in bits    
216         uint seg_bits;                          // seg size in pages in bits
217         uint mode;                                      // read-write mode
218 #ifdef unix
219         int idx;
220 #else
221         HANDLE idx;
222 #endif
223         ushort poolcnt;                         // highest page pool node in use
224         ushort poolmax;                         // highest page pool node allocated
225         ushort poolmask;                        // total number of pages in mmap segment - 1
226         ushort hashsize;                        // size of Hash Table for pool entries
227         ushort evicted;                         // last evicted hash table slot
228         ushort *hash;                           // hash table of pool entries
229         BtPool *pool;                           // memory pool page segments
230         BtSpinLatch *latch;                     // latches for pool hash slots
231         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
232         BtLatchSet *latchsets;          // mapped latch set from latch pages
233 #ifndef unix
234         HANDLE halloc;                          // allocation and latch table handle
235 #endif
236 } BtMgr;
237
238 typedef struct {
239         BtMgr *mgr;                     // buffer manager for thread
240         BtPage cursor;          // cached frame for start/next (never mapped)
241         BtPage frame;           // spare frame for the page split (never mapped)
242         BtPage zero;            // page frame for zeroes at end of file
243         BtPage page;            // current page
244         uid page_no;            // current page number  
245         uid cursor_page;        // current cursor page number   
246         BtLatchSet *set;        // current page latch set
247         BtPool *pool;           // current page pool
248         unsigned char *mem;     // frame, cursor, page memory buffer
249         int foster;                     // last search was to foster child
250         int found;                      // last delete was found
251         int err;                        // last error
252 } BtDb;
253
254 typedef enum {
255         BTERR_ok = 0,
256         BTERR_struct,
257         BTERR_ovflw,
258         BTERR_lock,
259         BTERR_map,
260         BTERR_wrt,
261         BTERR_hash,
262         BTERR_latch
263 } BTERR;
264
265 // B-Tree functions
266 extern void bt_close (BtDb *bt);
267 extern BtDb *bt_open (BtMgr *mgr);
268 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl);
269 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len);
270 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
271 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
272 extern uint bt_nextkey   (BtDb *bt, uint slot);
273
274 //      manager functions
275 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
276 void bt_mgrclose (BtMgr *mgr);
277
278 //      internal functions
279 BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no);
280 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot);
281 BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl);
282
283 //  Helper functions to return cursor slot values
284
285 extern BtKey bt_key (BtDb *bt, uint slot);
286 extern uid bt_uid (BtDb *bt, uint slot);
287 extern uint bt_tod (BtDb *bt, uint slot);
288
289 //  BTree page number constants
290 #define ALLOC_page              0       // allocation & lock manager hash table
291 #define ROOT_page               1       // root of the btree
292 #define LEAF_page               2       // first page of leaves
293 #define LATCH_page              3       // pages for lock manager
294
295 //      Number of levels to create in a new BTree
296
297 #define MIN_lvl                 2
298
299 //  The page is allocated from low and hi ends.
300 //  The key offsets and row-id's are allocated
301 //  from the bottom, while the text of the key
302 //  is allocated from the top.  When the two
303 //  areas meet, the page is split into two.
304
305 //  A key consists of a length byte, two bytes of
306 //  index number (0 - 65534), and up to 253 bytes
307 //  of key value.  Duplicate keys are discarded.
308 //  Associated with each key is a 48 bit row-id.
309
310 //  The b-tree root is always located at page 1.
311 //      The first leaf page of level zero is always
312 //      located on page 2.
313
314 //      When to root page fills, it is split in two and
315 //      the tree height is raised by a new root at page
316 //      one with two keys.
317
318 //      Deleted keys are marked with a dead bit until
319 //      page cleanup The fence key for a node is always
320 //      present, even after deletion and cleanup.
321
322 //  Groups of pages called segments from the btree are
323 //  cached with memory mapping. A hash table is used to keep
324 //  track of the cached segments.  This behaviour is controlled
325 //  by the cache block size parameter to bt_open.
326
327 //  To achieve maximum concurrency one page is locked at a time
328 //  as the tree is traversed to find leaf key in question.
329
330 //      An adoption traversal leaves the parent node locked as the
331 //      tree is traversed to the level in quesiton.
332
333 //  Page 0 is dedicated to lock for new page extensions,
334 //      and chains empty pages together for reuse.
335
336 //      Empty pages are chained together through the ALLOC page and reused.
337
338 //      Access macros to address slot and key values from the page
339
340 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
341 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
342
343 void bt_putid(unsigned char *dest, uid id)
344 {
345 int i = BtId;
346
347         while( i-- )
348                 dest[i] = (unsigned char)id, id >>= 8;
349 }
350
351 uid bt_getid(unsigned char *src)
352 {
353 uid id = 0;
354 int i;
355
356         for( i = 0; i < BtId; i++ )
357                 id <<= 8, id |= *src++; 
358
359         return id;
360 }
361
362 //      wait until write lock mode is clear
363 //      and add 1 to the share count
364
365 void bt_spinreadlock(BtSpinLatch *latch)
366 {
367 ushort prev;
368
369   do {
370 #ifdef unix
371         while( __sync_fetch_and_or((ushort *)latch, Mutex) & Mutex )
372                 sched_yield();
373 #else
374         while( _InterlockedOr16((ushort *)latch, Mutex) & Mutex )
375                 SwitchToThread();
376 #endif
377
378         //  see if exclusive request is granted or pending
379
380         if( prev = !(latch->exclusive | latch->pending) )
381 #ifdef unix
382                 __sync_fetch_and_add((ushort *)latch, Share);
383 #else
384                 _InterlockedExchangeAdd16 ((ushort *)latch, Share);
385 #endif
386
387 #ifdef unix
388         __sync_fetch_and_and ((ushort *)latch, ~Mutex);
389 #else
390         _InterlockedAnd16((ushort *)latch, ~Mutex);
391 #endif
392         if( prev )
393                 return;
394 #ifdef  unix
395   } while( sched_yield(), 1 );
396 #else
397   } while( SwitchToThread(), 1 );
398 #endif
399 }
400
401 //      wait for other read and write latches to relinquish
402
403 void bt_spinwritelock(BtSpinLatch *latch)
404 {
405   do {
406 #ifdef unix
407         while( __sync_fetch_and_or((ushort *)latch, Mutex | Pending) & Mutex )
408                 sched_yield();
409 #else
410         while( _InterlockedOr16((ushort *)latch, Mutex | Pending) & Mutex )
411                 SwitchToThread();
412 #endif
413         if( !(latch->share | latch->exclusive) ) {
414 #ifdef unix
415                 __sync_fetch_and_or((ushort *)latch, Write);
416                 __sync_fetch_and_and ((ushort *)latch, ~(Mutex | Pending));
417 #else
418                 _InterlockedOr16((ushort *)latch, Write);
419                 _InterlockedAnd16((ushort *)latch, ~(Mutex | Pending));
420 #endif
421                 return;
422         }
423
424 #ifdef unix
425         __sync_fetch_and_and ((ushort *)latch, ~Mutex);
426 #else
427         _InterlockedAnd16((ushort *)latch, ~Mutex);
428 #endif
429 #ifdef  unix
430         sched_yield();
431 #else
432         SwitchToThread();
433 #endif
434   } while( 1 );
435 }
436
437 //      try to obtain write lock
438
439 //      return 1 if obtained,
440 //              0 otherwise
441
442 int bt_spinwritetry(BtSpinLatch *latch)
443 {
444 ushort prev;
445
446 #ifdef unix
447         if( prev = __sync_fetch_and_or((ushort *)latch, Mutex), prev & Mutex )
448                 return 0;
449 #else
450         if( prev = _InterlockedOr16((ushort *)latch, Mutex), prev & Mutex )
451                 return 0;
452 #endif
453         //      take write access if all bits are clear
454
455         if( !prev )
456 #ifdef unix
457                 __sync_fetch_and_or ((ushort *)latch, Write);
458 #else
459                 _InterlockedOr16((ushort *)latch, Write);
460 #endif
461
462 #ifdef unix
463         __sync_fetch_and_and ((ushort *)latch, ~Mutex);
464 #else
465         _InterlockedAnd16((ushort *)latch, ~Mutex);
466 #endif
467         return !prev;
468 }
469
470 //      clear write mode
471
472 void bt_spinreleasewrite(BtSpinLatch *latch)
473 {
474 #ifdef unix
475         __sync_fetch_and_and ((ushort *)latch, ~Write);
476 #else
477         _InterlockedAnd16((ushort *)latch, ~Write);
478 #endif
479 }
480
481 //      decrement reader count
482
483 void bt_spinreleaseread(BtSpinLatch *latch)
484 {
485 #ifdef unix
486         __sync_fetch_and_add((ushort *)latch, -Share);
487 #else
488         _InterlockedExchangeAdd16 ((ushort *)latch, -Share);
489 #endif
490 }
491
492 void bt_initlockset (BtLatchSet *set, int reuse)
493 {
494 #ifdef unix
495 pthread_rwlockattr_t rwattr[1];
496
497         if( reuse ) {
498                 pthread_rwlock_destroy (set->readwr->lock);
499                 pthread_rwlock_destroy (set->access->lock);
500                 pthread_rwlock_destroy (set->parent->lock);
501         }
502
503         pthread_rwlockattr_init (rwattr);
504         pthread_rwlockattr_setkind_np (rwattr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
505         pthread_rwlockattr_setpshared (rwattr, PTHREAD_PROCESS_SHARED);
506
507         pthread_rwlock_init (set->readwr->lock, rwattr);
508         pthread_rwlock_init (set->access->lock, rwattr);
509         pthread_rwlock_init (set->parent->lock, rwattr);
510         pthread_rwlockattr_destroy (rwattr);
511 #else
512         InitializeSRWLock (set->readwr->srw);
513         InitializeSRWLock (set->access->srw);
514         InitializeSRWLock (set->parent->srw);
515 #endif
516 }
517
518 //      link latch table entry into latch hash table
519
520 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
521 {
522 BtLatchSet *set = bt->mgr->latchsets + victim;
523
524         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
525                 bt->mgr->latchsets[set->next].prev = victim;
526
527         bt->mgr->latchmgr->table[hashidx].slot = victim;
528         set->page_no = page_no;
529         set->hash = hashidx;
530         set->prev = 0;
531 }
532
533 void bt_unpinlatch (BtLatchSet *set)
534 {
535 #ifdef unix
536         __sync_fetch_and_add(&set->pin, -1);
537 #else
538         _InterlockedDecrement16 (&set->pin);
539 #endif
540 }
541
542 //      find existing latchset or inspire new one
543 //      return with latchset pinned
544
545 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
546 {
547 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
548 ushort slot, avail = 0, victim, idx;
549 BtLatchSet *set;
550
551         //  obtain read lock on hash table entry
552
553         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
554
555         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
556         {
557                 set = bt->mgr->latchsets + slot;
558                 if( page_no == set->page_no )
559                         break;
560         } while( slot = set->next );
561
562         if( slot ) {
563 #ifdef unix
564                 __sync_fetch_and_add(&set->pin, 1);
565 #else
566                 _InterlockedIncrement16 (&set->pin);
567 #endif
568         }
569
570     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
571
572         if( slot )
573                 return set;
574
575   //  try again, this time with write lock
576
577   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
578
579   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
580   {
581         set = bt->mgr->latchsets + slot;
582         if( page_no == set->page_no )
583                 break;
584         if( !set->pin && !avail )
585                 avail = slot;
586   } while( slot = set->next );
587
588   //  found our entry, or take over an unpinned one
589
590   if( slot || (slot = avail) ) {
591         set = bt->mgr->latchsets + slot;
592 #ifdef unix
593         __sync_fetch_and_add(&set->pin, 1);
594 #else
595         _InterlockedIncrement16 (&set->pin);
596 #endif
597         set->page_no = page_no;
598         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
599         return set;
600   }
601
602         //  see if there are any unused entries
603 #ifdef unix
604         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
605 #else
606         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
607 #endif
608
609         if( victim < bt->mgr->latchmgr->latchtotal ) {
610                 set = bt->mgr->latchsets + victim;
611 #ifdef unix
612                 __sync_fetch_and_add(&set->pin, 1);
613 #else
614                 _InterlockedIncrement16 (&set->pin);
615 #endif
616                 bt_initlockset (set, 0);
617                 bt_latchlink (bt, hashidx, victim, page_no);
618                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
619                 return set;
620         }
621
622 #ifdef unix
623         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
624 #else
625         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
626 #endif
627   //  find and reuse previous lock entry
628
629   while( 1 ) {
630 #ifdef unix
631         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
632 #else
633         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
634 #endif
635         //      we don't use slot zero
636
637         if( victim %= bt->mgr->latchmgr->latchtotal )
638                 set = bt->mgr->latchsets + victim;
639         else
640                 continue;
641
642         //      take control of our slot
643         //      from other threads
644
645         if( set->pin || !bt_spinwritetry (set->busy) )
646                 continue;
647
648         idx = set->hash;
649
650         // try to get write lock on hash chain
651         //      skip entry if not obtained
652         //      or has outstanding locks
653
654         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
655                 bt_spinreleasewrite (set->busy);
656                 continue;
657         }
658
659         if( set->pin ) {
660                 bt_spinreleasewrite (set->busy);
661                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
662                 continue;
663         }
664
665         //  unlink our available victim from its hash chain
666
667         if( set->prev )
668                 bt->mgr->latchsets[set->prev].next = set->next;
669         else
670                 bt->mgr->latchmgr->table[idx].slot = set->next;
671
672         if( set->next )
673                 bt->mgr->latchsets[set->next].prev = set->prev;
674
675         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
676 #ifdef unix
677         __sync_fetch_and_add(&set->pin, 1);
678 #else
679         _InterlockedIncrement16 (&set->pin);
680 #endif
681         bt_initlockset (set, 1);
682         bt_latchlink (bt, hashidx, victim, page_no);
683         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
684         bt_spinreleasewrite (set->busy);
685         return set;
686   }
687 }
688
689 void bt_mgrclose (BtMgr *mgr)
690 {
691 BtPool *pool;
692 uint slot;
693
694         // release mapped pages
695         //      note that slot zero is never used
696
697         for( slot = 1; slot < mgr->poolmax; slot++ ) {
698                 pool = mgr->pool + slot;
699                 if( pool->slot )
700 #ifdef unix
701                         munmap (pool->map, (mgr->poolmask+1) << mgr->page_bits);
702 #else
703                 {
704                         FlushViewOfFile(pool->map, 0);
705                         UnmapViewOfFile(pool->map);
706                         CloseHandle(pool->hmap);
707                 }
708 #endif
709         }
710
711 #ifdef unix
712         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
713         munmap (mgr->latchmgr, mgr->page_size);
714 #else
715         FlushViewOfFile(mgr->latchmgr, 0);
716         UnmapViewOfFile(mgr->latchmgr);
717         CloseHandle(mgr->halloc);
718 #endif
719 #ifdef unix
720         close (mgr->idx);
721         free (mgr->pool);
722         free (mgr->hash);
723         free (mgr->latch);
724         free (mgr);
725 #else
726         FlushFileBuffers(mgr->idx);
727         CloseHandle(mgr->idx);
728         GlobalFree (mgr->pool);
729         GlobalFree (mgr->hash);
730         GlobalFree (mgr->latch);
731         GlobalFree (mgr);
732 #endif
733 }
734
735 //      close and release memory
736
737 void bt_close (BtDb *bt)
738 {
739 #ifdef unix
740         if ( bt->mem )
741                 free (bt->mem);
742 #else
743         if ( bt->mem)
744                 VirtualFree (bt->mem, 0, MEM_RELEASE);
745 #endif
746         free (bt);
747 }
748
749 //  open/create new btree buffer manager
750
751 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
752 //              size of mapped page pool (e.g. 8192)
753
754 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
755 {
756 uint lvl, attr, cacheblk, last, slot, idx;
757 uint nlatchpage, latchhash;
758 BtLatchMgr *latchmgr;
759 off64_t size;
760 uint amt[1];
761 BtMgr* mgr;
762 BtKey key;
763 int flag;
764
765 #ifndef unix
766 SYSTEM_INFO sysinfo[1];
767 #endif
768
769         // determine sanity of page size and buffer pool
770
771         if( bits > BT_maxbits )
772                 bits = BT_maxbits;
773         else if( bits < BT_minbits )
774                 bits = BT_minbits;
775
776         if( !poolmax )
777                 return NULL;    // must have buffer pool
778
779 #ifdef unix
780         mgr = calloc (1, sizeof(BtMgr));
781
782         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
783
784         if( mgr->idx == -1 )
785                 return free(mgr), NULL;
786         
787         cacheblk = 4096;        // minimum mmap segment size for unix
788
789 #else
790         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
791         attr = FILE_ATTRIBUTE_NORMAL;
792         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
793
794         if( mgr->idx == INVALID_HANDLE_VALUE )
795                 return GlobalFree(mgr), NULL;
796
797         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
798         GetSystemInfo(sysinfo);
799         cacheblk = sysinfo->dwAllocationGranularity;
800 #endif
801
802 #ifdef unix
803         latchmgr = malloc (BT_maxpage);
804         *amt = 0;
805
806         // read minimum page size to get root info
807
808         if( size = lseek (mgr->idx, 0L, 2) ) {
809                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
810                         bits = latchmgr->alloc->bits;
811                 else
812                         return free(mgr), free(latchmgr), NULL;
813         } else if( mode == BT_ro )
814                 return free(latchmgr), free (mgr), NULL;
815 #else
816         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
817         size = GetFileSize(mgr->idx, amt);
818
819         if( size || *amt ) {
820                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
821                         return bt_mgrclose (mgr), NULL;
822                 bits = latchmgr->alloc->bits;
823         } else if( mode == BT_ro )
824                 return bt_mgrclose (mgr), NULL;
825 #endif
826
827         mgr->page_size = 1 << bits;
828         mgr->page_bits = bits;
829
830         mgr->poolmax = poolmax;
831         mgr->mode = mode;
832
833         if( cacheblk < mgr->page_size )
834                 cacheblk = mgr->page_size;
835
836         //  mask for partial memmaps
837
838         mgr->poolmask = (cacheblk >> bits) - 1;
839
840         //      see if requested size of pages per memmap is greater
841
842         if( (1 << segsize) > mgr->poolmask )
843                 mgr->poolmask = (1 << segsize) - 1;
844
845         mgr->seg_bits = 0;
846
847         while( (1 << mgr->seg_bits) <= mgr->poolmask )
848                 mgr->seg_bits++;
849
850         mgr->hashsize = hashsize;
851
852 #ifdef unix
853         mgr->pool = calloc (poolmax, sizeof(BtPool));
854         mgr->hash = calloc (hashsize, sizeof(ushort));
855         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
856 #else
857         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
858         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
859         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
860 #endif
861
862         if( size || *amt )
863                 goto mgrlatch;
864
865         // initialize an empty b-tree with latch page, root page, page of leaves
866         // and page(s) of latches
867
868         memset (latchmgr, 0, 1 << bits);
869         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
870         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
871         latchmgr->alloc->bits = mgr->page_bits;
872
873         latchmgr->nlatchpage = nlatchpage;
874         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
875
876         //  initialize latch manager
877
878         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
879
880         //      size of hash table = total number of latchsets
881
882         if( latchhash > latchmgr->latchtotal )
883                 latchhash = latchmgr->latchtotal;
884
885         latchmgr->latchhash = latchhash;
886
887 #ifdef unix
888         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
889                 return free(latchmgr), bt_mgrclose (mgr), NULL;
890 #else
891         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
892                 return bt_mgrclose (mgr), NULL;
893
894         if( *amt < mgr->page_size )
895                 return bt_mgrclose (mgr), NULL;
896 #endif
897
898         memset (latchmgr, 0, 1 << bits);
899         latchmgr->alloc->bits = mgr->page_bits;
900
901         for( lvl=MIN_lvl; lvl--; ) {
902                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3;
903                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
904                 key = keyptr(latchmgr->alloc, 1);
905                 key->len = 2;                   // create stopper key
906                 key->key[0] = 0xff;
907                 key->key[1] = 0xff;
908                 latchmgr->alloc->min = mgr->page_size - 3;
909                 latchmgr->alloc->lvl = lvl;
910                 latchmgr->alloc->cnt = 1;
911                 latchmgr->alloc->act = 1;
912 #ifdef unix
913                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
914                         return bt_mgrclose (mgr), NULL;
915 #else
916                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
917                         return bt_mgrclose (mgr), NULL;
918
919                 if( *amt < mgr->page_size )
920                         return bt_mgrclose (mgr), NULL;
921 #endif
922         }
923
924         // clear out latch manager locks
925         //      and rest of pages to round out segment
926
927         memset(latchmgr, 0, mgr->page_size);
928         last = MIN_lvl + 1;
929
930         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
931 #ifdef unix
932                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
933 #else
934                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
935                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
936                         return bt_mgrclose (mgr), NULL;
937                 if( *amt < mgr->page_size )
938                         return bt_mgrclose (mgr), NULL;
939 #endif
940                 last++;
941         }
942
943 mgrlatch:
944 #ifdef unix
945         flag = PROT_READ | PROT_WRITE;
946         mgr->latchmgr = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
947         if( mgr->latchmgr == MAP_FAILED )
948                 return bt_mgrclose (mgr), NULL;
949         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
950         if( mgr->latchsets == MAP_FAILED )
951                 return bt_mgrclose (mgr), NULL;
952 #else
953         flag = PAGE_READWRITE;
954         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
955         if( !mgr->halloc )
956                 return bt_mgrclose (mgr), NULL;
957
958         flag = FILE_MAP_WRITE;
959         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
960         if( !mgr->latchmgr )
961                 return GetLastError(), bt_mgrclose (mgr), NULL;
962
963         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
964 #endif
965
966 #ifdef unix
967         free (latchmgr);
968 #else
969         VirtualFree (latchmgr, 0, MEM_RELEASE);
970 #endif
971         return mgr;
972 }
973
974 //      open BTree access method
975 //      based on buffer manager
976
977 BtDb *bt_open (BtMgr *mgr)
978 {
979 BtDb *bt = malloc (sizeof(*bt));
980
981         memset (bt, 0, sizeof(*bt));
982         bt->mgr = mgr;
983 #ifdef unix
984         bt->mem = malloc (3 *mgr->page_size);
985 #else
986         bt->mem = VirtualAlloc(NULL, 3 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
987 #endif
988         bt->frame = (BtPage)bt->mem;
989         bt->zero = (BtPage)(bt->mem + 1 * mgr->page_size);
990         bt->cursor = (BtPage)(bt->mem + 2 * mgr->page_size);
991
992         memset(bt->zero, 0, mgr->page_size);
993         return bt;
994 }
995
996 //  compare two keys, returning > 0, = 0, or < 0
997 //  as the comparison value
998
999 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1000 {
1001 uint len1 = key1->len;
1002 int ans;
1003
1004         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1005                 return ans;
1006
1007         if( len1 > len2 )
1008                 return 1;
1009         if( len1 < len2 )
1010                 return -1;
1011
1012         return 0;
1013 }
1014
1015 //      Latch Manager
1016
1017 void bt_readlock(BtLatch *latch)
1018 {
1019 #ifdef unix
1020         pthread_rwlock_rdlock (latch->lock);
1021 #else
1022         AcquireSRWLockShared (latch->srw);
1023 #endif
1024 }
1025
1026 //      wait for other read and write latches to relinquish
1027
1028 void bt_writelock(BtLatch *latch)
1029 {
1030 #ifdef unix
1031         pthread_rwlock_wrlock (latch->lock);
1032 #else
1033         AcquireSRWLockExclusive (latch->srw);
1034 #endif
1035 }
1036
1037 //      try to obtain write lock
1038
1039 //      return 1 if obtained,
1040 //              0 if already write or read locked
1041
1042 int bt_writetry(BtLatch *latch)
1043 {
1044 int result = 0;
1045
1046 #ifdef unix
1047         result = !pthread_rwlock_trywrlock (latch->lock);
1048 #else
1049         result = TryAcquireSRWLockExclusive (latch->srw);
1050 #endif
1051         return result;
1052 }
1053
1054 //      clear write mode
1055
1056 void bt_releasewrite(BtLatch *latch)
1057 {
1058 #ifdef unix
1059         pthread_rwlock_unlock (latch->lock);
1060 #else
1061         ReleaseSRWLockExclusive (latch->srw);
1062 #endif
1063 }
1064
1065 //      decrement reader count
1066
1067 void bt_releaseread(BtLatch *latch)
1068 {
1069 #ifdef unix
1070         pthread_rwlock_unlock (latch->lock);
1071 #else
1072         ReleaseSRWLockShared (latch->srw);
1073 #endif
1074 }
1075
1076 //      Buffer Pool mgr
1077
1078 // find segment in pool
1079 // must be called with hashslot idx locked
1080 //      return NULL if not there
1081 //      otherwise return node
1082
1083 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1084 {
1085 BtPool *pool;
1086 uint slot;
1087
1088         // compute start of hash chain in pool
1089
1090         if( slot = bt->mgr->hash[idx] ) 
1091                 pool = bt->mgr->pool + slot;
1092         else
1093                 return NULL;
1094
1095         page_no &= ~bt->mgr->poolmask;
1096
1097         while( pool->basepage != page_no )
1098           if( pool = pool->hashnext )
1099                 continue;
1100           else
1101                 return NULL;
1102
1103         return pool;
1104 }
1105
1106 // add segment to hash table
1107
1108 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1109 {
1110 BtPool *node;
1111 uint slot;
1112
1113         pool->hashprev = pool->hashnext = NULL;
1114         pool->basepage = page_no & ~bt->mgr->poolmask;
1115         pool->lru = 1;
1116
1117         if( slot = bt->mgr->hash[idx] ) {
1118                 node = bt->mgr->pool + slot;
1119                 pool->hashnext = node;
1120                 node->hashprev = pool;
1121         }
1122
1123         bt->mgr->hash[idx] = pool->slot;
1124 }
1125
1126 //      find best segment to evict from buffer pool
1127
1128 BtPool *bt_findlru (BtDb *bt, uint hashslot)
1129 {
1130 unsigned long long int target = ~0LL;
1131 BtPool *pool = NULL, *node;
1132
1133         if( !hashslot )
1134                 return NULL;
1135
1136         node = bt->mgr->pool + hashslot;
1137
1138         //      scan pool entries under hash table slot
1139
1140         do {
1141           if( node->pin )
1142                 continue;
1143           if( node->lru > target )
1144                 continue;
1145           target = node->lru;
1146           pool = node;
1147         } while( node = node->hashnext );
1148
1149         return pool;
1150 }
1151
1152 //  map new buffer pool segment to virtual memory
1153
1154 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1155 {
1156 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1157 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1158 int flag;
1159
1160 #ifdef unix
1161         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1162         pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1163         if( pool->map == MAP_FAILED )
1164                 return bt->err = BTERR_map;
1165 #else
1166         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1167         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1168         if( !pool->hmap )
1169                 return bt->err = BTERR_map;
1170
1171         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1172         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1173         if( !pool->map )
1174                 return bt->err = BTERR_map;
1175 #endif
1176         return bt->err = 0;
1177 }
1178
1179 //      calculate page within pool
1180
1181 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1182 {
1183 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1184 BtPage page;
1185
1186         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1187         return page;
1188 }
1189
1190 //  release pool pin
1191
1192 void bt_unpinpool (BtPool *pool)
1193 {
1194 #ifdef unix
1195         __sync_fetch_and_add(&pool->pin, -1);
1196 #else
1197         _InterlockedDecrement16 (&pool->pin);
1198 #endif
1199 }
1200
1201 //      find or place requested page in segment-pool
1202 //      return pool table entry, incrementing pin
1203
1204 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1205 {
1206 BtPool *pool, *node, *next;
1207 uint slot, idx, victim;
1208 BtLatchSet *set;
1209
1210         //      lock hash table chain
1211
1212         idx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1213         bt_spinreadlock (&bt->mgr->latch[idx]);
1214
1215         //      look up in hash table
1216
1217         if( pool = bt_findpool(bt, page_no, idx) ) {
1218 #ifdef unix
1219                 __sync_fetch_and_add(&pool->pin, 1);
1220 #else
1221                 _InterlockedIncrement16 (&pool->pin);
1222 #endif
1223                 bt_spinreleaseread (&bt->mgr->latch[idx]);
1224                 pool->lru++;
1225                 return pool;
1226         }
1227
1228         //      upgrade to write lock
1229
1230         bt_spinreleaseread (&bt->mgr->latch[idx]);
1231         bt_spinwritelock (&bt->mgr->latch[idx]);
1232
1233         // try to find page in pool with write lock
1234
1235         if( pool = bt_findpool(bt, page_no, idx) ) {
1236 #ifdef unix
1237                 __sync_fetch_and_add(&pool->pin, 1);
1238 #else
1239                 _InterlockedIncrement16 (&pool->pin);
1240 #endif
1241                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1242                 pool->lru++;
1243                 return pool;
1244         }
1245
1246         // allocate a new pool node
1247         // and add to hash table
1248
1249 #ifdef unix
1250         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1251 #else
1252         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1253 #endif
1254
1255         if( ++slot < bt->mgr->poolmax ) {
1256                 pool = bt->mgr->pool + slot;
1257                 pool->slot = slot;
1258
1259                 if( bt_mapsegment(bt, pool, page_no) )
1260                         return NULL;
1261
1262                 bt_linkhash(bt, pool, page_no, idx);
1263 #ifdef unix
1264                 __sync_fetch_and_add(&pool->pin, 1);
1265 #else
1266                 _InterlockedIncrement16 (&pool->pin);
1267 #endif
1268                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1269                 return pool;
1270         }
1271
1272         // pool table is full
1273         //      find best pool entry to evict
1274
1275 #ifdef unix
1276         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1277 #else
1278         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1279 #endif
1280
1281         while( 1 ) {
1282 #ifdef unix
1283                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1284 #else
1285                 victim = _InterlockedIncrement16 (&bt->mgr->evicted) - 1;
1286 #endif
1287                 victim %= bt->mgr->hashsize;
1288
1289                 // try to get write lock
1290                 //      skip entry if not obtained
1291
1292                 if( !bt_spinwritetry (&bt->mgr->latch[victim]) )
1293                         continue;
1294
1295                 //  if cache entry is empty
1296                 //      or no slots are unpinned
1297                 //      skip this entry
1298
1299                 if( !(pool = bt_findlru(bt, bt->mgr->hash[victim])) ) {
1300                         bt_spinreleasewrite (&bt->mgr->latch[victim]);
1301                         continue;
1302                 }
1303
1304                 // unlink victim pool node from hash table
1305
1306                 if( node = pool->hashprev )
1307                         node->hashnext = pool->hashnext;
1308                 else if( node = pool->hashnext )
1309                         bt->mgr->hash[victim] = node->slot;
1310                 else
1311                         bt->mgr->hash[victim] = 0;
1312
1313                 if( node = pool->hashnext )
1314                         node->hashprev = pool->hashprev;
1315
1316                 bt_spinreleasewrite (&bt->mgr->latch[victim]);
1317
1318                 //      remove old file mapping
1319 #ifdef unix
1320                 munmap (pool->map, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1321 #else
1322                 FlushViewOfFile(pool->map, 0);
1323                 UnmapViewOfFile(pool->map);
1324                 CloseHandle(pool->hmap);
1325 #endif
1326                 pool->map = NULL;
1327
1328                 //  create new pool mapping
1329                 //  and link into hash table
1330
1331                 if( bt_mapsegment(bt, pool, page_no) )
1332                         return NULL;
1333
1334                 bt_linkhash(bt, pool, page_no, idx);
1335 #ifdef unix
1336                 __sync_fetch_and_add(&pool->pin, 1);
1337 #else
1338                 _InterlockedIncrement16 (&pool->pin);
1339 #endif
1340                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1341                 return pool;
1342         }
1343 }
1344
1345 // place write, read, or parent lock on requested page_no.
1346 //      pin to buffer pool and return latchset pointer
1347
1348 void bt_lockpage(BtLock mode, BtLatchSet *set)
1349 {
1350         switch( mode ) {
1351         case BtLockRead:
1352                 bt_readlock (set->readwr);
1353                 break;
1354         case BtLockWrite:
1355                 bt_writelock (set->readwr);
1356                 break;
1357         case BtLockAccess:
1358                 bt_readlock (set->access);
1359                 break;
1360         case BtLockDelete:
1361                 bt_writelock (set->access);
1362                 break;
1363         case BtLockParent:
1364                 bt_writelock (set->parent);
1365                 break;
1366         }
1367 }
1368
1369 // remove write, read, or parent lock on requested page_no.
1370
1371 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1372 {
1373         switch( mode ) {
1374         case BtLockRead:
1375                 bt_releaseread (set->readwr);
1376                 break;
1377         case BtLockWrite:
1378                 bt_releasewrite (set->readwr);
1379                 break;
1380         case BtLockAccess:
1381                 bt_releaseread (set->access);
1382                 break;
1383         case BtLockDelete:
1384                 bt_releasewrite (set->access);
1385                 break;
1386         case BtLockParent:
1387                 bt_releasewrite (set->parent);
1388                 break;
1389         }
1390 }
1391
1392 //      allocate a new page and write page into it
1393
1394 uid bt_newpage(BtDb *bt, BtPage page)
1395 {
1396 BtLatchSet *set;
1397 BtPool *pool;
1398 uid new_page;
1399 BtPage pmap;
1400 int reuse;
1401
1402         //      lock allocation page
1403
1404         bt_spinwritelock(bt->mgr->latchmgr->lock);
1405
1406         // use empty chain first
1407         // else allocate empty page
1408
1409         if( new_page = bt_getid(bt->mgr->latchmgr->alloc[1].right) ) {
1410                 if( pool = bt_pinpool (bt, new_page) )
1411                         pmap = bt_page (bt, pool, new_page);
1412                 else
1413                         return 0;
1414                 bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(pmap->right));
1415                 bt_unpinpool (pool);
1416                 reuse = 1;
1417         } else {
1418                 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1419                 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1420                 reuse = 0;
1421         }
1422 #ifdef unix
1423         // if writing first page of pool block, zero last page in the block
1424
1425         if ( !reuse && bt->mgr->poolmask > 0 && (new_page & bt->mgr->poolmask) == 0 )
1426         {
1427                 // use zero buffer to write zeros
1428                 if ( pwrite(bt->mgr->idx,bt->zero, bt->mgr->page_size, (new_page | bt->mgr->poolmask) << bt->mgr->page_bits) < bt->mgr->page_size )
1429                         return bt->err = BTERR_wrt, 0;
1430         }
1431
1432         // unlock allocation latch
1433
1434         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1435
1436         if ( pwrite(bt->mgr->idx, page, bt->mgr->page_size, new_page << bt->mgr->page_bits) < bt->mgr->page_size )
1437                 return bt->err = BTERR_wrt, 0;
1438
1439 #else
1440         // unlock allocation latch
1441
1442         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1443
1444         //      bring new page into pool and copy page.
1445         //      this will extend the file into the new pages.
1446         //      NB -- no latch required
1447
1448         if( pool = bt_pinpool (bt, new_page) )
1449                 pmap = bt_page (bt, pool, new_page);
1450         else
1451                 return 0;
1452
1453         memcpy(pmap, page, bt->mgr->page_size);
1454         bt_unpinpool (pool);
1455 #endif
1456         return new_page;
1457 }
1458
1459 //  find slot in page for given key at a given level
1460
1461 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1462 {
1463 uint diff, higher = bt->page->cnt, low = 1, slot;
1464 uint good = 0;
1465
1466         //      if no right link
1467         //        make stopper key an infinite fence value
1468         //        by setting the good flag
1469
1470         if( bt_getid (bt->page->right) )
1471                 higher++;
1472         else
1473                 good++;
1474
1475         //      low is the next candidate.
1476         //  loop ends when they meet
1477
1478         //  if good, higher is already
1479         //      tested as .ge. the given key.
1480
1481         while( diff = higher - low ) {
1482                 slot = low + ( diff >> 1 );
1483                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1484                         low = slot + 1;
1485                 else
1486                         higher = slot, good++;
1487         }
1488
1489         //      return zero if key is on right link page
1490
1491         return good ? higher : 0;
1492 }
1493
1494 //  find and load page at given level for given key
1495 //      leave page rd or wr locked as requested
1496
1497 uint bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, BtLock lock)
1498 {
1499 uid page_no = ROOT_page, prevpage = 0;
1500 BtLatchSet *set, *prevset;
1501 uint drill = 0xff, slot;
1502 uint mode, prevmode;
1503 BtPool *prevpool;
1504 int foster = 0;
1505
1506   //  start at root of btree and drill down
1507
1508   do {
1509         // determine lock mode of drill level
1510         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1511
1512         //      obtain latch set for this page
1513
1514         bt->set = bt_pinlatch (bt, page_no);
1515         bt->page_no = page_no;
1516
1517         // pin page contents
1518
1519         if( bt->pool = bt_pinpool (bt, page_no) )
1520                 bt->page = bt_page (bt, bt->pool, page_no);
1521         else
1522                 return 0;
1523
1524         // obtain access lock using lock chaining with Access mode
1525
1526         if( page_no > ROOT_page )
1527           bt_lockpage(BtLockAccess, bt->set);
1528
1529         //  now unlock and unpin our (possibly foster) parent
1530
1531         if( prevpage ) {
1532           bt_unlockpage(prevmode, prevset);
1533           bt_unpinlatch (prevset);
1534           bt_unpinpool (prevpool);
1535           prevpage = 0;
1536         }
1537
1538         // obtain read lock using lock chaining
1539
1540         bt_lockpage(mode, bt->set);
1541
1542         if( page_no > ROOT_page )
1543           bt_unlockpage(BtLockAccess, bt->set);
1544
1545         // re-read and re-lock root after determining actual level of root
1546
1547         if( page_no == ROOT_page )
1548           if( bt->page->lvl != drill) {
1549                 drill = bt->page->lvl;
1550
1551             if( lock == BtLockWrite && drill == lvl ) {
1552                   bt_unlockpage(mode, bt->set);
1553                   bt_unpinlatch (bt->set);
1554                   bt_unpinpool (bt->pool);
1555                   continue;
1556                 }
1557           }
1558
1559         //  find key on page at this level
1560         //  and either descend to requested level
1561         //      or return key slot
1562
1563         if( slot = bt_findslot (bt, key, len) ) {
1564           //    is this slot < foster child area
1565           //    on the requested level?
1566
1567           //    if so, return actual slot even if dead
1568
1569           if( slot <= bt->page->cnt - bt->page->foster )
1570            if( drill == lvl )
1571                 return bt->foster = foster, slot;
1572
1573           //    find next active slot
1574           //    note: foster children are never dead
1575
1576           while( slotptr(bt->page, slot)->dead )
1577            if( slot++ < bt->page->cnt )
1578                 continue;
1579            else {
1580                 //  we are waiting for fence key posting
1581                 page_no = bt_getid(bt->page->right);
1582                 goto slideright;
1583           }
1584
1585          //     is this slot < foster child area
1586          //     if so, drill to next level
1587
1588          if( slot <= bt->page->cnt - bt->page->foster )
1589                 foster = 0, drill--;
1590          else
1591                 foster = 1;
1592
1593           //  continue right onto foster child
1594           //    or down to next level.
1595
1596           page_no = bt_getid(slotptr(bt->page, slot)->id);
1597
1598         //  or slide right into next page
1599
1600         } else {
1601                 page_no = bt_getid(bt->page->right);
1602                 foster = 1;
1603         }
1604
1605 slideright:
1606         prevpage = bt->page_no;
1607         prevpool = bt->pool;
1608         prevset = bt->set;
1609         prevmode = mode;
1610
1611   } while( page_no );
1612
1613   // return error on end of chain
1614
1615   bt->err = BTERR_struct;
1616   return 0;     // return error
1617 }
1618
1619 //  remove empty page from the B-tree
1620 //      by pulling our right node left over ourselves
1621
1622 //  call with bt->page, etc, set to page's locked parent
1623 //      returns with page locked.
1624
1625 BTERR bt_mergeright (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl, uint slot)
1626 {
1627 BtLatchSet *rset, *pset, *rpset;
1628 BtPool *rpool, *ppool, *rppool;
1629 BtPage rpage, ppage, rppage;
1630 uid right, parent, rparent;
1631 BtKey ptr;
1632 uint idx;
1633
1634         //      cache node's parent page
1635
1636         parent = bt->page_no;
1637         ppage = bt->page;
1638         ppool = bt->pool;
1639         pset = bt->set;
1640
1641         // lock and map our right page
1642         // note that it cannot be our foster child
1643         // since the our node is empty
1644         //      and it cannot be NULL because of the stopper
1645         //      in the last right page
1646
1647         bt_lockpage (BtLockWrite, set);
1648
1649         // if we aren't dead yet
1650
1651         if( page->act )
1652                 goto rmergexit;
1653
1654         if( right = bt_getid (page->right) )
1655           if( rpool = bt_pinpool (bt, right) )
1656                 rpage = bt_page (bt, rpool, right);
1657           else
1658                 return bt->err;
1659         else
1660                 return bt->err = BTERR_struct;
1661
1662         rset = bt_pinlatch (bt, right);
1663
1664         //      find our right neighbor
1665
1666         if( ppage->act > 1 ) {
1667          for( idx = slot; idx++ < ppage->cnt; )
1668           if( !slotptr(ppage, idx)->dead )
1669                 break;
1670
1671          if( idx > ppage->cnt )
1672                 return bt->err = BTERR_struct;
1673
1674          //  redirect right neighbor in parent to left node
1675
1676          bt_putid(slotptr(ppage,idx)->id, page_no);
1677         }
1678
1679         //      if parent has only our deleted page, e.g. no right neighbor
1680         //      prepare to merge parent itself
1681
1682         if( ppage->act == 1 ) {
1683           if( rparent = bt_getid (ppage->right) )
1684            if( rppool = bt_pinpool (bt, rparent) )
1685                 rppage = bt_page (bt, rppool, rparent);
1686            else
1687                 return bt->err;
1688           else
1689                 return bt->err = BTERR_struct;
1690
1691           rpset = bt_pinlatch (bt, rparent);
1692           bt_lockpage (BtLockWrite, rpset);
1693
1694           // find our right neighbor on right parent page
1695
1696           for( idx = 0; idx++ < rppage->cnt; )
1697                 if( !slotptr(rppage, idx)->dead ) {
1698                   bt_putid (slotptr(rppage, idx)->id, page_no);
1699                   break;
1700                 }
1701
1702           if( idx > rppage->cnt )
1703                 return bt->err = BTERR_struct;
1704         }
1705
1706         //      now that there are no more pointers to our right node
1707         //      we can wait for delete lock on it
1708
1709         bt_lockpage(BtLockDelete, rset);
1710         bt_lockpage(BtLockWrite, rset);
1711
1712         // pull contents of right page into our empty page 
1713
1714         memcpy (page, rpage, bt->mgr->page_size);
1715
1716         // ready to release right parent lock
1717         //      now that we have a new page in place
1718
1719         if( ppage->act == 1 ) {
1720           bt_unlockpage (BtLockWrite, rpset);
1721           bt_unpinlatch (rpset);
1722           bt_unpinpool (rppool);
1723         }
1724
1725         //      add killed right block to free chain
1726         //      lock latch mgr
1727
1728         bt_spinwritelock(bt->mgr->latchmgr->lock);
1729
1730         //      store free chain in allocation page second right
1731
1732         bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
1733         bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
1734
1735         // unlock latch mgr and right page
1736
1737         bt_unlockpage(BtLockDelete, rset);
1738         bt_unlockpage(BtLockWrite, rset);
1739         bt_unpinlatch (rset);
1740         bt_unpinpool (rpool);
1741
1742         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1743
1744         // delete our obsolete fence key from our parent
1745
1746         slotptr(ppage, slot)->dead = 1;
1747         ppage->dirty = 1;
1748
1749         //      if our parent now empty
1750         //      remove it from the tree
1751
1752         if( ppage->act-- == 1 )
1753           if( bt_mergeleft (bt, ppage, ppool, pset, parent, lvl+1) )
1754                 return bt->err;
1755
1756 rmergexit:
1757         bt_unlockpage (BtLockWrite, pset);
1758         bt_unpinlatch (pset);
1759         bt_unpinpool (ppool);
1760
1761         bt->found = 1;
1762         return bt->err = 0;
1763 }
1764
1765 //  remove empty page from the B-tree
1766 //      try merging left first.  If no left
1767 //      sibling, then merge right.
1768
1769 //      call with page loaded and locked,
1770 //  return with page locked.
1771
1772 BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl)
1773 {
1774 unsigned char fencekey[256], postkey[256];
1775 uint slot, idx, postfence = 0;
1776 BtLatchSet *lset, *pset;
1777 BtPool *lpool, *ppool;
1778 BtPage lpage, ppage;
1779 uid left, parent;
1780 BtKey ptr;
1781
1782         ptr = keyptr(page, page->cnt);
1783         memcpy(fencekey, ptr, ptr->len + 1);
1784         bt_unlockpage (BtLockWrite, set);
1785
1786         //      load and lock our parent
1787
1788 retry:
1789         if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+1, BtLockWrite)) )
1790                 return bt->err;
1791
1792         parent = bt->page_no;
1793         ppage = bt->page;
1794         ppool = bt->pool;
1795         pset = bt->set;
1796
1797         //      wait until we are not a foster child
1798
1799         if( bt->foster ) {
1800                 bt_unlockpage (BtLockWrite, pset);
1801                 bt_unpinlatch (pset);
1802                 bt_unpinpool (ppool);
1803 #ifdef unix
1804                 sched_yield();
1805 #else
1806                 SwitchToThread();
1807 #endif
1808                 goto retry;
1809         }
1810
1811         //  find our left neighbor in our parent page
1812
1813         for( idx = slot; --idx; )
1814           if( !slotptr(ppage, idx)->dead )
1815                 break;
1816
1817         //      if no left neighbor, do right merge
1818
1819         if( !idx )
1820                 return bt_mergeright (bt, page, pool, set, page_no, lvl, slot);
1821
1822         // lock and map our left neighbor's page
1823
1824         left = bt_getid (slotptr(ppage, idx)->id);
1825
1826         if( lpool = bt_pinpool (bt, left) )
1827                 lpage = bt_page (bt, lpool, left);
1828         else
1829                 return bt->err;
1830
1831         lset = bt_pinlatch (bt, left);
1832         bt_lockpage(BtLockWrite, lset);
1833
1834         //  wait until foster sibling is in our parent
1835
1836         if( bt_getid (lpage->right) != page_no ) {
1837                 bt_unlockpage (BtLockWrite, pset);
1838                 bt_unpinlatch (pset);
1839                 bt_unpinpool (ppool);
1840                 bt_unlockpage (BtLockWrite, lset);
1841                 bt_unpinlatch (lset);
1842                 bt_unpinpool (lpool);
1843 #ifdef linux
1844                 sched_yield();
1845 #else
1846                 SwitchToThread();
1847 #endif
1848                 goto retry;
1849         }
1850
1851         //  since our page will have no more pointers to it,
1852         //      obtain Delete lock and wait for write locks to clear
1853
1854         bt_lockpage(BtLockDelete, set);
1855         bt_lockpage(BtLockWrite, set);
1856
1857         //      if we aren't dead yet,
1858         //      get ready for exit
1859
1860         if( page->act ) {
1861                 bt_unlockpage(BtLockDelete, set);
1862                 bt_unlockpage(BtLockWrite, lset);
1863                 bt_unpinlatch (lset);
1864                 bt_unpinpool (lpool);
1865                 goto lmergexit;
1866         }
1867
1868         //      are we are the fence key for our parent?
1869         //      if so, grab our old fence key
1870
1871         if( postfence = slot == ppage->cnt ) {
1872                 ptr = keyptr (ppage, ppage->cnt);
1873                 memcpy(fencekey, ptr, ptr->len + 1);
1874                 memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
1875
1876                 // clear out other dead slots
1877
1878                 while( --ppage->cnt )
1879                   if( slotptr(ppage, ppage->cnt)->dead )
1880                         memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
1881                   else
1882                         break;
1883
1884                 ptr = keyptr (ppage, ppage->cnt);
1885                 memcpy(postkey, ptr, ptr->len + 1);
1886         } else
1887                 slotptr(ppage,slot)->dead = 1;
1888
1889         ppage->dirty = 1;
1890         ppage->act--;
1891
1892         //      push our right neighbor pointer to our left
1893
1894         memcpy (lpage->right, page->right, BtId);
1895
1896         //      add ourselves to free chain
1897         //      lock latch mgr
1898
1899         bt_spinwritelock(bt->mgr->latchmgr->lock);
1900
1901         //      store free chain in allocation page second right
1902         bt_putid(page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
1903         bt_putid(bt->mgr->latchmgr->alloc[1].right, page_no);
1904
1905         // unlock latch mgr and pages
1906
1907         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1908         bt_unlockpage(BtLockWrite, lset);
1909         bt_unpinlatch (lset);
1910         bt_unpinpool (lpool);
1911
1912         // release our node's delete lock
1913
1914         bt_unlockpage(BtLockDelete, set);
1915
1916 lmergexit:
1917         bt_unlockpage (BtLockWrite, pset);
1918         bt_unpinpool (ppool);
1919
1920         //  do we need to post parent's fence key in its parent?
1921
1922         if( !postfence || parent == ROOT_page ) {
1923                 bt_unpinlatch (pset);
1924                 bt->found = 1;
1925                 return bt->err = 0;
1926         }
1927
1928         //      interlock parent fence post
1929
1930         bt_lockpage (BtLockParent, pset);
1931
1932         //      load parent's parent page
1933 posttry:
1934         if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+2, BtLockWrite)) )
1935                 return bt->err;
1936
1937         if( !(slot = bt_cleanpage (bt, bt->page, *fencekey, slot)) )
1938           if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
1939                 return bt->err;
1940           else
1941                 goto posttry;
1942
1943         page = bt->page;
1944
1945         page->min -= *postkey + 1;
1946         ((unsigned char *)page)[page->min] = *postkey;
1947         memcpy ((unsigned char *)page + page->min +1, postkey + 1, *postkey );
1948         slotptr(page, slot)->off = page->min;
1949
1950         bt_unlockpage (BtLockParent, pset);
1951         bt_unpinlatch (pset);
1952
1953         bt_unlockpage (BtLockWrite, bt->set);
1954         bt_unpinlatch (bt->set);
1955         bt_unpinpool (bt->pool);
1956
1957         bt->found = 1;
1958         return bt->err = 0;
1959 }
1960
1961 //  find and delete key on page by marking delete flag bit
1962 //  if page becomes empty, delete it from the btree
1963
1964 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
1965 {
1966 BtLatchSet *set;
1967 BtPool *pool;
1968 BtPage page;
1969 uid page_no;
1970 BtKey ptr;
1971 uint slot;
1972
1973         if( !(slot = bt_loadpage (bt, key, len, 0, BtLockWrite)) )
1974                 return bt->err;
1975
1976         page_no = bt->page_no;
1977         page = bt->page;
1978         pool = bt->pool;
1979         set = bt->set;
1980
1981         // if key is found delete it, otherwise ignore request
1982
1983         ptr = keyptr(page, slot);
1984
1985         if( bt->found = !keycmp (ptr, key, len) )
1986           if( bt->found = slotptr(page, slot)->dead == 0 ) {
1987                 slotptr(page,slot)->dead = 1;
1988                   if( slot < page->cnt )
1989                         page->dirty = 1;
1990                   if( !--page->act )
1991                         if( bt_mergeleft (bt, page, pool, set, page_no, 0) )
1992                           return bt->err;
1993                 }
1994
1995         bt_unlockpage(BtLockWrite, set);
1996         bt_unpinlatch (set);
1997         bt_unpinpool (pool);
1998         return bt->err = 0;
1999 }
2000
2001 //      find key in leaf level and return row-id
2002
2003 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
2004 {
2005 uint  slot;
2006 BtKey ptr;
2007 uid id;
2008
2009         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
2010                 ptr = keyptr(bt->page, slot);
2011         else
2012                 return 0;
2013
2014         // if key exists, return row-id
2015         //      otherwise return 0
2016
2017         if( slot <= bt->page->cnt && !keycmp (ptr, key, len) )
2018                 id = bt_getid(slotptr(bt->page,slot)->id);
2019         else
2020                 id = 0;
2021
2022         bt_unlockpage (BtLockRead, bt->set);
2023         bt_unpinlatch (bt->set);
2024         bt_unpinpool (bt->pool);
2025         return id;
2026 }
2027
2028 //      check page for space available,
2029 //      clean if necessary and return
2030 //      0 - page needs splitting
2031 //      >0  new slot value
2032
2033 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
2034 {
2035 uint nxt = bt->mgr->page_size;
2036 uint cnt = 0, idx = 0;
2037 uint max = page->cnt;
2038 uint newslot = max;
2039 BtKey key;
2040
2041         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
2042                 return slot;
2043
2044         //      skip cleanup if nothing to reclaim
2045
2046         if( !page->dirty )
2047                 return 0;
2048
2049         memcpy (bt->frame, page, bt->mgr->page_size);
2050
2051         // skip page info and set rest of page to zero
2052
2053         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2054         page->dirty = 0;
2055         page->act = 0;
2056
2057         // try cleaning up page first
2058
2059         // always leave fence key in the array
2060         // otherwise, remove deleted key
2061
2062         // note: foster children are never dead
2063
2064         while( cnt++ < max ) {
2065                 if( cnt == slot )
2066                         newslot = idx + 1;
2067                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
2068                         continue;
2069
2070                 // copy key
2071
2072                 key = keyptr(bt->frame, cnt);
2073                 nxt -= key->len + 1;
2074                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
2075
2076                 // copy slot
2077                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
2078                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2079                         page->act++;
2080                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
2081                 slotptr(page, idx)->off = nxt;
2082         }
2083
2084         page->min = nxt;
2085         page->cnt = idx;
2086
2087         //      see if page has enough space now, or does it need splitting?
2088
2089         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
2090                 return newslot;
2091
2092         return 0;
2093 }
2094
2095 //      add key to current page
2096 //      page must already be writelocked
2097
2098 void bt_addkeytopage (BtDb *bt, BtPage page, uint slot, unsigned char *key, uint len, uid id, uint tod)
2099 {
2100 uint idx;
2101
2102         // find next available dead slot and copy key onto page
2103         // note that foster children on the page are never dead
2104
2105         // look for next hole, but stay back from the fence key
2106
2107         for( idx = slot; idx < page->cnt; idx++ )
2108           if( slotptr(page, idx)->dead )
2109                 break;
2110
2111         if( idx == page->cnt )
2112                 idx++, page->cnt++;
2113
2114         page->act++;
2115
2116         // now insert key into array before slot
2117
2118         while( idx > slot )
2119                 *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
2120
2121         page->min -= len + 1;
2122         ((unsigned char *)page)[page->min] = len;
2123         memcpy ((unsigned char *)page + page->min +1, key, len );
2124
2125         bt_putid(slotptr(page,slot)->id, id);
2126         slotptr(page, slot)->off = page->min;
2127         slotptr(page, slot)->tod = tod;
2128         slotptr(page, slot)->dead = 0;
2129 }
2130
2131 // split the root and raise the height of the btree
2132 //      call with current page locked and page no of foster child
2133 //      return with current page (root) unlocked
2134
2135 BTERR bt_splitroot(BtDb *bt, uid right)
2136 {
2137 uint nxt = bt->mgr->page_size;
2138 unsigned char fencekey[256];
2139 BtPage root = bt->page;
2140 uid new_page;
2141 BtKey key;
2142
2143         //  Obtain an empty page to use, and copy the left page
2144         //  contents into it from the root.  Strip foster child key.
2145         //      (it's the stopper key)
2146
2147         memset (slotptr(root, root->cnt), 0, sizeof(BtSlot));
2148         root->dirty = 1;
2149         root->foster--;
2150         root->act--;
2151         root->cnt--;
2152
2153         //      Save left fence key.
2154
2155         key = keyptr(root, root->cnt);
2156         memcpy (fencekey, key, key->len + 1);
2157
2158         //  copy the lower keys into a new left page
2159
2160         if( !(new_page = bt_newpage(bt, root)) )
2161                 return bt->err;
2162
2163         // preserve the page info at the bottom
2164         // and set rest of the root to zero
2165
2166         memset (root+1, 0, bt->mgr->page_size - sizeof(*root));
2167
2168         // insert left fence key on empty newroot page
2169
2170         nxt -= *fencekey + 1;
2171         memcpy ((unsigned char *)root + nxt, fencekey, *fencekey + 1);
2172         bt_putid(slotptr(root, 1)->id, new_page);
2173         slotptr(root, 1)->off = nxt;
2174         
2175         // insert stopper key on newroot page
2176         // and increase the root height
2177
2178         nxt -= 3;
2179         fencekey[0] = 2;
2180         fencekey[1] = 0xff;
2181         fencekey[2] = 0xff;
2182         memcpy ((unsigned char *)root + nxt, fencekey, *fencekey + 1);
2183         bt_putid(slotptr(root, 2)->id, right);
2184         slotptr(root, 2)->off = nxt;
2185
2186         bt_putid(root->right, 0);
2187         root->min = nxt;                // reset lowest used offset and key count
2188         root->cnt = 2;
2189         root->act = 2;
2190         root->lvl++;
2191
2192         // release and unpin root (bt->page)
2193
2194         bt_unlockpage(BtLockWrite, bt->set);
2195         bt_unpinlatch (bt->set);
2196         bt_unpinpool (bt->pool);
2197         return 0;
2198 }
2199
2200 //  split already locked full node
2201 //      return unlocked and unpinned.
2202
2203 BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no)
2204 {
2205 uint slot, cnt, idx, max, nxt = bt->mgr->page_size;
2206 unsigned char fencekey[256];
2207 uint tod = time(NULL);
2208 uint lvl = page->lvl;
2209 uid new_page;
2210 BtKey key;
2211
2212         //      initialize frame buffer for right node
2213
2214         memset (bt->frame, 0, bt->mgr->page_size);
2215         max = page->cnt - page->foster;
2216         cnt = max / 2;
2217         idx = 0;
2218
2219         //  split higher half of keys to bt->frame
2220         //      leaving old foster children in the left node,
2221         //      and adding a new foster child there.
2222
2223         while( cnt++ < max ) {
2224                 key = keyptr(page, cnt);
2225                 nxt -= key->len + 1;
2226                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
2227                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
2228                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
2229                         bt->frame->act++;
2230                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
2231                 slotptr(bt->frame, idx)->off = nxt;
2232         }
2233
2234         // transfer right link node to new right node
2235
2236         if( page_no > ROOT_page )
2237                 memcpy (bt->frame->right, page->right, BtId);
2238
2239         bt->frame->bits = bt->mgr->page_bits;
2240         bt->frame->min = nxt;
2241         bt->frame->cnt = idx;
2242         bt->frame->lvl = lvl;
2243
2244         //      get new free page and write right frame to it.
2245
2246         if( !(new_page = bt_newpage(bt, bt->frame)) )
2247                 return bt->err;
2248
2249         //      remember fence key for new right page to add
2250         //      as foster child to the left node
2251
2252         key = keyptr(bt->frame, idx);
2253         memcpy (fencekey, key, key->len + 1);
2254
2255         //      update lower keys and foster children to continue in old page
2256
2257         memcpy (bt->frame, page, bt->mgr->page_size);
2258         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2259         nxt = bt->mgr->page_size;
2260         page->dirty = 0;
2261         page->act = 0;
2262         cnt = 0;
2263         idx = 0;
2264
2265         //  assemble page of smaller keys
2266         //      to remain in the old page
2267
2268         while( cnt++ < max / 2 ) {
2269                 key = keyptr(bt->frame, cnt);
2270                 nxt -= key->len + 1;
2271                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
2272                 memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
2273                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2274                         page->act++;
2275                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
2276                 slotptr(page, idx)->off = nxt;
2277         }
2278
2279         //      insert new foster child for right page in queue
2280         //      before any of the current foster children
2281
2282         nxt -= *fencekey + 1;
2283         memcpy ((unsigned char *)page + nxt, fencekey, *fencekey + 1);
2284
2285         bt_putid (slotptr(page,++idx)->id, new_page);
2286         slotptr(page, idx)->tod = tod;
2287         slotptr(page, idx)->off = nxt;
2288         page->foster++;
2289         page->act++;
2290
2291         //  continue with old foster child keys
2292         //      note that none will be dead
2293
2294         cnt = bt->frame->cnt - bt->frame->foster;
2295
2296         while( cnt++ < bt->frame->cnt ) {
2297                 key = keyptr(bt->frame, cnt);
2298                 nxt -= key->len + 1;
2299                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
2300                 memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
2301                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
2302                 slotptr(page, idx)->off = nxt;
2303                 page->act++;
2304         }
2305
2306         page->min = nxt;
2307         page->cnt = idx;
2308
2309         //      link new right page
2310
2311         bt_putid (page->right, new_page);
2312
2313         // if current page is the root page, split it
2314
2315         if( page_no == ROOT_page )
2316                 return bt_splitroot (bt, new_page);
2317
2318         //  release wr lock on our page
2319
2320         bt_unlockpage (BtLockWrite, set);
2321
2322         //  obtain ParentModification lock for current page
2323         //      to fix new fence key and oldest foster child on page
2324
2325         bt_lockpage (BtLockParent, set);
2326
2327         //  get our new fence key to insert in parent node
2328
2329         bt_lockpage (BtLockRead, set);
2330
2331         key = keyptr(page, page->cnt-1);
2332         memcpy (fencekey, key, key->len+1);
2333
2334         bt_unlockpage (BtLockRead, set);
2335
2336         if( bt_insertkey (bt, fencekey + 1, *fencekey, page_no, tod, lvl + 1) )
2337                 return bt->err;
2338
2339         //      lock our page for writing
2340
2341         bt_lockpage (BtLockRead, set);
2342
2343         //      switch old parent key from us to our oldest foster child
2344
2345         key = keyptr(page, page->cnt);
2346         memcpy (fencekey, key, key->len+1);
2347
2348         new_page = bt_getid (slotptr(page, page->cnt)->id);
2349         bt_unlockpage (BtLockRead, set);
2350
2351         if( bt_insertkey (bt, fencekey + 1, *fencekey, new_page, tod, lvl + 1) )
2352                 return bt->err;
2353
2354         //      now that it has its own parent pointer,
2355         //      remove oldest foster child from our page
2356
2357         bt_lockpage (BtLockWrite, set);
2358         memset (slotptr(page, page->cnt), 0, sizeof(BtSlot));
2359         page->dirty = 1;
2360         page->foster--;
2361         page->cnt--;
2362         page->act--;
2363
2364         bt_unlockpage (BtLockParent, set);
2365
2366         //  if this emptied page,
2367         //      undo the foster child
2368
2369         if( !page->act )
2370           if( bt_mergeleft (bt, page, pool, set, page_no, lvl) )
2371                 return bt->err;
2372
2373         //      unlock and unpin
2374
2375         bt_unlockpage (BtLockWrite, set);
2376         bt_unpinlatch (set);
2377         bt_unpinpool (pool);
2378         return 0;
2379 }
2380
2381 //  Insert new key into the btree at leaf level.
2382
2383 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl)
2384 {
2385 uint slot, idx;
2386 BtPage page;
2387 BtKey ptr;
2388
2389         while( 1 ) {
2390                 if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
2391                         ptr = keyptr(bt->page, slot);
2392                 else
2393                 {
2394                         if ( !bt->err )
2395                                 bt->err = BTERR_ovflw;
2396                         return bt->err;
2397                 }
2398
2399                 // if key already exists, update id and return
2400
2401                 page = bt->page;
2402
2403                 if( !keycmp (ptr, key, len) ) {
2404                         if( slotptr(page, slot)->dead )
2405                                 page->act++;
2406                         slotptr(page, slot)->dead = 0;
2407                         slotptr(page, slot)->tod = tod;
2408                         bt_putid(slotptr(page,slot)->id, id);
2409                         bt_unlockpage(BtLockWrite, bt->set);
2410                         bt_unpinlatch (bt->set);
2411                         bt_unpinpool (bt->pool);
2412                         return bt->err;
2413                 }
2414
2415                 // check if page has enough space
2416
2417                 if( slot = bt_cleanpage (bt, bt->page, len, slot) )
2418                         break;
2419
2420                 if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
2421                         return bt->err;
2422         }
2423
2424         bt_addkeytopage (bt, bt->page, slot, key, len, id, tod);
2425
2426         bt_unlockpage (BtLockWrite, bt->set);
2427         bt_unpinlatch (bt->set);
2428         bt_unpinpool (bt->pool);
2429         return 0;
2430 }
2431
2432 //  cache page of keys into cursor and return starting slot for given key
2433
2434 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2435 {
2436 uint slot;
2437
2438         // cache page for retrieval
2439         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
2440                 memcpy (bt->cursor, bt->page, bt->mgr->page_size);
2441
2442         bt->cursor_page = bt->page_no;
2443
2444         bt_unlockpage(BtLockRead, bt->set);
2445         bt_unpinlatch (bt->set);
2446         bt_unpinpool (bt->pool);
2447         return slot;
2448 }
2449
2450 //  return next slot for cursor page
2451 //  or slide cursor right into next page
2452
2453 uint bt_nextkey (BtDb *bt, uint slot)
2454 {
2455 BtLatchSet *set;
2456 BtPool *pool;
2457 BtPage page;
2458 uid right;
2459
2460   do {
2461         right = bt_getid(bt->cursor->right);
2462         while( slot++ < bt->cursor->cnt - bt->cursor->foster )
2463           if( slotptr(bt->cursor,slot)->dead )
2464                 continue;
2465           else if( right || (slot < bt->cursor->cnt - bt->cursor->foster) )
2466                 return slot;
2467           else
2468                 break;
2469
2470         if( !right )
2471                 break;
2472
2473         bt->cursor_page = right;
2474         if( pool = bt_pinpool (bt, right) )
2475                 page = bt_page (bt, pool, right);
2476         else
2477                 return 0;
2478
2479         set = bt_pinlatch (bt, right);
2480     bt_lockpage(BtLockRead, set);
2481
2482         memcpy (bt->cursor, page, bt->mgr->page_size);
2483
2484         bt_unlockpage(BtLockRead, set);
2485         bt_unpinlatch (set);
2486         bt_unpinpool (pool);
2487         slot = 0;
2488   } while( 1 );
2489
2490   return bt->err = 0;
2491 }
2492
2493 BtKey bt_key(BtDb *bt, uint slot)
2494 {
2495         return keyptr(bt->cursor, slot);
2496 }
2497
2498 uid bt_uid(BtDb *bt, uint slot)
2499 {
2500         return bt_getid(slotptr(bt->cursor,slot)->id);
2501 }
2502
2503 uint bt_tod(BtDb *bt, uint slot)
2504 {
2505         return slotptr(bt->cursor,slot)->tod;
2506 }
2507
2508
2509 #ifdef STANDALONE
2510
2511 void bt_latchaudit (BtDb *bt)
2512 {
2513 ushort idx, hashidx;
2514 BtLatchSet *set;
2515 BtPool *pool;
2516 BtPage page;
2517 uid page_no;
2518
2519 #ifdef unix
2520         for( idx = 1; idx < bt->mgr->latchmgr->latchdeployed; idx++ ) {
2521                 set = bt->mgr->latchsets + idx;
2522                 if( set->pin ) {
2523                         fprintf(stderr, "latchset %d pinned\n", idx);
2524                         set->pin = 0;
2525                 }
2526         }
2527
2528         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2529           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2530                 set = bt->mgr->latchsets + idx;
2531                 if( set->hash != hashidx )
2532                         fprintf(stderr, "latchset %d wrong hashidx\n", idx);
2533                 if( set->pin )
2534                         fprintf(stderr, "latchset %d pinned\n", idx);
2535           } while( idx = set->next );
2536         }
2537         page_no = LEAF_page;
2538
2539         while( page_no ) {
2540                 fprintf(stderr, "page: %.6x\n", (uint)page_no);
2541                 pool = bt_pinpool (bt, page_no);
2542                 page = bt_page (bt, pool, page_no);
2543             page_no = bt_getid(page->right);
2544                 bt_unpinpool (pool);
2545         }
2546
2547         page_no = bt_getid(bt->mgr->latchmgr->alloc[1].right);
2548
2549         while( page_no ) {
2550                 fprintf(stderr, "free: %.6x\n", (uint)page_no);
2551                 pool = bt_pinpool (bt, page_no);
2552                 page = bt_page (bt, pool, page_no);
2553             page_no = bt_getid(page->right);
2554                 bt_unpinpool (pool);
2555         }
2556 #endif
2557 }
2558
2559 typedef struct {
2560         char type, idx;
2561         char *infile;
2562         BtMgr *mgr;
2563         int num;
2564 } ThreadArg;
2565
2566 //  standalone program to index file of keys
2567 //  then list them onto std-out
2568
2569 #ifdef unix
2570 void *index_file (void *arg)
2571 #else
2572 uint __stdcall index_file (void *arg)
2573 #endif
2574 {
2575 int line = 0, found = 0, cnt = 0;
2576 uid next, page_no = LEAF_page;  // start on first page of leaves
2577 unsigned char key[256];
2578 ThreadArg *args = arg;
2579 int ch, len = 0, slot;
2580 BtLatchSet *set;
2581 time_t tod[1];
2582 BtPool *pool;
2583 BtPage page;
2584 BtKey ptr;
2585 BtDb *bt;
2586 FILE *in;
2587
2588         bt = bt_open (args->mgr);
2589         time (tod);
2590
2591         switch(args->type | 0x20)
2592         {
2593         case 'a':
2594                 fprintf(stderr, "started latch mgr audit\n");
2595                 bt_latchaudit (bt);
2596                 fprintf(stderr, "finished latch mgr audit\n");
2597                 break;
2598
2599         case 'w':
2600                 fprintf(stderr, "started indexing for %s\n", args->infile);
2601                 if( in = fopen (args->infile, "rb") )
2602                   while( ch = getc(in), ch != EOF )
2603                         if( ch == '\n' )
2604                         {
2605                           line++;
2606
2607                           if( args->num == 1 )
2608                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2609
2610                           else if( args->num )
2611                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2612
2613                           if( bt_insertkey (bt, key, len, line, *tod, 0) )
2614                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2615                           len = 0;
2616                         }
2617                         else if( len < 255 )
2618                                 key[len++] = ch;
2619                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2620                 break;
2621
2622         case 'd':
2623                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2624                 if( in = fopen (args->infile, "rb") )
2625                   while( ch = getc(in), ch != EOF )
2626                         if( ch == '\n' )
2627                         {
2628                           line++;
2629                           if( args->num == 1 )
2630                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2631
2632                           else if( args->num )
2633                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2634
2635                           if( bt_deletekey (bt, key, len) )
2636                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2637                           len = 0;
2638                         }
2639                         else if( len < 255 )
2640                                 key[len++] = ch;
2641                 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
2642                 break;
2643
2644         case 'f':
2645                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2646                 if( in = fopen (args->infile, "rb") )
2647                   while( ch = getc(in), ch != EOF )
2648                         if( ch == '\n' )
2649                         {
2650                           line++;
2651                           if( args->num == 1 )
2652                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2653
2654                           else if( args->num )
2655                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2656
2657                           if( bt_findkey (bt, key, len) )
2658                                 found++;
2659                           else if( bt->err )
2660                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2661                           len = 0;
2662                         }
2663                         else if( len < 255 )
2664                                 key[len++] = ch;
2665                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2666                 break;
2667
2668         case 's':
2669                 len = key[0] = 0;
2670
2671                 fprintf(stderr, "started reading\n");
2672
2673                 if( slot = bt_startkey (bt, key, len) )
2674                   slot--;
2675                 else
2676                   fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2677
2678                 while( slot = bt_nextkey (bt, slot) ) {
2679                         ptr = bt_key(bt, slot);
2680                         fwrite (ptr->key, ptr->len, 1, stdout);
2681                         fputc ('\n', stdout);
2682                 }
2683
2684                 break;
2685
2686         case 'c':
2687                 fprintf(stderr, "started reading\n");
2688
2689                 do {
2690                         if( pool = bt_pinpool (bt, page_no) )
2691                                 page = bt_page (bt, pool, page_no);
2692                         else
2693                                 break;
2694                         set = bt_pinlatch (bt, page_no);
2695                         bt_lockpage (BtLockRead, set);
2696                         cnt += page->act;
2697                         next = bt_getid (page->right);
2698                         bt_unlockpage (BtLockRead, set);
2699                         bt_unpinlatch (set);
2700                         bt_unpinpool (pool);
2701                 } while( page_no = next );
2702
2703                 cnt--;  // remove stopper key
2704                 fprintf(stderr, " Total keys read %d\n", cnt);
2705                 break;
2706         }
2707
2708         bt_close (bt);
2709 #ifdef unix
2710         return NULL;
2711 #else
2712         return 0;
2713 #endif
2714 }
2715
2716 typedef struct timeval timer;
2717
2718 int main (int argc, char **argv)
2719 {
2720 int idx, cnt, len, slot, err;
2721 int segsize, bits = 16;
2722 #ifdef unix
2723 pthread_t *threads;
2724 timer start, stop;
2725 #else
2726 time_t start[1], stop[1];
2727 HANDLE *threads;
2728 #endif
2729 double real_time;
2730 ThreadArg *args;
2731 uint poolsize = 0;
2732 int num = 0;
2733 char key[1];
2734 BtMgr *mgr;
2735 BtKey ptr;
2736 BtDb *bt;
2737
2738         if( argc < 3 ) {
2739                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2740                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2741                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2742                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2743                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2744                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2745                 exit(0);
2746         }
2747
2748 #ifdef unix
2749         gettimeofday(&start, NULL);
2750 #else
2751         time(start);
2752 #endif
2753
2754         if( argc > 3 )
2755                 bits = atoi(argv[3]);
2756
2757         if( argc > 4 )
2758                 poolsize = atoi(argv[4]);
2759
2760         if( !poolsize )
2761                 fprintf (stderr, "Warning: no mapped_pool\n");
2762
2763         if( poolsize > 65535 )
2764                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2765
2766         if( argc > 5 )
2767                 segsize = atoi(argv[5]);
2768         else
2769                 segsize = 4;    // 16 pages per mmap segment
2770
2771         if( argc > 6 )
2772                 num = atoi(argv[6]);
2773
2774         cnt = argc - 7;
2775 #ifdef unix
2776         threads = malloc (cnt * sizeof(pthread_t));
2777 #else
2778         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2779 #endif
2780         args = malloc (cnt * sizeof(ThreadArg));
2781
2782         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2783
2784         if( !mgr ) {
2785                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2786                 exit (1);
2787         }
2788
2789         //      fire off threads
2790
2791         for( idx = 0; idx < cnt; idx++ ) {
2792                 args[idx].infile = argv[idx + 7];
2793                 args[idx].type = argv[2][0];
2794                 args[idx].mgr = mgr;
2795                 args[idx].num = num;
2796                 args[idx].idx = idx;
2797 #ifdef unix
2798                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2799                         fprintf(stderr, "Error creating thread %d\n", err);
2800 #else
2801                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2802 #endif
2803         }
2804
2805         //      wait for termination
2806
2807 #ifdef unix
2808         for( idx = 0; idx < cnt; idx++ )
2809                 pthread_join (threads[idx], NULL);
2810         gettimeofday(&stop, NULL);
2811         real_time = 1000.0 * ( stop.tv_sec - start.tv_sec ) + 0.001 * (stop.tv_usec - start.tv_usec );
2812 #else
2813         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2814
2815         for( idx = 0; idx < cnt; idx++ )
2816                 CloseHandle(threads[idx]);
2817
2818         time (stop);
2819         real_time = 1000 * (*stop - *start);
2820 #endif
2821         fprintf(stderr, " Time to complete: %.2f seconds\n", real_time/1000);
2822         bt_mgrclose (mgr);
2823 }
2824
2825 #endif  //STANDALONE