]> pd.if.org Git - btree/blob - fosterbtreed.c
Merge branch 'master' of https://github.com/malbrain/Btree-source-code
[btree] / fosterbtreed.c
1 // foster btree version d
2 // 24 DEC 2013
3
4 // author: karl malbrain, malbrain@cal.berkeley.edu
5
6 /*
7 This work, including the source code, documentation
8 and related data, is placed into the public domain.
9
10 The orginal author is Karl Malbrain.
11
12 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
13 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
14 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
15 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
16 RESULTING FROM THE USE, MODIFICATION, OR
17 REDISTRIBUTION OF THIS SOFTWARE.
18 */
19
20 // Please see the project home page for documentation
21 // code.google.com/p/high-concurrency-btree
22
23 #define _FILE_OFFSET_BITS 64
24 #define _LARGEFILE64_SOURCE
25
26 #ifdef linux
27 #define _GNU_SOURCE
28 #endif
29
30 #ifdef unix
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <fcntl.h>
35 #include <sys/time.h>
36 #include <sys/mman.h>
37 #include <errno.h>
38 #include <pthread.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #include <process.h>
47 #include <intrin.h>
48 #endif
49
50 #include <memory.h>
51 #include <string.h>
52
53 typedef unsigned long long      uid;
54
55 #ifndef unix
56 typedef unsigned long long      off64_t;
57 typedef unsigned short          ushort;
58 typedef unsigned int            uint;
59 #endif
60
61 #define BT_ro 0x6f72    // ro
62 #define BT_rw 0x7772    // rw
63
64 #define BT_maxbits              24                                      // maximum page size in bits
65 #define BT_minbits              9                                       // minimum page size in bits
66 #define BT_minpage              (1 << BT_minbits)       // minimum page size
67 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
68
69 /*
70 There are five lock types for each node in three independent sets: 
71 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
72 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
73 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
74 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
75 5. (set 3) ParentLock: Exclusive. Have parent adopt/delete maximum foster child from the node.
76 */
77
78 typedef enum{
79         BtLockAccess,
80         BtLockDelete,
81         BtLockRead,
82         BtLockWrite,
83         BtLockParent
84 }BtLock;
85
86 //      Define the length of the page and key pointers
87
88 #define BtId 6
89
90 //      Page key slot definition.
91
92 //      If BT_maxbits is 15 or less, you can save 4 bytes
93 //      for each key stored by making the first two uints
94 //      into ushorts.  You can also save 4 bytes by removing
95 //      the tod field from the key.
96
97 //      Keys are marked dead, but remain on the page until
98 //      it cleanup is called. The fence key (highest key) for
99 //      the page is always present, even after cleanup.
100
101 typedef struct {
102         uint off:BT_maxbits;            // page offset for key start
103         uint dead:1;                            // set for deleted key
104         uint tod;                                       // time-stamp for key
105         unsigned char id[BtId];         // id associated with key
106 } BtSlot;
107
108 //      The key structure occupies space at the upper end of
109 //      each page.  It's a length byte followed by the value
110 //      bytes.
111
112 typedef struct {
113         unsigned char len;
114         unsigned char key[1];
115 } *BtKey;
116
117 //      The first part of an index page.
118 //      It is immediately followed
119 //      by the BtSlot array of keys.
120
121 typedef struct Page {
122         uint cnt;                                       // count of keys in page
123         uint act;                                       // count of active keys
124         uint min;                                       // next key offset
125         uint foster;                            // count of foster children
126         unsigned char bits;                     // page size in bits
127         unsigned char lvl:6;            // level of page
128         unsigned char kill:1;           // page is being deleted
129         unsigned char dirty:1;          // page needs to be cleaned
130         unsigned char right[BtId];      // page number to right
131 } *BtPage;
132
133 //      latch table lock structure
134
135 typedef struct {
136 #ifdef unix
137         pthread_rwlock_t lock[1];
138 #else
139         SRWLOCK srw[1];
140 #endif
141 } BtLatch;
142
143 typedef struct {
144         BtLatch readwr[1];              // read/write page lock
145         BtLatch access[1];              // Access Intent/Page delete
146         BtLatch parent[1];              // adoption of foster children
147 } BtLatchSet;
148
149 //      The memory mapping pool table buffer manager entry
150
151 typedef struct {
152         unsigned long long int lru;     // number of times accessed
153         uid  basepage;                          // mapped base page number
154         char *map;                                      // mapped memory pointer
155         uint pin;                                       // mapped page pin counter
156         uint slot;                                      // slot index in this array
157         void *hashprev;                         // previous pool entry for the same hash idx
158         void *hashnext;                         // next pool entry for the same hash idx
159 #ifndef unix
160         HANDLE hmap;
161 #endif
162 //      array of page latch sets, one for each page in map segment
163         BtLatchSet pagelatch[0];
164 } BtPool;
165
166 //      The object structure for Btree access
167
168 typedef struct {
169         uint page_size;                         // page size    
170         uint page_bits;                         // page size in bits    
171         uint seg_bits;                          // seg size in pages in bits
172         uint mode;                                      // read-write mode
173 #ifdef unix
174         int idx;
175         char *pooladvise;                       // bit maps for pool page advisements
176 #else
177         HANDLE idx;
178 #endif
179         uint poolcnt;                           // highest page pool node in use
180         uint poolmax;                           // highest page pool node allocated
181         uint poolmask;                          // total size of pages in mmap segment - 1
182         uint hashsize;                          // size of Hash Table for pool entries
183         volatile uint evicted;          // last evicted hash table slot
184         ushort *hash;                           // hash table of pool entries
185         BtLatch *latch;                         // latches for hash table slots
186         char *nodes;                            // memory pool page segments
187 } BtMgr;
188
189 typedef struct {
190         BtMgr *mgr;                     // buffer manager for thread
191         BtPage temp;            // temporary frame buffer (memory mapped/file IO)
192         BtPage alloc;           // frame buffer for alloc page ( page 0 )
193         BtPage cursor;          // cached frame for start/next (never mapped)
194         BtPage frame;           // spare frame for the page split (never mapped)
195         BtPage zero;            // page frame for zeroes at end of file
196         BtPage page;            // current page
197         uid page_no;            // current page number  
198         uid cursor_page;        // current cursor page number   
199         unsigned char *mem;     // frame, cursor, page memory buffer
200         int err;                        // last error
201 } BtDb;
202
203 typedef enum {
204         BTERR_ok = 0,
205         BTERR_struct,
206         BTERR_ovflw,
207         BTERR_lock,
208         BTERR_map,
209         BTERR_wrt,
210         BTERR_hash
211 } BTERR;
212
213 // B-Tree functions
214 extern void bt_close (BtDb *bt);
215 extern BtDb *bt_open (BtMgr *mgr);
216 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod);
217 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
218 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
219 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
220 extern uint bt_nextkey   (BtDb *bt, uint slot);
221
222 //      manager functions
223 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
224 void bt_mgrclose (BtMgr *mgr);
225
226 //  Helper functions to return cursor slot values
227
228 extern BtKey bt_key (BtDb *bt, uint slot);
229 extern uid bt_uid (BtDb *bt, uint slot);
230 extern uint bt_tod (BtDb *bt, uint slot);
231
232 //  BTree page number constants
233 #define ALLOC_page              0
234 #define ROOT_page               1
235 #define LEAF_page               2
236
237 //      Number of levels to create in a new BTree
238
239 #define MIN_lvl                 2
240
241 //  The page is allocated from low and hi ends.
242 //  The key offsets and row-id's are allocated
243 //  from the bottom, while the text of the key
244 //  is allocated from the top.  When the two
245 //  areas meet, the page is split into two.
246
247 //  A key consists of a length byte, two bytes of
248 //  index number (0 - 65534), and up to 253 bytes
249 //  of key value.  Duplicate keys are discarded.
250 //  Associated with each key is a 48 bit row-id.
251
252 //  The b-tree root is always located at page 1.
253 //      The first leaf page of level zero is always
254 //      located on page 2.
255
256 //      When to root page fills, it is split in two and
257 //      the tree height is raised by a new root at page
258 //      one with two keys.
259
260 //      Deleted keys are marked with a dead bit until
261 //      page cleanup The fence key for a node is always
262 //      present, even after deletion and cleanup.
263
264 //  Groups of pages called segments from the btree are
265 //  cached with memory mapping. A hash table is used to keep
266 //  track of the cached segments.  This behaviour is controlled
267 //  by the cache block size parameter to bt_open.
268
269 //  To achieve maximum concurrency one page is locked at a time
270 //  as the tree is traversed to find leaf key in question.
271
272 //      An adoption traversal leaves the parent node locked as the
273 //      tree is traversed to the level in quesiton.
274
275 //  Page 0 is dedicated to lock for new page extensions,
276 //      and chains empty pages together for reuse.
277
278 //      Empty pages are chained together through the ALLOC page and reused.
279
280 //      Access macros to address slot and key values from the page
281
282 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
283 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
284
285 void bt_putid(unsigned char *dest, uid id)
286 {
287 int i = BtId;
288
289         while( i-- )
290                 dest[i] = (unsigned char)id, id >>= 8;
291 }
292
293 uid bt_getid(unsigned char *src)
294 {
295 uid id = 0;
296 int i;
297
298         for( i = 0; i < BtId; i++ )
299                 id <<= 8, id |= *src++; 
300
301         return id;
302 }
303
304 void bt_mgrclose (BtMgr *mgr)
305 {
306 BtPool *pool;
307 uint slot;
308
309         // release mapped pages
310         //      note that slot zero is never used
311
312         for( slot = 1; slot < mgr->poolmax; slot++ ) {
313                 pool = (BtPool *)(mgr->nodes + slot * (sizeof(BtPool) + (mgr->poolmask + 1) * sizeof(BtLatchSet)));
314                 if( pool->slot )
315 #ifdef unix
316                         munmap (pool->map, (mgr->poolmask+1) << mgr->page_bits);
317 #else
318                 {
319                         FlushViewOfFile(pool->map, 0);
320                         UnmapViewOfFile(pool->map);
321                         CloseHandle(pool->hmap);
322                 }
323 #endif
324         }
325
326 #ifdef unix
327         close (mgr->idx);
328         free (mgr->nodes);
329         free (mgr->hash);
330         free (mgr->latch);
331         free (mgr->pooladvise);
332         free (mgr);
333 #else
334         FlushFileBuffers(mgr->idx);
335         CloseHandle(mgr->idx);
336         GlobalFree (mgr->nodes);
337         GlobalFree (mgr->hash);
338         GlobalFree (mgr->latch);
339         GlobalFree (mgr);
340 #endif
341 }
342
343 //      close and release memory
344
345 void bt_close (BtDb *bt)
346 {
347 #ifdef unix
348         if ( bt->mem )
349                 free (bt->mem);
350 #else
351         if ( bt->mem)
352                 VirtualFree (bt->mem, 0, MEM_RELEASE);
353 #endif
354         free (bt);
355 }
356
357 //  open/create new btree buffer manager
358
359 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
360 //              size of mapped page pool (e.g. 8192)
361
362 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
363 {
364 uint lvl, attr, cacheblk, last, slot, idx;
365 BtPage alloc;
366 int lockmode;
367 off64_t size;
368 uint amt[1];
369 BtMgr* mgr;
370 BtKey key;
371
372 #ifdef unix
373 pthread_rwlockattr_t rwattr[1];
374 #else
375 SYSTEM_INFO sysinfo[1];
376 #endif
377
378         // determine sanity of page size and buffer pool
379
380         if( bits > BT_maxbits )
381                 bits = BT_maxbits;
382         else if( bits < BT_minbits )
383                 bits = BT_minbits;
384
385         if( !poolmax )
386                 return NULL;    // must have buffer pool
387
388 #ifdef unix
389         mgr = calloc (1, sizeof(BtMgr));
390
391         switch (mode & 0x7fff)
392         {
393         case BT_rw:
394                 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
395                 lockmode = 1;
396                 break;
397
398         case BT_ro:
399         default:
400                 mgr->idx = open ((char*)name, O_RDONLY);
401                 lockmode = 0;
402                 break;
403         }
404         if( mgr->idx == -1 )
405                 return free(mgr), NULL;
406         
407         cacheblk = 4096;        // minimum mmap segment size for unix
408
409 #else
410         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
411         attr = FILE_ATTRIBUTE_NORMAL;
412         switch (mode & 0x7fff)
413         {
414         case BT_rw:
415                 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
416                 lockmode = 1;
417                 break;
418
419         case BT_ro:
420         default:
421                 mgr->idx = CreateFile(name, GENERIC_READ, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_EXISTING, attr, NULL);
422                 lockmode = 0;
423                 break;
424         }
425         if( mgr->idx == INVALID_HANDLE_VALUE )
426                 return GlobalFree(mgr), NULL;
427
428         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
429         GetSystemInfo(sysinfo);
430         cacheblk = sysinfo->dwAllocationGranularity;
431 #endif
432
433 #ifdef unix
434         alloc = malloc (BT_maxpage);
435         *amt = 0;
436
437         // read minimum page size to get root info
438
439         if( size = lseek (mgr->idx, 0L, 2) ) {
440                 if( pread(mgr->idx, alloc, BT_minpage, 0) == BT_minpage )
441                         bits = alloc->bits;
442                 else
443                         return free(mgr), free(alloc), NULL;
444         } else if( mode == BT_ro )
445                 return bt_mgrclose (mgr), NULL;
446 #else
447         alloc = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
448         size = GetFileSize(mgr->idx, amt);
449
450         if( size || *amt ) {
451                 if( !ReadFile(mgr->idx, (char *)alloc, BT_minpage, amt, NULL) )
452                         return bt_mgrclose (mgr), NULL;
453                 bits = alloc->bits;
454         } else if( mode == BT_ro )
455                 return bt_mgrclose (mgr), NULL;
456 #endif
457
458         mgr->page_size = 1 << bits;
459         mgr->page_bits = bits;
460
461         mgr->poolmax = poolmax;
462         mgr->mode = mode;
463
464         if( cacheblk < mgr->page_size )
465                 cacheblk = mgr->page_size;
466
467         //  mask for partial memmaps
468
469         mgr->poolmask = (cacheblk >> bits) - 1;
470
471         //      see if requested size of pages per memmap is greater
472
473         if( (1 << segsize) > mgr->poolmask )
474                 mgr->poolmask = (1 << segsize) - 1;
475
476         mgr->seg_bits = 0;
477
478         while( (1 << mgr->seg_bits) <= mgr->poolmask )
479                 mgr->seg_bits++;
480
481         mgr->hashsize = hashsize;
482
483 #ifdef unix
484         mgr->nodes = calloc (poolmax, (sizeof(BtPool) + (mgr->poolmask + 1) * sizeof(BtLatchSet)));
485         mgr->hash = calloc (hashsize, sizeof(ushort));
486         mgr->latch = calloc (hashsize, sizeof(BtLatch));
487         mgr->pooladvise = calloc (poolmax, (mgr->poolmask + 8) / 8);
488 #else
489         mgr->nodes = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * (sizeof(BtPool) + (mgr->poolmask + 1) * sizeof(BtLatchSet)));
490         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
491         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtLatch));
492 #endif
493
494 #ifdef unix
495         pthread_rwlockattr_init (rwattr);
496         pthread_rwlockattr_setkind_np (rwattr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
497 #endif
498
499         //  initialize buffer pool mgr latches
500
501         for( slot = 0; slot < hashsize; slot++ ) {
502 #ifdef unix
503                 pthread_rwlock_init (mgr->latch[slot].lock, rwattr);
504 #else
505                 InitializeSRWLock (mgr->latch[slot].srw);
506 #endif
507         }
508
509         //  initialize buffer pool page latches
510 #ifdef unix
511 //      pthread_rwlockattr_setpshared (rwattr, PTHREAD_PROCESS_SHARED);
512 #endif
513         for( slot = 1; slot < poolmax; slot++ ) {
514           BtLatchSet *latchset = (BtLatchSet *)(mgr->nodes + slot * (sizeof(BtPool) + (mgr->poolmask + 1) * sizeof(BtLatchSet)) + sizeof(BtPool));
515           for( idx = 0; idx < mgr->poolmask + 1; idx++ ) {
516 #ifdef  unix
517                 pthread_rwlock_init (latchset[idx].readwr->lock, rwattr);
518                 pthread_rwlock_init (latchset[idx].access->lock, rwattr);
519                 pthread_rwlock_init (latchset[idx].parent->lock, rwattr);
520 #else
521                 InitializeSRWLock (latchset[idx].readwr->srw);
522                 InitializeSRWLock (latchset[idx].access->srw);
523                 InitializeSRWLock (latchset[idx].parent->srw);
524 #endif
525           }
526         }
527
528         if( size || *amt )
529                 goto mgrxit;
530
531         // initializes an empty b-tree with root page and page of leaves
532
533         memset (alloc, 0, 1 << bits);
534         bt_putid(alloc->right, MIN_lvl+1);
535         alloc->bits = mgr->page_bits;
536
537 #ifdef unix
538         if( write (mgr->idx, alloc, mgr->page_size) < mgr->page_size )
539                 return bt_mgrclose (mgr), NULL;
540 #else
541         if( !WriteFile (mgr->idx, (char *)alloc, mgr->page_size, amt, NULL) )
542                 return bt_mgrclose (mgr), NULL;
543
544         if( *amt < mgr->page_size )
545                 return bt_mgrclose (mgr), NULL;
546 #endif
547
548         memset (alloc, 0, 1 << bits);
549         alloc->bits = mgr->page_bits;
550
551         for( lvl=MIN_lvl; lvl--; ) {
552                 slotptr(alloc, 1)->off = mgr->page_size - 3;
553                 bt_putid(slotptr(alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);           // next(lower) page number
554                 key = keyptr(alloc, 1);
555                 key->len = 2;                   // create stopper key
556                 key->key[0] = 0xff;
557                 key->key[1] = 0xff;
558                 alloc->min = mgr->page_size - 3;
559                 alloc->lvl = lvl;
560                 alloc->cnt = 1;
561                 alloc->act = 1;
562 #ifdef unix
563                 if( write (mgr->idx, alloc, mgr->page_size) < mgr->page_size )
564                         return bt_mgrclose (mgr), NULL;
565 #else
566                 if( !WriteFile (mgr->idx, (char *)alloc, mgr->page_size, amt, NULL) )
567                         return bt_mgrclose (mgr), NULL;
568
569                 if( *amt < mgr->page_size )
570                         return bt_mgrclose (mgr), NULL;
571 #endif
572         }
573
574         // create empty page area by writing last page of first
575         // segment area (other pages are zeroed by O/S)
576
577         if( mgr->poolmask ) {
578                 memset(alloc, 0, mgr->page_size);
579                 last = mgr->poolmask;
580
581                 while( last < MIN_lvl + 1 )
582                         last += mgr->poolmask + 1;
583
584 #ifdef unix
585                 pwrite(mgr->idx, alloc, mgr->page_size, last << mgr->page_bits);
586 #else
587                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
588                 if( !WriteFile (mgr->idx, (char *)alloc, mgr->page_size, amt, NULL) )
589                         return bt_mgrclose (mgr), NULL;
590                 if( *amt < mgr->page_size )
591                         return bt_mgrclose (mgr), NULL;
592 #endif
593         }
594
595 mgrxit:
596 #ifdef unix
597         free (alloc);
598 #else
599         VirtualFree (alloc, 0, MEM_RELEASE);
600 #endif
601         return mgr;
602 }
603
604 //      open BTree access method
605 //      based on buffer manager
606
607 BtDb *bt_open (BtMgr *mgr)
608 {
609 BtDb *bt = malloc (sizeof(*bt));
610
611         memset (bt, 0, sizeof(*bt));
612         bt->mgr = mgr;
613 #ifdef unix
614         bt->mem = malloc (3 *mgr->page_size);
615 #else
616         bt->mem = VirtualAlloc(NULL, 3 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
617 #endif
618         bt->frame = (BtPage)bt->mem;
619         bt->zero = (BtPage)(bt->mem + 1 * mgr->page_size);
620         bt->cursor = (BtPage)(bt->mem + 2 * mgr->page_size);
621         return bt;
622 }
623
624 //  compare two keys, returning > 0, = 0, or < 0
625 //  as the comparison value
626
627 int keycmp (BtKey key1, unsigned char *key2, uint len2)
628 {
629 uint len1 = key1->len;
630 int ans;
631
632         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
633                 return ans;
634
635         if( len1 > len2 )
636                 return 1;
637         if( len1 < len2 )
638                 return -1;
639
640         return 0;
641 }
642
643 //      Latch Manager
644
645 void bt_readlock(BtLatch *latch)
646 {
647 #ifdef unix
648         pthread_rwlock_rdlock (latch->lock);
649 #else
650         AcquireSRWLockShared (latch->srw);
651 #endif
652 }
653
654 //      wait for other read and write latches to relinquish
655
656 void bt_writelock(BtLatch *latch)
657 {
658 #ifdef unix
659         pthread_rwlock_wrlock (latch->lock);
660 #else
661         AcquireSRWLockExclusive (latch->srw);
662 #endif
663 }
664
665 //      try to obtain write lock
666
667 //      return 1 if obtained,
668 //              0 if already write or read locked
669
670 int bt_writetry(BtLatch *latch)
671 {
672 int result = 0;
673
674 #ifdef unix
675         result = !pthread_rwlock_trywrlock (latch->lock);
676 #else
677         result = TryAcquireSRWLockExclusive (latch->srw);
678 #endif
679         return result;
680 }
681
682 //      clear write mode
683
684 void bt_releasewrite(BtLatch *latch)
685 {
686 #ifdef unix
687         pthread_rwlock_unlock (latch->lock);
688 #else
689         ReleaseSRWLockExclusive (latch->srw);
690 #endif
691 }
692
693 //      decrement reader count
694
695 void bt_releaseread(BtLatch *latch)
696 {
697 #ifdef unix
698         pthread_rwlock_unlock (latch->lock);
699 #else
700         ReleaseSRWLockShared (latch->srw);
701 #endif
702 }
703
704 //      Buffer Pool mgr
705
706 // find segment in pool
707 // must be called with hashslot idx locked
708 //      return NULL if not there
709 //      otherwise return node
710
711 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
712 {
713 BtPool *pool;
714 uint slot;
715
716         // compute start of hash chain in pool
717
718         if( slot = bt->mgr->hash[idx] ) 
719                 pool = (BtPool *)(bt->mgr->nodes + slot * (sizeof(BtPool) + (bt->mgr->poolmask + 1) * sizeof(BtLatchSet)));
720         else
721                 return NULL;
722
723         page_no &= ~bt->mgr->poolmask;
724
725         while( pool->basepage != page_no )
726           if( pool = pool->hashnext )
727                 continue;
728           else
729                 return NULL;
730
731         return pool;
732 }
733
734 // add segment to hash table
735
736 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
737 {
738 BtPool *node;
739 uint slot;
740
741         pool->hashprev = pool->hashnext = NULL;
742         pool->basepage = page_no & ~bt->mgr->poolmask;
743         pool->lru = 1;
744
745         if( slot = bt->mgr->hash[idx] ) {
746                 node = (BtPool *)(bt->mgr->nodes + slot * (sizeof(BtPool) + (bt->mgr->poolmask + 1) * sizeof(BtLatchSet)));
747                 pool->hashnext = node;
748                 node->hashprev = pool;
749         }
750
751         bt->mgr->hash[idx] = pool->slot;
752 }
753
754 //      find best segment to evict from buffer pool
755
756 BtPool *bt_findlru (BtDb *bt, uint hashslot)
757 {
758 unsigned long long int target = ~0LL;
759 BtPool *pool = NULL, *node;
760
761         if( !hashslot )
762                 return NULL;
763
764         node = (BtPool *)(bt->mgr->nodes + hashslot * (sizeof(BtPool) + (bt->mgr->poolmask + 1) * sizeof(BtLatchSet)));
765
766         //      scan pool entries under hash table slot
767
768         do {
769           if( node->pin )
770                 continue;
771           if( node->lru > target )
772                 continue;
773           target = node->lru;
774           pool = node;
775         } while( node = node->hashnext );
776
777         return pool;
778 }
779
780 //  map new buffer pool segment to virtual memory
781
782 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
783 {
784 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
785 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
786 int flag;
787
788 #ifdef unix
789         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
790         pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
791         if( pool->map == MAP_FAILED )
792                 return bt->err = BTERR_map;
793         // clear out madvise issued bits
794         memset (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8) / 8), 0, (bt->mgr->poolmask + 8)/8);
795 #else
796         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
797         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
798         if( !pool->hmap )
799                 return bt->err = BTERR_map;
800
801         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
802         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
803         if( !pool->map )
804                 return bt->err = BTERR_map;
805 #endif
806         return bt->err = 0;
807 }
808
809 //      find or place requested page in segment-pool
810 //      return pool table entry, incrementing pin
811
812 BtPool *bt_pinpage(BtDb *bt, uid page_no)
813 {
814 BtPool *pool, *node, *next;
815 uint slot, idx, victim;
816 BtLatchSet *set;
817
818         //      lock hash table chain
819
820         idx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
821         bt_readlock (&bt->mgr->latch[idx]);
822
823         //      look up in hash table
824
825         if( pool = bt_findpool(bt, page_no, idx) ) {
826 #ifdef unix
827                 __sync_fetch_and_add(&pool->pin, 1);
828 #else
829                 _InterlockedIncrement (&pool->pin);
830 #endif
831                 bt_releaseread (&bt->mgr->latch[idx]);
832                 pool->lru++;
833                 return pool;
834         }
835
836         //      upgrade to write lock
837
838         bt_releaseread (&bt->mgr->latch[idx]);
839         bt_writelock (&bt->mgr->latch[idx]);
840
841         // try to find page in pool with write lock
842
843         if( pool = bt_findpool(bt, page_no, idx) ) {
844 #ifdef unix
845                 __sync_fetch_and_add(&pool->pin, 1);
846 #else
847                 _InterlockedIncrement (&pool->pin);
848 #endif
849                 bt_releasewrite (&bt->mgr->latch[idx]);
850                 pool->lru++;
851                 return pool;
852         }
853
854         // allocate a new pool node
855         // and add to hash table
856
857 #ifdef unix
858         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
859 #else
860         slot = _InterlockedIncrement (&bt->mgr->poolcnt) - 1;
861 #endif
862
863         if( ++slot < bt->mgr->poolmax ) {
864                 pool = (BtPool *)(bt->mgr->nodes + slot * (sizeof(BtPool) + (bt->mgr->poolmask + 1) * sizeof(BtLatchSet)));
865                 pool->slot = slot;
866
867                 if( bt_mapsegment(bt, pool, page_no) )
868                         return NULL;
869
870                 bt_linkhash(bt, pool, page_no, idx);
871 #ifdef unix
872                 __sync_fetch_and_add(&pool->pin, 1);
873 #else
874                 _InterlockedIncrement (&pool->pin);
875 #endif
876                 bt_releasewrite (&bt->mgr->latch[idx]);
877                 return pool;
878         }
879
880         // pool table is full
881         //      find best pool entry to evict
882
883 #ifdef unix
884         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
885 #else
886         _InterlockedDecrement (&bt->mgr->poolcnt);
887 #endif
888
889         while( 1 ) {
890 #ifdef unix
891                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
892 #else
893                 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
894 #endif
895                 victim %= bt->mgr->hashsize;
896
897                 // try to get write lock
898                 //      skip entry if not obtained
899
900                 if( !bt_writetry (&bt->mgr->latch[victim]) )
901                         continue;
902
903                 //  if cache entry is empty
904                 //      or no slots are unpinned
905                 //      skip this entry
906
907                 if( !(pool = bt_findlru(bt, bt->mgr->hash[victim])) ) {
908                         bt_releasewrite (&bt->mgr->latch[victim]);
909                         continue;
910                 }
911
912                 // unlink victim pool node from hash table
913
914                 if( node = pool->hashprev )
915                         node->hashnext = pool->hashnext;
916                 else if( node = pool->hashnext )
917                         bt->mgr->hash[victim] = node->slot;
918                 else
919                         bt->mgr->hash[victim] = 0;
920
921                 if( node = pool->hashnext )
922                         node->hashprev = pool->hashprev;
923
924                 bt_releasewrite (&bt->mgr->latch[victim]);
925
926                 //      remove old file mapping
927 #ifdef unix
928                 munmap (pool->map, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
929 #else
930                 FlushViewOfFile(pool->map, 0);
931                 UnmapViewOfFile(pool->map);
932                 CloseHandle(pool->hmap);
933 #endif
934                 pool->map = NULL;
935
936                 //  create new pool mapping
937                 //  and link into hash table
938
939                 if( bt_mapsegment(bt, pool, page_no) )
940                         return NULL;
941
942                 bt_linkhash(bt, pool, page_no, idx);
943 #ifdef unix
944                 __sync_fetch_and_add(&pool->pin, 1);
945 #else
946                 _InterlockedIncrement (&pool->pin);
947 #endif
948                 bt_releasewrite (&bt->mgr->latch[idx]);
949                 return pool;
950         }
951 }
952
953 // place write, read, or parent lock on requested page_no.
954 //      pin to buffer pool and return page pointer
955
956 BTERR bt_lockpage(BtDb *bt, uid page_no, BtLock mode, BtPage *pageptr)
957 {
958 BtLatchSet *set;
959 BtPool *pool;
960 uint subpage;
961 BtPage page;
962
963         //      find/create maping in pool table
964         //        and pin our pool slot
965
966         if( pool = bt_pinpage(bt, page_no) )
967                 subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
968         else
969                 return bt->err;
970
971         set = pool->pagelatch + subpage;
972         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
973 #ifdef unix
974         {
975         uint idx = subpage / 8;
976         uint bit = subpage % 8;
977
978                 if( ~((bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] >> bit) & 1 ) {
979                   madvise (page, bt->mgr->page_size, MADV_WILLNEED);
980                   (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] |= 1 << bit;
981                 }
982         }
983 #endif
984
985         switch( mode ) {
986         case BtLockRead:
987                 bt_readlock (set->readwr);
988                 break;
989         case BtLockWrite:
990                 bt_writelock (set->readwr);
991                 break;
992         case BtLockAccess:
993                 bt_readlock (set->access);
994                 break;
995         case BtLockDelete:
996                 bt_writelock (set->access);
997                 break;
998         case BtLockParent:
999                 bt_writelock (set->parent);
1000                 break;
1001         default:
1002                 return bt->err = BTERR_lock;
1003         }
1004
1005         if( pageptr )
1006                 *pageptr = page;
1007
1008         return bt->err = 0;
1009 }
1010
1011 // remove write, read, or parent lock on requested page_no.
1012
1013 BTERR bt_unlockpage(BtDb *bt, uid page_no, BtLock mode)
1014 {
1015 uint subpage, idx;
1016 BtLatchSet *set;
1017 BtPool *pool;
1018
1019         //      since page is pinned
1020         //      it should still be in the buffer pool
1021         //      and is in no danger of being a victim for reuse
1022
1023         idx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1024         bt_readlock (&bt->mgr->latch[idx]);
1025
1026         if( pool = bt_findpool(bt, page_no, idx) )
1027                 subpage = (uint)(page_no & bt->mgr->poolmask);
1028         else
1029                 return bt->err = BTERR_hash;
1030
1031         bt_releaseread (&bt->mgr->latch[idx]);
1032         set = pool->pagelatch + subpage;
1033
1034         switch( mode ) {
1035         case BtLockRead:
1036                 bt_releaseread (set->readwr);
1037                 break;
1038         case BtLockWrite:
1039                 bt_releasewrite (set->readwr);
1040                 break;
1041         case BtLockAccess:
1042                 bt_releaseread (set->access);
1043                 break;
1044         case BtLockDelete:
1045                 bt_releasewrite (set->access);
1046                 break;
1047         case BtLockParent:
1048                 bt_releasewrite (set->parent);
1049                 break;
1050         default:
1051                 return bt->err = BTERR_lock;
1052         }
1053
1054 #ifdef  unix
1055         __sync_fetch_and_add(&pool->pin, -1);
1056 #else
1057         _InterlockedDecrement (&pool->pin);
1058 #endif
1059         return bt->err = 0;
1060 }
1061
1062 //      deallocate a deleted page
1063 //      place on free chain out of allocator page
1064
1065 BTERR bt_freepage(BtDb *bt, uid page_no)
1066 {
1067         //  obtain delete lock on deleted page
1068
1069         if( bt_lockpage(bt, page_no, BtLockDelete, NULL) )
1070                 return bt->err;
1071
1072         //  obtain write lock on deleted page
1073
1074         if( bt_lockpage(bt, page_no, BtLockWrite, &bt->temp) )
1075                 return bt->err;
1076
1077         //      lock allocation page
1078
1079         if ( bt_lockpage(bt, ALLOC_page, BtLockWrite, &bt->alloc) )
1080                 return bt->err;
1081
1082         //      store chain in second right
1083         bt_putid(bt->temp->right, bt_getid(bt->alloc[1].right));
1084         bt_putid(bt->alloc[1].right, page_no);
1085
1086         // unlock page zero 
1087
1088         if( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
1089                 return bt->err;
1090
1091         //  remove write lock on deleted node
1092
1093         if( bt_unlockpage(bt, page_no, BtLockWrite) )
1094                 return bt->err;
1095
1096         //  remove delete lock on deleted node
1097
1098         if( bt_unlockpage(bt, page_no, BtLockDelete) )
1099                 return bt->err;
1100
1101         return 0;
1102 }
1103
1104 //      allocate a new page and write page into it
1105
1106 uid bt_newpage(BtDb *bt, BtPage page)
1107 {
1108 uid new_page;
1109 BtPage pmap;
1110 int reuse;
1111
1112         // lock page zero
1113
1114         if ( bt_lockpage(bt, ALLOC_page, BtLockWrite, &bt->alloc) )
1115                 return 0;
1116
1117         // use empty chain first
1118         // else allocate empty page
1119
1120         if( new_page = bt_getid(bt->alloc[1].right) ) {
1121                 if( bt_lockpage (bt, new_page, BtLockWrite, &bt->temp) )
1122                         return 0;
1123                 bt_putid(bt->alloc[1].right, bt_getid(bt->temp->right));
1124                 if( bt_unlockpage (bt, new_page, BtLockWrite) )
1125                         return 0;
1126                 reuse = 1;
1127         } else {
1128                 new_page = bt_getid(bt->alloc->right);
1129                 bt_putid(bt->alloc->right, new_page+1);
1130                 reuse = 0;
1131         }
1132 #ifdef unix
1133         if ( pwrite(bt->mgr->idx, page, bt->mgr->page_size, new_page << bt->mgr->page_bits) < bt->mgr->page_size )
1134                 return bt->err = BTERR_wrt, 0;
1135
1136         // if writing first page of pool block, zero last page in the block
1137
1138         if ( !reuse && bt->mgr->poolmask > 0 && (new_page & bt->mgr->poolmask) == 0 )
1139         {
1140                 // use zero buffer to write zeros
1141                 memset(bt->zero, 0, bt->mgr->page_size);
1142                 if ( pwrite(bt->mgr->idx,bt->zero, bt->mgr->page_size, (new_page | bt->mgr->poolmask) << bt->mgr->page_bits) < bt->mgr->page_size )
1143                         return bt->err = BTERR_wrt, 0;
1144         }
1145 #else
1146         //      bring new page into pool and copy page.
1147         //      this will extend the file into the new pages.
1148
1149         if( bt_lockpage(bt, new_page, BtLockWrite, &pmap) )
1150                 return 0;
1151
1152         memcpy(pmap, page, bt->mgr->page_size);
1153
1154         if( bt_unlockpage (bt, new_page, BtLockWrite) )
1155                 return 0;
1156 #endif
1157         // unlock page zero 
1158
1159         if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
1160                 return 0;
1161
1162         return new_page;
1163 }
1164
1165 //  find slot in page for given key at a given level
1166
1167 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1168 {
1169 uint diff, higher = bt->page->cnt, low = 1, slot;
1170
1171         //      low is the lowest candidate, higher is already
1172         //      tested as .ge. the given key, loop ends when they meet
1173
1174         while( diff = higher - low ) {
1175                 slot = low + ( diff >> 1 );
1176                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1177                         low = slot + 1;
1178                 else
1179                         higher = slot;
1180         }
1181
1182         return higher;
1183 }
1184
1185 //  find and load page at given level for given key
1186 //      leave page rd or wr locked as requested
1187
1188 int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
1189 {
1190 uid page_no = ROOT_page, prevpage = 0;
1191 uint drill = 0xff, slot;
1192 uint mode, prevmode;
1193
1194   //  start at root of btree and drill down
1195
1196   do {
1197         // determine lock mode of drill level
1198         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1199
1200         bt->page_no = page_no;
1201
1202         // obtain access lock using lock chaining with Access mode
1203
1204         if( page_no > ROOT_page )
1205           if( bt_lockpage(bt, page_no, BtLockAccess, NULL) )
1206                 return 0;                                                                       
1207
1208         if( prevpage )
1209           if( bt_unlockpage(bt, prevpage, prevmode) )
1210                 return 0;
1211
1212         // obtain read lock using lock chaining
1213         // and pin page contents
1214
1215         if( bt_lockpage(bt, page_no, mode, &bt->page) )
1216                 return 0;                                                                       
1217
1218         if( page_no > ROOT_page )
1219           if( bt_unlockpage(bt, page_no, BtLockAccess) )
1220                 return 0;                                                                       
1221
1222         // re-read and re-lock root after determining actual level of root
1223
1224         if( bt->page_no == ROOT_page )
1225           if( bt->page->lvl != drill) {
1226                 drill = bt->page->lvl;
1227
1228             if( lock == BtLockWrite && drill == lvl )
1229                   if( bt_unlockpage(bt, page_no, mode) )
1230                         return 0;
1231                   else
1232                         continue;
1233           }
1234
1235         //      if page is being deleted,
1236         //      move back to preceeding page
1237
1238         if( bt->page->kill ) {
1239                 page_no = bt_getid (bt->page->right);
1240                 continue;
1241         }
1242
1243         //  find key on page at this level
1244         //  and descend to requested level
1245
1246         slot = bt_findslot (bt, key, len);
1247
1248         //      is this slot a foster child?
1249
1250         if( slot <= bt->page->cnt - bt->page->foster )
1251           if( drill == lvl )
1252                 return slot;
1253           else
1254                 drill--;
1255
1256         while( slotptr(bt->page, slot)->dead )
1257           if( slot++ < bt->page->cnt )
1258                 continue;
1259           else
1260                 return bt->err = BTERR_struct, 0;
1261
1262         //  continue down / right using overlapping locks
1263         //  to protect pages being killed or split.
1264
1265         prevmode = mode;
1266         prevpage = bt->page_no;
1267         page_no = bt_getid(slotptr(bt->page, slot)->id);
1268   } while( page_no );
1269
1270   // return error on end of chain
1271
1272   bt->err = BTERR_struct;
1273   return 0;     // return error
1274 }
1275
1276 //  find and delete key on page by marking delete flag bit
1277 //  when page becomes empty, delete it from the btree
1278
1279 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1280 {
1281 unsigned char leftkey[256], rightkey[256];
1282 uid page_no, right;
1283 uint slot, tod;
1284 BtKey ptr;
1285
1286         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1287                 ptr = keyptr(bt->page, slot);
1288         else
1289                 return bt->err;
1290
1291         // if key is found delete it, otherwise ignore request
1292
1293         if( !keycmp (ptr, key, len) )
1294                 if( slotptr(bt->page, slot)->dead == 0 ) {
1295                         slotptr(bt->page,slot)->dead = 1;
1296                         if( slot < bt->page->cnt )
1297                                 bt->page->dirty = 1;
1298                         bt->page->act--;
1299                 }
1300
1301         // return if page is not empty, or it has no right sibling
1302
1303         right = bt_getid(bt->page->right);
1304         page_no = bt->page_no;
1305
1306         if( !right || bt->page->act )
1307                 return bt_unlockpage(bt, page_no, BtLockWrite);
1308
1309         // obtain Parent lock over write lock
1310
1311         if( bt_lockpage(bt, page_no, BtLockParent, NULL) )
1312                 return bt->err;
1313
1314         // cache copy of key to delete
1315
1316         ptr = keyptr(bt->page, bt->page->cnt);
1317         memcpy(leftkey, ptr, ptr->len + 1);
1318
1319         // lock and map right page
1320
1321         if ( bt_lockpage(bt, right, BtLockWrite, &bt->temp) )
1322                 return bt->err;
1323
1324         // pull contents of next page into current empty page 
1325         memcpy (bt->page, bt->temp, bt->mgr->page_size);
1326
1327         //      cache copy of key to update
1328         ptr = keyptr(bt->temp, bt->temp->cnt);
1329         memcpy(rightkey, ptr, ptr->len + 1);
1330
1331         //  Mark right page as deleted and point it to left page
1332         //      until we can post updates at higher level.
1333
1334         bt_putid(bt->temp->right, page_no);
1335         bt->temp->kill = 1;
1336         bt->temp->cnt = 0;
1337
1338         if( bt_unlockpage(bt, right, BtLockWrite) )
1339                 return bt->err;
1340         if( bt_unlockpage(bt, page_no, BtLockWrite) )
1341                 return bt->err;
1342
1343         //  delete old lower key to consolidated node
1344
1345         if( bt_deletekey (bt, leftkey + 1, *leftkey, lvl + 1) )
1346                 return bt->err;
1347
1348         //  redirect higher key directly to consolidated node
1349
1350         if( slot = bt_loadpage (bt, rightkey+1, *rightkey, lvl+1, BtLockWrite) )
1351                 ptr = keyptr(bt->page, slot);
1352         else
1353                 return bt->err;
1354
1355         // since key already exists, update id
1356
1357         if( keycmp (ptr, rightkey+1, *rightkey) )
1358                 return bt->err = BTERR_struct;
1359
1360         slotptr(bt->page, slot)->dead = 0;
1361         bt_putid(slotptr(bt->page,slot)->id, page_no);
1362         bt_unlockpage(bt, bt->page_no, BtLockWrite);
1363
1364         //      obtain write lock and
1365         //      add right block to free chain
1366
1367         if( bt_freepage (bt, right) )
1368                 return bt->err;
1369
1370         //      remove ParentModify lock
1371
1372         if( bt_unlockpage(bt, page_no, BtLockParent) )
1373                 return bt->err;
1374         
1375         return 0;
1376 }
1377
1378 //      find key in leaf level and return row-id
1379
1380 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1381 {
1382 uint  slot;
1383 BtKey ptr;
1384 uid id;
1385
1386         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1387                 ptr = keyptr(bt->page, slot);
1388         else
1389                 return 0;
1390
1391         // if key exists, return row-id
1392         //      otherwise return 0
1393
1394         if( ptr->len == len && !memcmp (ptr->key, key, len) )
1395                 id = bt_getid(slotptr(bt->page,slot)->id);
1396         else
1397                 id = 0;
1398
1399         if ( bt_unlockpage(bt, bt->page_no, BtLockRead) )
1400                 return 0;
1401
1402         return id;
1403 }
1404
1405 //      check page for space available,
1406 //      clean if necessary and return
1407 //      0 - page needs splitting
1408 //      1 - go ahead
1409
1410 uint bt_cleanpage(BtDb *bt, uint amt)
1411 {
1412 uint nxt = bt->mgr->page_size;
1413 BtPage page = bt->page;
1414 uint cnt = 0, idx = 0;
1415 uint max = page->cnt;
1416 BtKey key;
1417
1418         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1419                 return 1;
1420
1421         //      skip cleanup if nothing to reclaim
1422
1423         if( !page->dirty )
1424                 return 0;
1425
1426         memcpy (bt->frame, page, bt->mgr->page_size);
1427
1428         // skip page info and set rest of page to zero
1429
1430         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1431         page->dirty = 0;
1432         page->act = 0;
1433
1434         // try cleaning up page first
1435
1436         while( cnt++ < max ) {
1437                 // always leave fence key and foster children in list
1438                 if( cnt < max - page->foster && slotptr(bt->frame,cnt)->dead )
1439                         continue;
1440
1441                 // copy key
1442                 key = keyptr(bt->frame, cnt);
1443                 nxt -= key->len + 1;
1444                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1445
1446                 // copy slot
1447                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1448                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1449                         page->act++;
1450                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1451                 slotptr(page, idx)->off = nxt;
1452         }
1453
1454         page->min = nxt;
1455         page->cnt = idx;
1456
1457         //      see if page has enough space now, or does it need splitting?
1458
1459         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1460                 return 1;
1461
1462         return 0;
1463 }
1464
1465 //      add key to page
1466 //      return with page unlocked
1467
1468 BTERR bt_addkeytopage (BtDb *bt, uint slot, unsigned char *key, uint len, uid id, uint tod)
1469 {
1470 BtPage page = bt->page;
1471 uint idx;
1472
1473         // calculate next available slot and copy key into page
1474
1475         page->min -= len + 1;
1476         ((unsigned char *)page)[page->min] = len;
1477         memcpy ((unsigned char *)page + page->min +1, key, len );
1478
1479         for( idx = slot; idx < page->cnt; idx++ )
1480           if( slotptr(page, idx)->dead )
1481                 break;
1482
1483         // now insert key into array before slot
1484         // preserving the fence slot
1485
1486         if( idx == page->cnt )
1487                 idx++, page->cnt++;
1488
1489         page->act++;
1490
1491         while( idx > slot )
1492                 *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
1493
1494         bt_putid(slotptr(page,slot)->id, id);
1495         slotptr(page, slot)->off = page->min;
1496         slotptr(page, slot)->tod = tod;
1497         slotptr(page, slot)->dead = 0;
1498
1499         return bt_unlockpage(bt, bt->page_no, BtLockWrite);
1500 }
1501
1502 // split the root and raise the height of the btree
1503
1504 BTERR bt_splitroot(BtDb *bt, uid right)
1505 {
1506 uint nxt = bt->mgr->page_size;
1507 unsigned char fencekey[256];
1508 BtPage root = bt->page;
1509 uid new_page;
1510 BtKey key;
1511
1512         //  Obtain an empty page to use, and copy the left page
1513         //  contents into it from the root.  Strip foster child key.
1514         //      (it's the stopper key)
1515
1516         root->act--;
1517         root->cnt--;
1518         root->foster--;
1519
1520         //      Save left fence key.
1521
1522         key = keyptr(root, root->cnt);
1523         memcpy (fencekey, key, key->len + 1);
1524
1525         //  copy the lower keys into a new left page
1526
1527         if( !(new_page = bt_newpage(bt, root)) )
1528                 return bt->err;
1529
1530         // preserve the page info at the bottom
1531         // and set rest of the root to zero
1532
1533         memset (root+1, 0, bt->mgr->page_size - sizeof(*root));
1534
1535         // insert left fence key on empty newroot page
1536
1537         nxt -= *fencekey + 1;
1538         memcpy ((unsigned char *)root + nxt, fencekey, *fencekey + 1);
1539         bt_putid(slotptr(root, 1)->id, new_page);
1540         slotptr(root, 1)->off = nxt;
1541         
1542         // insert stopper key on newroot page
1543         // and increase the root height
1544
1545         nxt -= 3;
1546         fencekey[0] = 2;
1547         fencekey[1] = 0xff;
1548         fencekey[2] = 0xff;
1549         memcpy ((unsigned char *)root + nxt, fencekey, *fencekey + 1);
1550         bt_putid(slotptr(root, 2)->id, right);
1551         slotptr(root, 2)->off = nxt;
1552
1553         bt_putid(root->right, 0);
1554         root->min = nxt;                // reset lowest used offset and key count
1555         root->cnt = 2;
1556         root->act = 2;
1557         root->lvl++;
1558
1559         // release root (bt->page)
1560
1561         return bt_unlockpage(bt, bt->page_no, BtLockWrite);
1562 }
1563
1564 //  split already locked full node
1565 //      return unlocked.
1566
1567 BTERR bt_splitpage (BtDb *bt)
1568 {
1569 uint slot, cnt, idx, max, nxt = bt->mgr->page_size;
1570 unsigned char fencekey[256];
1571 uid page_no = bt->page_no;
1572 BtPage page = bt->page;
1573 uint tod = time(NULL);
1574 uint lvl = page->lvl;
1575 uid new_page, right;
1576 BtKey key;
1577
1578         //      initialize frame buffer
1579
1580         memset (bt->frame, 0, bt->mgr->page_size);
1581         max = page->cnt - page->foster;
1582         tod = (uint)time(NULL);
1583         cnt = max / 2;
1584         idx = 0;
1585
1586         //  split higher half of keys to bt->frame
1587         //      leaving foster children in the left node.
1588
1589         while( cnt++ < max ) {
1590                 key = keyptr(page, cnt);
1591                 nxt -= key->len + 1;
1592                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1593                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1594                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1595                 slotptr(bt->frame, idx)->off = nxt;
1596                 bt->frame->act++;
1597         }
1598
1599         // transfer right link node
1600
1601         if( page_no > ROOT_page ) {
1602                 right = bt_getid (page->right);
1603                 bt_putid(bt->frame->right, right);
1604         }
1605
1606         bt->frame->bits = bt->mgr->page_bits;
1607         bt->frame->min = nxt;
1608         bt->frame->cnt = idx;
1609         bt->frame->lvl = lvl;
1610
1611         //      get new free page and write frame to it.
1612
1613         if( !(new_page = bt_newpage(bt, bt->frame)) )
1614                 return bt->err;
1615
1616         //      remember fence key for new page to add
1617         //      as foster child
1618
1619         key = keyptr(bt->frame, idx);
1620         memcpy (fencekey, key, key->len + 1);
1621
1622         //      update lower keys and foster children to continue in old page
1623
1624         memcpy (bt->frame, page, bt->mgr->page_size);
1625         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1626         nxt = bt->mgr->page_size;
1627         page->act = 0;
1628         cnt = 0;
1629         idx = 0;
1630
1631         //  assemble page of smaller keys
1632         //      to remain in the old page
1633
1634         while( cnt++ < max / 2 ) {
1635                 key = keyptr(bt->frame, cnt);
1636                 nxt -= key->len + 1;
1637                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1638                 memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1639                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1640                 slotptr(page, idx)->off = nxt;
1641                 page->act++;
1642         }
1643
1644         //      insert new foster child at beginning of the current foster children
1645
1646         nxt -= *fencekey + 1;
1647         memcpy ((unsigned char *)page + nxt, fencekey, *fencekey + 1);
1648         bt_putid (slotptr(page,++idx)->id, new_page);
1649         slotptr(page, idx)->tod = tod;
1650         slotptr(page, idx)->off = nxt;
1651         page->foster++;
1652         page->act++;
1653
1654         //  continue with old foster child keys if any
1655
1656         cnt = bt->frame->cnt - bt->frame->foster;
1657
1658         while( cnt++ < bt->frame->cnt ) {
1659                 key = keyptr(bt->frame, cnt);
1660                 nxt -= key->len + 1;
1661                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1662                 memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1663                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1664                 slotptr(page, idx)->off = nxt;
1665                 page->act++;
1666         }
1667
1668         page->min = nxt;
1669         page->cnt = idx;
1670
1671         //      link new right page
1672
1673         bt_putid (page->right, new_page);
1674
1675         // if current page is the root page, split it
1676
1677         if( page_no == ROOT_page )
1678                 return bt_splitroot (bt, new_page);
1679
1680         //  release wr lock on page
1681
1682         if( bt_unlockpage (bt, page_no, BtLockWrite) )
1683                 return bt->err;
1684
1685         //  obtain ParentModification lock for current page
1686         //      to fix fence key and highest foster child on page
1687
1688         if( bt_lockpage (bt, page_no, BtLockParent, NULL) )
1689                 return bt->err;
1690
1691         //  get our highest foster child key to find in parent node
1692
1693         if( bt_lockpage (bt, page_no, BtLockRead, &page) )
1694                 return bt->err;
1695
1696         key = keyptr(page, page->cnt);
1697         memcpy (fencekey, key, key->len+1);
1698
1699         if( bt_unlockpage (bt, page_no, BtLockRead) )
1700                 return bt->err;
1701
1702 try_again:
1703
1704         do {
1705           slot = bt_loadpage (bt, fencekey + 1, *fencekey, lvl + 1, BtLockWrite);
1706
1707           if( !slot )
1708                 return bt->err;
1709
1710           // check if parent page has enough space for any possible key
1711
1712           if( bt_cleanpage (bt, 256) )
1713                 break;
1714
1715           if( bt_splitpage (bt) )
1716                 return bt->err;
1717         } while( 1 );
1718
1719         //  see if we are still a foster child from another node
1720
1721         if( bt_getid (slotptr(bt->page, slot)->id) != page_no ) {
1722                 bt_unlockpage (bt, bt->page_no, BtLockWrite);
1723 #ifdef  unix
1724                 sched_yield();
1725 #else
1726                 SwitchToThread();
1727 #endif
1728                 goto try_again;
1729         }
1730
1731         //      wait until readers from parent get their locks
1732
1733         if( bt_lockpage (bt, page_no, BtLockDelete, NULL) )
1734                 return bt->err;
1735
1736         if( bt_lockpage (bt, page_no, BtLockWrite, &page) )
1737                 return bt->err;
1738
1739         //      switch parent fence key to foster child
1740
1741         if( slotptr(page, page->cnt)->dead )
1742                 slotptr(bt->page, slot)->dead = 1;
1743         else
1744                 bt_putid (slotptr(bt->page, slot)->id, bt_getid(slotptr(page, page->cnt)->id));
1745
1746         //      remove highest foster child from our page
1747         //      add our new fence key to parent
1748
1749         page->cnt--;
1750         page->act--;
1751         page->foster--;
1752         page->dirty = 1;
1753         key = keyptr(page, page->cnt);
1754
1755         if( bt_addkeytopage (bt, slot, key->key, key->len, page_no, tod) )
1756                 return bt->err;
1757
1758         if( bt_unlockpage (bt, page_no, BtLockDelete) )
1759                 return bt->err;
1760
1761         if( bt_unlockpage (bt, page_no, BtLockWrite) )
1762                 return bt->err;
1763
1764         return bt_unlockpage (bt, page_no, BtLockParent);
1765 }
1766
1767 //  Insert new key into the btree at leaf level.
1768
1769 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod)
1770 {
1771 uint slot, idx;
1772 BtPage page;
1773 BtKey ptr;
1774
1775         while( 1 ) {
1776                 if( slot = bt_loadpage (bt, key, len, 0, BtLockWrite) )
1777                         ptr = keyptr(bt->page, slot);
1778                 else
1779                 {
1780                         if ( !bt->err )
1781                                 bt->err = BTERR_ovflw;
1782                         return bt->err;
1783                 }
1784
1785                 // if key already exists, update id and return
1786
1787                 page = bt->page;
1788
1789                 if( !keycmp (ptr, key, len) ) {
1790                         slotptr(page, slot)->dead = 0;
1791                         slotptr(page, slot)->tod = tod;
1792                         bt_putid(slotptr(page,slot)->id, id);
1793                         return bt_unlockpage(bt, bt->page_no, BtLockWrite);
1794                 }
1795
1796                 // check if page has enough space
1797
1798                 if( bt_cleanpage (bt, len) )
1799                         break;
1800
1801                 if( bt_splitpage (bt) )
1802                         return bt->err;
1803         }
1804
1805         return bt_addkeytopage (bt, slot, key, len, id, tod);
1806 }
1807
1808 //  cache page of keys into cursor and return starting slot for given key
1809
1810 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
1811 {
1812 uint slot;
1813
1814         // cache page for retrieval
1815         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1816                 memcpy (bt->cursor, bt->page, bt->mgr->page_size);
1817         bt->cursor_page = bt->page_no;
1818         if ( bt_unlockpage(bt, bt->page_no, BtLockRead) )
1819                 return 0;
1820
1821         return slot;
1822 }
1823
1824 //  return next slot for cursor page
1825 //  or slide cursor right into next page
1826
1827 uint bt_nextkey (BtDb *bt, uint slot)
1828 {
1829 BtPage page;
1830 uid right;
1831
1832   do {
1833         right = bt_getid(bt->cursor->right);
1834         while( slot++ < bt->cursor->cnt - bt->cursor->foster )
1835           if( slotptr(bt->cursor,slot)->dead )
1836                 continue;
1837           else if( right || (slot < bt->cursor->cnt - bt->cursor->foster) )
1838                 return slot;
1839           else
1840                 break;
1841
1842         if( !right )
1843                 break;
1844
1845         bt->cursor_page = right;
1846
1847     if( bt_lockpage(bt, right, BtLockRead, &page) )
1848                 return 0;
1849
1850         memcpy (bt->cursor, page, bt->mgr->page_size);
1851
1852         if ( bt_unlockpage(bt, right, BtLockRead) )
1853                 return 0;
1854
1855         slot = 0;
1856   } while( 1 );
1857
1858   return bt->err = 0;
1859 }
1860
1861 BtKey bt_key(BtDb *bt, uint slot)
1862 {
1863         return keyptr(bt->cursor, slot);
1864 }
1865
1866 uid bt_uid(BtDb *bt, uint slot)
1867 {
1868         return bt_getid(slotptr(bt->cursor,slot)->id);
1869 }
1870
1871 uint bt_tod(BtDb *bt, uint slot)
1872 {
1873         return slotptr(bt->cursor,slot)->tod;
1874 }
1875
1876
1877 #ifdef STANDALONE
1878
1879 typedef struct {
1880         char type, idx;
1881         char *infile;
1882         BtMgr *mgr;
1883         int num;
1884 } ThreadArg;
1885
1886 //  standalone program to index file of keys
1887 //  then list them onto std-out
1888
1889 #ifdef unix
1890 void *index_file (void *arg)
1891 #else
1892 uint __stdcall index_file (void *arg)
1893 #endif
1894 {
1895 int line = 0, found = 0, cnt = 0;
1896 uid next, page_no = LEAF_page;  // start on first page of leaves
1897 unsigned char key[256];
1898 ThreadArg *args = arg;
1899 int ch, len = 0, slot;
1900 time_t tod[1];
1901 BtPage page;
1902 BtKey ptr;
1903 BtDb *bt;
1904 FILE *in;
1905
1906         bt = bt_open (args->mgr);
1907         time (tod);
1908
1909         switch(args->type | 0x20)
1910         {
1911         case 'w':
1912                 fprintf(stderr, "started indexing for %s\n", args->infile);
1913                 if( in = fopen (args->infile, "rb") )
1914                   while( ch = getc(in), ch != EOF )
1915                         if( ch == '\n' )
1916                         {
1917                           line++;
1918
1919                           if( args->num == 1 )
1920                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
1921
1922                           else if( args->num )
1923                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
1924
1925                           if( bt_insertkey (bt, key, len, line, *tod) )
1926                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
1927                           len = 0;
1928                         }
1929                         else if( len < 255 )
1930                                 key[len++] = ch;
1931                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
1932                 break;
1933
1934         case 'd':
1935                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
1936                 if( in = fopen (args->infile, "rb") )
1937                   while( ch = getc(in), ch != EOF )
1938                         if( ch == '\n' )
1939                         {
1940                           line++;
1941                           if( args->num == 1 )
1942                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
1943
1944                           else if( args->num )
1945                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
1946
1947                           if( bt_deletekey (bt, key, len, 0) )
1948                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
1949                           len = 0;
1950                         }
1951                         else if( len < 255 )
1952                                 key[len++] = ch;
1953                 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
1954                 break;
1955
1956         case 'f':
1957                 fprintf(stderr, "started finding keys for %s\n", args->infile);
1958                 if( in = fopen (args->infile, "rb") )
1959                   while( ch = getc(in), ch != EOF )
1960                         if( ch == '\n' )
1961                         {
1962                           line++;
1963                           if( args->num == 1 )
1964                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
1965
1966                           else if( args->num )
1967                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
1968
1969                           if( bt_findkey (bt, key, len) )
1970                                 found++;
1971                           else if( bt->err )
1972                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
1973                           len = 0;
1974                         }
1975                         else if( len < 255 )
1976                                 key[len++] = ch;
1977                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
1978                 break;
1979
1980         case 's':
1981                 len = key[0] = 0;
1982
1983                 fprintf(stderr, "started reading\n");
1984
1985                 if( slot = bt_startkey (bt, key, len) )
1986                   slot--;
1987                 else
1988                   fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
1989
1990                 while( slot = bt_nextkey (bt, slot) ) {
1991                         ptr = bt_key(bt, slot);
1992                         fwrite (ptr->key, ptr->len, 1, stdout);
1993                         fputc ('\n', stdout);
1994                 }
1995
1996                 break;
1997
1998         case 'c':
1999                 fprintf(stderr, "started reading\n");
2000
2001                 do {
2002                         bt_lockpage (bt, page_no, BtLockRead, &page);
2003                         cnt += page->act;
2004                         next = bt_getid (page->right);
2005                         bt_unlockpage (bt, page_no, BtLockRead);
2006                 } while( page_no = next );
2007
2008                 cnt--;  // remove stopper key
2009                 fprintf(stderr, " Total keys read %d\n", cnt);
2010                 break;
2011         }
2012
2013         bt_close (bt);
2014 #ifdef unix
2015         return NULL;
2016 #else
2017         return 0;
2018 #endif
2019 }
2020
2021 typedef struct timeval timer;
2022
2023 int main (int argc, char **argv)
2024 {
2025 int idx, cnt, len, slot, err;
2026 int segsize, bits = 16;
2027 #ifdef unix
2028 pthread_t *threads;
2029 timer start, stop;
2030 #else
2031 time_t start[1], stop[1];
2032 HANDLE *threads;
2033 #endif
2034 double real_time;
2035 ThreadArg *args;
2036 uint poolsize = 0;
2037 int num = 0;
2038 char key[1];
2039 BtMgr *mgr;
2040 BtKey ptr;
2041 BtDb *bt;
2042
2043         if( argc < 3 ) {
2044                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2045                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2046                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2047                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2048                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2049                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2050                 exit(0);
2051         }
2052
2053 #ifdef unix
2054         gettimeofday(&start, NULL);
2055 #else
2056         time(start);
2057 #endif
2058
2059         if( argc > 3 )
2060                 bits = atoi(argv[3]);
2061
2062         if( argc > 4 )
2063                 poolsize = atoi(argv[4]);
2064
2065         if( !poolsize )
2066                 fprintf (stderr, "Warning: no mapped_pool\n");
2067
2068         if( poolsize > 65535 )
2069                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2070
2071         if( argc > 5 )
2072                 segsize = atoi(argv[5]);
2073         else
2074                 segsize = 4;    // 16 pages per mmap segment
2075
2076         if( argc > 6 )
2077                 num = atoi(argv[6]);
2078
2079         cnt = argc - 7;
2080 #ifdef unix
2081         threads = malloc (cnt * sizeof(pthread_t));
2082 #else
2083         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2084 #endif
2085         args = malloc (cnt * sizeof(ThreadArg));
2086
2087         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2088
2089         if( !mgr ) {
2090                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2091                 exit (1);
2092         }
2093
2094         //      fire off threads
2095
2096         for( idx = 0; idx < cnt; idx++ ) {
2097                 args[idx].infile = argv[idx + 7];
2098                 args[idx].type = argv[2][0];
2099                 args[idx].mgr = mgr;
2100                 args[idx].num = num;
2101                 args[idx].idx = idx;
2102 #ifdef unix
2103                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2104                         fprintf(stderr, "Error creating thread %d\n", err);
2105 #else
2106                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2107 #endif
2108         }
2109
2110         //      wait for termination
2111
2112 #ifdef unix
2113         for( idx = 0; idx < cnt; idx++ )
2114                 pthread_join (threads[idx], NULL);
2115         gettimeofday(&stop, NULL);
2116         real_time = 1000.0 * ( stop.tv_sec - start.tv_sec ) + 0.001 * (stop.tv_usec - start.tv_usec );
2117 #else
2118         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2119
2120         for( idx = 0; idx < cnt; idx++ )
2121                 CloseHandle(threads[idx]);
2122
2123         time (stop);
2124         real_time = 1000 * (*stop - *start);
2125 #endif
2126         fprintf(stderr, " Time to complete: %.2f seconds\n", real_time/1000);
2127         bt_mgrclose (mgr);
2128 }
2129
2130 #endif  //STANDALONE