]> pd.if.org Git - btree/blob - threadskv3.c
bc7db88a63c518a44a96d6f80d168695fee2fac8
[btree] / threadskv3.c
1 // btree version threadskv3 sched_yield version
2 //      with reworked bt_deletekey code
3 //      and phase-fair reader writer lock
4 //      and librarian page split code
5 // 02 SEP 2014
6
7 // author: karl malbrain, malbrain@cal.berkeley.edu
8
9 /*
10 This work, including the source code, documentation
11 and related data, is placed into the public domain.
12
13 The orginal author is Karl Malbrain.
14
15 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
16 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
17 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
18 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
19 RESULTING FROM THE USE, MODIFICATION, OR
20 REDISTRIBUTION OF THIS SOFTWARE.
21 */
22
23 // Please see the project home page for documentation
24 // code.google.com/p/high-concurrency-btree
25
26 #define _FILE_OFFSET_BITS 64
27 #define _LARGEFILE64_SOURCE
28
29 #ifdef linux
30 #define _GNU_SOURCE
31 #endif
32
33 #ifdef unix
34 #include <unistd.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <fcntl.h>
38 #include <sys/time.h>
39 #include <sys/mman.h>
40 #include <errno.h>
41 #include <pthread.h>
42 #else
43 #define WIN32_LEAN_AND_MEAN
44 #include <windows.h>
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <time.h>
48 #include <fcntl.h>
49 #include <process.h>
50 #include <intrin.h>
51 #endif
52
53 #include <memory.h>
54 #include <string.h>
55 #include <stddef.h>
56
57 typedef unsigned long long      uid;
58
59 #ifndef unix
60 typedef unsigned long long      off64_t;
61 typedef unsigned short          ushort;
62 typedef unsigned int            uint;
63 #endif
64
65 #define BT_latchtable   128                                     // number of latch manager slots
66
67 #define BT_ro 0x6f72    // ro
68 #define BT_rw 0x7772    // rw
69
70 #define BT_maxbits              24                                      // maximum page size in bits
71 #define BT_minbits              9                                       // minimum page size in bits
72 #define BT_minpage              (1 << BT_minbits)       // minimum page size
73 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
74
75 /*
76 There are five lock types for each node in three independent sets: 
77 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
78 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
79 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
80 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
81 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
82 */
83
84 typedef enum{
85         BtLockAccess,
86         BtLockDelete,
87         BtLockRead,
88         BtLockWrite,
89         BtLockParent
90 } BtLock;
91
92 //      definition for phase-fair reader/writer lock implementation
93
94 typedef struct {
95         ushort rin[1];
96         ushort rout[1];
97         ushort ticket[1];
98         ushort serving[1];
99 } RWLock;
100
101 #define PHID 0x1
102 #define PRES 0x2
103 #define MASK 0x3
104 #define RINC 0x4
105
106 //      definition for spin latch implementation
107
108 // exclusive is set for write access
109 // share is count of read accessors
110 // grant write lock when share == 0
111
112 volatile typedef struct {
113         ushort exclusive:1;
114         ushort pending:1;
115         ushort share:14;
116 } BtSpinLatch;
117
118 #define XCL 1
119 #define PEND 2
120 #define BOTH 3
121 #define SHARE 4
122
123 //  hash table entries
124
125 typedef struct {
126         BtSpinLatch latch[1];
127         volatile ushort slot;           // Latch table entry at head of chain
128 } BtHashEntry;
129
130 //      latch manager table structure
131
132 typedef struct {
133         RWLock readwr[1];               // read/write page lock
134         RWLock access[1];               // Access Intent/Page delete
135         RWLock parent[1];               // Posting of fence key in parent
136         BtSpinLatch busy[1];            // slot is being moved between chains
137         volatile ushort next;           // next entry in hash table chain
138         volatile ushort prev;           // prev entry in hash table chain
139         volatile ushort pin;            // number of outstanding locks
140         volatile ushort hash;           // hash slot entry is under
141         volatile uid page_no;           // latch set page number
142 } BtLatchSet;
143
144 //      Define the length of the page and key pointers
145
146 #define BtId 6
147
148 //      Page key slot definition.
149
150 //      If BT_maxbits is 15 or less, you can save 2 bytes
151 //      for each key stored by making the two uints
152 //      into ushorts.
153
154 //      Keys are marked dead, but remain on the page until
155 //      it cleanup is called. The fence key (highest key) for
156 //      the page is always present, even after cleanup.
157
158 typedef struct {
159         uint off:BT_maxbits;            // page offset for key start
160         uint librarian:1;                       // set for librarian slot
161         uint dead:1;                            // set for deleted key
162 } BtSlot;
163
164 //      The key structure occupies space at the upper end of
165 //      each page.  It's a length byte followed by the key
166 //      bytes.
167
168 typedef struct {
169         unsigned char len;
170         unsigned char key[1];
171 } *BtKey;
172
173 //      the value structure also occupies space at the upper
174 //      end of the page.
175
176 typedef struct {
177         unsigned char len;
178         unsigned char value[1];
179 } *BtVal;
180
181 //      The first part of an index page.
182 //      It is immediately followed
183 //      by the BtSlot array of keys.
184
185 typedef struct BtPage_ {
186         uint cnt;                                       // count of keys in page
187         uint act;                                       // count of active keys
188         uint min;                                       // next key offset
189         uint garbage;                           // page garbage in bytes
190         unsigned char bits:7;           // page size in bits
191         unsigned char free:1;           // page is on free chain
192         unsigned char lvl:7;            // level of page
193         unsigned char kill:1;           // page is being deleted
194         unsigned char right[BtId];      // page number to right
195 } *BtPage;
196
197 //      The memory mapping pool table buffer manager entry
198
199 typedef struct {
200         uid  basepage;                          // mapped base page number
201         char *map;                                      // mapped memory pointer
202         ushort slot;                            // slot index in this array
203         ushort pin;                                     // mapped page pin counter
204         void *hashprev;                         // previous pool entry for the same hash idx
205         void *hashnext;                         // next pool entry for the same hash idx
206 #ifndef unix
207         HANDLE hmap;                            // Windows memory mapping handle
208 #endif
209 } BtPool;
210
211 #define CLOCK_bit 0x8000                // bit in pool->pin
212
213 //  The loadpage interface object
214
215 typedef struct {
216         uid page_no;            // current page number
217         BtPage page;            // current page pointer
218         BtPool *pool;           // current page pool
219         BtLatchSet *latch;      // current page latch set
220 } BtPageSet;
221
222 //      structure for latch manager on ALLOC_page
223
224 typedef struct {
225         struct BtPage_ alloc[1];        // next page_no in right ptr
226         unsigned char chain[BtId];      // head of free page_nos chain
227         BtSpinLatch lock[1];            // allocation area lite latch
228         ushort latchdeployed;           // highest number of latch entries deployed
229         ushort nlatchpage;                      // number of latch pages at BT_latch
230         ushort latchtotal;                      // number of page latch entries
231         ushort latchhash;                       // number of latch hash table slots
232         ushort latchvictim;                     // next latch entry to examine
233         BtHashEntry table[0];           // the hash table
234 } BtLatchMgr;
235
236 //      The object structure for Btree access
237
238 typedef struct {
239         uint page_size;                         // page size    
240         uint page_bits;                         // page size in bits    
241         uint seg_bits;                          // seg size in pages in bits
242         uint mode;                                      // read-write mode
243 #ifdef unix
244         int idx;
245 #else
246         HANDLE idx;
247 #endif
248         ushort poolcnt;                         // highest page pool node in use
249         ushort poolmax;                         // highest page pool node allocated
250         ushort poolmask;                        // total number of pages in mmap segment - 1
251         ushort hashsize;                        // size of Hash Table for pool entries
252         volatile uint evicted;          // last evicted hash table slot
253         ushort *hash;                           // pool index for hash entries
254         BtSpinLatch *latch;                     // latches for hash table slots
255         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
256         BtLatchSet *latchsets;          // mapped latch set from latch pages
257         BtPool *pool;                           // memory pool page segments
258 #ifndef unix
259         HANDLE halloc;                          // allocation and latch table handle
260 #endif
261 } BtMgr;
262
263 typedef struct {
264         BtMgr *mgr;                     // buffer manager for thread
265         BtPage cursor;          // cached frame for start/next (never mapped)
266         BtPage frame;           // spare frame for the page split (never mapped)
267         uid cursor_page;        // current cursor page number   
268         unsigned char *mem;     // frame, cursor, page memory buffer
269         int found;                      // last delete or insert was found
270         int err;                        // last error
271 } BtDb;
272
273 typedef enum {
274         BTERR_ok = 0,
275         BTERR_struct,
276         BTERR_ovflw,
277         BTERR_lock,
278         BTERR_map,
279         BTERR_wrt,
280         BTERR_hash
281 } BTERR;
282
283 // B-Tree functions
284 extern void bt_close (BtDb *bt);
285 extern BtDb *bt_open (BtMgr *mgr);
286 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen);
287 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
288 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint vallen);
289 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
290 extern uint bt_nextkey   (BtDb *bt, uint slot);
291
292 //      manager functions
293 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
294 void bt_mgrclose (BtMgr *mgr);
295
296 //  Helper functions to return slot values
297 //      from the cursor page.
298
299 extern BtKey bt_key (BtDb *bt, uint slot);
300 extern BtVal bt_val (BtDb *bt, uint slot);
301
302 //  BTree page number constants
303 #define ALLOC_page              0       // allocation & latch manager hash table
304 #define ROOT_page               1       // root of the btree
305 #define LEAF_page               2       // first page of leaves
306 #define LATCH_page              3       // pages for latch manager
307
308 //      Number of levels to create in a new BTree
309
310 #define MIN_lvl                 2
311
312 //  The page is allocated from low and hi ends.
313 //  The key slots are allocated from the bottom,
314 //      while the text and value of the key
315 //  are allocated from the top.  When the two
316 //  areas meet, the page is split into two.
317
318 //  A key consists of a length byte, two bytes of
319 //  index number (0 - 65534), and up to 253 bytes
320 //  of key value.  Duplicate keys are discarded.
321 //  Associated with each key is a value byte string
322 //      containing any value desired.
323
324 //  The b-tree root is always located at page 1.
325 //      The first leaf page of level zero is always
326 //      located on page 2.
327
328 //      The b-tree pages are linked with next
329 //      pointers to facilitate enumerators,
330 //      and provide for concurrency.
331
332 //      When to root page fills, it is split in two and
333 //      the tree height is raised by a new root at page
334 //      one with two keys.
335
336 //      Deleted keys are marked with a dead bit until
337 //      page cleanup. The fence key for a leaf node is
338 //      always present
339
340 //  Groups of pages called segments from the btree are optionally
341 //  cached with a memory mapped pool. A hash table is used to keep
342 //  track of the cached segments.  This behaviour is controlled
343 //  by the cache block size parameter to bt_open.
344
345 //  To achieve maximum concurrency one page is locked at a time
346 //  as the tree is traversed to find leaf key in question. The right
347 //  page numbers are used in cases where the page is being split,
348 //      or consolidated.
349
350 //  Page 0 is dedicated to lock for new page extensions,
351 //      and chains empty pages together for reuse. It also
352 //      contains the latch manager hash table.
353
354 //      The ParentModification lock on a node is obtained to serialize posting
355 //      or changing the fence key for a node.
356
357 //      Empty pages are chained together through the ALLOC page and reused.
358
359 //      Access macros to address slot and key values from the page
360 //      Page slots use 1 based indexing.
361
362 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
363 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
364 #define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
365
366 void bt_putid(unsigned char *dest, uid id)
367 {
368 int i = BtId;
369
370         while( i-- )
371                 dest[i] = (unsigned char)id, id >>= 8;
372 }
373
374 uid bt_getid(unsigned char *src)
375 {
376 uid id = 0;
377 int i;
378
379         for( i = 0; i < BtId; i++ )
380                 id <<= 8, id |= *src++; 
381
382         return id;
383 }
384
385 //      Phase-Fair reader/writer lock implementation
386
387 void WriteLock (RWLock *lock)
388 {
389 ushort w, r, tix;
390
391 #ifdef unix
392         tix = __sync_fetch_and_add (lock->ticket, 1);
393 #else
394         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
395 #endif
396         // wait for our ticket to come up
397
398         while( tix != lock->serving[0] )
399 #ifdef unix
400                 sched_yield();
401 #else
402                 SwitchToThread ();
403 #endif
404
405         w = PRES | (tix & PHID);
406 #ifdef  unix
407         r = __sync_fetch_and_add (lock->rin, w);
408 #else
409         r = _InterlockedExchangeAdd16 (lock->rin, w);
410 #endif
411         while( r != *lock->rout )
412 #ifdef unix
413                 sched_yield();
414 #else
415                 SwitchToThread();
416 #endif
417 }
418
419 void WriteRelease (RWLock *lock)
420 {
421 #ifdef unix
422         __sync_fetch_and_and (lock->rin, ~MASK);
423 #else
424         _InterlockedAnd16 (lock->rin, ~MASK);
425 #endif
426         lock->serving[0]++;
427 }
428
429 void ReadLock (RWLock *lock)
430 {
431 ushort w;
432 #ifdef unix
433         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
434 #else
435         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
436 #endif
437         if( w )
438           while( w == (*lock->rin & MASK) )
439 #ifdef unix
440                 sched_yield ();
441 #else
442                 SwitchToThread ();
443 #endif
444 }
445
446 void ReadRelease (RWLock *lock)
447 {
448 #ifdef unix
449         __sync_fetch_and_add (lock->rout, RINC);
450 #else
451         _InterlockedExchangeAdd16 (lock->rout, RINC);
452 #endif
453 }
454
455 //      Spin Latch Manager
456
457 //      wait until write lock mode is clear
458 //      and add 1 to the share count
459
460 void bt_spinreadlock(BtSpinLatch *latch)
461 {
462 ushort prev;
463
464   do {
465 #ifdef unix
466         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
467 #else
468         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
469 #endif
470         //  see if exclusive request is granted or pending
471
472         if( !(prev & BOTH) )
473                 return;
474 #ifdef unix
475         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
476 #else
477         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
478 #endif
479 #ifdef  unix
480   } while( sched_yield(), 1 );
481 #else
482   } while( SwitchToThread(), 1 );
483 #endif
484 }
485
486 //      wait for other read and write latches to relinquish
487
488 void bt_spinwritelock(BtSpinLatch *latch)
489 {
490 ushort prev;
491
492   do {
493 #ifdef  unix
494         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
495 #else
496         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
497 #endif
498         if( !(prev & XCL) )
499           if( !(prev & ~BOTH) )
500                 return;
501           else
502 #ifdef unix
503                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
504 #else
505                 _InterlockedAnd16((ushort *)latch, ~XCL);
506 #endif
507 #ifdef  unix
508   } while( sched_yield(), 1 );
509 #else
510   } while( SwitchToThread(), 1 );
511 #endif
512 }
513
514 //      try to obtain write lock
515
516 //      return 1 if obtained,
517 //              0 otherwise
518
519 int bt_spinwritetry(BtSpinLatch *latch)
520 {
521 ushort prev;
522
523 #ifdef  unix
524         prev = __sync_fetch_and_or((ushort *)latch, XCL);
525 #else
526         prev = _InterlockedOr16((ushort *)latch, XCL);
527 #endif
528         //      take write access if all bits are clear
529
530         if( !(prev & XCL) )
531           if( !(prev & ~BOTH) )
532                 return 1;
533           else
534 #ifdef unix
535                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
536 #else
537                 _InterlockedAnd16((ushort *)latch, ~XCL);
538 #endif
539         return 0;
540 }
541
542 //      clear write mode
543
544 void bt_spinreleasewrite(BtSpinLatch *latch)
545 {
546 #ifdef unix
547         __sync_fetch_and_and((ushort *)latch, ~BOTH);
548 #else
549         _InterlockedAnd16((ushort *)latch, ~BOTH);
550 #endif
551 }
552
553 //      decrement reader count
554
555 void bt_spinreleaseread(BtSpinLatch *latch)
556 {
557 #ifdef unix
558         __sync_fetch_and_add((ushort *)latch, -SHARE);
559 #else
560         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
561 #endif
562 }
563
564 //      link latch table entry into latch hash table
565
566 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
567 {
568 BtLatchSet *set = bt->mgr->latchsets + victim;
569
570         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
571                 bt->mgr->latchsets[set->next].prev = victim;
572
573         bt->mgr->latchmgr->table[hashidx].slot = victim;
574         set->page_no = page_no;
575         set->hash = hashidx;
576         set->prev = 0;
577 }
578
579 //      release latch pin
580
581 void bt_unpinlatch (BtLatchSet *set)
582 {
583 #ifdef unix
584         __sync_fetch_and_add(&set->pin, -1);
585 #else
586         _InterlockedDecrement16 (&set->pin);
587 #endif
588 }
589
590 //      find existing latchset or inspire new one
591 //      return with latchset pinned
592
593 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
594 {
595 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
596 ushort slot, avail = 0, victim, idx;
597 BtLatchSet *set;
598
599         //  obtain read lock on hash table entry
600
601         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
602
603         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
604         {
605                 set = bt->mgr->latchsets + slot;
606                 if( page_no == set->page_no )
607                         break;
608         } while( slot = set->next );
609
610         if( slot ) {
611 #ifdef unix
612                 __sync_fetch_and_add(&set->pin, 1);
613 #else
614                 _InterlockedIncrement16 (&set->pin);
615 #endif
616         }
617
618     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
619
620         if( slot )
621                 return set;
622
623   //  try again, this time with write lock
624
625   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
626
627   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
628   {
629         set = bt->mgr->latchsets + slot;
630         if( page_no == set->page_no )
631                 break;
632         if( !set->pin && !avail )
633                 avail = slot;
634   } while( slot = set->next );
635
636   //  found our entry, or take over an unpinned one
637
638   if( slot || (slot = avail) ) {
639         set = bt->mgr->latchsets + slot;
640 #ifdef unix
641         __sync_fetch_and_add(&set->pin, 1);
642 #else
643         _InterlockedIncrement16 (&set->pin);
644 #endif
645         set->page_no = page_no;
646         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
647         return set;
648   }
649
650         //  see if there are any unused entries
651 #ifdef unix
652         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
653 #else
654         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
655 #endif
656
657         if( victim < bt->mgr->latchmgr->latchtotal ) {
658                 set = bt->mgr->latchsets + victim;
659 #ifdef unix
660                 __sync_fetch_and_add(&set->pin, 1);
661 #else
662                 _InterlockedIncrement16 (&set->pin);
663 #endif
664                 bt_latchlink (bt, hashidx, victim, page_no);
665                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
666                 return set;
667         }
668
669 #ifdef unix
670         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
671 #else
672         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
673 #endif
674   //  find and reuse previous lock entry
675
676   while( 1 ) {
677 #ifdef unix
678         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
679 #else
680         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
681 #endif
682         //      we don't use slot zero
683
684         if( victim %= bt->mgr->latchmgr->latchtotal )
685                 set = bt->mgr->latchsets + victim;
686         else
687                 continue;
688
689         //      take control of our slot
690         //      from other threads
691
692         if( set->pin || !bt_spinwritetry (set->busy) )
693                 continue;
694
695         idx = set->hash;
696
697         // try to get write lock on hash chain
698         //      skip entry if not obtained
699         //      or has outstanding locks
700
701         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
702                 bt_spinreleasewrite (set->busy);
703                 continue;
704         }
705
706         if( set->pin ) {
707                 bt_spinreleasewrite (set->busy);
708                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
709                 continue;
710         }
711
712         //  unlink our available victim from its hash chain
713
714         if( set->prev )
715                 bt->mgr->latchsets[set->prev].next = set->next;
716         else
717                 bt->mgr->latchmgr->table[idx].slot = set->next;
718
719         if( set->next )
720                 bt->mgr->latchsets[set->next].prev = set->prev;
721
722         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
723 #ifdef unix
724         __sync_fetch_and_add(&set->pin, 1);
725 #else
726         _InterlockedIncrement16 (&set->pin);
727 #endif
728         bt_latchlink (bt, hashidx, victim, page_no);
729         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
730         bt_spinreleasewrite (set->busy);
731         return set;
732   }
733 }
734
735 void bt_mgrclose (BtMgr *mgr)
736 {
737 BtPool *pool;
738 uint slot;
739
740         // release mapped pages
741         //      note that slot zero is never used
742
743         for( slot = 1; slot < mgr->poolmax; slot++ ) {
744                 pool = mgr->pool + slot;
745                 if( pool->slot )
746 #ifdef unix
747                         munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
748 #else
749                 {
750                         FlushViewOfFile(pool->map, 0);
751                         UnmapViewOfFile(pool->map);
752                         CloseHandle(pool->hmap);
753                 }
754 #endif
755         }
756
757 #ifdef unix
758         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
759         munmap (mgr->latchmgr, 2 * mgr->page_size);
760 #else
761         FlushViewOfFile(mgr->latchmgr, 0);
762         UnmapViewOfFile(mgr->latchmgr);
763         CloseHandle(mgr->halloc);
764 #endif
765 #ifdef unix
766         close (mgr->idx);
767         free (mgr->pool);
768         free (mgr->hash);
769         free ((void *)mgr->latch);
770         free (mgr);
771 #else
772         FlushFileBuffers(mgr->idx);
773         CloseHandle(mgr->idx);
774         GlobalFree (mgr->pool);
775         GlobalFree (mgr->hash);
776         GlobalFree ((void *)mgr->latch);
777         GlobalFree (mgr);
778 #endif
779 }
780
781 //      close and release memory
782
783 void bt_close (BtDb *bt)
784 {
785 #ifdef unix
786         if( bt->mem )
787                 free (bt->mem);
788 #else
789         if( bt->mem)
790                 VirtualFree (bt->mem, 0, MEM_RELEASE);
791 #endif
792         free (bt);
793 }
794
795 //  open/create new btree buffer manager
796
797 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
798 //              size of mapped page pool (e.g. 8192)
799
800 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
801 {
802 uint lvl, attr, cacheblk, last, slot, idx;
803 uint nlatchpage, latchhash;
804 unsigned char value[BtId];
805 BtLatchMgr *latchmgr;
806 off64_t size;
807 uint amt[1];
808 BtMgr* mgr;
809 BtKey key;
810 BtVal val;
811 int flag;
812
813 #ifndef unix
814 SYSTEM_INFO sysinfo[1];
815 #endif
816
817         // determine sanity of page size and buffer pool
818
819         if( bits > BT_maxbits )
820                 bits = BT_maxbits;
821         else if( bits < BT_minbits )
822                 bits = BT_minbits;
823
824         if( !poolmax )
825                 return NULL;    // must have buffer pool
826
827 #ifdef unix
828         mgr = calloc (1, sizeof(BtMgr));
829
830         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
831
832         if( mgr->idx == -1 )
833                 return free(mgr), NULL;
834         
835         cacheblk = 4096;        // minimum mmap segment size for unix
836
837 #else
838         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
839         attr = FILE_ATTRIBUTE_NORMAL;
840         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
841
842         if( mgr->idx == INVALID_HANDLE_VALUE )
843                 return GlobalFree(mgr), NULL;
844
845         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
846         GetSystemInfo(sysinfo);
847         cacheblk = sysinfo->dwAllocationGranularity;
848 #endif
849
850 #ifdef unix
851         latchmgr = malloc (BT_maxpage);
852         *amt = 0;
853
854         // read minimum page size to get root info
855
856         if( size = lseek (mgr->idx, 0L, 2) ) {
857                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
858                         bits = latchmgr->alloc->bits;
859                 else
860                         return free(mgr), free(latchmgr), NULL;
861         } else if( mode == BT_ro )
862                 return free(latchmgr), bt_mgrclose (mgr), NULL;
863 #else
864         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
865         size = GetFileSize(mgr->idx, amt);
866
867         if( size || *amt ) {
868                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
869                         return bt_mgrclose (mgr), NULL;
870                 bits = latchmgr->alloc->bits;
871         } else if( mode == BT_ro )
872                 return bt_mgrclose (mgr), NULL;
873 #endif
874
875         mgr->page_size = 1 << bits;
876         mgr->page_bits = bits;
877
878         mgr->poolmax = poolmax;
879         mgr->mode = mode;
880
881         if( cacheblk < mgr->page_size )
882                 cacheblk = mgr->page_size;
883
884         //  mask for partial memmaps
885
886         mgr->poolmask = (cacheblk >> bits) - 1;
887
888         //      see if requested size of pages per memmap is greater
889
890         if( (1 << segsize) > mgr->poolmask )
891                 mgr->poolmask = (1 << segsize) - 1;
892
893         mgr->seg_bits = 0;
894
895         while( (1 << mgr->seg_bits) <= mgr->poolmask )
896                 mgr->seg_bits++;
897
898         mgr->hashsize = hashsize;
899
900 #ifdef unix
901         mgr->pool = calloc (poolmax, sizeof(BtPool));
902         mgr->hash = calloc (hashsize, sizeof(ushort));
903         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
904 #else
905         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
906         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
907         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
908 #endif
909
910         if( size || *amt )
911                 goto mgrlatch;
912
913         // initialize an empty b-tree with latch page, root page, page of leaves
914         // and page(s) of latches
915
916         memset (latchmgr, 0, 1 << bits);
917         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
918         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
919         latchmgr->alloc->bits = mgr->page_bits;
920
921         latchmgr->nlatchpage = nlatchpage;
922         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
923
924         //  initialize latch manager
925
926         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
927
928         //      size of hash table = total number of latchsets
929
930         if( latchhash > latchmgr->latchtotal )
931                 latchhash = latchmgr->latchtotal;
932
933         latchmgr->latchhash = latchhash;
934
935 #ifdef unix
936         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
937                 return bt_mgrclose (mgr), NULL;
938 #else
939         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
940                 return bt_mgrclose (mgr), NULL;
941
942         if( *amt < mgr->page_size )
943                 return bt_mgrclose (mgr), NULL;
944 #endif
945
946         memset (latchmgr, 0, 1 << bits);
947         latchmgr->alloc->bits = mgr->page_bits;
948
949         for( lvl=MIN_lvl; lvl--; ) {
950                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
951                 key = keyptr(latchmgr->alloc, 1);
952                 key->len = 2;           // create stopper key
953                 key->key[0] = 0xff;
954                 key->key[1] = 0xff;
955
956                 bt_putid(value, MIN_lvl - lvl + 1);
957                 val = valptr(latchmgr->alloc, 1);
958                 val->len = lvl ? BtId : 0;
959                 memcpy (val->value, value, val->len);
960
961                 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
962                 latchmgr->alloc->lvl = lvl;
963                 latchmgr->alloc->cnt = 1;
964                 latchmgr->alloc->act = 1;
965 #ifdef unix
966                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
967                         return bt_mgrclose (mgr), NULL;
968 #else
969                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
970                         return bt_mgrclose (mgr), NULL;
971
972                 if( *amt < mgr->page_size )
973                         return bt_mgrclose (mgr), NULL;
974 #endif
975         }
976
977         // clear out latch manager locks
978         //      and rest of pages to round out segment
979
980         memset(latchmgr, 0, mgr->page_size);
981         last = MIN_lvl + 1;
982
983         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
984 #ifdef unix
985                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
986 #else
987                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
988                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
989                         return bt_mgrclose (mgr), NULL;
990                 if( *amt < mgr->page_size )
991                         return bt_mgrclose (mgr), NULL;
992 #endif
993                 last++;
994         }
995
996 mgrlatch:
997 #ifdef unix
998         // mlock the root page and the latchmgr page
999
1000         flag = PROT_READ | PROT_WRITE;
1001         mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1002         if( mgr->latchmgr == MAP_FAILED )
1003                 return bt_mgrclose (mgr), NULL;
1004         mlock (mgr->latchmgr, 2 * mgr->page_size);
1005
1006         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1007         if( mgr->latchsets == MAP_FAILED )
1008                 return bt_mgrclose (mgr), NULL;
1009         mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1010 #else
1011         flag = PAGE_READWRITE;
1012         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1013         if( !mgr->halloc )
1014                 return bt_mgrclose (mgr), NULL;
1015
1016         flag = FILE_MAP_WRITE;
1017         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1018         if( !mgr->latchmgr )
1019                 return GetLastError(), bt_mgrclose (mgr), NULL;
1020
1021         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1022 #endif
1023
1024 #ifdef unix
1025         free (latchmgr);
1026 #else
1027         VirtualFree (latchmgr, 0, MEM_RELEASE);
1028 #endif
1029         return mgr;
1030 }
1031
1032 //      open BTree access method
1033 //      based on buffer manager
1034
1035 BtDb *bt_open (BtMgr *mgr)
1036 {
1037 BtDb *bt = malloc (sizeof(*bt));
1038
1039         memset (bt, 0, sizeof(*bt));
1040         bt->mgr = mgr;
1041 #ifdef unix
1042         bt->mem = malloc (2 *mgr->page_size);
1043 #else
1044         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1045 #endif
1046         bt->frame = (BtPage)bt->mem;
1047         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1048         return bt;
1049 }
1050
1051 //  compare two keys, returning > 0, = 0, or < 0
1052 //  as the comparison value
1053
1054 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1055 {
1056 uint len1 = key1->len;
1057 int ans;
1058
1059         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1060                 return ans;
1061
1062         if( len1 > len2 )
1063                 return 1;
1064         if( len1 < len2 )
1065                 return -1;
1066
1067         return 0;
1068 }
1069
1070 //      Buffer Pool mgr
1071
1072 // find segment in pool
1073 // must be called with hashslot idx locked
1074 //      return NULL if not there
1075 //      otherwise return node
1076
1077 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1078 {
1079 BtPool *pool;
1080 uint slot;
1081
1082         // compute start of hash chain in pool
1083
1084         if( slot = bt->mgr->hash[idx] ) 
1085                 pool = bt->mgr->pool + slot;
1086         else
1087                 return NULL;
1088
1089         page_no &= ~bt->mgr->poolmask;
1090
1091         while( pool->basepage != page_no )
1092           if( pool = pool->hashnext )
1093                 continue;
1094           else
1095                 return NULL;
1096
1097         return pool;
1098 }
1099
1100 // add segment to hash table
1101
1102 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1103 {
1104 BtPool *node;
1105 uint slot;
1106
1107         pool->hashprev = pool->hashnext = NULL;
1108         pool->basepage = page_no & ~bt->mgr->poolmask;
1109         pool->pin = CLOCK_bit + 1;
1110
1111         if( slot = bt->mgr->hash[idx] ) {
1112                 node = bt->mgr->pool + slot;
1113                 pool->hashnext = node;
1114                 node->hashprev = pool;
1115         }
1116
1117         bt->mgr->hash[idx] = pool->slot;
1118 }
1119
1120 //  map new buffer pool segment to virtual memory
1121
1122 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1123 {
1124 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1125 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1126 int flag;
1127
1128 #ifdef unix
1129         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1130         pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1131         if( pool->map == MAP_FAILED )
1132                 return bt->err = BTERR_map;
1133
1134 #else
1135         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1136         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1137         if( !pool->hmap )
1138                 return bt->err = BTERR_map;
1139
1140         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1141         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1142         if( !pool->map )
1143                 return bt->err = BTERR_map;
1144 #endif
1145         return bt->err = 0;
1146 }
1147
1148 //      calculate page within pool
1149
1150 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1151 {
1152 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1153 BtPage page;
1154
1155         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1156 //      madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1157         return page;
1158 }
1159
1160 //  release pool pin
1161
1162 void bt_unpinpool (BtPool *pool)
1163 {
1164 #ifdef unix
1165         __sync_fetch_and_add(&pool->pin, -1);
1166 #else
1167         _InterlockedDecrement16 (&pool->pin);
1168 #endif
1169 }
1170
1171 //      find or place requested page in segment-pool
1172 //      return pool table entry, incrementing pin
1173
1174 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1175 {
1176 uint slot, hashidx, idx, victim;
1177 BtPool *pool, *node, *next;
1178
1179         //      lock hash table chain
1180
1181         hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1182         bt_spinwritelock (&bt->mgr->latch[hashidx]);
1183
1184         //      look up in hash table
1185
1186         if( pool = bt_findpool(bt, page_no, hashidx) ) {
1187 #ifdef unix
1188                 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1189                 __sync_fetch_and_add(&pool->pin, 1);
1190 #else
1191                 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1192                 _InterlockedIncrement16 (&pool->pin);
1193 #endif
1194                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1195                 return pool;
1196         }
1197
1198         // allocate a new pool node
1199         // and add to hash table
1200
1201 #ifdef unix
1202         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1203 #else
1204         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1205 #endif
1206
1207         if( ++slot < bt->mgr->poolmax ) {
1208                 pool = bt->mgr->pool + slot;
1209                 pool->slot = slot;
1210
1211                 if( bt_mapsegment(bt, pool, page_no) )
1212                         return NULL;
1213
1214                 bt_linkhash(bt, pool, page_no, hashidx);
1215                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1216                 return pool;
1217         }
1218
1219         // pool table is full
1220         //      find best pool entry to evict
1221
1222 #ifdef unix
1223         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1224 #else
1225         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1226 #endif
1227
1228         while( 1 ) {
1229 #ifdef unix
1230                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1231 #else
1232                 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1233 #endif
1234                 victim %= bt->mgr->poolmax;
1235                 pool = bt->mgr->pool + victim;
1236                 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1237
1238                 if( !victim )
1239                         continue;
1240
1241                 // try to get write lock
1242                 //      skip entry if not obtained
1243
1244                 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1245                         continue;
1246
1247                 //      skip this entry if
1248                 //      page is pinned
1249                 //  or clock bit is set
1250
1251                 if( pool->pin ) {
1252 #ifdef unix
1253                         __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1254 #else
1255                         _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1256 #endif
1257                         bt_spinreleasewrite (&bt->mgr->latch[idx]);
1258                         continue;
1259                 }
1260
1261                 // unlink victim pool node from hash table
1262
1263                 if( node = pool->hashprev )
1264                         node->hashnext = pool->hashnext;
1265                 else if( node = pool->hashnext )
1266                         bt->mgr->hash[idx] = node->slot;
1267                 else
1268                         bt->mgr->hash[idx] = 0;
1269
1270                 if( node = pool->hashnext )
1271                         node->hashprev = pool->hashprev;
1272
1273                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1274
1275                 //      remove old file mapping
1276 #ifdef unix
1277                 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1278 #else
1279 //              FlushViewOfFile(pool->map, 0);
1280                 UnmapViewOfFile(pool->map);
1281                 CloseHandle(pool->hmap);
1282 #endif
1283                 pool->map = NULL;
1284
1285                 //  create new pool mapping
1286                 //  and link into hash table
1287
1288                 if( bt_mapsegment(bt, pool, page_no) )
1289                         return NULL;
1290
1291                 bt_linkhash(bt, pool, page_no, hashidx);
1292                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1293                 return pool;
1294         }
1295 }
1296
1297 // place write, read, or parent lock on requested page_no.
1298
1299 void bt_lockpage(BtLock mode, BtLatchSet *set)
1300 {
1301         switch( mode ) {
1302         case BtLockRead:
1303                 ReadLock (set->readwr);
1304                 break;
1305         case BtLockWrite:
1306                 WriteLock (set->readwr);
1307                 break;
1308         case BtLockAccess:
1309                 ReadLock (set->access);
1310                 break;
1311         case BtLockDelete:
1312                 WriteLock (set->access);
1313                 break;
1314         case BtLockParent:
1315                 WriteLock (set->parent);
1316                 break;
1317         }
1318 }
1319
1320 // remove write, read, or parent lock on requested page
1321
1322 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1323 {
1324         switch( mode ) {
1325         case BtLockRead:
1326                 ReadRelease (set->readwr);
1327                 break;
1328         case BtLockWrite:
1329                 WriteRelease (set->readwr);
1330                 break;
1331         case BtLockAccess:
1332                 ReadRelease (set->access);
1333                 break;
1334         case BtLockDelete:
1335                 WriteRelease (set->access);
1336                 break;
1337         case BtLockParent:
1338                 WriteRelease (set->parent);
1339                 break;
1340         }
1341 }
1342
1343 //      allocate a new page and write page into it
1344
1345 uid bt_newpage(BtDb *bt, BtPage page)
1346 {
1347 BtPageSet set[1];
1348 uid new_page;
1349 int blk;
1350
1351         //      lock allocation page
1352
1353         bt_spinwritelock(bt->mgr->latchmgr->lock);
1354
1355         // use empty chain first
1356         // else allocate empty page
1357
1358         if( new_page = bt_getid(bt->mgr->latchmgr->chain) ) {
1359                 if( set->pool = bt_pinpool (bt, new_page) )
1360                         set->page = bt_page (bt, set->pool, new_page);
1361                 else
1362                         return 0;
1363
1364                 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1365                 bt_unpinpool (set->pool);
1366         } else {
1367                 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1368                 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1369 #ifdef unix
1370                 // if writing first page of pool block, set file length thru last page
1371
1372                 if( (new_page & bt->mgr->poolmask) == 0 )
1373                         ftruncate (bt->mgr->idx, (new_page + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1374 #endif
1375         }
1376 #ifdef unix
1377         // unlock allocation latch
1378
1379         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1380 #endif
1381
1382         //      bring new page into pool and copy page.
1383         //      this will extend the file into the new pages on WIN32.
1384
1385         if( set->pool = bt_pinpool (bt, new_page) )
1386                 set->page = bt_page (bt, set->pool, new_page);
1387         else
1388                 return 0;
1389
1390         memcpy(set->page, page, bt->mgr->page_size);
1391         bt_unpinpool (set->pool);
1392
1393 #ifndef unix
1394         // unlock allocation latch
1395
1396         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1397 #endif
1398         return new_page;
1399 }
1400
1401 //  find slot in page for given key at a given level
1402
1403 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1404 {
1405 uint diff, higher = set->page->cnt, low = 1, slot;
1406 uint good = 0;
1407
1408         //        make stopper key an infinite fence value
1409
1410         if( bt_getid (set->page->right) )
1411                 higher++;
1412         else
1413                 good++;
1414
1415         //      low is the lowest candidate.
1416         //  loop ends when they meet
1417
1418         //  higher is already
1419         //      tested as .ge. the passed key.
1420
1421         while( diff = higher - low ) {
1422                 slot = low + ( diff >> 1 );
1423                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1424                         low = slot + 1;
1425                 else
1426                         higher = slot, good++;
1427         }
1428
1429         //      return zero if key is on right link page
1430
1431         return good ? higher : 0;
1432 }
1433
1434 //  find and load page at given level for given key
1435 //      leave page rd or wr locked as requested
1436
1437 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1438 {
1439 uid page_no = ROOT_page, prevpage = 0;
1440 uint drill = 0xff, slot;
1441 BtLatchSet *prevlatch;
1442 uint mode, prevmode;
1443 BtPool *prevpool;
1444
1445   //  start at root of btree and drill down
1446
1447   do {
1448         // determine lock mode of drill level
1449         mode = (drill == lvl) ? lock : BtLockRead; 
1450
1451         set->latch = bt_pinlatch (bt, page_no);
1452         set->page_no = page_no;
1453
1454         // pin page contents
1455
1456         if( set->pool = bt_pinpool (bt, page_no) )
1457                 set->page = bt_page (bt, set->pool, page_no);
1458         else
1459                 return 0;
1460
1461         // obtain access lock using lock chaining with Access mode
1462
1463         if( page_no > ROOT_page )
1464           bt_lockpage(BtLockAccess, set->latch);
1465
1466         //      release & unpin parent page
1467
1468         if( prevpage ) {
1469           bt_unlockpage(prevmode, prevlatch);
1470           bt_unpinlatch (prevlatch);
1471           bt_unpinpool (prevpool);
1472           prevpage = 0;
1473         }
1474
1475         // obtain read lock using lock chaining
1476
1477         bt_lockpage(mode, set->latch);
1478
1479         if( set->page->free )
1480                 return bt->err = BTERR_struct, 0;
1481
1482         if( page_no > ROOT_page )
1483           bt_unlockpage(BtLockAccess, set->latch);
1484
1485         // re-read and re-lock root after determining actual level of root
1486
1487         if( set->page->lvl != drill) {
1488                 if( set->page_no != ROOT_page )
1489                         return bt->err = BTERR_struct, 0;
1490                         
1491                 drill = set->page->lvl;
1492
1493                 if( lock != BtLockRead && drill == lvl ) {
1494                   bt_unlockpage(mode, set->latch);
1495                   bt_unpinlatch (set->latch);
1496                   bt_unpinpool (set->pool);
1497                   continue;
1498                 }
1499         }
1500
1501         prevpage = set->page_no;
1502         prevlatch = set->latch;
1503         prevpool = set->pool;
1504         prevmode = mode;
1505
1506         //  find key on page at this level
1507         //  and descend to requested level
1508
1509         if( !set->page->kill )
1510          if( slot = bt_findslot (set, key, len) ) {
1511           if( drill == lvl )
1512                 return slot;
1513
1514           while( slotptr(set->page, slot)->dead )
1515                 if( slot++ < set->page->cnt )
1516                         continue;
1517                 else
1518                         goto slideright;
1519
1520           page_no = bt_getid(valptr(set->page, slot)->value);
1521           drill--;
1522           continue;
1523          }
1524
1525         //  or slide right into next page
1526
1527 slideright:
1528         page_no = bt_getid(set->page->right);
1529
1530   } while( page_no );
1531
1532   // return error on end of right chain
1533
1534   bt->err = BTERR_struct;
1535   return 0;     // return error
1536 }
1537
1538 //      return page to free list
1539 //      page must be delete & write locked
1540
1541 void bt_freepage (BtDb *bt, BtPageSet *set)
1542 {
1543         //      lock allocation page
1544
1545         bt_spinwritelock (bt->mgr->latchmgr->lock);
1546
1547         //      store chain
1548         memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1549         bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1550         set->page->free = 1;
1551
1552         // unlock released page
1553
1554         bt_unlockpage (BtLockDelete, set->latch);
1555         bt_unlockpage (BtLockWrite, set->latch);
1556         bt_unpinlatch (set->latch);
1557         bt_unpinpool (set->pool);
1558
1559         // unlock allocation page
1560
1561         bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1562 }
1563
1564 //      a fence key was deleted from a page
1565 //      push new fence value upwards
1566
1567 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1568 {
1569 unsigned char leftkey[256], rightkey[256];
1570 unsigned char value[BtId];
1571 uid page_no;
1572 BtKey ptr;
1573 BtVal val;
1574
1575         //      remove the old fence value
1576
1577         ptr = keyptr(set->page, set->page->cnt);
1578         memcpy (rightkey, ptr, ptr->len + 1);
1579         set->page->garbage += ptr->len + val->len + 2;
1580         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1581
1582         ptr = keyptr(set->page, set->page->cnt);
1583         memcpy (leftkey, ptr, ptr->len + 1);
1584         page_no = set->page_no;
1585
1586         bt_lockpage (BtLockParent, set->latch);
1587         bt_unlockpage (BtLockWrite, set->latch);
1588
1589         //      insert new (now smaller) fence key
1590
1591         bt_putid (value, page_no);
1592
1593         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId) )
1594           return bt->err;
1595
1596         //      now delete old fence key
1597
1598         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1) )
1599                 return bt->err;
1600
1601         bt_unlockpage (BtLockParent, set->latch);
1602         bt_unpinlatch(set->latch);
1603         bt_unpinpool (set->pool);
1604         return 0;
1605 }
1606
1607 //      root has a single child
1608 //      collapse a level from the tree
1609
1610 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1611 {
1612 BtPageSet child[1];
1613 uint idx;
1614
1615   // find the child entry and promote as new root contents
1616
1617   do {
1618         for( idx = 0; idx++ < root->page->cnt; )
1619           if( !slotptr(root->page, idx)->dead )
1620                 break;
1621
1622         child->page_no = bt_getid (valptr(root->page, idx)->value);
1623
1624         child->latch = bt_pinlatch (bt, child->page_no);
1625         bt_lockpage (BtLockDelete, child->latch);
1626         bt_lockpage (BtLockWrite, child->latch);
1627
1628         if( child->pool = bt_pinpool (bt, child->page_no) )
1629                 child->page = bt_page (bt, child->pool, child->page_no);
1630         else
1631                 return bt->err;
1632
1633         memcpy (root->page, child->page, bt->mgr->page_size);
1634         bt_freepage (bt, child);
1635
1636   } while( root->page->lvl > 1 && root->page->act == 1 );
1637
1638   bt_unlockpage (BtLockWrite, root->latch);
1639   bt_unpinlatch (root->latch);
1640   bt_unpinpool (root->pool);
1641   return 0;
1642 }
1643
1644 //  find and delete key on page by marking delete flag bit
1645 //  if page becomes empty, delete it from the btree
1646
1647 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1648 {
1649 unsigned char lowerfence[256], higherfence[256];
1650 uint slot, idx, dirty = 0, fence, found;
1651 BtPageSet set[1], right[1];
1652 unsigned char value[BtId];
1653 BtKey ptr, tst;
1654 BtVal val;
1655
1656         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1657                 ptr = keyptr(set->page, slot);
1658         else
1659                 return bt->err;
1660
1661         // if librarian slot, advance to real slot
1662
1663         if( slotptr(set->page, slot)->librarian )
1664                 ptr = keyptr(set->page, ++slot);
1665
1666         //      are we deleting a fence slot?
1667
1668         fence = slot == set->page->cnt;
1669
1670         // if key is found delete it, otherwise ignore request
1671
1672         if( found = !keycmp (ptr, key, len) )
1673           if( found = slotptr(set->page, slot)->dead == 0 ) {
1674                 val = valptr(set->page,slot);
1675                 dirty = slotptr(set->page, slot)->dead = 1;
1676                 set->page->garbage += ptr->len + val->len + 2;
1677                 set->page->act--;
1678
1679                 // collapse empty slots
1680
1681                 while( idx = set->page->cnt - 1 )
1682                   if( slotptr(set->page, idx)->dead ) {
1683                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1684                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1685                   } else
1686                         break;
1687           }
1688
1689         //      did we delete a fence key in an upper level?
1690
1691         if( dirty && lvl && set->page->act && fence )
1692           if( bt_fixfence (bt, set, lvl) )
1693                 return bt->err;
1694           else
1695                 return bt->found = found, 0;
1696
1697         //      is this a collapsed root?
1698
1699         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1700           if( bt_collapseroot (bt, set) )
1701                 return bt->err;
1702           else
1703                 return bt->found = found, 0;
1704
1705         //      return if page is not empty
1706
1707         if( set->page->act ) {
1708                 bt_unlockpage(BtLockWrite, set->latch);
1709                 bt_unpinlatch (set->latch);
1710                 bt_unpinpool (set->pool);
1711                 return bt->found = found, 0;
1712         }
1713
1714         //      cache copy of fence key
1715         //      to post in parent
1716
1717         ptr = keyptr(set->page, set->page->cnt);
1718         memcpy (lowerfence, ptr, ptr->len + 1);
1719
1720         //      obtain lock on right page
1721
1722         right->page_no = bt_getid(set->page->right);
1723         right->latch = bt_pinlatch (bt, right->page_no);
1724         bt_lockpage (BtLockWrite, right->latch);
1725
1726         // pin page contents
1727
1728         if( right->pool = bt_pinpool (bt, right->page_no) )
1729                 right->page = bt_page (bt, right->pool, right->page_no);
1730         else
1731                 return 0;
1732
1733         if( right->page->kill )
1734                 return bt->err = BTERR_struct;
1735
1736         // pull contents of right peer into our empty page
1737
1738         memcpy (set->page, right->page, bt->mgr->page_size);
1739
1740         // cache copy of key to update
1741
1742         ptr = keyptr(right->page, right->page->cnt);
1743         memcpy (higherfence, ptr, ptr->len + 1);
1744
1745         // mark right page deleted and point it to left page
1746         //      until we can post parent updates
1747
1748         bt_putid (right->page->right, set->page_no);
1749         right->page->kill = 1;
1750
1751         bt_lockpage (BtLockParent, right->latch);
1752         bt_unlockpage (BtLockWrite, right->latch);
1753
1754         bt_lockpage (BtLockParent, set->latch);
1755         bt_unlockpage (BtLockWrite, set->latch);
1756
1757         // redirect higher key directly to our new node contents
1758
1759         bt_putid (value, set->page_no);
1760
1761         if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId) )
1762           return bt->err;
1763
1764         //      delete old lower key to our node
1765
1766         if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1) )
1767           return bt->err;
1768
1769         //      obtain delete and write locks to right node
1770
1771         bt_unlockpage (BtLockParent, right->latch);
1772         bt_lockpage (BtLockDelete, right->latch);
1773         bt_lockpage (BtLockWrite, right->latch);
1774         bt_freepage (bt, right);
1775
1776         bt_unlockpage (BtLockParent, set->latch);
1777         bt_unpinlatch (set->latch);
1778         bt_unpinpool (set->pool);
1779         bt->found = found;
1780         return 0;
1781 }
1782
1783 //      find key in leaf level and return number of value bytes
1784 //      or (-1) if not found
1785
1786 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1787 {
1788 BtPageSet set[1];
1789 uint  slot;
1790 BtKey ptr;
1791 BtVal val;
1792 int ret;
1793
1794         if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1795                 ptr = keyptr(set->page, slot);
1796         else
1797                 return 0;
1798
1799         //      skip librarian slot place holder
1800
1801         if( slotptr(set->page, slot)->librarian )
1802                 ptr = keyptr(set->page, ++slot);
1803
1804         // if key exists, return >= 0 value bytes copied
1805         //      otherwise return (-1)
1806
1807         if( !slotptr(set->page, slot)->dead && !keycmp (ptr, key, keylen) ) {
1808                 val = valptr (set->page,slot);
1809                 if( valmax > val->len )
1810                         valmax = val->len;
1811                 memcpy (value, val->value, valmax);
1812                 ret = valmax;
1813         } else
1814                 ret = -1;
1815
1816         bt_unlockpage (BtLockRead, set->latch);
1817         bt_unpinlatch (set->latch);
1818         bt_unpinpool (set->pool);
1819         return ret;
1820 }
1821
1822 //      check page for space available,
1823 //      clean if necessary and return
1824 //      0 - page needs splitting
1825 //      >0  new slot value
1826
1827 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1828 {
1829 uint nxt = bt->mgr->page_size;
1830 uint cnt = 0, idx = 0;
1831 uint max = page->cnt;
1832 uint newslot = max;
1833 BtKey key;
1834 BtVal val;
1835
1836         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
1837                 return slot;
1838
1839         //      skip cleanup and proceed to split
1840         //      if there's not enough garbage
1841
1842         if( page->garbage + page->min < 2 * page->act * sizeof(BtSlot) + sizeof(*page) + nxt / 3 )
1843                 return 0;
1844
1845         memcpy (bt->frame, page, bt->mgr->page_size);
1846
1847         // skip page info and set rest of page to zero
1848
1849         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1850         page->garbage = 0;
1851         page->act = 0;
1852
1853         // clean up page first by
1854         // removing deleted keys
1855
1856         while( cnt++ < max ) {
1857                 if( cnt == slot )
1858                         newslot = idx + 2;
1859                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1860                         continue;
1861
1862                 // copy the key across
1863
1864                 key = keyptr(bt->frame, cnt);
1865                 nxt -= key->len + 1;
1866                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1867
1868                 // copy the value across
1869
1870                 val = valptr(bt->frame, cnt);
1871                 nxt -= val->len + 1;
1872                 ((unsigned char *)page)[nxt] = val->len;
1873                 memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
1874
1875                 // make a librarian slot
1876
1877                 slotptr(page, ++idx)->off = nxt;
1878                 slotptr(page, idx)->librarian = 1;
1879                 slotptr(page, idx)->dead = 1;
1880
1881                 // set up the slot
1882
1883                 slotptr(page, ++idx)->off = nxt;
1884
1885                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1886                         page->act++;
1887         }
1888
1889         page->min = nxt;
1890         page->cnt = idx;
1891
1892         //      see if page has enough space now, or does it need splitting?
1893
1894         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
1895                 return newslot;
1896
1897         return 0;
1898 }
1899
1900 // split the root and raise the height of the btree
1901
1902 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, uid page_no2)
1903 {
1904 uint nxt = bt->mgr->page_size;
1905 unsigned char value[BtId];
1906 uid left;
1907
1908         //  Obtain an empty page to use, and copy the current
1909         //  root contents into it, e.g. lower keys
1910
1911         if( !(left = bt_newpage(bt, root->page)) )
1912                 return bt->err;
1913
1914         // preserve the page info at the bottom
1915         // of higher keys and set rest to zero
1916
1917         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1918
1919         // insert lower keys page fence key on newroot page as first key
1920
1921         nxt -= BtId + 1;
1922         bt_putid (value, left);
1923         ((unsigned char *)root->page)[nxt] = BtId;
1924         memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
1925
1926         nxt -= *leftkey + 1;
1927         memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
1928         slotptr(root->page, 1)->off = nxt;
1929         
1930         // insert stopper key on newroot page
1931         // and increase the root height
1932
1933         nxt -= 3 + BtId + 1;
1934         ((unsigned char *)root->page)[nxt] = 2;
1935         ((unsigned char *)root->page)[nxt+1] = 0xff;
1936         ((unsigned char *)root->page)[nxt+2] = 0xff;
1937
1938         bt_putid (value, page_no2);
1939         ((unsigned char *)root->page)[nxt+3] = BtId;
1940         memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
1941         slotptr(root->page, 2)->off = nxt;
1942
1943         bt_putid(root->page->right, 0);
1944         root->page->min = nxt;          // reset lowest used offset and key count
1945         root->page->cnt = 2;
1946         root->page->act = 2;
1947         root->page->lvl++;
1948
1949         // release and unpin root
1950
1951         bt_unlockpage(BtLockWrite, root->latch);
1952         bt_unpinlatch (root->latch);
1953         bt_unpinpool (root->pool);
1954         return 0;
1955 }
1956
1957 //  split already locked full node
1958 //      return unlocked.
1959
1960 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1961 {
1962 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1963 unsigned char fencekey[256], rightkey[256];
1964 unsigned char value[BtId];
1965 uint lvl = set->page->lvl;
1966 BtPageSet right[1];
1967 uint prev;
1968 BtKey key;
1969 BtVal val;
1970
1971         //  split higher half of keys to bt->frame
1972
1973         memset (bt->frame, 0, bt->mgr->page_size);
1974         max = set->page->cnt;
1975         cnt = max / 2;
1976         idx = 0;
1977
1978         while( cnt++ < max ) {
1979                 if( slotptr(set->page, cnt)->dead && cnt < max )
1980                         continue;
1981                 val = valptr(set->page, cnt);
1982                 nxt -= val->len + 1;
1983                 ((unsigned char *)bt->frame)[nxt] = val->len;
1984                 memcpy ((unsigned char *)bt->frame + nxt + 1, val->value, val->len);
1985
1986                 key = keyptr(set->page, cnt);
1987                 nxt -= key->len + 1;
1988                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1989
1990                 //      add librarian slot
1991
1992                 slotptr(bt->frame, ++idx)->off = nxt;
1993                 slotptr(bt->frame, idx)->librarian = 1;
1994                 slotptr(bt->frame, idx)->dead = 1;
1995
1996                 //  add actual slot
1997
1998                 slotptr(bt->frame, ++idx)->off = nxt;
1999
2000                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2001                         bt->frame->act++;
2002         }
2003
2004         // remember existing fence key for new page to the right
2005
2006         memcpy (rightkey, key, key->len + 1);
2007
2008         bt->frame->bits = bt->mgr->page_bits;
2009         bt->frame->min = nxt;
2010         bt->frame->cnt = idx;
2011         bt->frame->lvl = lvl;
2012
2013         // link right node
2014
2015         if( set->page_no > ROOT_page )
2016                 memcpy (bt->frame->right, set->page->right, BtId);
2017
2018         //      get new free page and write higher keys to it.
2019
2020         if( !(right->page_no = bt_newpage(bt, bt->frame)) )
2021                 return bt->err;
2022
2023         //      update lower keys to continue in old page
2024
2025         memcpy (bt->frame, set->page, bt->mgr->page_size);
2026         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2027         nxt = bt->mgr->page_size;
2028         set->page->garbage = 0;
2029         set->page->act = 0;
2030         max /= 2;
2031         cnt = 0;
2032         idx = 0;
2033
2034         if( slotptr(bt->frame, max)->librarian )
2035                 max--;
2036
2037         //  assemble page of smaller keys
2038
2039         while( cnt++ < max ) {
2040                 if( slotptr(bt->frame, cnt)->dead )
2041                         continue;
2042                 val = valptr(bt->frame, cnt);
2043                 nxt -= val->len + 1;
2044                 ((unsigned char *)set->page)[nxt] = val->len;
2045                 memcpy ((unsigned char *)set->page + nxt + 1, val->value, val->len);
2046
2047                 key = keyptr(bt->frame, cnt);
2048                 nxt -= key->len + 1;
2049                 memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
2050
2051                 //      add librarian slot
2052
2053                 slotptr(set->page, ++idx)->off = nxt;
2054                 slotptr(set->page, idx)->librarian = 1;
2055                 slotptr(set->page, idx)->dead = 1;
2056
2057                 //      add actual slot
2058
2059                 slotptr(set->page, ++idx)->off = nxt;
2060                 set->page->act++;
2061         }
2062
2063         // remember fence key for smaller page
2064
2065         memcpy(fencekey, key, key->len + 1);
2066
2067         bt_putid(set->page->right, right->page_no);
2068         set->page->min = nxt;
2069         set->page->cnt = idx;
2070
2071         // if current page is the root page, split it
2072
2073         if( set->page_no == ROOT_page )
2074                 return bt_splitroot (bt, set, fencekey, right->page_no);
2075
2076         // insert new fences in their parent pages
2077
2078         right->latch = bt_pinlatch (bt, right->page_no);
2079         bt_lockpage (BtLockParent, right->latch);
2080
2081         bt_lockpage (BtLockParent, set->latch);
2082         bt_unlockpage (BtLockWrite, set->latch);
2083
2084         // insert new fence for reformulated left block of smaller keys
2085
2086         bt_putid (value, set->page_no);
2087
2088         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId) )
2089                 return bt->err;
2090
2091         // switch fence for right block of larger keys to new right page
2092
2093         bt_putid (value, right->page_no);
2094
2095         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId) )
2096                 return bt->err;
2097
2098         bt_unlockpage (BtLockParent, set->latch);
2099         bt_unpinlatch (set->latch);
2100         bt_unpinpool (set->pool);
2101
2102         bt_unlockpage (BtLockParent, right->latch);
2103         bt_unpinlatch (right->latch);
2104         return 0;
2105 }
2106 //  Insert new key into the btree at given level.
2107
2108 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen)
2109 {
2110 BtPageSet set[1];
2111 uint slot, idx;
2112 uint reuse;
2113 BtKey ptr;
2114 BtKey tst;
2115 BtVal val;
2116
2117         while( 1 ) {
2118                 if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
2119                         ptr = keyptr(set->page, slot);
2120                 else {
2121                         if( !bt->err )
2122                                 bt->err = BTERR_ovflw;
2123                         return bt->err;
2124                 }
2125
2126                 // if librarian slot == found slot, advance to real slot
2127
2128                 if( slotptr(set->page, slot)->librarian )
2129                   if( !keycmp (ptr, key, keylen) )
2130                         ptr = keyptr(set->page, ++slot);
2131
2132                 // if key already exists, update value and return
2133
2134                 if( reuse = !keycmp (ptr, key, keylen) )
2135                   if( val = valptr(set->page, slot), val->len >= vallen ) {
2136                         if( slotptr(set->page, slot)->dead )
2137                                 set->page->act++;
2138                         set->page->garbage += val->len - vallen;
2139                         slotptr(set->page, slot)->dead = 0;
2140                         val->len = vallen;
2141                         memcpy (val->value, value, vallen);
2142                         bt_unlockpage(BtLockWrite, set->latch);
2143                         bt_unpinlatch (set->latch);
2144                         bt_unpinpool (set->pool);
2145                         return 0;
2146                   } else {
2147                         if( !slotptr(set->page, slot)->dead ) {
2148                                 set->page->garbage += val->len + ptr->len + 2;
2149                                 set->page->act--;
2150                         }
2151                         slotptr(set->page, slot)->dead = 1;
2152                   }
2153
2154                 //      if found slot > desired slot and previous slot
2155                 //      is a librarian slot, use it
2156
2157                 if( !reuse && slot > 1 )
2158                   if( slotptr(set->page, slot-1)->librarian )
2159                         slot--;
2160
2161                 // check if page has enough space
2162
2163                 if( slot = bt_cleanpage (bt, set->page, keylen, slot, vallen) )
2164                         break;
2165
2166                 if( bt_splitpage (bt, set) )
2167                         return bt->err;
2168         }
2169
2170         // calculate next available slot and copy key into page
2171
2172         set->page->min -= vallen + 1; // reset lowest used offset
2173         ((unsigned char *)set->page)[set->page->min] = vallen;
2174         memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
2175
2176         set->page->min -= keylen + 1; // reset lowest used offset
2177         ((unsigned char *)set->page)[set->page->min] = keylen;
2178         memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
2179
2180         for( idx = slot; idx < set->page->cnt; idx++ )
2181           if( slotptr(set->page, idx)->dead )
2182                 break;
2183
2184         // now insert key into array before slot
2185
2186         if( !reuse && idx == set->page->cnt )
2187                 idx++, set->page->cnt++;
2188
2189         set->page->act++;
2190
2191         while( idx > slot )
2192                 *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
2193
2194         slotptr(set->page, slot)->off = set->page->min;
2195         slotptr(set->page, slot)->librarian = 0;
2196         slotptr(set->page, slot)->dead = 0;
2197
2198         bt_unlockpage (BtLockWrite, set->latch);
2199         bt_unpinlatch (set->latch);
2200         bt_unpinpool (set->pool);
2201         return 0;
2202 }
2203
2204 //  cache page of keys into cursor and return starting slot for given key
2205
2206 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2207 {
2208 BtPageSet set[1];
2209 uint slot;
2210
2211         // cache page for retrieval
2212
2213         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2214           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2215         else
2216           return 0;
2217
2218         bt->cursor_page = set->page_no;
2219
2220         bt_unlockpage(BtLockRead, set->latch);
2221         bt_unpinlatch (set->latch);
2222         bt_unpinpool (set->pool);
2223         return slot;
2224 }
2225
2226 //  return next slot for cursor page
2227 //  or slide cursor right into next page
2228
2229 uint bt_nextkey (BtDb *bt, uint slot)
2230 {
2231 BtPageSet set[1];
2232 uid right;
2233
2234   do {
2235         right = bt_getid(bt->cursor->right);
2236
2237         while( slot++ < bt->cursor->cnt )
2238           if( slotptr(bt->cursor,slot)->dead )
2239                 continue;
2240           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2241                 return slot;
2242           else
2243                 break;
2244
2245         if( !right )
2246                 break;
2247
2248         bt->cursor_page = right;
2249
2250         if( set->pool = bt_pinpool (bt, right) )
2251                 set->page = bt_page (bt, set->pool, right);
2252         else
2253                 return 0;
2254
2255         set->latch = bt_pinlatch (bt, right);
2256     bt_lockpage(BtLockRead, set->latch);
2257
2258         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2259
2260         bt_unlockpage(BtLockRead, set->latch);
2261         bt_unpinlatch (set->latch);
2262         bt_unpinpool (set->pool);
2263         slot = 0;
2264
2265   } while( 1 );
2266
2267   return bt->err = 0;
2268 }
2269
2270 BtKey bt_key(BtDb *bt, uint slot)
2271 {
2272         return keyptr(bt->cursor, slot);
2273 }
2274
2275 BtVal bt_val(BtDb *bt, uint slot)
2276 {
2277         return valptr(bt->cursor,slot);
2278 }
2279
2280 #ifdef STANDALONE
2281
2282 #ifndef unix
2283 double getCpuTime(int type)
2284 {
2285 FILETIME crtime[1];
2286 FILETIME xittime[1];
2287 FILETIME systime[1];
2288 FILETIME usrtime[1];
2289 SYSTEMTIME timeconv[1];
2290 double ans = 0;
2291
2292         memset (timeconv, 0, sizeof(SYSTEMTIME));
2293
2294         switch( type ) {
2295         case 0:
2296                 GetSystemTimeAsFileTime (xittime);
2297                 FileTimeToSystemTime (xittime, timeconv);
2298                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2299                 break;
2300         case 1:
2301                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2302                 FileTimeToSystemTime (usrtime, timeconv);
2303                 break;
2304         case 2:
2305                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2306                 FileTimeToSystemTime (systime, timeconv);
2307                 break;
2308         }
2309
2310         ans += (double)timeconv->wHour * 3600;
2311         ans += (double)timeconv->wMinute * 60;
2312         ans += (double)timeconv->wSecond;
2313         ans += (double)timeconv->wMilliseconds / 1000;
2314         return ans;
2315 }
2316 #else
2317 #include <time.h>
2318 #include <sys/resource.h>
2319
2320 double getCpuTime(int type)
2321 {
2322 struct rusage used[1];
2323 struct timeval tv[1];
2324
2325         switch( type ) {
2326         case 0:
2327                 gettimeofday(tv, NULL);
2328                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2329
2330         case 1:
2331                 getrusage(RUSAGE_SELF, used);
2332                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2333
2334         case 2:
2335                 getrusage(RUSAGE_SELF, used);
2336                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2337         }
2338
2339         return 0;
2340 }
2341 #endif
2342
2343 uint bt_latchaudit (BtDb *bt)
2344 {
2345 ushort idx, hashidx;
2346 uid next, page_no;
2347 BtLatchSet *latch;
2348 uint cnt = 0;
2349 BtKey ptr;
2350
2351 #ifdef unix
2352         posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2353 #endif
2354         if( *(ushort *)(bt->mgr->latchmgr->lock) )
2355                 fprintf(stderr, "Alloc page locked\n");
2356         *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2357
2358         for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2359                 latch = bt->mgr->latchsets + idx;
2360                 if( *latch->readwr->rin & MASK )
2361                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2362                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2363
2364                 if( *latch->access->rin & MASK )
2365                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2366                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2367
2368                 if( *latch->parent->rin & MASK )
2369                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2370                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2371
2372                 if( latch->pin ) {
2373                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2374                         latch->pin = 0;
2375                 }
2376         }
2377
2378         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2379           if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2380                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2381
2382           *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2383
2384           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2385                 latch = bt->mgr->latchsets + idx;
2386                 if( *(ushort *)latch->busy )
2387                         fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2388                 *(ushort *)latch->busy = 0;
2389                 if( latch->pin )
2390                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2391           } while( idx = latch->next );
2392         }
2393
2394         next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2395         page_no = LEAF_page;
2396
2397         while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2398         off64_t off = page_no << bt->mgr->page_bits;
2399 #ifdef unix
2400                 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2401 #else
2402                 DWORD amt[1];
2403
2404                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2405
2406                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2407                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2408
2409                   if( *amt <  bt->mgr->page_size )
2410                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2411 #endif
2412                 if( !bt->frame->free ) {
2413                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2414                   ptr = keyptr(bt->frame, idx+1);
2415                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2416                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2417                  }
2418                  if( !bt->frame->lvl )
2419                         cnt += bt->frame->act;
2420                 }
2421                 if( page_no > LEAF_page )
2422                         next = page_no + 1;
2423                 page_no = next;
2424         }
2425         return cnt - 1;
2426 }
2427
2428 typedef struct {
2429         char idx;
2430         char *type;
2431         char *infile;
2432         BtMgr *mgr;
2433         int num;
2434 } ThreadArg;
2435
2436 //  standalone program to index file of keys
2437 //  then list them onto std-out
2438
2439 #ifdef unix
2440 void *index_file (void *arg)
2441 #else
2442 uint __stdcall index_file (void *arg)
2443 #endif
2444 {
2445 int line = 0, found = 0, cnt = 0;
2446 uid next, page_no = LEAF_page;  // start on first page of leaves
2447 unsigned char key[256];
2448 ThreadArg *args = arg;
2449 int ch, len = 0, slot;
2450 BtPageSet set[1];
2451 BtKey ptr;
2452 BtDb *bt;
2453 FILE *in;
2454
2455         bt = bt_open (args->mgr);
2456
2457         switch(args->type[0] | 0x20)
2458         {
2459         case 'a':
2460                 fprintf(stderr, "started latch mgr audit\n");
2461                 cnt = bt_latchaudit (bt);
2462                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2463                 break;
2464
2465         case 'p':
2466                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2467                 if( in = fopen (args->infile, "rb") )
2468                   while( ch = getc(in), ch != EOF )
2469                         if( ch == '\n' )
2470                         {
2471                           line++;
2472
2473                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10) )
2474                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2475                           len = 0;
2476                         }
2477                         else if( len < 255 )
2478                                 key[len++] = ch;
2479                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2480                 break;
2481
2482         case 'w':
2483                 fprintf(stderr, "started indexing for %s\n", args->infile);
2484                 if( in = fopen (args->infile, "rb") )
2485                   while( ch = getc(in), ch != EOF )
2486                         if( ch == '\n' )
2487                         {
2488                           line++;
2489
2490                           if( args->num == 1 )
2491                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2492
2493                           else if( args->num )
2494                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2495
2496                           if( bt_insertkey (bt, key, len, 0, NULL, 0) )
2497                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2498                           len = 0;
2499                         }
2500                         else if( len < 255 )
2501                                 key[len++] = ch;
2502                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2503                 break;
2504
2505         case 'd':
2506                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2507                 if( in = fopen (args->infile, "rb") )
2508                   while( ch = getc(in), ch != EOF )
2509                         if( ch == '\n' )
2510                         {
2511                           line++;
2512                           if( args->num == 1 )
2513                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2514
2515                           else if( args->num )
2516                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2517
2518                           if( bt_deletekey (bt, key, len, 0) )
2519                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2520                           len = 0;
2521                         }
2522                         else if( len < 255 )
2523                                 key[len++] = ch;
2524                 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
2525                 break;
2526
2527         case 'f':
2528                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2529                 if( in = fopen (args->infile, "rb") )
2530                   while( ch = getc(in), ch != EOF )
2531                         if( ch == '\n' )
2532                         {
2533                           line++;
2534                           if( args->num == 1 )
2535                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2536
2537                           else if( args->num )
2538                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2539
2540                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2541                                 found++;
2542                           else if( bt->err )
2543                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2544                           len = 0;
2545                         }
2546                         else if( len < 255 )
2547                                 key[len++] = ch;
2548                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2549                 break;
2550
2551         case 's':
2552                 fprintf(stderr, "started scanning\n");
2553                 do {
2554                         if( set->pool = bt_pinpool (bt, page_no) )
2555                                 set->page = bt_page (bt, set->pool, page_no);
2556                         else
2557                                 break;
2558                         set->latch = bt_pinlatch (bt, page_no);
2559                         bt_lockpage (BtLockRead, set->latch);
2560                         next = bt_getid (set->page->right);
2561                         cnt += set->page->act;
2562
2563                         for( slot = 0; slot++ < set->page->cnt; )
2564                          if( next || slot < set->page->cnt )
2565                           if( !slotptr(set->page, slot)->dead ) {
2566                                 ptr = keyptr(set->page, slot);
2567                                 fwrite (ptr->key, ptr->len, 1, stdout);
2568                                 fputc ('\n', stdout);
2569                           }
2570
2571                         bt_unlockpage (BtLockRead, set->latch);
2572                         bt_unpinlatch (set->latch);
2573                         bt_unpinpool (set->pool);
2574                 } while( page_no = next );
2575
2576                 cnt--;  // remove stopper key
2577                 fprintf(stderr, " Total keys read %d\n", cnt);
2578                 break;
2579
2580         case 'c':
2581 #ifdef unix
2582                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2583 #endif
2584                 fprintf(stderr, "started counting\n");
2585                 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2586                 page_no = LEAF_page;
2587
2588                 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2589                 uid off = page_no << bt->mgr->page_bits;
2590 #ifdef unix
2591                   pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2592 #else
2593                 DWORD amt[1];
2594
2595                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2596
2597                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2598                         return bt->err = BTERR_map;
2599
2600                   if( *amt <  bt->mgr->page_size )
2601                         return bt->err = BTERR_map;
2602 #endif
2603                         if( !bt->frame->free && !bt->frame->lvl )
2604                                 cnt += bt->frame->act;
2605                         if( page_no > LEAF_page )
2606                                 next = page_no + 1;
2607                         page_no = next;
2608                 }
2609                 
2610                 cnt--;  // remove stopper key
2611                 fprintf(stderr, " Total keys read %d\n", cnt);
2612                 break;
2613         }
2614
2615         bt_close (bt);
2616 #ifdef unix
2617         return NULL;
2618 #else
2619         return 0;
2620 #endif
2621 }
2622
2623 typedef struct timeval timer;
2624
2625 int main (int argc, char **argv)
2626 {
2627 int idx, cnt, len, slot, err;
2628 int segsize, bits = 16;
2629 double start, stop;
2630 #ifdef unix
2631 pthread_t *threads;
2632 #else
2633 HANDLE *threads;
2634 #endif
2635 ThreadArg *args;
2636 uint poolsize = 0;
2637 float elapsed;
2638 int num = 0;
2639 char key[1];
2640 BtMgr *mgr;
2641 BtKey ptr;
2642 BtDb *bt;
2643
2644         if( argc < 3 ) {
2645                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2646                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2647                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2648                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2649                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2650                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2651                 exit(0);
2652         }
2653
2654         start = getCpuTime(0);
2655
2656         if( argc > 3 )
2657                 bits = atoi(argv[3]);
2658
2659         if( argc > 4 )
2660                 poolsize = atoi(argv[4]);
2661
2662         if( !poolsize )
2663                 fprintf (stderr, "Warning: no mapped_pool\n");
2664
2665         if( poolsize > 65535 )
2666                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2667
2668         if( argc > 5 )
2669                 segsize = atoi(argv[5]);
2670         else
2671                 segsize = 4;    // 16 pages per mmap segment
2672
2673         if( argc > 6 )
2674                 num = atoi(argv[6]);
2675
2676         cnt = argc - 7;
2677 #ifdef unix
2678         threads = malloc (cnt * sizeof(pthread_t));
2679 #else
2680         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2681 #endif
2682         args = malloc (cnt * sizeof(ThreadArg));
2683
2684         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2685
2686         if( !mgr ) {
2687                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2688                 exit (1);
2689         }
2690
2691         //      fire off threads
2692
2693         for( idx = 0; idx < cnt; idx++ ) {
2694                 args[idx].infile = argv[idx + 7];
2695                 args[idx].type = argv[2];
2696                 args[idx].mgr = mgr;
2697                 args[idx].num = num;
2698                 args[idx].idx = idx;
2699 #ifdef unix
2700                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2701                         fprintf(stderr, "Error creating thread %d\n", err);
2702 #else
2703                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2704 #endif
2705         }
2706
2707         //      wait for termination
2708
2709 #ifdef unix
2710         for( idx = 0; idx < cnt; idx++ )
2711                 pthread_join (threads[idx], NULL);
2712 #else
2713         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2714
2715         for( idx = 0; idx < cnt; idx++ )
2716                 CloseHandle(threads[idx]);
2717
2718 #endif
2719         elapsed = getCpuTime(0) - start;
2720         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2721         elapsed = getCpuTime(1);
2722         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2723         elapsed = getCpuTime(2);
2724         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2725
2726         bt_mgrclose (mgr);
2727 }
2728
2729 #endif  //STANDALONE