]> pd.if.org Git - btree/blob - threadskv3.c
Fix errors in threadskv1 and threadskv3
[btree] / threadskv3.c
1 // btree version threadskv3 sched_yield version
2 //      with reworked bt_deletekey code
3 //      and phase-fair reader writer lock
4 //      and librarian page split code
5 // 02 SEP 2014
6
7 // author: karl malbrain, malbrain@cal.berkeley.edu
8
9 /*
10 This work, including the source code, documentation
11 and related data, is placed into the public domain.
12
13 The orginal author is Karl Malbrain.
14
15 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
16 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
17 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
18 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
19 RESULTING FROM THE USE, MODIFICATION, OR
20 REDISTRIBUTION OF THIS SOFTWARE.
21 */
22
23 // Please see the project home page for documentation
24 // code.google.com/p/high-concurrency-btree
25
26 #define _FILE_OFFSET_BITS 64
27 #define _LARGEFILE64_SOURCE
28
29 #ifdef linux
30 #define _GNU_SOURCE
31 #endif
32
33 #ifdef unix
34 #include <unistd.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <fcntl.h>
38 #include <sys/time.h>
39 #include <sys/mman.h>
40 #include <errno.h>
41 #include <pthread.h>
42 #else
43 #define WIN32_LEAN_AND_MEAN
44 #include <windows.h>
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <time.h>
48 #include <fcntl.h>
49 #include <process.h>
50 #include <intrin.h>
51 #endif
52
53 #include <memory.h>
54 #include <string.h>
55 #include <stddef.h>
56
57 typedef unsigned long long      uid;
58
59 #ifndef unix
60 typedef unsigned long long      off64_t;
61 typedef unsigned short          ushort;
62 typedef unsigned int            uint;
63 #endif
64
65 #define BT_latchtable   128                                     // number of latch manager slots
66
67 #define BT_ro 0x6f72    // ro
68 #define BT_rw 0x7772    // rw
69
70 #define BT_maxbits              24                                      // maximum page size in bits
71 #define BT_minbits              9                                       // minimum page size in bits
72 #define BT_minpage              (1 << BT_minbits)       // minimum page size
73 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
74
75 /*
76 There are five lock types for each node in three independent sets: 
77 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
78 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
79 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
80 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
81 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
82 */
83
84 typedef enum{
85         BtLockAccess,
86         BtLockDelete,
87         BtLockRead,
88         BtLockWrite,
89         BtLockParent
90 } BtLock;
91
92 //      definition for phase-fair reader/writer lock implementation
93
94 typedef struct {
95         ushort rin[1];
96         ushort rout[1];
97         ushort ticket[1];
98         ushort serving[1];
99 } RWLock;
100
101 #define PHID 0x1
102 #define PRES 0x2
103 #define MASK 0x3
104 #define RINC 0x4
105
106 //      definition for spin latch implementation
107
108 // exclusive is set for write access
109 // share is count of read accessors
110 // grant write lock when share == 0
111
112 volatile typedef struct {
113         ushort exclusive:1;
114         ushort pending:1;
115         ushort share:14;
116 } BtSpinLatch;
117
118 #define XCL 1
119 #define PEND 2
120 #define BOTH 3
121 #define SHARE 4
122
123 //  hash table entries
124
125 typedef struct {
126         BtSpinLatch latch[1];
127         volatile ushort slot;           // Latch table entry at head of chain
128 } BtHashEntry;
129
130 //      latch manager table structure
131
132 typedef struct {
133         RWLock readwr[1];               // read/write page lock
134         RWLock access[1];               // Access Intent/Page delete
135         RWLock parent[1];               // Posting of fence key in parent
136         BtSpinLatch busy[1];            // slot is being moved between chains
137         volatile ushort next;           // next entry in hash table chain
138         volatile ushort prev;           // prev entry in hash table chain
139         volatile ushort pin;            // number of outstanding locks
140         volatile ushort hash;           // hash slot entry is under
141         volatile uid page_no;           // latch set page number
142 } BtLatchSet;
143
144 //      Define the length of the page and key pointers
145
146 #define BtId 6
147
148 //      Page key slot definition.
149
150 //      If BT_maxbits is 15 or less, you can save 2 bytes
151 //      for each key stored by making the two uints
152 //      into ushorts.
153
154 //      Keys are marked dead, but remain on the page until
155 //      it cleanup is called. The fence key (highest key) for
156 //      the page is always present, even after cleanup.
157
158 typedef struct {
159         uint off:BT_maxbits;            // page offset for key start
160         uint librarian:1;                       // set for librarian slot
161         uint dead:1;                            // set for deleted key
162 } BtSlot;
163
164 //      The key structure occupies space at the upper end of
165 //      each page.  It's a length byte followed by the key
166 //      bytes.
167
168 typedef struct {
169         unsigned char len;
170         unsigned char key[1];
171 } *BtKey;
172
173 //      the value structure also occupies space at the upper
174 //      end of the page.
175
176 typedef struct {
177         unsigned char len;
178         unsigned char value[1];
179 } *BtVal;
180
181 //      The first part of an index page.
182 //      It is immediately followed
183 //      by the BtSlot array of keys.
184
185 typedef struct BtPage_ {
186         uint cnt;                                       // count of keys in page
187         uint act;                                       // count of active keys
188         uint min;                                       // next key offset
189         uint garbage;                           // page garbage in bytes
190         unsigned char bits:7;           // page size in bits
191         unsigned char free:1;           // page is on free chain
192         unsigned char lvl:7;            // level of page
193         unsigned char kill:1;           // page is being deleted
194         unsigned char right[BtId];      // page number to right
195 } *BtPage;
196
197 //      The memory mapping pool table buffer manager entry
198
199 typedef struct {
200         uid  basepage;                          // mapped base page number
201         char *map;                                      // mapped memory pointer
202         ushort slot;                            // slot index in this array
203         ushort pin;                                     // mapped page pin counter
204         void *hashprev;                         // previous pool entry for the same hash idx
205         void *hashnext;                         // next pool entry for the same hash idx
206 #ifndef unix
207         HANDLE hmap;                            // Windows memory mapping handle
208 #endif
209 } BtPool;
210
211 #define CLOCK_bit 0x8000                // bit in pool->pin
212
213 //  The loadpage interface object
214
215 typedef struct {
216         uid page_no;            // current page number
217         BtPage page;            // current page pointer
218         BtPool *pool;           // current page pool
219         BtLatchSet *latch;      // current page latch set
220 } BtPageSet;
221
222 //      structure for latch manager on ALLOC_page
223
224 typedef struct {
225         struct BtPage_ alloc[1];        // next page_no in right ptr
226         unsigned char chain[BtId];      // head of free page_nos chain
227         BtSpinLatch lock[1];            // allocation area lite latch
228         ushort latchdeployed;           // highest number of latch entries deployed
229         ushort nlatchpage;                      // number of latch pages at BT_latch
230         ushort latchtotal;                      // number of page latch entries
231         ushort latchhash;                       // number of latch hash table slots
232         ushort latchvictim;                     // next latch entry to examine
233         BtHashEntry table[0];           // the hash table
234 } BtLatchMgr;
235
236 //      The object structure for Btree access
237
238 typedef struct {
239         uint page_size;                         // page size    
240         uint page_bits;                         // page size in bits    
241         uint seg_bits;                          // seg size in pages in bits
242         uint mode;                                      // read-write mode
243 #ifdef unix
244         int idx;
245 #else
246         HANDLE idx;
247 #endif
248         ushort poolcnt;                         // highest page pool node in use
249         ushort poolmax;                         // highest page pool node allocated
250         ushort poolmask;                        // total number of pages in mmap segment - 1
251         ushort hashsize;                        // size of Hash Table for pool entries
252         volatile uint evicted;          // last evicted hash table slot
253         ushort *hash;                           // pool index for hash entries
254         BtSpinLatch *latch;                     // latches for hash table slots
255         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
256         BtLatchSet *latchsets;          // mapped latch set from latch pages
257         BtPool *pool;                           // memory pool page segments
258 #ifndef unix
259         HANDLE halloc;                          // allocation and latch table handle
260 #endif
261 } BtMgr;
262
263 typedef struct {
264         BtMgr *mgr;                     // buffer manager for thread
265         BtPage cursor;          // cached frame for start/next (never mapped)
266         BtPage frame;           // spare frame for the page split (never mapped)
267         uid cursor_page;        // current cursor page number   
268         unsigned char *mem;     // frame, cursor, page memory buffer
269         int found;                      // last delete or insert was found
270         int err;                        // last error
271 } BtDb;
272
273 typedef enum {
274         BTERR_ok = 0,
275         BTERR_struct,
276         BTERR_ovflw,
277         BTERR_lock,
278         BTERR_map,
279         BTERR_wrt,
280         BTERR_hash
281 } BTERR;
282
283 // B-Tree functions
284 extern void bt_close (BtDb *bt);
285 extern BtDb *bt_open (BtMgr *mgr);
286 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen);
287 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
288 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint vallen);
289 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
290 extern uint bt_nextkey   (BtDb *bt, uint slot);
291
292 //      manager functions
293 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
294 void bt_mgrclose (BtMgr *mgr);
295
296 //  Helper functions to return slot values
297 //      from the cursor page.
298
299 extern BtKey bt_key (BtDb *bt, uint slot);
300 extern BtVal bt_val (BtDb *bt, uint slot);
301
302 //  BTree page number constants
303 #define ALLOC_page              0       // allocation & latch manager hash table
304 #define ROOT_page               1       // root of the btree
305 #define LEAF_page               2       // first page of leaves
306 #define LATCH_page              3       // pages for latch manager
307
308 //      Number of levels to create in a new BTree
309
310 #define MIN_lvl                 2
311
312 //  The page is allocated from low and hi ends.
313 //  The key slots are allocated from the bottom,
314 //      while the text and value of the key
315 //  are allocated from the top.  When the two
316 //  areas meet, the page is split into two.
317
318 //  A key consists of a length byte, two bytes of
319 //  index number (0 - 65534), and up to 253 bytes
320 //  of key value.  Duplicate keys are discarded.
321 //  Associated with each key is a value byte string
322 //      containing any value desired.
323
324 //  The b-tree root is always located at page 1.
325 //      The first leaf page of level zero is always
326 //      located on page 2.
327
328 //      The b-tree pages are linked with next
329 //      pointers to facilitate enumerators,
330 //      and provide for concurrency.
331
332 //      When to root page fills, it is split in two and
333 //      the tree height is raised by a new root at page
334 //      one with two keys.
335
336 //      Deleted keys are marked with a dead bit until
337 //      page cleanup. The fence key for a leaf node is
338 //      always present
339
340 //  Groups of pages called segments from the btree are optionally
341 //  cached with a memory mapped pool. A hash table is used to keep
342 //  track of the cached segments.  This behaviour is controlled
343 //  by the cache block size parameter to bt_open.
344
345 //  To achieve maximum concurrency one page is locked at a time
346 //  as the tree is traversed to find leaf key in question. The right
347 //  page numbers are used in cases where the page is being split,
348 //      or consolidated.
349
350 //  Page 0 is dedicated to lock for new page extensions,
351 //      and chains empty pages together for reuse. It also
352 //      contains the latch manager hash table.
353
354 //      The ParentModification lock on a node is obtained to serialize posting
355 //      or changing the fence key for a node.
356
357 //      Empty pages are chained together through the ALLOC page and reused.
358
359 //      Access macros to address slot and key values from the page
360 //      Page slots use 1 based indexing.
361
362 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
363 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
364 #define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
365
366 void bt_putid(unsigned char *dest, uid id)
367 {
368 int i = BtId;
369
370         while( i-- )
371                 dest[i] = (unsigned char)id, id >>= 8;
372 }
373
374 uid bt_getid(unsigned char *src)
375 {
376 uid id = 0;
377 int i;
378
379         for( i = 0; i < BtId; i++ )
380                 id <<= 8, id |= *src++; 
381
382         return id;
383 }
384
385 //      Phase-Fair reader/writer lock implementation
386
387 void WriteLock (RWLock *lock)
388 {
389 ushort w, r, tix;
390
391 #ifdef unix
392         tix = __sync_fetch_and_add (lock->ticket, 1);
393 #else
394         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
395 #endif
396         // wait for our ticket to come up
397
398         while( tix != lock->serving[0] )
399 #ifdef unix
400                 sched_yield();
401 #else
402                 SwitchToThread ();
403 #endif
404
405         w = PRES | (tix & PHID);
406 #ifdef  unix
407         r = __sync_fetch_and_add (lock->rin, w);
408 #else
409         r = _InterlockedExchangeAdd16 (lock->rin, w);
410 #endif
411         while( r != *lock->rout )
412 #ifdef unix
413                 sched_yield();
414 #else
415                 SwitchToThread();
416 #endif
417 }
418
419 void WriteRelease (RWLock *lock)
420 {
421 #ifdef unix
422         __sync_fetch_and_and (lock->rin, ~MASK);
423 #else
424         _InterlockedAnd16 (lock->rin, ~MASK);
425 #endif
426         lock->serving[0]++;
427 }
428
429 void ReadLock (RWLock *lock)
430 {
431 ushort w;
432 #ifdef unix
433         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
434 #else
435         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
436 #endif
437         if( w )
438           while( w == (*lock->rin & MASK) )
439 #ifdef unix
440                 sched_yield ();
441 #else
442                 SwitchToThread ();
443 #endif
444 }
445
446 void ReadRelease (RWLock *lock)
447 {
448 #ifdef unix
449         __sync_fetch_and_add (lock->rout, RINC);
450 #else
451         _InterlockedExchangeAdd16 (lock->rout, RINC);
452 #endif
453 }
454
455 //      Spin Latch Manager
456
457 //      wait until write lock mode is clear
458 //      and add 1 to the share count
459
460 void bt_spinreadlock(BtSpinLatch *latch)
461 {
462 ushort prev;
463
464   do {
465 #ifdef unix
466         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
467 #else
468         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
469 #endif
470         //  see if exclusive request is granted or pending
471
472         if( !(prev & BOTH) )
473                 return;
474 #ifdef unix
475         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
476 #else
477         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
478 #endif
479 #ifdef  unix
480   } while( sched_yield(), 1 );
481 #else
482   } while( SwitchToThread(), 1 );
483 #endif
484 }
485
486 //      wait for other read and write latches to relinquish
487
488 void bt_spinwritelock(BtSpinLatch *latch)
489 {
490 ushort prev;
491
492   do {
493 #ifdef  unix
494         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
495 #else
496         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
497 #endif
498         if( !(prev & XCL) )
499           if( !(prev & ~BOTH) )
500                 return;
501           else
502 #ifdef unix
503                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
504 #else
505                 _InterlockedAnd16((ushort *)latch, ~XCL);
506 #endif
507 #ifdef  unix
508   } while( sched_yield(), 1 );
509 #else
510   } while( SwitchToThread(), 1 );
511 #endif
512 }
513
514 //      try to obtain write lock
515
516 //      return 1 if obtained,
517 //              0 otherwise
518
519 int bt_spinwritetry(BtSpinLatch *latch)
520 {
521 ushort prev;
522
523 #ifdef  unix
524         prev = __sync_fetch_and_or((ushort *)latch, XCL);
525 #else
526         prev = _InterlockedOr16((ushort *)latch, XCL);
527 #endif
528         //      take write access if all bits are clear
529
530         if( !(prev & XCL) )
531           if( !(prev & ~BOTH) )
532                 return 1;
533           else
534 #ifdef unix
535                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
536 #else
537                 _InterlockedAnd16((ushort *)latch, ~XCL);
538 #endif
539         return 0;
540 }
541
542 //      clear write mode
543
544 void bt_spinreleasewrite(BtSpinLatch *latch)
545 {
546 #ifdef unix
547         __sync_fetch_and_and((ushort *)latch, ~BOTH);
548 #else
549         _InterlockedAnd16((ushort *)latch, ~BOTH);
550 #endif
551 }
552
553 //      decrement reader count
554
555 void bt_spinreleaseread(BtSpinLatch *latch)
556 {
557 #ifdef unix
558         __sync_fetch_and_add((ushort *)latch, -SHARE);
559 #else
560         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
561 #endif
562 }
563
564 //      link latch table entry into latch hash table
565
566 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
567 {
568 BtLatchSet *set = bt->mgr->latchsets + victim;
569
570         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
571                 bt->mgr->latchsets[set->next].prev = victim;
572
573         bt->mgr->latchmgr->table[hashidx].slot = victim;
574         set->page_no = page_no;
575         set->hash = hashidx;
576         set->prev = 0;
577 }
578
579 //      release latch pin
580
581 void bt_unpinlatch (BtLatchSet *set)
582 {
583 #ifdef unix
584         __sync_fetch_and_add(&set->pin, -1);
585 #else
586         _InterlockedDecrement16 (&set->pin);
587 #endif
588 }
589
590 //      find existing latchset or inspire new one
591 //      return with latchset pinned
592
593 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
594 {
595 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
596 ushort slot, avail = 0, victim, idx;
597 BtLatchSet *set;
598
599         //  obtain read lock on hash table entry
600
601         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
602
603         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
604         {
605                 set = bt->mgr->latchsets + slot;
606                 if( page_no == set->page_no )
607                         break;
608         } while( slot = set->next );
609
610         if( slot ) {
611 #ifdef unix
612                 __sync_fetch_and_add(&set->pin, 1);
613 #else
614                 _InterlockedIncrement16 (&set->pin);
615 #endif
616         }
617
618     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
619
620         if( slot )
621                 return set;
622
623   //  try again, this time with write lock
624
625   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
626
627   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
628   {
629         set = bt->mgr->latchsets + slot;
630         if( page_no == set->page_no )
631                 break;
632         if( !set->pin && !avail )
633                 avail = slot;
634   } while( slot = set->next );
635
636   //  found our entry, or take over an unpinned one
637
638   if( slot || (slot = avail) ) {
639         set = bt->mgr->latchsets + slot;
640 #ifdef unix
641         __sync_fetch_and_add(&set->pin, 1);
642 #else
643         _InterlockedIncrement16 (&set->pin);
644 #endif
645         set->page_no = page_no;
646         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
647         return set;
648   }
649
650         //  see if there are any unused entries
651 #ifdef unix
652         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
653 #else
654         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
655 #endif
656
657         if( victim < bt->mgr->latchmgr->latchtotal ) {
658                 set = bt->mgr->latchsets + victim;
659 #ifdef unix
660                 __sync_fetch_and_add(&set->pin, 1);
661 #else
662                 _InterlockedIncrement16 (&set->pin);
663 #endif
664                 bt_latchlink (bt, hashidx, victim, page_no);
665                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
666                 return set;
667         }
668
669 #ifdef unix
670         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
671 #else
672         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
673 #endif
674   //  find and reuse previous lock entry
675
676   while( 1 ) {
677 #ifdef unix
678         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
679 #else
680         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
681 #endif
682         //      we don't use slot zero
683
684         if( victim %= bt->mgr->latchmgr->latchtotal )
685                 set = bt->mgr->latchsets + victim;
686         else
687                 continue;
688
689         //      take control of our slot
690         //      from other threads
691
692         if( set->pin || !bt_spinwritetry (set->busy) )
693                 continue;
694
695         idx = set->hash;
696
697         // try to get write lock on hash chain
698         //      skip entry if not obtained
699         //      or has outstanding locks
700
701         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
702                 bt_spinreleasewrite (set->busy);
703                 continue;
704         }
705
706         if( set->pin ) {
707                 bt_spinreleasewrite (set->busy);
708                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
709                 continue;
710         }
711
712         //  unlink our available victim from its hash chain
713
714         if( set->prev )
715                 bt->mgr->latchsets[set->prev].next = set->next;
716         else
717                 bt->mgr->latchmgr->table[idx].slot = set->next;
718
719         if( set->next )
720                 bt->mgr->latchsets[set->next].prev = set->prev;
721
722         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
723 #ifdef unix
724         __sync_fetch_and_add(&set->pin, 1);
725 #else
726         _InterlockedIncrement16 (&set->pin);
727 #endif
728         bt_latchlink (bt, hashidx, victim, page_no);
729         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
730         bt_spinreleasewrite (set->busy);
731         return set;
732   }
733 }
734
735 void bt_mgrclose (BtMgr *mgr)
736 {
737 BtPool *pool;
738 uint slot;
739
740         // release mapped pages
741         //      note that slot zero is never used
742
743         for( slot = 1; slot < mgr->poolmax; slot++ ) {
744                 pool = mgr->pool + slot;
745                 if( pool->slot )
746 #ifdef unix
747                         munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
748 #else
749                 {
750                         FlushViewOfFile(pool->map, 0);
751                         UnmapViewOfFile(pool->map);
752                         CloseHandle(pool->hmap);
753                 }
754 #endif
755         }
756
757 #ifdef unix
758         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
759         munmap (mgr->latchmgr, 2 * mgr->page_size);
760 #else
761         FlushViewOfFile(mgr->latchmgr, 0);
762         UnmapViewOfFile(mgr->latchmgr);
763         CloseHandle(mgr->halloc);
764 #endif
765 #ifdef unix
766         close (mgr->idx);
767         free (mgr->pool);
768         free (mgr->hash);
769         free ((void *)mgr->latch);
770         free (mgr);
771 #else
772         FlushFileBuffers(mgr->idx);
773         CloseHandle(mgr->idx);
774         GlobalFree (mgr->pool);
775         GlobalFree (mgr->hash);
776         GlobalFree ((void *)mgr->latch);
777         GlobalFree (mgr);
778 #endif
779 }
780
781 //      close and release memory
782
783 void bt_close (BtDb *bt)
784 {
785 #ifdef unix
786         if( bt->mem )
787                 free (bt->mem);
788 #else
789         if( bt->mem)
790                 VirtualFree (bt->mem, 0, MEM_RELEASE);
791 #endif
792         free (bt);
793 }
794
795 //  open/create new btree buffer manager
796
797 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
798 //              size of mapped page pool (e.g. 8192)
799
800 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
801 {
802 uint lvl, attr, cacheblk, last, slot, idx;
803 uint nlatchpage, latchhash;
804 unsigned char value[BtId];
805 BtLatchMgr *latchmgr;
806 off64_t size;
807 uint amt[1];
808 BtMgr* mgr;
809 BtKey key;
810 BtVal val;
811 int flag;
812
813 #ifndef unix
814 SYSTEM_INFO sysinfo[1];
815 #endif
816
817         // determine sanity of page size and buffer pool
818
819         if( bits > BT_maxbits )
820                 bits = BT_maxbits;
821         else if( bits < BT_minbits )
822                 bits = BT_minbits;
823
824         if( !poolmax )
825                 return NULL;    // must have buffer pool
826
827 #ifdef unix
828         mgr = calloc (1, sizeof(BtMgr));
829
830         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
831
832         if( mgr->idx == -1 )
833                 return free(mgr), NULL;
834         
835         cacheblk = 4096;        // minimum mmap segment size for unix
836
837 #else
838         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
839         attr = FILE_ATTRIBUTE_NORMAL;
840         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
841
842         if( mgr->idx == INVALID_HANDLE_VALUE )
843                 return GlobalFree(mgr), NULL;
844
845         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
846         GetSystemInfo(sysinfo);
847         cacheblk = sysinfo->dwAllocationGranularity;
848 #endif
849
850 #ifdef unix
851         latchmgr = malloc (BT_maxpage);
852         *amt = 0;
853
854         // read minimum page size to get root info
855
856         if( size = lseek (mgr->idx, 0L, 2) ) {
857                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
858                         bits = latchmgr->alloc->bits;
859                 else
860                         return free(mgr), free(latchmgr), NULL;
861         } else if( mode == BT_ro )
862                 return free(latchmgr), bt_mgrclose (mgr), NULL;
863 #else
864         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
865         size = GetFileSize(mgr->idx, amt);
866
867         if( size || *amt ) {
868                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
869                         return bt_mgrclose (mgr), NULL;
870                 bits = latchmgr->alloc->bits;
871         } else if( mode == BT_ro )
872                 return bt_mgrclose (mgr), NULL;
873 #endif
874
875         mgr->page_size = 1 << bits;
876         mgr->page_bits = bits;
877
878         mgr->poolmax = poolmax;
879         mgr->mode = mode;
880
881         if( cacheblk < mgr->page_size )
882                 cacheblk = mgr->page_size;
883
884         //  mask for partial memmaps
885
886         mgr->poolmask = (cacheblk >> bits) - 1;
887
888         //      see if requested size of pages per memmap is greater
889
890         if( (1 << segsize) > mgr->poolmask )
891                 mgr->poolmask = (1 << segsize) - 1;
892
893         mgr->seg_bits = 0;
894
895         while( (1 << mgr->seg_bits) <= mgr->poolmask )
896                 mgr->seg_bits++;
897
898         mgr->hashsize = hashsize;
899
900 #ifdef unix
901         mgr->pool = calloc (poolmax, sizeof(BtPool));
902         mgr->hash = calloc (hashsize, sizeof(ushort));
903         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
904 #else
905         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
906         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
907         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
908 #endif
909
910         if( size || *amt )
911                 goto mgrlatch;
912
913         // initialize an empty b-tree with latch page, root page, page of leaves
914         // and page(s) of latches
915
916         memset (latchmgr, 0, 1 << bits);
917         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
918         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
919         latchmgr->alloc->bits = mgr->page_bits;
920
921         latchmgr->nlatchpage = nlatchpage;
922         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
923
924         //  initialize latch manager
925
926         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
927
928         //      size of hash table = total number of latchsets
929
930         if( latchhash > latchmgr->latchtotal )
931                 latchhash = latchmgr->latchtotal;
932
933         latchmgr->latchhash = latchhash;
934
935 #ifdef unix
936         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
937                 return bt_mgrclose (mgr), NULL;
938 #else
939         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
940                 return bt_mgrclose (mgr), NULL;
941
942         if( *amt < mgr->page_size )
943                 return bt_mgrclose (mgr), NULL;
944 #endif
945
946         memset (latchmgr, 0, 1 << bits);
947         latchmgr->alloc->bits = mgr->page_bits;
948
949         for( lvl=MIN_lvl; lvl--; ) {
950                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
951                 key = keyptr(latchmgr->alloc, 1);
952                 key->len = 2;           // create stopper key
953                 key->key[0] = 0xff;
954                 key->key[1] = 0xff;
955
956                 bt_putid(value, MIN_lvl - lvl + 1);
957                 val = valptr(latchmgr->alloc, 1);
958                 val->len = lvl ? BtId : 0;
959                 memcpy (val->value, value, val->len);
960
961                 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
962                 latchmgr->alloc->lvl = lvl;
963                 latchmgr->alloc->cnt = 1;
964                 latchmgr->alloc->act = 1;
965 #ifdef unix
966                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
967                         return bt_mgrclose (mgr), NULL;
968 #else
969                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
970                         return bt_mgrclose (mgr), NULL;
971
972                 if( *amt < mgr->page_size )
973                         return bt_mgrclose (mgr), NULL;
974 #endif
975         }
976
977         // clear out latch manager locks
978         //      and rest of pages to round out segment
979
980         memset(latchmgr, 0, mgr->page_size);
981         last = MIN_lvl + 1;
982
983         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
984 #ifdef unix
985                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
986 #else
987                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
988                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
989                         return bt_mgrclose (mgr), NULL;
990                 if( *amt < mgr->page_size )
991                         return bt_mgrclose (mgr), NULL;
992 #endif
993                 last++;
994         }
995
996 mgrlatch:
997 #ifdef unix
998         // mlock the root page and the latchmgr page
999
1000         flag = PROT_READ | PROT_WRITE;
1001         mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1002         if( mgr->latchmgr == MAP_FAILED )
1003                 return bt_mgrclose (mgr), NULL;
1004         mlock (mgr->latchmgr, 2 * mgr->page_size);
1005
1006         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1007         if( mgr->latchsets == MAP_FAILED )
1008                 return bt_mgrclose (mgr), NULL;
1009         mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1010 #else
1011         flag = PAGE_READWRITE;
1012         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1013         if( !mgr->halloc )
1014                 return bt_mgrclose (mgr), NULL;
1015
1016         flag = FILE_MAP_WRITE;
1017         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1018         if( !mgr->latchmgr )
1019                 return GetLastError(), bt_mgrclose (mgr), NULL;
1020
1021         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1022 #endif
1023
1024 #ifdef unix
1025         free (latchmgr);
1026 #else
1027         VirtualFree (latchmgr, 0, MEM_RELEASE);
1028 #endif
1029         return mgr;
1030 }
1031
1032 //      open BTree access method
1033 //      based on buffer manager
1034
1035 BtDb *bt_open (BtMgr *mgr)
1036 {
1037 BtDb *bt = malloc (sizeof(*bt));
1038
1039         memset (bt, 0, sizeof(*bt));
1040         bt->mgr = mgr;
1041 #ifdef unix
1042         bt->mem = malloc (2 *mgr->page_size);
1043 #else
1044         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1045 #endif
1046         bt->frame = (BtPage)bt->mem;
1047         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1048         return bt;
1049 }
1050
1051 //  compare two keys, returning > 0, = 0, or < 0
1052 //  as the comparison value
1053
1054 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1055 {
1056 uint len1 = key1->len;
1057 int ans;
1058
1059         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1060                 return ans;
1061
1062         if( len1 > len2 )
1063                 return 1;
1064         if( len1 < len2 )
1065                 return -1;
1066
1067         return 0;
1068 }
1069
1070 //      Buffer Pool mgr
1071
1072 // find segment in pool
1073 // must be called with hashslot idx locked
1074 //      return NULL if not there
1075 //      otherwise return node
1076
1077 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1078 {
1079 BtPool *pool;
1080 uint slot;
1081
1082         // compute start of hash chain in pool
1083
1084         if( slot = bt->mgr->hash[idx] ) 
1085                 pool = bt->mgr->pool + slot;
1086         else
1087                 return NULL;
1088
1089         page_no &= ~bt->mgr->poolmask;
1090
1091         while( pool->basepage != page_no )
1092           if( pool = pool->hashnext )
1093                 continue;
1094           else
1095                 return NULL;
1096
1097         return pool;
1098 }
1099
1100 // add segment to hash table
1101
1102 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1103 {
1104 BtPool *node;
1105 uint slot;
1106
1107         pool->hashprev = pool->hashnext = NULL;
1108         pool->basepage = page_no & ~bt->mgr->poolmask;
1109         pool->pin = CLOCK_bit + 1;
1110
1111         if( slot = bt->mgr->hash[idx] ) {
1112                 node = bt->mgr->pool + slot;
1113                 pool->hashnext = node;
1114                 node->hashprev = pool;
1115         }
1116
1117         bt->mgr->hash[idx] = pool->slot;
1118 }
1119
1120 //  map new buffer pool segment to virtual memory
1121
1122 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1123 {
1124 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1125 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1126 int flag;
1127
1128 #ifdef unix
1129         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1130         pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1131         if( pool->map == MAP_FAILED )
1132                 return bt->err = BTERR_map;
1133
1134 #else
1135         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1136         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1137         if( !pool->hmap )
1138                 return bt->err = BTERR_map;
1139
1140         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1141         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1142         if( !pool->map )
1143                 return bt->err = BTERR_map;
1144 #endif
1145         return bt->err = 0;
1146 }
1147
1148 //      calculate page within pool
1149
1150 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1151 {
1152 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1153 BtPage page;
1154
1155         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1156 //      madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1157         return page;
1158 }
1159
1160 //  release pool pin
1161
1162 void bt_unpinpool (BtPool *pool)
1163 {
1164 #ifdef unix
1165         __sync_fetch_and_add(&pool->pin, -1);
1166 #else
1167         _InterlockedDecrement16 (&pool->pin);
1168 #endif
1169 }
1170
1171 //      find or place requested page in segment-pool
1172 //      return pool table entry, incrementing pin
1173
1174 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1175 {
1176 uint slot, hashidx, idx, victim;
1177 BtPool *pool, *node, *next;
1178
1179         //      lock hash table chain
1180
1181         hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1182         bt_spinwritelock (&bt->mgr->latch[hashidx]);
1183
1184         //      look up in hash table
1185
1186         if( pool = bt_findpool(bt, page_no, hashidx) ) {
1187 #ifdef unix
1188                 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1189                 __sync_fetch_and_add(&pool->pin, 1);
1190 #else
1191                 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1192                 _InterlockedIncrement16 (&pool->pin);
1193 #endif
1194                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1195                 return pool;
1196         }
1197
1198         // allocate a new pool node
1199         // and add to hash table
1200
1201 #ifdef unix
1202         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1203 #else
1204         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1205 #endif
1206
1207         if( ++slot < bt->mgr->poolmax ) {
1208                 pool = bt->mgr->pool + slot;
1209                 pool->slot = slot;
1210
1211                 if( bt_mapsegment(bt, pool, page_no) )
1212                         return NULL;
1213
1214                 bt_linkhash(bt, pool, page_no, hashidx);
1215                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1216                 return pool;
1217         }
1218
1219         // pool table is full
1220         //      find best pool entry to evict
1221
1222 #ifdef unix
1223         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1224 #else
1225         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1226 #endif
1227
1228         while( 1 ) {
1229 #ifdef unix
1230                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1231 #else
1232                 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1233 #endif
1234                 victim %= bt->mgr->poolmax;
1235                 pool = bt->mgr->pool + victim;
1236                 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1237
1238                 if( !victim )
1239                         continue;
1240
1241                 // try to get write lock
1242                 //      skip entry if not obtained
1243
1244                 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1245                         continue;
1246
1247                 //      skip this entry if
1248                 //      page is pinned
1249                 //  or clock bit is set
1250
1251                 if( pool->pin ) {
1252 #ifdef unix
1253                         __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1254 #else
1255                         _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1256 #endif
1257                         bt_spinreleasewrite (&bt->mgr->latch[idx]);
1258                         continue;
1259                 }
1260
1261                 // unlink victim pool node from hash table
1262
1263                 if( node = pool->hashprev )
1264                         node->hashnext = pool->hashnext;
1265                 else if( node = pool->hashnext )
1266                         bt->mgr->hash[idx] = node->slot;
1267                 else
1268                         bt->mgr->hash[idx] = 0;
1269
1270                 if( node = pool->hashnext )
1271                         node->hashprev = pool->hashprev;
1272
1273                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1274
1275                 //      remove old file mapping
1276 #ifdef unix
1277                 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1278 #else
1279 //              FlushViewOfFile(pool->map, 0);
1280                 UnmapViewOfFile(pool->map);
1281                 CloseHandle(pool->hmap);
1282 #endif
1283                 pool->map = NULL;
1284
1285                 //  create new pool mapping
1286                 //  and link into hash table
1287
1288                 if( bt_mapsegment(bt, pool, page_no) )
1289                         return NULL;
1290
1291                 bt_linkhash(bt, pool, page_no, hashidx);
1292                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1293                 return pool;
1294         }
1295 }
1296
1297 // place write, read, or parent lock on requested page_no.
1298
1299 void bt_lockpage(BtLock mode, BtLatchSet *set)
1300 {
1301         switch( mode ) {
1302         case BtLockRead:
1303                 ReadLock (set->readwr);
1304                 break;
1305         case BtLockWrite:
1306                 WriteLock (set->readwr);
1307                 break;
1308         case BtLockAccess:
1309                 ReadLock (set->access);
1310                 break;
1311         case BtLockDelete:
1312                 WriteLock (set->access);
1313                 break;
1314         case BtLockParent:
1315                 WriteLock (set->parent);
1316                 break;
1317         }
1318 }
1319
1320 // remove write, read, or parent lock on requested page
1321
1322 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1323 {
1324         switch( mode ) {
1325         case BtLockRead:
1326                 ReadRelease (set->readwr);
1327                 break;
1328         case BtLockWrite:
1329                 WriteRelease (set->readwr);
1330                 break;
1331         case BtLockAccess:
1332                 ReadRelease (set->access);
1333                 break;
1334         case BtLockDelete:
1335                 WriteRelease (set->access);
1336                 break;
1337         case BtLockParent:
1338                 WriteRelease (set->parent);
1339                 break;
1340         }
1341 }
1342
1343 //      allocate a new page and write page into it
1344
1345 uid bt_newpage(BtDb *bt, BtPage page)
1346 {
1347 BtPageSet set[1];
1348 uid new_page;
1349 int blk;
1350
1351         //      lock allocation page
1352
1353         bt_spinwritelock(bt->mgr->latchmgr->lock);
1354
1355         // use empty chain first
1356         // else allocate empty page
1357
1358         if( new_page = bt_getid(bt->mgr->latchmgr->chain) ) {
1359                 if( set->pool = bt_pinpool (bt, new_page) )
1360                         set->page = bt_page (bt, set->pool, new_page);
1361                 else
1362                         return 0;
1363
1364                 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1365                 bt_unpinpool (set->pool);
1366         } else {
1367                 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1368                 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1369 #ifdef unix
1370                 // if writing first page of pool block, set file length thru last page
1371
1372                 if( (new_page & bt->mgr->poolmask) == 0 )
1373                         ftruncate (bt->mgr->idx, (new_page + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1374 #endif
1375         }
1376 #ifdef unix
1377         // unlock allocation latch
1378
1379         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1380 #endif
1381
1382         //      bring new page into pool and copy page.
1383         //      this will extend the file into the new pages on WIN32.
1384
1385         if( set->pool = bt_pinpool (bt, new_page) )
1386                 set->page = bt_page (bt, set->pool, new_page);
1387         else
1388                 return 0;
1389
1390         memcpy(set->page, page, bt->mgr->page_size);
1391         bt_unpinpool (set->pool);
1392
1393 #ifndef unix
1394         // unlock allocation latch
1395
1396         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1397 #endif
1398         return new_page;
1399 }
1400
1401 //  find slot in page for given key at a given level
1402
1403 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1404 {
1405 uint diff, higher = set->page->cnt, low = 1, slot;
1406 uint good = 0;
1407
1408         //        make stopper key an infinite fence value
1409
1410         if( bt_getid (set->page->right) )
1411                 higher++;
1412         else
1413                 good++;
1414
1415         //      low is the lowest candidate.
1416         //  loop ends when they meet
1417
1418         //  higher is already
1419         //      tested as .ge. the passed key.
1420
1421         while( diff = higher - low ) {
1422                 slot = low + ( diff >> 1 );
1423                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1424                         low = slot + 1;
1425                 else
1426                         higher = slot, good++;
1427         }
1428
1429         //      return zero if key is on right link page
1430
1431         return good ? higher : 0;
1432 }
1433
1434 //  find and load page at given level for given key
1435 //      leave page rd or wr locked as requested
1436
1437 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1438 {
1439 uid page_no = ROOT_page, prevpage = 0;
1440 uint drill = 0xff, slot;
1441 BtLatchSet *prevlatch;
1442 uint mode, prevmode;
1443 BtPool *prevpool;
1444
1445   //  start at root of btree and drill down
1446
1447   do {
1448         // determine lock mode of drill level
1449         mode = (drill == lvl) ? lock : BtLockRead; 
1450
1451         set->latch = bt_pinlatch (bt, page_no);
1452         set->page_no = page_no;
1453
1454         // pin page contents
1455
1456         if( set->pool = bt_pinpool (bt, page_no) )
1457                 set->page = bt_page (bt, set->pool, page_no);
1458         else
1459                 return 0;
1460
1461         // obtain access lock using lock chaining with Access mode
1462
1463         if( page_no > ROOT_page )
1464           bt_lockpage(BtLockAccess, set->latch);
1465
1466         //      release & unpin parent page
1467
1468         if( prevpage ) {
1469           bt_unlockpage(prevmode, prevlatch);
1470           bt_unpinlatch (prevlatch);
1471           bt_unpinpool (prevpool);
1472           prevpage = 0;
1473         }
1474
1475         // obtain read lock using lock chaining
1476
1477         bt_lockpage(mode, set->latch);
1478
1479         if( set->page->free )
1480                 return bt->err = BTERR_struct, 0;
1481
1482         if( page_no > ROOT_page )
1483           bt_unlockpage(BtLockAccess, set->latch);
1484
1485         // re-read and re-lock root after determining actual level of root
1486
1487         if( set->page->lvl != drill) {
1488                 if( set->page_no != ROOT_page )
1489                         return bt->err = BTERR_struct, 0;
1490                         
1491                 drill = set->page->lvl;
1492
1493                 if( lock != BtLockRead && drill == lvl ) {
1494                   bt_unlockpage(mode, set->latch);
1495                   bt_unpinlatch (set->latch);
1496                   bt_unpinpool (set->pool);
1497                   continue;
1498                 }
1499         }
1500
1501         prevpage = set->page_no;
1502         prevlatch = set->latch;
1503         prevpool = set->pool;
1504         prevmode = mode;
1505
1506         //  find key on page at this level
1507         //  and descend to requested level
1508
1509         if( set->page->kill )
1510           goto slideright;
1511
1512         if( slot = bt_findslot (set, key, len) ) {
1513           if( drill == lvl )
1514                 return slot;
1515
1516           while( slotptr(set->page, slot)->dead )
1517                 if( slot++ < set->page->cnt )
1518                         continue;
1519                 else
1520                         goto slideright;
1521
1522           page_no = bt_getid(valptr(set->page, slot)->value);
1523           drill--;
1524           continue;
1525         }
1526
1527         //  or slide right into next page
1528
1529 slideright:
1530         page_no = bt_getid(set->page->right);
1531
1532   } while( page_no );
1533
1534   // return error on end of right chain
1535
1536   bt->err = BTERR_struct;
1537   return 0;     // return error
1538 }
1539
1540 //      return page to free list
1541 //      page must be delete & write locked
1542
1543 void bt_freepage (BtDb *bt, BtPageSet *set)
1544 {
1545         //      lock allocation page
1546
1547         bt_spinwritelock (bt->mgr->latchmgr->lock);
1548
1549         //      store chain
1550         memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1551         bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1552         set->page->free = 1;
1553
1554         // unlock released page
1555
1556         bt_unlockpage (BtLockDelete, set->latch);
1557         bt_unlockpage (BtLockWrite, set->latch);
1558         bt_unpinlatch (set->latch);
1559         bt_unpinpool (set->pool);
1560
1561         // unlock allocation page
1562
1563         bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1564 }
1565
1566 //      a fence key was deleted from a page
1567 //      push new fence value upwards
1568
1569 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1570 {
1571 unsigned char leftkey[256], rightkey[256];
1572 unsigned char value[BtId];
1573 BtKey ptr;
1574 BtVal val;
1575
1576         //      remove the old fence value
1577
1578         ptr = keyptr(set->page, set->page->cnt);
1579         val = valptr(set->page, set->page->cnt);
1580         memcpy (rightkey, ptr, ptr->len + 1);
1581         set->page->garbage += ptr->len + val->len + 2;
1582         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1583
1584         ptr = keyptr(set->page, set->page->cnt);
1585         memcpy (leftkey, ptr, ptr->len + 1);
1586
1587         bt_lockpage (BtLockParent, set->latch);
1588         bt_unlockpage (BtLockWrite, set->latch);
1589
1590         //      insert new (now smaller) fence key
1591
1592         bt_putid (value, set->page_no);
1593
1594         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId) )
1595           return bt->err;
1596
1597         //      now delete old fence key
1598
1599         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1) )
1600                 return bt->err;
1601
1602         bt_unlockpage (BtLockParent, set->latch);
1603         bt_unpinlatch(set->latch);
1604         bt_unpinpool (set->pool);
1605         return 0;
1606 }
1607
1608 //      root has a single child
1609 //      collapse a level from the tree
1610
1611 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1612 {
1613 BtPageSet child[1];
1614 uint idx;
1615
1616   // find the child entry and promote as new root contents
1617
1618   do {
1619         for( idx = 0; idx++ < root->page->cnt; )
1620           if( !slotptr(root->page, idx)->dead )
1621                 break;
1622
1623         child->page_no = bt_getid (valptr(root->page, idx)->value);
1624
1625         child->latch = bt_pinlatch (bt, child->page_no);
1626         bt_lockpage (BtLockDelete, child->latch);
1627         bt_lockpage (BtLockWrite, child->latch);
1628
1629         if( child->pool = bt_pinpool (bt, child->page_no) )
1630                 child->page = bt_page (bt, child->pool, child->page_no);
1631         else
1632                 return bt->err;
1633
1634         memcpy (root->page, child->page, bt->mgr->page_size);
1635         bt_freepage (bt, child);
1636
1637   } while( root->page->lvl > 1 && root->page->act == 1 );
1638
1639   bt_unlockpage (BtLockWrite, root->latch);
1640   bt_unpinlatch (root->latch);
1641   bt_unpinpool (root->pool);
1642   return 0;
1643 }
1644
1645 //  find and delete key on page by marking delete flag bit
1646 //  if page becomes empty, delete it from the btree
1647
1648 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1649 {
1650 unsigned char lowerfence[256], higherfence[256];
1651 uint slot, idx, dirty = 0, fence, found;
1652 BtPageSet set[1], right[1];
1653 unsigned char value[BtId];
1654 BtKey ptr, tst;
1655 BtVal val;
1656
1657         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1658                 ptr = keyptr(set->page, slot);
1659         else
1660                 return bt->err;
1661
1662         // if librarian slot, advance to real slot
1663
1664         if( slotptr(set->page, slot)->librarian )
1665                 ptr = keyptr(set->page, ++slot);
1666
1667         //      are we deleting a fence slot?
1668
1669         fence = slot == set->page->cnt;
1670
1671         // if key is found delete it, otherwise ignore request
1672
1673         if( found = !keycmp (ptr, key, len) )
1674           if( found = slotptr(set->page, slot)->dead == 0 ) {
1675                 val = valptr(set->page,slot);
1676                 dirty = slotptr(set->page, slot)->dead = 1;
1677                 set->page->garbage += ptr->len + val->len + 2;
1678                 set->page->act--;
1679
1680                 // collapse empty slots beneath our slot
1681
1682                 while( idx = set->page->cnt - 1 )
1683                   if( slotptr(set->page, idx)->dead ) {
1684                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1685                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1686                   } else
1687                         break;
1688           }
1689
1690         //      did we delete a fence key in an upper level?
1691
1692         if( dirty && lvl && set->page->act && fence )
1693           if( bt_fixfence (bt, set, lvl) )
1694                 return bt->err;
1695           else
1696                 return bt->found = found, 0;
1697
1698         //      is this a collapsed root?
1699
1700         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1701           if( bt_collapseroot (bt, set) )
1702                 return bt->err;
1703           else
1704                 return bt->found = found, 0;
1705
1706         //      return if page is not empty
1707
1708         if( set->page->act ) {
1709                 bt_unlockpage(BtLockWrite, set->latch);
1710                 bt_unpinlatch (set->latch);
1711                 bt_unpinpool (set->pool);
1712                 return bt->found = found, 0;
1713         }
1714
1715         //      cache copy of fence key
1716         //      to post in parent
1717
1718         ptr = keyptr(set->page, set->page->cnt);
1719         memcpy (lowerfence, ptr, ptr->len + 1);
1720
1721         //      obtain lock on right page
1722
1723         right->page_no = bt_getid(set->page->right);
1724         right->latch = bt_pinlatch (bt, right->page_no);
1725         bt_lockpage (BtLockWrite, right->latch);
1726
1727         // pin page contents
1728
1729         if( right->pool = bt_pinpool (bt, right->page_no) )
1730                 right->page = bt_page (bt, right->pool, right->page_no);
1731         else
1732                 return 0;
1733
1734         if( right->page->kill )
1735                 return bt->err = BTERR_struct;
1736
1737         // pull contents of right peer into our empty page
1738
1739         memcpy (set->page, right->page, bt->mgr->page_size);
1740
1741         // cache copy of key to update
1742
1743         ptr = keyptr(right->page, right->page->cnt);
1744         memcpy (higherfence, ptr, ptr->len + 1);
1745
1746         // mark right page deleted and point it to left page
1747         //      until we can post parent updates
1748
1749         bt_putid (right->page->right, set->page_no);
1750         right->page->kill = 1;
1751
1752         bt_lockpage (BtLockParent, right->latch);
1753         bt_unlockpage (BtLockWrite, right->latch);
1754
1755         bt_lockpage (BtLockParent, set->latch);
1756         bt_unlockpage (BtLockWrite, set->latch);
1757
1758         // redirect higher key directly to our new node contents
1759
1760         bt_putid (value, set->page_no);
1761
1762         if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId) )
1763           return bt->err;
1764
1765         //      delete old lower key to our node
1766
1767         if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1) )
1768           return bt->err;
1769
1770         //      obtain delete and write locks to right node
1771
1772         bt_unlockpage (BtLockParent, right->latch);
1773         bt_lockpage (BtLockDelete, right->latch);
1774         bt_lockpage (BtLockWrite, right->latch);
1775         bt_freepage (bt, right);
1776
1777         bt_unlockpage (BtLockParent, set->latch);
1778         bt_unpinlatch (set->latch);
1779         bt_unpinpool (set->pool);
1780         bt->found = found;
1781         return 0;
1782 }
1783
1784 //      find key in leaf level and return number of value bytes
1785 //      or (-1) if not found
1786
1787 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1788 {
1789 BtPageSet set[1];
1790 uint  slot;
1791 BtKey ptr;
1792 BtVal val;
1793 int ret;
1794
1795         if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1796                 ptr = keyptr(set->page, slot);
1797         else
1798                 return 0;
1799
1800         //      skip librarian slot place holder
1801
1802         if( slotptr(set->page, slot)->librarian )
1803                 ptr = keyptr(set->page, ++slot);
1804
1805         // if key exists, return >= 0 value bytes copied
1806         //      otherwise return (-1)
1807
1808         if( !slotptr(set->page, slot)->dead && !keycmp (ptr, key, keylen) ) {
1809                 val = valptr (set->page,slot);
1810                 if( valmax > val->len )
1811                         valmax = val->len;
1812                 memcpy (value, val->value, valmax);
1813                 ret = valmax;
1814         } else
1815                 ret = -1;
1816
1817         bt_unlockpage (BtLockRead, set->latch);
1818         bt_unpinlatch (set->latch);
1819         bt_unpinpool (set->pool);
1820         return ret;
1821 }
1822
1823 //      check page for space available,
1824 //      clean if necessary and return
1825 //      0 - page needs splitting
1826 //      >0  new slot value
1827
1828 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1829 {
1830 uint nxt = bt->mgr->page_size;
1831 uint cnt = 0, idx = 0;
1832 uint max = page->cnt;
1833 uint newslot = max;
1834 BtKey key;
1835 BtVal val;
1836
1837         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
1838                 return slot;
1839
1840         //      skip cleanup and proceed to split
1841         //      if there's not enough garbage
1842
1843         if( page->garbage + page->min < 2 * page->act * sizeof(BtSlot) + sizeof(*page) + nxt / 3 )
1844                 return 0;
1845
1846         memcpy (bt->frame, page, bt->mgr->page_size);
1847
1848         // skip page info and set rest of page to zero
1849
1850         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1851         page->garbage = 0;
1852         page->act = 0;
1853
1854         // clean up page first by
1855         // removing deleted keys
1856
1857         while( cnt++ < max ) {
1858                 if( cnt == slot )
1859                         newslot = idx + 2;
1860                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1861                         continue;
1862
1863                 // copy the value across
1864
1865                 val = valptr(bt->frame, cnt);
1866                 nxt -= val->len + 1;
1867                 ((unsigned char *)page)[nxt] = val->len;
1868                 memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
1869
1870                 // copy the key across
1871
1872                 key = keyptr(bt->frame, cnt);
1873                 nxt -= key->len + 1;
1874                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1875
1876                 // make a librarian slot
1877
1878                 slotptr(page, ++idx)->off = nxt;
1879                 slotptr(page, idx)->librarian = 1;
1880                 slotptr(page, idx)->dead = 1;
1881
1882                 // set up the slot
1883
1884                 slotptr(page, ++idx)->off = nxt;
1885
1886                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1887                         page->act++;
1888         }
1889
1890         page->min = nxt;
1891         page->cnt = idx;
1892
1893         //      see if page has enough space now, or does it need splitting?
1894
1895         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
1896                 return newslot;
1897
1898         return 0;
1899 }
1900
1901 // split the root and raise the height of the btree
1902
1903 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, uid page_no2)
1904 {
1905 uint nxt = bt->mgr->page_size;
1906 unsigned char value[BtId];
1907 uid left;
1908
1909         //  Obtain an empty page to use, and copy the current
1910         //  root contents into it, e.g. lower keys
1911
1912         if( !(left = bt_newpage(bt, root->page)) )
1913                 return bt->err;
1914
1915         // preserve the page info at the bottom
1916         // of higher keys and set rest to zero
1917
1918         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1919
1920         // insert lower keys page fence key on newroot page as first key
1921
1922         nxt -= BtId + 1;
1923         bt_putid (value, left);
1924         ((unsigned char *)root->page)[nxt] = BtId;
1925         memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
1926
1927         nxt -= *leftkey + 1;
1928         memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
1929         slotptr(root->page, 1)->off = nxt;
1930         
1931         // insert stopper key on newroot page
1932         // and increase the root height
1933
1934         nxt -= 3 + BtId + 1;
1935         ((unsigned char *)root->page)[nxt] = 2;
1936         ((unsigned char *)root->page)[nxt+1] = 0xff;
1937         ((unsigned char *)root->page)[nxt+2] = 0xff;
1938
1939         bt_putid (value, page_no2);
1940         ((unsigned char *)root->page)[nxt+3] = BtId;
1941         memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
1942         slotptr(root->page, 2)->off = nxt;
1943
1944         bt_putid(root->page->right, 0);
1945         root->page->min = nxt;          // reset lowest used offset and key count
1946         root->page->cnt = 2;
1947         root->page->act = 2;
1948         root->page->lvl++;
1949
1950         // release and unpin root
1951
1952         bt_unlockpage(BtLockWrite, root->latch);
1953         bt_unpinlatch (root->latch);
1954         bt_unpinpool (root->pool);
1955         return 0;
1956 }
1957
1958 //  split already locked full node
1959 //      return unlocked.
1960
1961 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1962 {
1963 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1964 unsigned char fencekey[256], rightkey[256];
1965 unsigned char value[BtId];
1966 uint lvl = set->page->lvl;
1967 BtPageSet right[1];
1968 uint prev;
1969 BtKey key;
1970 BtVal val;
1971
1972         //  split higher half of keys to bt->frame
1973
1974         memset (bt->frame, 0, bt->mgr->page_size);
1975         max = set->page->cnt;
1976         cnt = max / 2;
1977         idx = 0;
1978
1979         while( cnt++ < max ) {
1980                 if( slotptr(set->page, cnt)->dead && cnt < max )
1981                         continue;
1982                 val = valptr(set->page, cnt);
1983                 nxt -= val->len + 1;
1984                 ((unsigned char *)bt->frame)[nxt] = val->len;
1985                 memcpy ((unsigned char *)bt->frame + nxt + 1, val->value, val->len);
1986
1987                 key = keyptr(set->page, cnt);
1988                 nxt -= key->len + 1;
1989                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1990
1991                 //      add librarian slot
1992
1993                 slotptr(bt->frame, ++idx)->off = nxt;
1994                 slotptr(bt->frame, idx)->librarian = 1;
1995                 slotptr(bt->frame, idx)->dead = 1;
1996
1997                 //  add actual slot
1998
1999                 slotptr(bt->frame, ++idx)->off = nxt;
2000
2001                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2002                         bt->frame->act++;
2003         }
2004
2005         // remember existing fence key for new page to the right
2006
2007         memcpy (rightkey, key, key->len + 1);
2008
2009         bt->frame->bits = bt->mgr->page_bits;
2010         bt->frame->min = nxt;
2011         bt->frame->cnt = idx;
2012         bt->frame->lvl = lvl;
2013
2014         // link right node
2015
2016         if( set->page_no > ROOT_page )
2017                 memcpy (bt->frame->right, set->page->right, BtId);
2018
2019         //      get new free page and write higher keys to it.
2020
2021         if( !(right->page_no = bt_newpage(bt, bt->frame)) )
2022                 return bt->err;
2023
2024         //      update lower keys to continue in old page
2025
2026         memcpy (bt->frame, set->page, bt->mgr->page_size);
2027         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2028         nxt = bt->mgr->page_size;
2029         set->page->garbage = 0;
2030         set->page->act = 0;
2031         max /= 2;
2032         cnt = 0;
2033         idx = 0;
2034
2035         if( slotptr(bt->frame, max)->librarian )
2036                 max--;
2037
2038         //  assemble page of smaller keys
2039
2040         while( cnt++ < max ) {
2041                 if( slotptr(bt->frame, cnt)->dead )
2042                         continue;
2043                 val = valptr(bt->frame, cnt);
2044                 nxt -= val->len + 1;
2045                 ((unsigned char *)set->page)[nxt] = val->len;
2046                 memcpy ((unsigned char *)set->page + nxt + 1, val->value, val->len);
2047
2048                 key = keyptr(bt->frame, cnt);
2049                 nxt -= key->len + 1;
2050                 memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
2051
2052                 //      add librarian slot
2053
2054                 slotptr(set->page, ++idx)->off = nxt;
2055                 slotptr(set->page, idx)->librarian = 1;
2056                 slotptr(set->page, idx)->dead = 1;
2057
2058                 //      add actual slot
2059
2060                 slotptr(set->page, ++idx)->off = nxt;
2061                 set->page->act++;
2062         }
2063
2064         // remember fence key for smaller page
2065
2066         memcpy(fencekey, key, key->len + 1);
2067
2068         bt_putid(set->page->right, right->page_no);
2069         set->page->min = nxt;
2070         set->page->cnt = idx;
2071
2072         // if current page is the root page, split it
2073
2074         if( set->page_no == ROOT_page )
2075                 return bt_splitroot (bt, set, fencekey, right->page_no);
2076
2077         // insert new fences in their parent pages
2078
2079         right->latch = bt_pinlatch (bt, right->page_no);
2080         bt_lockpage (BtLockParent, right->latch);
2081
2082         bt_lockpage (BtLockParent, set->latch);
2083         bt_unlockpage (BtLockWrite, set->latch);
2084
2085         // insert new fence for reformulated left block of smaller keys
2086
2087         bt_putid (value, set->page_no);
2088
2089         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId) )
2090                 return bt->err;
2091
2092         // switch fence for right block of larger keys to new right page
2093
2094         bt_putid (value, right->page_no);
2095
2096         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId) )
2097                 return bt->err;
2098
2099         bt_unlockpage (BtLockParent, set->latch);
2100         bt_unpinlatch (set->latch);
2101         bt_unpinpool (set->pool);
2102
2103         bt_unlockpage (BtLockParent, right->latch);
2104         bt_unpinlatch (right->latch);
2105         return 0;
2106 }
2107 //  Insert new key into the btree at given level.
2108
2109 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen)
2110 {
2111 BtPageSet set[1];
2112 uint slot, idx;
2113 uint reuse;
2114 BtKey ptr;
2115 BtKey tst;
2116 BtVal val;
2117
2118         while( 1 ) {
2119                 if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
2120                         ptr = keyptr(set->page, slot);
2121                 else {
2122                         if( !bt->err )
2123                                 bt->err = BTERR_ovflw;
2124                         return bt->err;
2125                 }
2126
2127                 // if librarian slot == found slot, advance to real slot
2128
2129                 if( slotptr(set->page, slot)->librarian )
2130                   if( !keycmp (ptr, key, keylen) )
2131                         ptr = keyptr(set->page, ++slot);
2132
2133                 // if key already exists, update value and return
2134
2135                 if( reuse = !keycmp (ptr, key, keylen) )
2136                   if( val = valptr(set->page, slot), val->len >= vallen ) {
2137                         if( slotptr(set->page, slot)->dead )
2138                                 set->page->act++;
2139                         set->page->garbage += val->len - vallen;
2140                         slotptr(set->page, slot)->dead = 0;
2141                         val->len = vallen;
2142                         memcpy (val->value, value, vallen);
2143                         bt_unlockpage(BtLockWrite, set->latch);
2144                         bt_unpinlatch (set->latch);
2145                         bt_unpinpool (set->pool);
2146                         return 0;
2147                   } else {
2148                         if( !slotptr(set->page, slot)->dead ) {
2149                                 set->page->garbage += val->len + ptr->len + 2;
2150                                 set->page->act--;
2151                         }
2152                         slotptr(set->page, slot)->dead = 1;
2153                   }
2154
2155                 //      if found slot > desired slot and previous slot
2156                 //      is a librarian slot, use it
2157
2158                 if( !reuse && slot > 1 )
2159                   if( slotptr(set->page, slot-1)->librarian )
2160                         slot--;
2161
2162                 // check if page has enough space
2163
2164                 if( slot = bt_cleanpage (bt, set->page, keylen, slot, vallen) )
2165                         break;
2166
2167                 if( bt_splitpage (bt, set) )
2168                         return bt->err;
2169         }
2170
2171         // calculate next available slot and copy key into page
2172
2173         set->page->min -= vallen + 1; // reset lowest used offset
2174         ((unsigned char *)set->page)[set->page->min] = vallen;
2175         memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
2176
2177         set->page->min -= keylen + 1; // reset lowest used offset
2178         ((unsigned char *)set->page)[set->page->min] = keylen;
2179         memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
2180
2181         for( idx = slot; idx < set->page->cnt; idx++ )
2182           if( slotptr(set->page, idx)->dead )
2183                 break;
2184
2185         // now insert key into array before slot
2186
2187         if( !reuse && idx == set->page->cnt )
2188                 idx++, set->page->cnt++;
2189
2190         set->page->act++;
2191
2192         while( idx > slot )
2193                 *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
2194
2195         slotptr(set->page, slot)->off = set->page->min;
2196         slotptr(set->page, slot)->librarian = 0;
2197         slotptr(set->page, slot)->dead = 0;
2198
2199         bt_unlockpage (BtLockWrite, set->latch);
2200         bt_unpinlatch (set->latch);
2201         bt_unpinpool (set->pool);
2202         return 0;
2203 }
2204
2205 //  cache page of keys into cursor and return starting slot for given key
2206
2207 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2208 {
2209 BtPageSet set[1];
2210 uint slot;
2211
2212         // cache page for retrieval
2213
2214         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2215           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2216         else
2217           return 0;
2218
2219         bt->cursor_page = set->page_no;
2220
2221         bt_unlockpage(BtLockRead, set->latch);
2222         bt_unpinlatch (set->latch);
2223         bt_unpinpool (set->pool);
2224         return slot;
2225 }
2226
2227 //  return next slot for cursor page
2228 //  or slide cursor right into next page
2229
2230 uint bt_nextkey (BtDb *bt, uint slot)
2231 {
2232 BtPageSet set[1];
2233 uid right;
2234
2235   do {
2236         right = bt_getid(bt->cursor->right);
2237
2238         while( slot++ < bt->cursor->cnt )
2239           if( slotptr(bt->cursor,slot)->dead )
2240                 continue;
2241           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2242                 return slot;
2243           else
2244                 break;
2245
2246         if( !right )
2247                 break;
2248
2249         bt->cursor_page = right;
2250
2251         if( set->pool = bt_pinpool (bt, right) )
2252                 set->page = bt_page (bt, set->pool, right);
2253         else
2254                 return 0;
2255
2256         set->latch = bt_pinlatch (bt, right);
2257     bt_lockpage(BtLockRead, set->latch);
2258
2259         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2260
2261         bt_unlockpage(BtLockRead, set->latch);
2262         bt_unpinlatch (set->latch);
2263         bt_unpinpool (set->pool);
2264         slot = 0;
2265
2266   } while( 1 );
2267
2268   return bt->err = 0;
2269 }
2270
2271 BtKey bt_key(BtDb *bt, uint slot)
2272 {
2273         return keyptr(bt->cursor, slot);
2274 }
2275
2276 BtVal bt_val(BtDb *bt, uint slot)
2277 {
2278         return valptr(bt->cursor,slot);
2279 }
2280
2281 #ifdef STANDALONE
2282
2283 #ifndef unix
2284 double getCpuTime(int type)
2285 {
2286 FILETIME crtime[1];
2287 FILETIME xittime[1];
2288 FILETIME systime[1];
2289 FILETIME usrtime[1];
2290 SYSTEMTIME timeconv[1];
2291 double ans = 0;
2292
2293         memset (timeconv, 0, sizeof(SYSTEMTIME));
2294
2295         switch( type ) {
2296         case 0:
2297                 GetSystemTimeAsFileTime (xittime);
2298                 FileTimeToSystemTime (xittime, timeconv);
2299                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2300                 break;
2301         case 1:
2302                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2303                 FileTimeToSystemTime (usrtime, timeconv);
2304                 break;
2305         case 2:
2306                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2307                 FileTimeToSystemTime (systime, timeconv);
2308                 break;
2309         }
2310
2311         ans += (double)timeconv->wHour * 3600;
2312         ans += (double)timeconv->wMinute * 60;
2313         ans += (double)timeconv->wSecond;
2314         ans += (double)timeconv->wMilliseconds / 1000;
2315         return ans;
2316 }
2317 #else
2318 #include <time.h>
2319 #include <sys/resource.h>
2320
2321 double getCpuTime(int type)
2322 {
2323 struct rusage used[1];
2324 struct timeval tv[1];
2325
2326         switch( type ) {
2327         case 0:
2328                 gettimeofday(tv, NULL);
2329                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2330
2331         case 1:
2332                 getrusage(RUSAGE_SELF, used);
2333                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2334
2335         case 2:
2336                 getrusage(RUSAGE_SELF, used);
2337                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2338         }
2339
2340         return 0;
2341 }
2342 #endif
2343
2344 uint bt_latchaudit (BtDb *bt)
2345 {
2346 ushort idx, hashidx;
2347 uid next, page_no;
2348 BtLatchSet *latch;
2349 uint cnt = 0;
2350 BtKey ptr;
2351
2352 #ifdef unix
2353         posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2354 #endif
2355         if( *(ushort *)(bt->mgr->latchmgr->lock) )
2356                 fprintf(stderr, "Alloc page locked\n");
2357         *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2358
2359         for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2360                 latch = bt->mgr->latchsets + idx;
2361                 if( *latch->readwr->rin & MASK )
2362                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2363                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2364
2365                 if( *latch->access->rin & MASK )
2366                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2367                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2368
2369                 if( *latch->parent->rin & MASK )
2370                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2371                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2372
2373                 if( latch->pin ) {
2374                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2375                         latch->pin = 0;
2376                 }
2377         }
2378
2379         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2380           if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2381                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2382
2383           *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2384
2385           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2386                 latch = bt->mgr->latchsets + idx;
2387                 if( *(ushort *)latch->busy )
2388                         fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2389                 *(ushort *)latch->busy = 0;
2390                 if( latch->pin )
2391                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2392           } while( idx = latch->next );
2393         }
2394
2395         next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2396         page_no = LEAF_page;
2397
2398         while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2399         off64_t off = page_no << bt->mgr->page_bits;
2400 #ifdef unix
2401                 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2402 #else
2403                 DWORD amt[1];
2404
2405                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2406
2407                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2408                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2409
2410                   if( *amt <  bt->mgr->page_size )
2411                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2412 #endif
2413                 if( !bt->frame->free ) {
2414                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2415                   ptr = keyptr(bt->frame, idx+1);
2416                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2417                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2418                  }
2419                  if( !bt->frame->lvl )
2420                         cnt += bt->frame->act;
2421                 }
2422                 if( page_no > LEAF_page )
2423                         next = page_no + 1;
2424                 page_no = next;
2425         }
2426         return cnt - 1;
2427 }
2428
2429 typedef struct {
2430         char idx;
2431         char *type;
2432         char *infile;
2433         BtMgr *mgr;
2434         int num;
2435 } ThreadArg;
2436
2437 //  standalone program to index file of keys
2438 //  then list them onto std-out
2439
2440 #ifdef unix
2441 void *index_file (void *arg)
2442 #else
2443 uint __stdcall index_file (void *arg)
2444 #endif
2445 {
2446 int line = 0, found = 0, cnt = 0;
2447 uid next, page_no = LEAF_page;  // start on first page of leaves
2448 unsigned char key[256];
2449 ThreadArg *args = arg;
2450 int ch, len = 0, slot;
2451 BtPageSet set[1];
2452 BtKey ptr;
2453 BtDb *bt;
2454 FILE *in;
2455
2456         bt = bt_open (args->mgr);
2457
2458         switch(args->type[0] | 0x20)
2459         {
2460         case 'a':
2461                 fprintf(stderr, "started latch mgr audit\n");
2462                 cnt = bt_latchaudit (bt);
2463                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2464                 break;
2465
2466         case 'p':
2467                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2468                 if( in = fopen (args->infile, "rb") )
2469                   while( ch = getc(in), ch != EOF )
2470                         if( ch == '\n' )
2471                         {
2472                           line++;
2473
2474                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10) )
2475                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2476                           len = 0;
2477                         }
2478                         else if( len < 255 )
2479                                 key[len++] = ch;
2480                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2481                 break;
2482
2483         case 'w':
2484                 fprintf(stderr, "started indexing for %s\n", args->infile);
2485                 if( in = fopen (args->infile, "rb") )
2486                   while( ch = getc(in), ch != EOF )
2487                         if( ch == '\n' )
2488                         {
2489                           line++;
2490
2491                           if( args->num == 1 )
2492                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2493
2494                           else if( args->num )
2495                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2496
2497                           if( bt_insertkey (bt, key, len, 0, NULL, 0) )
2498                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2499                           len = 0;
2500                         }
2501                         else if( len < 255 )
2502                                 key[len++] = ch;
2503                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2504                 break;
2505
2506         case 'd':
2507                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2508                 if( in = fopen (args->infile, "rb") )
2509                   while( ch = getc(in), ch != EOF )
2510                         if( ch == '\n' )
2511                         {
2512                           line++;
2513                           if( args->num == 1 )
2514                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2515
2516                           else if( args->num )
2517                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2518
2519                           if( bt_deletekey (bt, key, len, 0) )
2520                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2521                           len = 0;
2522                         }
2523                         else if( len < 255 )
2524                                 key[len++] = ch;
2525                 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
2526                 break;
2527
2528         case 'f':
2529                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2530                 if( in = fopen (args->infile, "rb") )
2531                   while( ch = getc(in), ch != EOF )
2532                         if( ch == '\n' )
2533                         {
2534                           line++;
2535                           if( args->num == 1 )
2536                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2537
2538                           else if( args->num )
2539                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2540
2541                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2542                                 found++;
2543                           else if( bt->err )
2544                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2545                           len = 0;
2546                         }
2547                         else if( len < 255 )
2548                                 key[len++] = ch;
2549                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2550                 break;
2551
2552         case 's':
2553                 fprintf(stderr, "started scanning\n");
2554                 do {
2555                         if( set->pool = bt_pinpool (bt, page_no) )
2556                                 set->page = bt_page (bt, set->pool, page_no);
2557                         else
2558                                 break;
2559                         set->latch = bt_pinlatch (bt, page_no);
2560                         bt_lockpage (BtLockRead, set->latch);
2561                         next = bt_getid (set->page->right);
2562                         cnt += set->page->act;
2563
2564                         for( slot = 0; slot++ < set->page->cnt; )
2565                          if( next || slot < set->page->cnt )
2566                           if( !slotptr(set->page, slot)->dead ) {
2567                                 ptr = keyptr(set->page, slot);
2568                                 fwrite (ptr->key, ptr->len, 1, stdout);
2569                                 fputc ('\n', stdout);
2570                           }
2571
2572                         bt_unlockpage (BtLockRead, set->latch);
2573                         bt_unpinlatch (set->latch);
2574                         bt_unpinpool (set->pool);
2575                 } while( page_no = next );
2576
2577                 cnt--;  // remove stopper key
2578                 fprintf(stderr, " Total keys read %d\n", cnt);
2579                 break;
2580
2581         case 'c':
2582 #ifdef unix
2583                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2584 #endif
2585                 fprintf(stderr, "started counting\n");
2586                 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2587                 page_no = LEAF_page;
2588
2589                 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2590                 uid off = page_no << bt->mgr->page_bits;
2591 #ifdef unix
2592                   pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2593 #else
2594                 DWORD amt[1];
2595
2596                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2597
2598                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2599                         return bt->err = BTERR_map;
2600
2601                   if( *amt <  bt->mgr->page_size )
2602                         return bt->err = BTERR_map;
2603 #endif
2604                         if( !bt->frame->free && !bt->frame->lvl )
2605                                 cnt += bt->frame->act;
2606                         if( page_no > LEAF_page )
2607                                 next = page_no + 1;
2608                         page_no = next;
2609                 }
2610                 
2611                 cnt--;  // remove stopper key
2612                 fprintf(stderr, " Total keys read %d\n", cnt);
2613                 break;
2614         }
2615
2616         bt_close (bt);
2617 #ifdef unix
2618         return NULL;
2619 #else
2620         return 0;
2621 #endif
2622 }
2623
2624 typedef struct timeval timer;
2625
2626 int main (int argc, char **argv)
2627 {
2628 int idx, cnt, len, slot, err;
2629 int segsize, bits = 16;
2630 double start, stop;
2631 #ifdef unix
2632 pthread_t *threads;
2633 #else
2634 HANDLE *threads;
2635 #endif
2636 ThreadArg *args;
2637 uint poolsize = 0;
2638 float elapsed;
2639 int num = 0;
2640 char key[1];
2641 BtMgr *mgr;
2642 BtKey ptr;
2643 BtDb *bt;
2644
2645         if( argc < 3 ) {
2646                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2647                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2648                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2649                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2650                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2651                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2652                 exit(0);
2653         }
2654
2655         start = getCpuTime(0);
2656
2657         if( argc > 3 )
2658                 bits = atoi(argv[3]);
2659
2660         if( argc > 4 )
2661                 poolsize = atoi(argv[4]);
2662
2663         if( !poolsize )
2664                 fprintf (stderr, "Warning: no mapped_pool\n");
2665
2666         if( poolsize > 65535 )
2667                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2668
2669         if( argc > 5 )
2670                 segsize = atoi(argv[5]);
2671         else
2672                 segsize = 4;    // 16 pages per mmap segment
2673
2674         if( argc > 6 )
2675                 num = atoi(argv[6]);
2676
2677         cnt = argc - 7;
2678 #ifdef unix
2679         threads = malloc (cnt * sizeof(pthread_t));
2680 #else
2681         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2682 #endif
2683         args = malloc (cnt * sizeof(ThreadArg));
2684
2685         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2686
2687         if( !mgr ) {
2688                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2689                 exit (1);
2690         }
2691
2692         //      fire off threads
2693
2694         for( idx = 0; idx < cnt; idx++ ) {
2695                 args[idx].infile = argv[idx + 7];
2696                 args[idx].type = argv[2];
2697                 args[idx].mgr = mgr;
2698                 args[idx].num = num;
2699                 args[idx].idx = idx;
2700 #ifdef unix
2701                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2702                         fprintf(stderr, "Error creating thread %d\n", err);
2703 #else
2704                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2705 #endif
2706         }
2707
2708         //      wait for termination
2709
2710 #ifdef unix
2711         for( idx = 0; idx < cnt; idx++ )
2712                 pthread_join (threads[idx], NULL);
2713 #else
2714         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2715
2716         for( idx = 0; idx < cnt; idx++ )
2717                 CloseHandle(threads[idx]);
2718
2719 #endif
2720         elapsed = getCpuTime(0) - start;
2721         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2722         elapsed = getCpuTime(1);
2723         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2724         elapsed = getCpuTime(2);
2725         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2726
2727         bt_mgrclose (mgr);
2728 }
2729
2730 #endif  //STANDALONE