]> pd.if.org Git - btree/blob - threads2h.c
46742c487497d05185dd795028a54aadb46ff4db
[btree] / threads2h.c
1 // btree version threads2h pthread rw lock/SRW version
2 //      with fixed bt_deletekey code
3 // 17 FEB 2014
4
5 // author: karl malbrain, malbrain@cal.berkeley.edu
6
7 /*
8 This work, including the source code, documentation
9 and related data, is placed into the public domain.
10
11 The orginal author is Karl Malbrain.
12
13 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
14 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
15 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
16 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
17 RESULTING FROM THE USE, MODIFICATION, OR
18 REDISTRIBUTION OF THIS SOFTWARE.
19 */
20
21 // Please see the project home page for documentation
22 // code.google.com/p/high-concurrency-btree
23
24 #define _FILE_OFFSET_BITS 64
25 #define _LARGEFILE64_SOURCE
26
27 #ifdef linux
28 #define _GNU_SOURCE
29 #endif
30
31 #ifdef unix
32 #include <unistd.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <fcntl.h>
36 #include <sys/time.h>
37 #include <sys/mman.h>
38 #include <errno.h>
39 #include <pthread.h>
40 #else
41 #define WIN32_LEAN_AND_MEAN
42 #include <windows.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <time.h>
46 #include <fcntl.h>
47 #include <process.h>
48 #include <intrin.h>
49 #endif
50
51 #include <memory.h>
52 #include <string.h>
53 #include <stddef.h>
54
55 typedef unsigned long long      uid;
56
57 #ifndef unix
58 typedef unsigned long long      off64_t;
59 typedef unsigned short          ushort;
60 typedef unsigned int            uint;
61 #endif
62
63 #define BT_latchtable   128                                     // number of latch manager slots
64
65 #define BT_ro 0x6f72    // ro
66 #define BT_rw 0x7772    // rw
67
68 #define BT_maxbits              24                                      // maximum page size in bits
69 #define BT_minbits              9                                       // minimum page size in bits
70 #define BT_minpage              (1 << BT_minbits)       // minimum page size
71 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
72
73 /*
74 There are five lock types for each node in three independent sets: 
75 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
76 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
77 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
78 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
79 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
80 */
81
82 typedef enum{
83         BtLockAccess,
84         BtLockDelete,
85         BtLockRead,
86         BtLockWrite,
87         BtLockParent
88 } BtLock;
89
90 //      mode & definition for latch implementation
91
92 // exclusive is set for write access
93 // share is count of read accessors
94 // grant write lock when share == 0
95
96 typedef struct {
97         volatile unsigned char mutex;
98         volatile unsigned char exclusive:1;
99         volatile unsigned char pending:1;
100         volatile ushort share;
101 } BtSpinLatch;
102
103 //  hash table entries
104
105 typedef struct {
106         BtSpinLatch latch[1];
107         volatile ushort slot;           // Latch table entry at head of chain
108 } BtHashEntry;
109
110 //      latch manager table structure
111
112 typedef struct {
113 #ifdef unix
114         pthread_rwlock_t lock[1];
115 #else
116         SRWLOCK srw[1];
117 #endif
118 } BtLatch;
119
120 typedef struct {
121         BtLatch readwr[1];                      // read/write page lock
122         BtLatch access[1];                      // Access Intent/Page delete
123         BtLatch parent[1];                      // Posting of fence key in parent
124         BtSpinLatch busy[1];            // slot is being moved between chains
125         volatile ushort next;           // next entry in hash table chain
126         volatile ushort prev;           // prev entry in hash table chain
127         volatile ushort pin;            // number of outstanding locks
128         volatile ushort hash;           // hash slot entry is under
129         volatile uid page_no;           // latch set page number
130 } BtLatchSet;
131
132 //      Define the length of the page and key pointers
133
134 #define BtId 6
135
136 //      Page key slot definition.
137
138 //      If BT_maxbits is 15 or less, you can save 4 bytes
139 //      for each key stored by making the first two uints
140 //      into ushorts.  You can also save 4 bytes by removing
141 //      the tod field from the key.
142
143 //      Keys are marked dead, but remain on the page until
144 //      it cleanup is called. The fence key (highest key) for
145 //      the page is always present, even after cleanup.
146
147 typedef struct {
148         uint off:BT_maxbits;            // page offset for key start
149         uint dead:1;                            // set for deleted key
150         uint tod;                                       // time-stamp for key
151         unsigned char id[BtId];         // id associated with key
152 } BtSlot;
153
154 //      The key structure occupies space at the upper end of
155 //      each page.  It's a length byte followed by the value
156 //      bytes.
157
158 typedef struct {
159         unsigned char len;
160         unsigned char key[1];
161 } *BtKey;
162
163 //      The first part of an index page.
164 //      It is immediately followed
165 //      by the BtSlot array of keys.
166
167 typedef struct BtPage_ {
168         uint cnt;                                       // count of keys in page
169         uint act;                                       // count of active keys
170         uint min;                                       // next key offset
171         unsigned char bits:7;           // page size in bits
172         unsigned char free:1;           // page is on free list
173         unsigned char lvl:5;            // level of page
174         unsigned char kill:1;           // page is being killed
175         unsigned char dirty:1;          // page has deleted keys
176         unsigned char posted:1;         // page fence is posted
177         unsigned char right[BtId];      // page number to right
178 } *BtPage;
179
180 //      The memory mapping pool table buffer manager entry
181
182 typedef struct {
183         unsigned long long int lru;     // number of times accessed
184         uid  basepage;                          // mapped base page number
185         char *map;                                      // mapped memory pointer
186         ushort slot;                            // slot index in this array
187         ushort pin;                                     // mapped page pin counter
188         void *hashprev;                         // previous pool entry for the same hash idx
189         void *hashnext;                         // next pool entry for the same hash idx
190 #ifndef unix
191         HANDLE hmap;                            // Windows memory mapping handle
192 #endif
193 } BtPool;
194
195 //  The loadpage interface object
196
197 typedef struct {
198         uid page_no;            // current page number
199         BtPage page;            // current page pointer
200         BtPool *pool;           // current page pool
201         BtLatchSet *latch;      // current page latch set
202 } BtPageSet;
203
204 //      structure for latch manager on ALLOC_page
205
206 typedef struct {
207         struct BtPage_ alloc[2];        // next & free page_nos in right ptr
208         BtSpinLatch lock[1];            // allocation area lite latch
209         ushort latchdeployed;           // highest number of latch entries deployed
210         ushort nlatchpage;                      // number of latch pages at BT_latch
211         ushort latchtotal;                      // number of page latch entries
212         ushort latchhash;                       // number of latch hash table slots
213         ushort latchvictim;                     // next latch entry to examine
214         BtHashEntry table[0];           // the hash table
215 } BtLatchMgr;
216
217 //      The object structure for Btree access
218
219 typedef struct {
220         uint page_size;                         // page size    
221         uint page_bits;                         // page size in bits    
222         uint seg_bits;                          // seg size in pages in bits
223         uint mode;                                      // read-write mode
224 #ifdef unix
225         int idx;
226 #else
227         HANDLE idx;
228 #endif
229         ushort poolcnt;                         // highest page pool node in use
230         ushort poolmax;                         // highest page pool node allocated
231         ushort poolmask;                        // total number of pages in mmap segment - 1
232         ushort hashsize;                        // size of Hash Table for pool entries
233         volatile uint evicted;          // last evicted hash table slot
234         ushort *hash;                           // pool index for hash entries
235         BtSpinLatch *latch;                     // latches for hash table slots
236         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
237         BtLatchSet *latchsets;          // mapped latch set from latch pages
238         BtPool *pool;                           // memory pool page segments
239 #ifndef unix
240         HANDLE halloc;                          // allocation and latch table handle
241 #endif
242 } BtMgr;
243
244 typedef struct {
245         BtMgr *mgr;                     // buffer manager for thread
246         BtPage cursor;          // cached frame for start/next (never mapped)
247         BtPage frame;           // spare frame for the page split (never mapped)
248         BtPage zero;            // page frame for zeroes at end of file
249         uid cursor_page;        // current cursor page number   
250         unsigned char *mem;     // frame, cursor, page memory buffer
251         int found;                      // last delete or insert was found
252         int err;                        // last error
253 } BtDb;
254
255 typedef enum {
256         BTERR_ok = 0,
257         BTERR_struct,
258         BTERR_ovflw,
259         BTERR_lock,
260         BTERR_map,
261         BTERR_wrt,
262         BTERR_hash
263 } BTERR;
264
265 // B-Tree functions
266 extern void bt_close (BtDb *bt);
267 extern BtDb *bt_open (BtMgr *mgr);
268 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
269 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
270 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
271 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
272 extern uint bt_nextkey   (BtDb *bt, uint slot);
273
274 //      manager functions
275 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
276 void bt_mgrclose (BtMgr *mgr);
277
278 //  Helper functions to return slot values
279
280 extern BtKey bt_key (BtDb *bt, uint slot);
281 extern uid bt_uid (BtDb *bt, uint slot);
282 extern uint bt_tod (BtDb *bt, uint slot);
283
284 //  BTree page number constants
285 #define ALLOC_page              0       // allocation & lock manager hash table
286 #define ROOT_page               1       // root of the btree
287 #define LEAF_page               2       // first page of leaves
288 #define LATCH_page              3       // pages for lock manager
289
290 //      Number of levels to create in a new BTree
291
292 #define MIN_lvl                 2
293
294 //  The page is allocated from low and hi ends.
295 //  The key offsets and row-id's are allocated
296 //  from the bottom, while the text of the key
297 //  is allocated from the top.  When the two
298 //  areas meet, the page is split into two.
299
300 //  A key consists of a length byte, two bytes of
301 //  index number (0 - 65534), and up to 253 bytes
302 //  of key value.  Duplicate keys are discarded.
303 //  Associated with each key is a 48 bit row-id.
304
305 //  The b-tree root is always located at page 1.
306 //      The first leaf page of level zero is always
307 //      located on page 2.
308
309 //      The b-tree pages are linked with next
310 //      pointers to facilitate enumerators,
311 //      and provide for concurrency.
312
313 //      When to root page fills, it is split in two and
314 //      the tree height is raised by a new root at page
315 //      one with two keys.
316
317 //      Deleted keys are marked with a dead bit until
318 //      page cleanup The fence key for a node is
319 //      present in a special array.
320
321 //  Groups of pages called segments from the btree are optionally
322 //  cached with a memory mapped pool. A hash table is used to keep
323 //  track of the cached segments.  This behaviour is controlled
324 //  by the cache block size parameter to bt_open.
325
326 //  To achieve maximum concurrency one page is locked at a time
327 //  as the tree is traversed to find leaf key in question. The right
328 //  page numbers are used in cases where the page is being split,
329 //      or consolidated.
330
331 //  Page 0 is dedicated to lock for new page extensions,
332 //      and chains empty pages together for reuse.
333
334 //      The ParentModification lock on a node is obtained to serialize posting
335 //      or changing the fence key for a node.
336
337 //      Empty pages are chained together through the ALLOC page and reused.
338
339 //      Access macros to address slot and key values from the page.
340 //      Page slots use 1 based indexing.
341
342 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
343 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
344
345 void bt_putid(unsigned char *dest, uid id)
346 {
347 int i = BtId;
348
349         while( i-- )
350                 dest[i] = (unsigned char)id, id >>= 8;
351 }
352
353 uid bt_getid(unsigned char *src)
354 {
355 uid id = 0;
356 int i;
357
358         for( i = 0; i < BtId; i++ )
359                 id <<= 8, id |= *src++; 
360
361         return id;
362 }
363
364 //      Latch Manager
365
366 //      wait until write lock mode is clear
367 //      and add 1 to the share count
368
369 void bt_spinreadlock(BtSpinLatch *latch)
370 {
371 ushort prev;
372
373   do {
374         //      obtain latch mutex
375 #ifdef unix
376         if( __sync_lock_test_and_set(&latch->mutex, 1) )
377                 continue;
378 #else
379         if( _InterlockedExchange8(&latch->mutex, 1) )
380                 continue;
381 #endif
382         //  see if exclusive request is granted or pending
383
384         if( prev = !(latch->exclusive | latch->pending) )
385                 latch->share++;
386
387 #ifdef unix
388         __sync_lock_release (&latch->mutex);
389 #else
390         _InterlockedExchange8(&latch->mutex, 0);
391 #endif
392
393         if( prev )
394                 return;
395
396 #ifdef  unix
397   } while( sched_yield(), 1 );
398 #else
399   } while( SwitchToThread(), 1 );
400 #endif
401 }
402
403 //      wait for other read and write latches to relinquish
404
405 void bt_spinwritelock(BtSpinLatch *latch)
406 {
407 uint prev;
408
409   do {
410 #ifdef  unix
411         if( __sync_lock_test_and_set(&latch->mutex, 1) )
412                 continue;
413 #else
414         if( _InterlockedExchange8(&latch->mutex, 1) )
415                 continue;
416 #endif
417         if( prev = !(latch->share | latch->exclusive) )
418                 latch->exclusive = 1, latch->pending = 0;
419         else
420                 latch->pending = 1;
421 #ifdef unix
422         __sync_lock_release (&latch->mutex);
423 #else
424         _InterlockedExchange8(&latch->mutex, 0);
425 #endif
426         if( prev )
427                 return;
428 #ifdef  unix
429   } while( sched_yield(), 1 );
430 #else
431   } while( SwitchToThread(), 1 );
432 #endif
433 }
434
435 //      try to obtain write lock
436
437 //      return 1 if obtained,
438 //              0 otherwise
439
440 int bt_spinwritetry(BtSpinLatch *latch)
441 {
442 uint prev;
443
444 #ifdef unix
445         if( __sync_lock_test_and_set(&latch->mutex, 1) )
446                 return 0;
447 #else
448         if( _InterlockedExchange8(&latch->mutex, 1) )
449                 return 0;
450 #endif
451         //      take write access if all bits are clear
452
453         if( prev = !(latch->exclusive | latch->share) )
454                 latch->exclusive = 1;
455
456 #ifdef unix
457         __sync_lock_release (&latch->mutex);
458 #else
459         _InterlockedExchange8(&latch->mutex, 0);
460 #endif
461         return prev;
462 }
463
464 //      clear write mode
465
466 void bt_spinreleasewrite(BtSpinLatch *latch)
467 {
468         //      obtain latch mutex
469 #ifdef unix
470         while( __sync_lock_test_and_set(&latch->mutex, 1) )
471                 sched_yield();
472 #else
473         while( _InterlockedExchange8(&latch->mutex, 1) )
474                 SwitchToThread();
475 #endif
476         latch->exclusive = 0;
477 #ifdef unix
478         __sync_lock_release (&latch->mutex);
479 #else
480         _InterlockedExchange8(&latch->mutex, 0);
481 #endif
482 }
483
484 //      decrement reader count
485
486 void bt_spinreleaseread(BtSpinLatch *latch)
487 {
488 #ifdef unix
489         while( __sync_lock_test_and_set(&latch->mutex, 1) )
490                 sched_yield();
491 #else
492         while( _InterlockedExchange8(&latch->mutex, 1) )
493                 SwitchToThread();
494 #endif
495         latch->share--;
496 #ifdef unix
497         __sync_lock_release (&latch->mutex);
498 #else
499         _InterlockedExchange8(&latch->mutex, 0);
500 #endif
501 }
502
503 void bt_readlock(BtLatch *latch)
504 {
505 #ifdef unix
506         pthread_rwlock_rdlock (latch->lock);
507 #else
508         AcquireSRWLockShared (latch->srw);
509 #endif
510 }
511
512 //      wait for other read and write latches to relinquish
513
514 void bt_writelock(BtLatch *latch)
515 {
516 #ifdef unix
517         pthread_rwlock_wrlock (latch->lock);
518 #else
519         AcquireSRWLockExclusive (latch->srw);
520 #endif
521 }
522
523 //      try to obtain write lock
524
525 //      return 1 if obtained,
526 //              0 if already write or read locked
527
528 int bt_writetry(BtLatch *latch)
529 {
530 int result = 0;
531
532 #ifdef unix
533         result = !pthread_rwlock_trywrlock (latch->lock);
534 #else
535         result = TryAcquireSRWLockExclusive (latch->srw);
536 #endif
537         return result;
538 }
539
540 //      clear write mode
541
542 void bt_releasewrite(BtLatch *latch)
543 {
544 #ifdef unix
545         pthread_rwlock_unlock (latch->lock);
546 #else
547         ReleaseSRWLockExclusive (latch->srw);
548 #endif
549 }
550
551 //      decrement reader count
552
553 void bt_releaseread(BtLatch *latch)
554 {
555 #ifdef unix
556         pthread_rwlock_unlock (latch->lock);
557 #else
558         ReleaseSRWLockShared (latch->srw);
559 #endif
560 }
561
562 void bt_initlockset (BtLatchSet *set)
563 {
564 #ifdef unix
565 pthread_rwlockattr_t rwattr[1];
566
567         pthread_rwlockattr_init (rwattr);
568         pthread_rwlockattr_setpshared (rwattr, PTHREAD_PROCESS_SHARED);
569
570         pthread_rwlock_init (set->readwr->lock, rwattr);
571         pthread_rwlock_init (set->access->lock, rwattr);
572         pthread_rwlock_init (set->parent->lock, rwattr);
573         pthread_rwlockattr_destroy (rwattr);
574 #else
575         InitializeSRWLock (set->readwr->srw);
576         InitializeSRWLock (set->access->srw);
577         InitializeSRWLock (set->parent->srw);
578 #endif
579 }
580
581 //      link latch table entry into latch hash table
582
583 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
584 {
585 BtLatchSet *set = bt->mgr->latchsets + victim;
586
587         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
588                 bt->mgr->latchsets[set->next].prev = victim;
589
590         bt->mgr->latchmgr->table[hashidx].slot = victim;
591         set->page_no = page_no;
592         set->hash = hashidx;
593         set->prev = 0;
594 }
595
596 //      release latch pin
597
598 void bt_unpinlatch (BtLatchSet *set)
599 {
600 #ifdef unix
601         __sync_fetch_and_add(&set->pin, -1);
602 #else
603         _InterlockedDecrement16 (&set->pin);
604 #endif
605 }
606
607 //      find existing latchset or inspire new one
608 //      return with latchset pinned
609
610 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
611 {
612 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
613 ushort slot, avail = 0, victim, idx;
614 BtLatchSet *set;
615
616         //  obtain read lock on hash table entry
617
618         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
619
620         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
621         {
622                 set = bt->mgr->latchsets + slot;
623                 if( page_no == set->page_no )
624                         break;
625         } while( slot = set->next );
626
627         if( slot ) {
628 #ifdef unix
629                 __sync_fetch_and_add(&set->pin, 1);
630 #else
631                 _InterlockedIncrement16 (&set->pin);
632 #endif
633         }
634
635     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
636
637         if( slot )
638                 return set;
639
640   //  try again, this time with write lock
641
642   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
643
644   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
645   {
646         set = bt->mgr->latchsets + slot;
647         if( page_no == set->page_no )
648                 break;
649         if( !set->pin && !avail )
650                 avail = slot;
651   } while( slot = set->next );
652
653   //  found our entry, or take over an unpinned one
654
655   if( slot || (slot = avail) ) {
656         set = bt->mgr->latchsets + slot;
657 #ifdef unix
658         __sync_fetch_and_add(&set->pin, 1);
659 #else
660         _InterlockedIncrement16 (&set->pin);
661 #endif
662         set->page_no = page_no;
663         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
664         return set;
665   }
666
667         //  see if there are any unused entries
668 #ifdef unix
669         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
670 #else
671         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
672 #endif
673
674         if( victim < bt->mgr->latchmgr->latchtotal ) {
675                 set = bt->mgr->latchsets + victim;
676 #ifdef unix
677                 __sync_fetch_and_add(&set->pin, 1);
678 #else
679                 _InterlockedIncrement16 (&set->pin);
680 #endif
681                 bt_initlockset (set);
682                 bt_latchlink (bt, hashidx, victim, page_no);
683                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
684                 return set;
685         }
686
687 #ifdef unix
688         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
689 #else
690         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
691 #endif
692   //  find and reuse previous lock entry
693
694   while( 1 ) {
695 #ifdef unix
696         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
697 #else
698         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
699 #endif
700         //      we don't use slot zero
701
702         if( victim %= bt->mgr->latchmgr->latchtotal )
703                 set = bt->mgr->latchsets + victim;
704         else
705                 continue;
706
707         //      take control of our slot
708         //      from other threads
709
710         if( set->pin || !bt_spinwritetry (set->busy) )
711                 continue;
712
713         idx = set->hash;
714
715         // try to get write lock on hash chain
716         //      skip entry if not obtained
717         //      or has outstanding locks
718
719         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
720                 bt_spinreleasewrite (set->busy);
721                 continue;
722         }
723
724         if( set->pin ) {
725                 bt_spinreleasewrite (set->busy);
726                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
727                 continue;
728         }
729
730         //  unlink our available victim from its hash chain
731
732         if( set->prev )
733                 bt->mgr->latchsets[set->prev].next = set->next;
734         else
735                 bt->mgr->latchmgr->table[idx].slot = set->next;
736
737         if( set->next )
738                 bt->mgr->latchsets[set->next].prev = set->prev;
739
740         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
741 #ifdef unix
742         __sync_fetch_and_add(&set->pin, 1);
743 #else
744         _InterlockedIncrement16 (&set->pin);
745 #endif
746         bt_latchlink (bt, hashidx, victim, page_no);
747         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
748         bt_spinreleasewrite (set->busy);
749         return set;
750   }
751 }
752
753 void bt_mgrclose (BtMgr *mgr)
754 {
755 BtPool *pool;
756 uint slot;
757
758         // release mapped pages
759         //      note that slot zero is never used
760
761         for( slot = 1; slot < mgr->poolmax; slot++ ) {
762                 pool = mgr->pool + slot;
763                 if( pool->slot )
764 #ifdef unix
765                         munmap (pool->map, (mgr->poolmask+1) << mgr->page_bits);
766 #else
767                 {
768                         FlushViewOfFile(pool->map, 0);
769                         UnmapViewOfFile(pool->map);
770                         CloseHandle(pool->hmap);
771                 }
772 #endif
773         }
774
775 #ifdef unix
776         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
777         munmap (mgr->latchmgr, mgr->page_size);
778 #else
779         FlushViewOfFile(mgr->latchmgr, 0);
780         UnmapViewOfFile(mgr->latchmgr);
781         CloseHandle(mgr->halloc);
782 #endif
783 #ifdef unix
784         close (mgr->idx);
785         free (mgr->pool);
786         free (mgr->hash);
787         free (mgr->latch);
788         free (mgr);
789 #else
790         FlushFileBuffers(mgr->idx);
791         CloseHandle(mgr->idx);
792         GlobalFree (mgr->pool);
793         GlobalFree (mgr->hash);
794         GlobalFree (mgr->latch);
795         GlobalFree (mgr);
796 #endif
797 }
798
799 //      close and release memory
800
801 void bt_close (BtDb *bt)
802 {
803 #ifdef unix
804         if ( bt->mem )
805                 free (bt->mem);
806 #else
807         if ( bt->mem)
808                 VirtualFree (bt->mem, 0, MEM_RELEASE);
809 #endif
810         free (bt);
811 }
812
813 //  open/create new btree buffer manager
814
815 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
816 //              size of mapped page pool (e.g. 8192)
817
818 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
819 {
820 uint lvl, attr, cacheblk, last, slot, idx;
821 uint nlatchpage, latchhash;
822 BtLatchMgr *latchmgr;
823 off64_t size;
824 uint amt[1];
825 BtMgr* mgr;
826 BtKey key;
827 int flag;
828
829 #ifndef unix
830 SYSTEM_INFO sysinfo[1];
831 #endif
832
833         // determine sanity of page size and buffer pool
834
835         if( bits > BT_maxbits )
836                 bits = BT_maxbits;
837         else if( bits < BT_minbits )
838                 bits = BT_minbits;
839
840         if( !poolmax )
841                 return NULL;    // must have buffer pool
842
843 #ifdef unix
844         mgr = calloc (1, sizeof(BtMgr));
845
846         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
847
848         if( mgr->idx == -1 )
849                 return free(mgr), NULL;
850         
851         cacheblk = 4096;        // minimum mmap segment size for unix
852
853 #else
854         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
855         attr = FILE_ATTRIBUTE_NORMAL;
856         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
857
858         if( mgr->idx == INVALID_HANDLE_VALUE )
859                 return GlobalFree(mgr), NULL;
860
861         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
862         GetSystemInfo(sysinfo);
863         cacheblk = sysinfo->dwAllocationGranularity;
864 #endif
865
866 #ifdef unix
867         latchmgr = malloc (BT_maxpage);
868         *amt = 0;
869
870         // read minimum page size to get root info
871
872         if( size = lseek (mgr->idx, 0L, 2) ) {
873                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
874                         bits = latchmgr->alloc->bits;
875                 else
876                         return free(mgr), free(latchmgr), NULL;
877         } else if( mode == BT_ro )
878                 return free(latchmgr), bt_mgrclose (mgr), NULL;
879 #else
880         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
881         size = GetFileSize(mgr->idx, amt);
882
883         if( size || *amt ) {
884                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
885                         return bt_mgrclose (mgr), NULL;
886                 bits = latchmgr->alloc->bits;
887         } else if( mode == BT_ro )
888                 return bt_mgrclose (mgr), NULL;
889 #endif
890
891         mgr->page_size = 1 << bits;
892         mgr->page_bits = bits;
893
894         mgr->poolmax = poolmax;
895         mgr->mode = mode;
896
897         if( cacheblk < mgr->page_size )
898                 cacheblk = mgr->page_size;
899
900         //  mask for partial memmaps
901
902         mgr->poolmask = (cacheblk >> bits) - 1;
903
904         //      see if requested size of pages per memmap is greater
905
906         if( (1 << segsize) > mgr->poolmask )
907                 mgr->poolmask = (1 << segsize) - 1;
908
909         mgr->seg_bits = 0;
910
911         while( (1 << mgr->seg_bits) <= mgr->poolmask )
912                 mgr->seg_bits++;
913
914         mgr->hashsize = hashsize;
915
916 #ifdef unix
917         mgr->pool = calloc (poolmax, sizeof(BtPool));
918         mgr->hash = calloc (hashsize, sizeof(ushort));
919         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
920 #else
921         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
922         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
923         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
924 #endif
925
926         if( size || *amt )
927                 goto mgrlatch;
928
929         // initialize an empty b-tree with latch page, root page, page of leaves
930         // and page(s) of latches
931
932         memset (latchmgr, 0, 1 << bits);
933         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
934         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
935         latchmgr->alloc->bits = mgr->page_bits;
936
937         latchmgr->nlatchpage = nlatchpage;
938         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
939
940         //  initialize latch manager
941
942         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
943
944         //      size of hash table = total number of latchsets
945
946         if( latchhash > latchmgr->latchtotal )
947                 latchhash = latchmgr->latchtotal;
948
949         latchmgr->latchhash = latchhash;
950
951 #ifdef unix
952         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
953                 return bt_mgrclose (mgr), NULL;
954 #else
955         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
956                 return bt_mgrclose (mgr), NULL;
957
958         if( *amt < mgr->page_size )
959                 return bt_mgrclose (mgr), NULL;
960 #endif
961
962         memset (latchmgr, 0, 1 << bits);
963         latchmgr->alloc->bits = mgr->page_bits;
964
965         for( lvl=MIN_lvl; lvl--; ) {
966                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3;
967                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
968                 key = keyptr(latchmgr->alloc, 1);
969                 key->len = 2;           // create stopper key
970                 key->key[0] = 0xff;
971                 key->key[1] = 0xff;
972                 latchmgr->alloc->min = mgr->page_size - 3;
973                 latchmgr->alloc->lvl = lvl;
974                 latchmgr->alloc->cnt = 1;
975                 latchmgr->alloc->act = 1;
976 #ifdef unix
977                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
978                         return bt_mgrclose (mgr), NULL;
979 #else
980                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
981                         return bt_mgrclose (mgr), NULL;
982
983                 if( *amt < mgr->page_size )
984                         return bt_mgrclose (mgr), NULL;
985 #endif
986         }
987
988         // clear out latch manager locks
989         //      and rest of pages to round out segment
990
991         memset(latchmgr, 0, mgr->page_size);
992         last = MIN_lvl + 1;
993
994         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
995 #ifdef unix
996                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
997 #else
998                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
999                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1000                         return bt_mgrclose (mgr), NULL;
1001                 if( *amt < mgr->page_size )
1002                         return bt_mgrclose (mgr), NULL;
1003 #endif
1004                 last++;
1005         }
1006
1007 mgrlatch:
1008 #ifdef unix
1009         flag = PROT_READ | PROT_WRITE;
1010         mgr->latchmgr = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1011         if( mgr->latchmgr == MAP_FAILED )
1012                 return bt_mgrclose (mgr), NULL;
1013         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1014         if( mgr->latchsets == MAP_FAILED )
1015                 return bt_mgrclose (mgr), NULL;
1016 #else
1017         flag = PAGE_READWRITE;
1018         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1019         if( !mgr->halloc )
1020                 return bt_mgrclose (mgr), NULL;
1021
1022         flag = FILE_MAP_WRITE;
1023         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1024         if( !mgr->latchmgr )
1025                 return GetLastError(), bt_mgrclose (mgr), NULL;
1026
1027         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1028 #endif
1029
1030 #ifdef unix
1031         free (latchmgr);
1032 #else
1033         VirtualFree (latchmgr, 0, MEM_RELEASE);
1034 #endif
1035         return mgr;
1036 }
1037
1038 //      open BTree access method
1039 //      based on buffer manager
1040
1041 BtDb *bt_open (BtMgr *mgr)
1042 {
1043 BtDb *bt = malloc (sizeof(*bt));
1044
1045         memset (bt, 0, sizeof(*bt));
1046         bt->mgr = mgr;
1047 #ifdef unix
1048         bt->mem = malloc (3 *mgr->page_size);
1049 #else
1050         bt->mem = VirtualAlloc(NULL, 3 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1051 #endif
1052         bt->frame = (BtPage)bt->mem;
1053         bt->zero = (BtPage)(bt->mem + 1 * mgr->page_size);
1054         bt->cursor = (BtPage)(bt->mem + 2 * mgr->page_size);
1055
1056         memset (bt->zero, 0, mgr->page_size);
1057         return bt;
1058 }
1059
1060 //  compare two keys, returning > 0, = 0, or < 0
1061 //  as the comparison value
1062
1063 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1064 {
1065 uint len1 = key1->len;
1066 int ans;
1067
1068         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1069                 return ans;
1070
1071         if( len1 > len2 )
1072                 return 1;
1073         if( len1 < len2 )
1074                 return -1;
1075
1076         return 0;
1077 }
1078
1079 //      Buffer Pool mgr
1080
1081 // find segment in pool
1082 // must be called with hashslot idx locked
1083 //      return NULL if not there
1084 //      otherwise return node
1085
1086 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1087 {
1088 BtPool *pool;
1089 uint slot;
1090
1091         // compute start of hash chain in pool
1092
1093         if( slot = bt->mgr->hash[idx] ) 
1094                 pool = bt->mgr->pool + slot;
1095         else
1096                 return NULL;
1097
1098         page_no &= ~bt->mgr->poolmask;
1099
1100         while( pool->basepage != page_no )
1101           if( pool = pool->hashnext )
1102                 continue;
1103           else
1104                 return NULL;
1105
1106         return pool;
1107 }
1108
1109 // add segment to hash table
1110
1111 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1112 {
1113 BtPool *node;
1114 uint slot;
1115
1116         pool->hashprev = pool->hashnext = NULL;
1117         pool->basepage = page_no & ~bt->mgr->poolmask;
1118         pool->lru = 1;
1119
1120         if( slot = bt->mgr->hash[idx] ) {
1121                 node = bt->mgr->pool + slot;
1122                 pool->hashnext = node;
1123                 node->hashprev = pool;
1124         }
1125
1126         bt->mgr->hash[idx] = pool->slot;
1127 }
1128
1129 //      find best segment to evict from buffer pool
1130
1131 BtPool *bt_findlru (BtDb *bt, uint hashslot)
1132 {
1133 unsigned long long int target = ~0LL;
1134 BtPool *pool = NULL, *node;
1135
1136         if( !hashslot )
1137                 return NULL;
1138
1139         node = bt->mgr->pool + hashslot;
1140
1141         //  scan pool entries under hash table slot
1142
1143         do {
1144           if( node->pin )
1145                 continue;
1146           if( node->lru > target )
1147                 continue;
1148           target = node->lru;
1149           pool = node;
1150         } while( node = node->hashnext );
1151
1152         return pool;
1153 }
1154
1155 //  map new buffer pool segment to virtual memory
1156
1157 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1158 {
1159 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1160 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1161 int flag;
1162
1163 #ifdef unix
1164         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1165         pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1166         if( pool->map == MAP_FAILED )
1167                 return bt->err = BTERR_map;
1168 #else
1169         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1170         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1171         if( !pool->hmap )
1172                 return bt->err = BTERR_map;
1173
1174         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1175         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1176         if( !pool->map )
1177                 return bt->err = BTERR_map;
1178 #endif
1179         return bt->err = 0;
1180 }
1181
1182 //      calculate page within pool
1183
1184 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1185 {
1186 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1187 BtPage page;
1188
1189         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1190         return page;
1191 }
1192
1193 //  release pool pin
1194
1195 void bt_unpinpool (BtPool *pool)
1196 {
1197 #ifdef unix
1198         __sync_fetch_and_add(&pool->pin, -1);
1199 #else
1200         _InterlockedDecrement16 (&pool->pin);
1201 #endif
1202 }
1203
1204 //      find or place requested page in segment-pool
1205 //      return pool table entry, incrementing pin
1206
1207 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1208 {
1209 BtPool *pool, *node, *next;
1210 uint slot, idx, victim;
1211
1212         //      lock hash table chain
1213
1214         idx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1215         bt_spinreadlock (&bt->mgr->latch[idx]);
1216
1217         //      look up in hash table
1218
1219         if( pool = bt_findpool(bt, page_no, idx) ) {
1220 #ifdef unix
1221                 __sync_fetch_and_add(&pool->pin, 1);
1222 #else
1223                 _InterlockedIncrement16 (&pool->pin);
1224 #endif
1225                 bt_spinreleaseread (&bt->mgr->latch[idx]);
1226                 pool->lru++;
1227                 return pool;
1228         }
1229
1230         //      upgrade to write lock
1231
1232         bt_spinreleaseread (&bt->mgr->latch[idx]);
1233         bt_spinwritelock (&bt->mgr->latch[idx]);
1234
1235         // try to find page in pool with write lock
1236
1237         if( pool = bt_findpool(bt, page_no, idx) ) {
1238 #ifdef unix
1239                 __sync_fetch_and_add(&pool->pin, 1);
1240 #else
1241                 _InterlockedIncrement16 (&pool->pin);
1242 #endif
1243                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1244                 pool->lru++;
1245                 return pool;
1246         }
1247
1248         // allocate a new pool node
1249         // and add to hash table
1250
1251 #ifdef unix
1252         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1253 #else
1254         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1255 #endif
1256
1257         if( ++slot < bt->mgr->poolmax ) {
1258                 pool = bt->mgr->pool + slot;
1259                 pool->slot = slot;
1260
1261                 if( bt_mapsegment(bt, pool, page_no) )
1262                         return NULL;
1263
1264                 bt_linkhash(bt, pool, page_no, idx);
1265 #ifdef unix
1266                 __sync_fetch_and_add(&pool->pin, 1);
1267 #else
1268                 _InterlockedIncrement16 (&pool->pin);
1269 #endif
1270                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1271                 return pool;
1272         }
1273
1274         // pool table is full
1275         //      find best pool entry to evict
1276
1277 #ifdef unix
1278         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1279 #else
1280         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1281 #endif
1282
1283         while( 1 ) {
1284 #ifdef unix
1285                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1286 #else
1287                 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1288 #endif
1289                 victim %= bt->mgr->hashsize;
1290
1291                 // try to get write lock
1292                 //      skip entry if not obtained
1293
1294                 if( !bt_spinwritetry (&bt->mgr->latch[victim]) )
1295                         continue;
1296
1297                 //  if pool entry is empty
1298                 //      or any pages are pinned
1299                 //      skip this entry
1300
1301                 if( !(pool = bt_findlru(bt, bt->mgr->hash[victim])) ) {
1302                         bt_spinreleasewrite (&bt->mgr->latch[victim]);
1303                         continue;
1304                 }
1305
1306                 // unlink victim pool node from hash table
1307
1308                 if( node = pool->hashprev )
1309                         node->hashnext = pool->hashnext;
1310                 else if( node = pool->hashnext )
1311                         bt->mgr->hash[victim] = node->slot;
1312                 else
1313                         bt->mgr->hash[victim] = 0;
1314
1315                 if( node = pool->hashnext )
1316                         node->hashprev = pool->hashprev;
1317
1318                 bt_spinreleasewrite (&bt->mgr->latch[victim]);
1319
1320                 //      remove old file mapping
1321 #ifdef unix
1322                 munmap (pool->map, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1323 #else
1324                 FlushViewOfFile(pool->map, 0);
1325                 UnmapViewOfFile(pool->map);
1326                 CloseHandle(pool->hmap);
1327 #endif
1328                 pool->map = NULL;
1329
1330                 //  create new pool mapping
1331                 //  and link into hash table
1332
1333                 if( bt_mapsegment(bt, pool, page_no) )
1334                         return NULL;
1335
1336                 bt_linkhash(bt, pool, page_no, idx);
1337 #ifdef unix
1338                 __sync_fetch_and_add(&pool->pin, 1);
1339 #else
1340                 _InterlockedIncrement16 (&pool->pin);
1341 #endif
1342                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1343                 return pool;
1344         }
1345 }
1346
1347 // place write, read, or parent lock on requested page_no.
1348
1349 void bt_lockpage(BtLock mode, BtLatchSet *set)
1350 {
1351         switch( mode ) {
1352         case BtLockRead:
1353                 bt_readlock (set->readwr);
1354                 break;
1355         case BtLockWrite:
1356                 bt_writelock (set->readwr);
1357                 break;
1358         case BtLockAccess:
1359                 bt_readlock (set->access);
1360                 break;
1361         case BtLockDelete:
1362                 bt_writelock (set->access);
1363                 break;
1364         case BtLockParent:
1365                 bt_writelock (set->parent);
1366                 break;
1367         }
1368 }
1369
1370 // remove write, read, or parent lock on requested page
1371
1372 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1373 {
1374         switch( mode ) {
1375         case BtLockRead:
1376                 bt_releaseread (set->readwr);
1377                 break;
1378         case BtLockWrite:
1379                 bt_releasewrite (set->readwr);
1380                 break;
1381         case BtLockAccess:
1382                 bt_releaseread (set->access);
1383                 break;
1384         case BtLockDelete:
1385                 bt_releasewrite (set->access);
1386                 break;
1387         case BtLockParent:
1388                 bt_releasewrite (set->parent);
1389                 break;
1390         }
1391 }
1392
1393 //      allocate a new page and write page into it
1394
1395 uid bt_newpage(BtDb *bt, BtPage page)
1396 {
1397 BtPageSet set[1];
1398 uid new_page;
1399 int reuse;
1400
1401         //      lock allocation page
1402
1403         bt_spinwritelock(bt->mgr->latchmgr->lock);
1404
1405         // use empty chain first
1406         // else allocate empty page
1407
1408         if( new_page = bt_getid(bt->mgr->latchmgr->alloc[1].right) ) {
1409                 if( set->pool = bt_pinpool (bt, new_page) )
1410                         set->page = bt_page (bt, set->pool, new_page);
1411                 else
1412                         return 0;
1413
1414                 bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(set->page->right));
1415                 bt_unpinpool (set->pool);
1416                 reuse = 1;
1417         } else {
1418                 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1419                 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1420                 reuse = 0;
1421         }
1422 #ifdef unix
1423         if ( pwrite(bt->mgr->idx, page, bt->mgr->page_size, new_page << bt->mgr->page_bits) < bt->mgr->page_size )
1424                 return bt->err = BTERR_wrt, 0;
1425
1426         // if writing first page of pool block, zero last page in the block
1427
1428         if ( !reuse && bt->mgr->poolmask > 0 && (new_page & bt->mgr->poolmask) == 0 )
1429         {
1430                 // use zero buffer to write zeros
1431                 if ( pwrite(bt->mgr->idx,bt->zero, bt->mgr->page_size, (new_page | bt->mgr->poolmask) << bt->mgr->page_bits) < bt->mgr->page_size )
1432                         return bt->err = BTERR_wrt, 0;
1433         }
1434 #else
1435         //      bring new page into pool and copy page.
1436         //      this will extend the file into the new pages.
1437
1438         if( set->pool = bt_pinpool (bt, new_page) )
1439                 set->page = bt_page (bt, set->pool, new_page);
1440         else
1441                 return 0;
1442
1443         memcpy(set->page, page, bt->mgr->page_size);
1444         bt_unpinpool (set->pool);
1445 #endif
1446         // unlock allocation latch and return new page no
1447
1448         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1449         return new_page;
1450 }
1451
1452 //  find slot in page for given key at a given level
1453
1454 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1455 {
1456 uint diff, higher = set->page->cnt, low = 1, slot;
1457 uint good = 0;
1458
1459         //        make stopper key an infinite fence value
1460
1461         if( bt_getid (set->page->right) )
1462                 higher++;
1463         else
1464                 good++;
1465
1466         //      low is the lowest candidate.
1467         //  loop ends when they meet
1468
1469         //  higher is already
1470         //      tested as .ge. the passed key.
1471
1472         while( diff = higher - low ) {
1473                 slot = low + ( diff >> 1 );
1474                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1475                         low = slot + 1;
1476                 else
1477                         higher = slot, good++;
1478         }
1479
1480         //      return zero if key is on right link page
1481
1482         return good ? higher : 0;
1483 }
1484
1485 //  find and load page at given level for given key
1486 //      leave page rd or wr locked as requested
1487
1488 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1489 {
1490 uid page_no = ROOT_page, prevpage = 0;
1491 uint drill = 0xff, slot;
1492 BtLatchSet *prevlatch;
1493 uint mode, prevmode;
1494 BtPool *prevpool;
1495
1496   //  start at root of btree and drill down
1497
1498   do {
1499         // determine lock mode of drill level
1500         mode = (drill == lvl) ? lock : BtLockRead; 
1501
1502         set->latch = bt_pinlatch (bt, page_no);
1503         set->page_no = page_no;
1504
1505         // pin page contents
1506
1507         if( set->pool = bt_pinpool (bt, page_no) )
1508                 set->page = bt_page (bt, set->pool, page_no);
1509         else
1510                 return 0;
1511
1512         // obtain access lock using lock chaining with Access mode
1513
1514         if( page_no > ROOT_page )
1515           bt_lockpage(BtLockAccess, set->latch);
1516
1517         //      release & unpin parent page
1518
1519         if( prevpage ) {
1520           bt_unlockpage(prevmode, prevlatch);
1521           bt_unpinlatch (prevlatch);
1522           bt_unpinpool (prevpool);
1523           prevpage = 0;
1524         }
1525
1526         // obtain read lock using lock chaining
1527
1528         bt_lockpage(mode, set->latch);
1529
1530         if( set->page->free )
1531                 return bt->err = BTERR_struct, 0;
1532
1533         if( page_no > ROOT_page )
1534           bt_unlockpage(BtLockAccess, set->latch);
1535
1536         // re-read and re-lock root after determining actual level of root
1537
1538         if( set->page->lvl != drill) {
1539                 if ( set->page_no != ROOT_page )
1540                         return bt->err = BTERR_struct, 0;
1541                         
1542                 drill = set->page->lvl;
1543
1544                 if( lock != BtLockRead && drill == lvl ) {
1545                   bt_unlockpage(mode, set->latch);
1546                   bt_unpinlatch (set->latch);
1547                   bt_unpinpool (set->pool);
1548                   continue;
1549                 }
1550         }
1551
1552         prevpage = set->page_no;
1553         prevlatch = set->latch;
1554         prevpool = set->pool;
1555         prevmode = mode;
1556
1557         //  find key on page at this level
1558         //  and descend to requested level
1559
1560         if( !set->page->kill )
1561          if( slot = bt_findslot (set, key, len) ) {
1562           if( drill == lvl )
1563                 return slot;
1564
1565           while( slotptr(set->page, slot)->dead )
1566                 if( slot++ < set->page->cnt )
1567                         continue;
1568                 else
1569                         goto slideright;
1570
1571           page_no = bt_getid(slotptr(set->page, slot)->id);
1572           drill--;
1573           continue;
1574          }
1575
1576         //  or slide right into next page
1577
1578 slideright:
1579         page_no = bt_getid(set->page->right);
1580
1581   } while( page_no );
1582
1583   // return error on end of right chain
1584
1585   bt->err = BTERR_struct;
1586   return 0;     // return error
1587 }
1588
1589 //      return page to free list
1590 //      page must be delete & write locked
1591
1592 void bt_freepage (BtDb *bt, BtPageSet *set)
1593 {
1594         //      lock allocation page
1595
1596         bt_spinwritelock (bt->mgr->latchmgr->lock);
1597
1598         //      store chain in second right
1599         bt_putid(set->page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
1600         bt_putid(bt->mgr->latchmgr->alloc[1].right, set->page_no);
1601         set->page->free = 1;
1602
1603         // unlock released page
1604
1605         bt_unlockpage (BtLockDelete, set->latch);
1606         bt_unlockpage (BtLockWrite, set->latch);
1607         bt_unpinlatch (set->latch);
1608         bt_unpinpool (set->pool);
1609
1610         // unlock allocation page
1611
1612         bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1613 }
1614
1615 //      a fence key was deleted from a page
1616 //      push new fence value upwards
1617
1618 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1619 {
1620 unsigned char leftkey[256], rightkey[256];
1621 uid page_no;
1622 BtKey ptr;
1623
1624         //      remove the old fence value
1625
1626         ptr = keyptr(set->page, set->page->cnt);
1627         memcpy (rightkey, ptr, ptr->len + 1);
1628
1629         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1630         set->page->dirty = 1;
1631
1632         ptr = keyptr(set->page, set->page->cnt);
1633         memcpy (leftkey, ptr, ptr->len + 1);
1634         page_no = set->page_no;
1635
1636         bt_lockpage (BtLockParent, set->latch);
1637         bt_unlockpage (BtLockWrite, set->latch);
1638
1639         //      insert new (now smaller) fence key
1640
1641         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, page_no, time(NULL)) )
1642           return bt->err;
1643
1644         //      now delete old fence key
1645
1646         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1) )
1647                 return bt->err;
1648
1649         bt_unlockpage (BtLockParent, set->latch);
1650         bt_unpinlatch(set->latch);
1651         bt_unpinpool (set->pool);
1652         return 0;
1653 }
1654
1655 //      root has a single child
1656 //      collapse a level from the tree
1657
1658 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1659 {
1660 BtPageSet child[1];
1661 uint idx;
1662
1663   // find the child entry and promote as new root contents
1664
1665   do {
1666         for( idx = 0; idx++ < root->page->cnt; )
1667           if( !slotptr(root->page, idx)->dead )
1668                 break;
1669
1670         child->page_no = bt_getid (slotptr(root->page, idx)->id);
1671
1672         child->latch = bt_pinlatch (bt, child->page_no);
1673         bt_lockpage (BtLockDelete, child->latch);
1674         bt_lockpage (BtLockWrite, child->latch);
1675
1676         if( child->pool = bt_pinpool (bt, child->page_no) )
1677                 child->page = bt_page (bt, child->pool, child->page_no);
1678         else
1679                 return bt->err;
1680
1681         memcpy (root->page, child->page, bt->mgr->page_size);
1682         bt_freepage (bt, child);
1683
1684   } while( root->page->lvl > 1 && root->page->act == 1 );
1685
1686   bt_unlockpage (BtLockWrite, root->latch);
1687   bt_unpinlatch (root->latch);
1688   bt_unpinpool (root->pool);
1689   return 0;
1690 }
1691
1692 //  find and delete key on page by marking delete flag bit
1693 //  if page becomes empty, delete it from the btree
1694
1695 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1696 {
1697 unsigned char lowerfence[256], higherfence[256];
1698 uint slot, idx, dirty = 0, fence, found;
1699 BtPageSet set[1], right[1];
1700 BtKey ptr;
1701
1702         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1703                 ptr = keyptr(set->page, slot);
1704         else
1705                 return bt->err;
1706
1707         //      are we deleting a fence slot?
1708
1709         fence = slot == set->page->cnt;
1710
1711         // if key is found delete it, otherwise ignore request
1712
1713         if( found = !keycmp (ptr, key, len) )
1714           if( found = slotptr(set->page, slot)->dead == 0 ) {
1715                 dirty = slotptr(set->page, slot)->dead = 1;
1716                 set->page->dirty = 1;
1717                 set->page->act--;
1718
1719                 // collapse empty slots
1720
1721                 while( idx = set->page->cnt - 1 )
1722                   if( slotptr(set->page, idx)->dead ) {
1723                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1724                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1725                   } else
1726                         break;
1727                   }
1728
1729         //      did we delete a fence key in an upper level?
1730
1731         if( dirty && lvl && set->page->act && fence )
1732           if( bt_fixfence (bt, set, lvl) )
1733                 return bt->err;
1734           else
1735                 return bt->found = found, 0;
1736
1737         //      is this a collapsed root?
1738
1739         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1740           if( bt_collapseroot (bt, set) )
1741                 return bt->err;
1742           else
1743                 return bt->found = found, 0;
1744
1745         //      return if page is not empty
1746
1747         if( set->page->act ) {
1748                 bt_unlockpage(BtLockWrite, set->latch);
1749                 bt_unpinlatch (set->latch);
1750                 bt_unpinpool (set->pool);
1751                 return bt->found = found, 0;
1752         }
1753
1754         //      cache copy of fence key
1755         //      to post in parent
1756
1757         ptr = keyptr(set->page, set->page->cnt);
1758         memcpy (lowerfence, ptr, ptr->len + 1);
1759
1760         //      obtain lock on right page
1761
1762         right->page_no = bt_getid(set->page->right);
1763         right->latch = bt_pinlatch (bt, right->page_no);
1764         bt_lockpage (BtLockWrite, right->latch);
1765
1766         // pin page contents
1767
1768         if( right->pool = bt_pinpool (bt, right->page_no) )
1769                 right->page = bt_page (bt, right->pool, right->page_no);
1770         else
1771                 return 0;
1772
1773         if( right->page->kill )
1774                 return bt->err = BTERR_struct;
1775
1776         // pull contents of right peer into our empty page
1777
1778         memcpy (set->page, right->page, bt->mgr->page_size);
1779
1780         // cache copy of key to update
1781
1782         ptr = keyptr(right->page, right->page->cnt);
1783         memcpy (higherfence, ptr, ptr->len + 1);
1784
1785         // mark right page deleted and point it to left page
1786         //      until we can post parent updates
1787
1788         bt_putid (right->page->right, set->page_no);
1789         right->page->kill = 1;
1790
1791         bt_lockpage (BtLockParent, right->latch);
1792         bt_unlockpage (BtLockWrite, right->latch);
1793
1794         bt_lockpage (BtLockParent, set->latch);
1795         bt_unlockpage (BtLockWrite, set->latch);
1796
1797         // redirect higher key directly to our new node contents
1798
1799         if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, set->page_no, time(NULL)) )
1800           return bt->err;
1801
1802         //      delete old lower key to our node
1803
1804         if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1) )
1805           return bt->err;
1806
1807         //      obtain delete and write locks to right node
1808
1809         bt_unlockpage (BtLockParent, right->latch);
1810         bt_lockpage (BtLockDelete, right->latch);
1811         bt_lockpage (BtLockWrite, right->latch);
1812         bt_freepage (bt, right);
1813
1814         bt_unlockpage (BtLockParent, set->latch);
1815         bt_unpinlatch (set->latch);
1816         bt_unpinpool (set->pool);
1817         bt->found = found;
1818         return 0;
1819 }
1820
1821 //      find key in leaf level and return row-id
1822
1823 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1824 {
1825 BtPageSet set[1];
1826 uint  slot;
1827 uid id = 0;
1828 BtKey ptr;
1829
1830         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
1831                 ptr = keyptr(set->page, slot);
1832         else
1833                 return 0;
1834
1835         // if key exists, return row-id
1836         //      otherwise return 0
1837
1838         if( slot <= set->page->cnt )
1839           if( !keycmp (ptr, key, len) )
1840                 id = bt_getid(slotptr(set->page,slot)->id);
1841
1842         bt_unlockpage (BtLockRead, set->latch);
1843         bt_unpinlatch (set->latch);
1844         bt_unpinpool (set->pool);
1845         return id;
1846 }
1847
1848 //      check page for space available,
1849 //      clean if necessary and return
1850 //      0 - page needs splitting
1851 //      >0  new slot value
1852
1853 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
1854 {
1855 uint nxt = bt->mgr->page_size;
1856 uint cnt = 0, idx = 0;
1857 uint max = page->cnt;
1858 uint newslot = max;
1859 BtKey key;
1860
1861         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1862                 return slot;
1863
1864         //      skip cleanup if nothing to reclaim
1865
1866         if( !page->dirty )
1867                 return 0;
1868
1869         memcpy (bt->frame, page, bt->mgr->page_size);
1870
1871         // skip page info and set rest of page to zero
1872
1873         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1874         page->dirty = 0;
1875         page->act = 0;
1876
1877         // try cleaning up page first
1878         // by removing deleted keys
1879
1880         while( cnt++ < max ) {
1881                 if( cnt == slot )
1882                         newslot = idx + 1;
1883                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1884                         continue;
1885
1886                 // copy the key across
1887
1888                 key = keyptr(bt->frame, cnt);
1889                 nxt -= key->len + 1;
1890                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1891
1892                 // copy slot
1893
1894                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1895                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1896                         page->act++;
1897                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1898                 slotptr(page, idx)->off = nxt;
1899         }
1900
1901         page->min = nxt;
1902         page->cnt = idx;
1903
1904         //      see if page has enough space now, or does it need splitting?
1905
1906         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1907                 return newslot;
1908
1909         return 0;
1910 }
1911
1912 // split the root and raise the height of the btree
1913
1914 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, uid page_no2)
1915 {
1916 uint nxt = bt->mgr->page_size;
1917 uid left;
1918
1919         //  Obtain an empty page to use, and copy the current
1920         //  root contents into it, e.g. lower keys
1921
1922         if( !(left = bt_newpage(bt, root->page)) )
1923                 return bt->err;
1924
1925         // preserve the page info at the bottom
1926         // of higher keys and set rest to zero
1927
1928         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1929
1930         // insert lower keys page fence key on newroot page as first key
1931
1932         nxt -= *leftkey + 1;
1933         memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
1934         bt_putid(slotptr(root->page, 1)->id, left);
1935         slotptr(root->page, 1)->off = nxt;
1936         
1937         // insert stopper key on newroot page
1938         // and increase the root height
1939
1940         nxt -= 3;
1941         ((unsigned char *)root->page)[nxt] = 2;
1942         ((unsigned char *)root->page)[nxt+1] = 0xff;
1943         ((unsigned char *)root->page)[nxt+2] = 0xff;
1944         bt_putid(slotptr(root->page, 2)->id, page_no2);
1945         slotptr(root->page, 2)->off = nxt;
1946
1947         bt_putid(root->page->right, 0);
1948         root->page->min = nxt;          // reset lowest used offset and key count
1949         root->page->cnt = 2;
1950         root->page->act = 2;
1951         root->page->lvl++;
1952
1953         // release and unpin root
1954
1955         bt_unlockpage(BtLockWrite, root->latch);
1956         bt_unpinlatch (root->latch);
1957         bt_unpinpool (root->pool);
1958         return 0;
1959 }
1960
1961 //  split already locked full node
1962 //      return unlocked.
1963
1964 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1965 {
1966 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1967 unsigned char fencekey[256];
1968 uint lvl = set->page->lvl;
1969 uint prev;
1970 uid right;
1971 BtKey key;
1972
1973         //  split higher half of keys to bt->frame
1974
1975         memset (bt->frame, 0, bt->mgr->page_size);
1976         max = set->page->cnt;
1977         cnt = max / 2;
1978         idx = 0;
1979
1980         while( cnt++ < max ) {
1981                 key = keyptr(set->page, cnt);
1982                 nxt -= key->len + 1;
1983                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1984
1985                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(set->page,cnt)->id, BtId);
1986                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1987                         bt->frame->act++;
1988                 slotptr(bt->frame, idx)->tod = slotptr(set->page, cnt)->tod;
1989                 slotptr(bt->frame, idx)->off = nxt;
1990         }
1991
1992         bt->frame->bits = bt->mgr->page_bits;
1993         bt->frame->min = nxt;
1994         bt->frame->cnt = idx;
1995         bt->frame->lvl = lvl;
1996
1997         // link right node
1998
1999         if( set->page_no > ROOT_page )
2000                 memcpy (bt->frame->right, set->page->right, BtId);
2001
2002         //      get new free page and write higher keys to it.
2003
2004         if( !(right = bt_newpage(bt, bt->frame)) )
2005                 return bt->err;
2006
2007         //      update lower keys to continue in old page
2008
2009         memcpy (bt->frame, set->page, bt->mgr->page_size);
2010         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2011         nxt = bt->mgr->page_size;
2012         set->page->posted = 0;
2013         set->page->dirty = 0;
2014         set->page->act = 0;
2015         cnt = 0;
2016         idx = 0;
2017
2018         //  assemble page of smaller keys
2019
2020         while( cnt++ < max / 2 ) {
2021                 key = keyptr(bt->frame, cnt);
2022                 nxt -= key->len + 1;
2023                 memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
2024                 memcpy(slotptr(set->page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
2025                 slotptr(set->page, idx)->tod = slotptr(bt->frame, cnt)->tod;
2026                 slotptr(set->page, idx)->off = nxt;
2027                 set->page->act++;
2028         }
2029
2030         // remember fence key for smaller page
2031
2032         memcpy(fencekey, key, key->len + 1);
2033         bt_putid(set->page->right, right);
2034         set->page->min = nxt;
2035         set->page->cnt = idx;
2036
2037         // if current page is the root page, split it
2038
2039         if( set->page_no == ROOT_page )
2040                 return bt_splitroot (bt, set, fencekey, right);
2041
2042         right = 0;
2043
2044         // insert new fences in their parent pages
2045
2046         while( 1 ) {
2047                 bt_lockpage (BtLockParent, set->latch);
2048
2049                 key = keyptr (set->page, set->page->cnt);
2050                 memcpy (fencekey, key, key->len + 1);
2051                 prev = set->page->posted;
2052
2053                 if( right && prev ) {
2054                         bt_unlockpage (BtLockParent, set->latch);
2055                         bt_unlockpage (BtLockWrite, set->latch);
2056                         bt_unpinlatch (set->latch);
2057                         bt_unpinpool (set->pool);
2058                         return 0;
2059                 }
2060
2061                 right = bt_getid (set->page->right);
2062                 set->page->posted = 1;
2063
2064                 bt_unlockpage (BtLockWrite, set->latch);
2065
2066                 // insert new fence for reformulated left block of smaller keys
2067
2068                 if( !prev )
2069                   if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, set->page_no, time(NULL)) )
2070                         return bt->err;
2071
2072                 bt_unlockpage (BtLockParent, set->latch);
2073                 bt_unpinlatch (set->latch);
2074                 bt_unpinpool (set->pool);
2075
2076                 if( !(set->page_no = right) )
2077                         break;
2078
2079                 set->latch = bt_pinlatch (bt, right);
2080
2081                 if( set->pool = bt_pinpool (bt, right) )
2082                         set->page = bt_page (bt, set->pool, right);
2083                 else
2084                         return bt->err;
2085
2086                 bt_lockpage (BtLockWrite, set->latch);
2087         }
2088
2089         return 0;
2090 }
2091
2092 //  Insert new key into the btree at given level.
2093
2094 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
2095 {
2096 BtPageSet set[1];
2097 uint slot, idx;
2098 BtKey ptr;
2099
2100         while( 1 ) {
2101                 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
2102                         ptr = keyptr(set->page, slot);
2103                 else
2104                 {
2105                         if ( !bt->err )
2106                                 bt->err = BTERR_ovflw;
2107                         return bt->err;
2108                 }
2109
2110                 // if key already exists, update id and return
2111
2112                 if( !keycmp (ptr, key, len) ) {
2113                         if( slotptr(set->page, slot)->dead )
2114                                 set->page->act++;
2115                         slotptr(set->page, slot)->dead = 0;
2116                         slotptr(set->page, slot)->tod = tod;
2117                         bt_putid(slotptr(set->page,slot)->id, id);
2118                         bt_unlockpage(BtLockWrite, set->latch);
2119                         bt_unpinlatch (set->latch);
2120                         bt_unpinpool (set->pool);
2121                         return 0;
2122                 }
2123
2124                 // check if page has enough space
2125
2126                 if( slot = bt_cleanpage (bt, set->page, len, slot) )
2127                         break;
2128
2129                 if( bt_splitpage (bt, set) )
2130                         return bt->err;
2131         }
2132
2133         // calculate next available slot and copy key into page
2134
2135         set->page->min -= len + 1; // reset lowest used offset
2136         ((unsigned char *)set->page)[set->page->min] = len;
2137         memcpy ((unsigned char *)set->page + set->page->min +1, key, len );
2138
2139         for( idx = slot; idx < set->page->cnt; idx++ )
2140           if( slotptr(set->page, idx)->dead )
2141                 break;
2142
2143         // now insert key into array before slot
2144
2145         if( idx == set->page->cnt )
2146                 idx++, set->page->cnt++;
2147
2148         set->page->act++;
2149
2150         while( idx > slot )
2151                 *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
2152
2153         bt_putid(slotptr(set->page,slot)->id, id);
2154         slotptr(set->page, slot)->off = set->page->min;
2155         slotptr(set->page, slot)->tod = tod;
2156         slotptr(set->page, slot)->dead = 0;
2157
2158         bt_unlockpage (BtLockWrite, set->latch);
2159         bt_unpinlatch (set->latch);
2160         bt_unpinpool (set->pool);
2161         return 0;
2162 }
2163
2164 //  cache page of keys into cursor and return starting slot for given key
2165
2166 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2167 {
2168 BtPageSet set[1];
2169 uint slot;
2170
2171         // cache page for retrieval
2172
2173         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2174           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2175         else
2176           return 0;
2177
2178         bt->cursor_page = set->page_no;
2179
2180         bt_unlockpage(BtLockRead, set->latch);
2181         bt_unpinlatch (set->latch);
2182         bt_unpinpool (set->pool);
2183         return slot;
2184 }
2185
2186 //  return next slot for cursor page
2187 //  or slide cursor right into next page
2188
2189 uint bt_nextkey (BtDb *bt, uint slot)
2190 {
2191 BtPageSet set[1];
2192 uid right;
2193
2194   do {
2195         right = bt_getid(bt->cursor->right);
2196
2197         while( slot++ < bt->cursor->cnt )
2198           if( slotptr(bt->cursor,slot)->dead )
2199                 continue;
2200           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2201                 return slot;
2202           else
2203                 break;
2204
2205         if( !right )
2206                 break;
2207
2208         bt->cursor_page = right;
2209
2210         if( set->pool = bt_pinpool (bt, right) )
2211                 set->page = bt_page (bt, set->pool, right);
2212         else
2213                 return 0;
2214
2215         set->latch = bt_pinlatch (bt, right);
2216     bt_lockpage(BtLockRead, set->latch);
2217
2218         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2219
2220         bt_unlockpage(BtLockRead, set->latch);
2221         bt_unpinlatch (set->latch);
2222         bt_unpinpool (set->pool);
2223         slot = 0;
2224
2225   } while( 1 );
2226
2227   return bt->err = 0;
2228 }
2229
2230 BtKey bt_key(BtDb *bt, uint slot)
2231 {
2232         return keyptr(bt->cursor, slot);
2233 }
2234
2235 uid bt_uid(BtDb *bt, uint slot)
2236 {
2237         return bt_getid(slotptr(bt->cursor,slot)->id);
2238 }
2239
2240 uint bt_tod(BtDb *bt, uint slot)
2241 {
2242         return slotptr(bt->cursor,slot)->tod;
2243 }
2244
2245
2246 #ifdef STANDALONE
2247
2248 void bt_latchaudit (BtDb *bt)
2249 {
2250 ushort idx, hashidx;
2251 uid next, page_no;
2252 BtPageSet set[1];
2253 BtKey ptr;
2254
2255 #ifdef unix
2256         for( idx = 1; idx < bt->mgr->latchmgr->latchdeployed; idx++ ) {
2257                 set->latch = bt->mgr->latchsets + idx;
2258                 if( set->latch->pin ) {
2259                         fprintf(stderr, "latchset %d pinned for page %.6x\n", idx, set->latch->page_no);
2260                         set->latch->pin = 0;
2261                 }
2262         }
2263
2264         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2265           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2266                 set->latch = bt->mgr->latchsets + idx;
2267                 if( set->latch->hash != hashidx )
2268                         fprintf(stderr, "latchset %d wrong hashidx\n", idx);
2269                 if( set->latch->pin )
2270                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, set->latch->page_no);
2271           } while( idx = set->latch->next );
2272         }
2273
2274         next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2275         page_no = LEAF_page;
2276
2277         while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2278                 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, page_no << bt->mgr->page_bits);
2279                 if( !bt->frame->free )
2280                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2281                   ptr = keyptr(bt->frame, idx+1);
2282                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2283                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2284                  }
2285
2286                 if( page_no > LEAF_page )
2287                         next = page_no + 1;
2288                 page_no = next;
2289         }
2290 #endif
2291 }
2292
2293 typedef struct {
2294         char type, idx;
2295         char *infile;
2296         BtMgr *mgr;
2297         int num;
2298 } ThreadArg;
2299
2300 //  standalone program to index file of keys
2301 //  then list them onto std-out
2302
2303 #ifdef unix
2304 void *index_file (void *arg)
2305 #else
2306 uint __stdcall index_file (void *arg)
2307 #endif
2308 {
2309 int line = 0, found = 0, cnt = 0;
2310 uid next, page_no = LEAF_page;  // start on first page of leaves
2311 unsigned char key[256];
2312 ThreadArg *args = arg;
2313 int ch, len = 0, slot;
2314 BtPageSet set[1];
2315 time_t tod[1];
2316 BtKey ptr;
2317 BtDb *bt;
2318 FILE *in;
2319
2320         bt = bt_open (args->mgr);
2321         time (tod);
2322
2323         switch(args->type | 0x20)
2324         {
2325         case 'a':
2326                 fprintf(stderr, "started latch mgr audit\n");
2327                 bt_latchaudit (bt);
2328                 fprintf(stderr, "finished latch mgr audit\n");
2329                 break;
2330
2331         case 'w':
2332                 fprintf(stderr, "started indexing for %s\n", args->infile);
2333                 if( in = fopen (args->infile, "rb") )
2334                   while( ch = getc(in), ch != EOF )
2335                         if( ch == '\n' )
2336                         {
2337                           line++;
2338
2339                           if( args->num == 1 )
2340                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2341
2342                           else if( args->num )
2343                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2344
2345                           if( bt_insertkey (bt, key, len, 0, line, *tod) )
2346                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2347                           len = 0;
2348                         }
2349                         else if( len < 255 )
2350                                 key[len++] = ch;
2351                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2352                 break;
2353
2354         case 'd':
2355                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2356                 if( in = fopen (args->infile, "rb") )
2357                   while( ch = getc(in), ch != EOF )
2358                         if( ch == '\n' )
2359                         {
2360                           line++;
2361                           if( args->num == 1 )
2362                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2363
2364                           else if( args->num )
2365                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2366
2367                           if( bt_deletekey (bt, key, len, 0) )
2368                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2369                           len = 0;
2370                         }
2371                         else if( len < 255 )
2372                                 key[len++] = ch;
2373                 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
2374                 break;
2375
2376         case 'f':
2377                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2378                 if( in = fopen (args->infile, "rb") )
2379                   while( ch = getc(in), ch != EOF )
2380                         if( ch == '\n' )
2381                         {
2382                           line++;
2383                           if( args->num == 1 )
2384                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2385
2386                           else if( args->num )
2387                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2388
2389                           if( bt_findkey (bt, key, len) )
2390                                 found++;
2391                           else if( bt->err )
2392                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2393                           else
2394                                 fprintf(stderr, "Unable to find key %.*s line %d\n", len, key, line);
2395                           len = 0;
2396                         }
2397                         else if( len < 255 )
2398                                 key[len++] = ch;
2399                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2400                 break;
2401
2402         case 's':
2403                 fprintf(stderr, "started scanning\n");
2404                 do {
2405                         if( set->pool = bt_pinpool (bt, page_no) )
2406                                 set->page = bt_page (bt, set->pool, page_no);
2407                         else
2408                                 break;
2409                         set->latch = bt_pinlatch (bt, page_no);
2410                         bt_lockpage (BtLockRead, set->latch);
2411                         next = bt_getid (set->page->right);
2412                         cnt += set->page->act;
2413
2414                         for( slot = 0; slot++ < set->page->cnt; )
2415                          if( next || slot < set->page->cnt )
2416                           if( !slotptr(set->page, slot)->dead ) {
2417                                 ptr = keyptr(set->page, slot);
2418                                 fwrite (ptr->key, ptr->len, 1, stdout);
2419                                 fputc ('\n', stdout);
2420                           }
2421
2422                         bt_unlockpage (BtLockRead, set->latch);
2423                         bt_unpinlatch (set->latch);
2424                         bt_unpinpool (set->pool);
2425                 } while( page_no = next );
2426
2427                 cnt--;  // remove stopper key
2428                 fprintf(stderr, " Total keys read %d\n", cnt);
2429                 break;
2430
2431         case 'c':
2432                 fprintf(stderr, "started counting\n");
2433                 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2434                 page_no = LEAF_page;
2435
2436                 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2437                 uid off = page_no << bt->mgr->page_bits;
2438 #ifdef unix
2439                   pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2440 #else
2441                 DWORD amt[1];
2442
2443                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2444
2445                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2446                         return bt->err = BTERR_map;
2447
2448                   if( *amt <  bt->mgr->page_size )
2449                         return bt->err = BTERR_map;
2450 #endif
2451                         if( !bt->frame->free && !bt->frame->lvl )
2452                                 cnt += bt->frame->act;
2453                         if( page_no > LEAF_page )
2454                                 next = page_no + 1;
2455                         page_no = next;
2456                 }
2457                 
2458                 cnt--;  // remove stopper key
2459                 fprintf(stderr, " Total keys read %d\n", cnt);
2460                 break;
2461         }
2462
2463         bt_close (bt);
2464 #ifdef unix
2465         return NULL;
2466 #else
2467         return 0;
2468 #endif
2469 }
2470
2471 typedef struct timeval timer;
2472
2473 int main (int argc, char **argv)
2474 {
2475 int idx, cnt, len, slot, err;
2476 int segsize, bits = 16;
2477 #ifdef unix
2478 pthread_t *threads;
2479 timer start, stop;
2480 #else
2481 time_t start[1], stop[1];
2482 HANDLE *threads;
2483 #endif
2484 double real_time;
2485 ThreadArg *args;
2486 uint poolsize = 0;
2487 int num = 0;
2488 char key[1];
2489 BtMgr *mgr;
2490 BtKey ptr;
2491 BtDb *bt;
2492
2493         if( argc < 3 ) {
2494                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2495                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2496                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2497                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2498                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2499                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2500                 exit(0);
2501         }
2502
2503 #ifdef unix
2504         gettimeofday(&start, NULL);
2505 #else
2506         time(start);
2507 #endif
2508
2509         if( argc > 3 )
2510                 bits = atoi(argv[3]);
2511
2512         if( argc > 4 )
2513                 poolsize = atoi(argv[4]);
2514
2515         if( !poolsize )
2516                 fprintf (stderr, "Warning: no mapped_pool\n");
2517
2518         if( poolsize > 65535 )
2519                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2520
2521         if( argc > 5 )
2522                 segsize = atoi(argv[5]);
2523         else
2524                 segsize = 4;    // 16 pages per mmap segment
2525
2526         if( argc > 6 )
2527                 num = atoi(argv[6]);
2528
2529         cnt = argc - 7;
2530 #ifdef unix
2531         threads = malloc (cnt * sizeof(pthread_t));
2532 #else
2533         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2534 #endif
2535         args = malloc (cnt * sizeof(ThreadArg));
2536
2537         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2538
2539         if( !mgr ) {
2540                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2541                 exit (1);
2542         }
2543
2544         //      fire off threads
2545
2546         for( idx = 0; idx < cnt; idx++ ) {
2547                 args[idx].infile = argv[idx + 7];
2548                 args[idx].type = argv[2][0];
2549                 args[idx].mgr = mgr;
2550                 args[idx].num = num;
2551                 args[idx].idx = idx;
2552 #ifdef unix
2553                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2554                         fprintf(stderr, "Error creating thread %d\n", err);
2555 #else
2556                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2557 #endif
2558         }
2559
2560         //      wait for termination
2561
2562 #ifdef unix
2563         for( idx = 0; idx < cnt; idx++ )
2564                 pthread_join (threads[idx], NULL);
2565         gettimeofday(&stop, NULL);
2566         real_time = 1000.0 * ( stop.tv_sec - start.tv_sec ) + 0.001 * (stop.tv_usec - start.tv_usec );
2567 #else
2568         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2569
2570         for( idx = 0; idx < cnt; idx++ )
2571                 CloseHandle(threads[idx]);
2572
2573         time (stop);
2574         real_time = 1000 * (*stop - *start);
2575 #endif
2576         fprintf(stderr, " Time to complete: %.2f seconds\n", real_time/1000);
2577         bt_mgrclose (mgr);
2578 }
2579
2580 #endif  //STANDALONE