]> pd.if.org Git - btree/blob - fosterbtreef.c
foster btree final code release
[btree] / fosterbtreef.c
1 // foster btree version f2
2 // 18 JAN 2014
3
4 // author: karl malbrain, malbrain@cal.berkeley.edu
5
6 /*
7 This work, including the source code, documentation
8 and related data, is placed into the public domain.
9
10 The orginal author is Karl Malbrain.
11
12 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
13 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
14 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
15 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
16 RESULTING FROM THE USE, MODIFICATION, OR
17 REDISTRIBUTION OF THIS SOFTWARE.
18 */
19
20 // Please see the project home page for documentation
21 // code.google.com/p/high-concurrency-btree
22
23 #define _FILE_OFFSET_BITS 64
24 #define _LARGEFILE64_SOURCE
25
26 #ifdef linux
27 #define _GNU_SOURCE
28 #endif
29
30 #ifdef unix
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <fcntl.h>
35 #include <sys/time.h>
36 #include <sys/mman.h>
37 #include <errno.h>
38 #include <pthread.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #include <process.h>
47 #include <intrin.h>
48 #endif
49
50 #include <memory.h>
51 #include <string.h>
52
53 typedef unsigned long long      uid;
54
55 #ifndef unix
56 typedef unsigned long long      off64_t;
57 typedef unsigned short          ushort;
58 typedef unsigned int            uint;
59 #endif
60
61 #define BT_ro 0x6f72    // ro
62 #define BT_rw 0x7772    // rw
63
64 #define BT_latchtable   128                                     // number of latch manager slots
65
66 #define BT_maxbits              24                                      // maximum page size in bits
67 #define BT_minbits              9                                       // minimum page size in bits
68 #define BT_minpage              (1 << BT_minbits)       // minimum page size
69 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
70
71 /*
72 There are five lock types for each node in three independent sets: 
73 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
74 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
75 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
76 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
77 5. (set 3) ParentLock: Exclusive. Have parent adopt/delete maximum foster child from the node.
78 */
79
80 typedef enum{
81         BtLockAccess,
82         BtLockDelete,
83         BtLockRead,
84         BtLockWrite,
85         BtLockParent
86 }BtLock;
87
88 //      Define the length of the page and key pointers
89
90 #define BtId 6
91
92 //      Page key slot definition.
93
94 //      If BT_maxbits is 15 or less, you can save 4 bytes
95 //      for each key stored by making the first two uints
96 //      into ushorts.  You can also save 4 bytes by removing
97 //      the tod field from the key.
98
99 //      Keys are marked dead, but remain on the page until
100 //      it cleanup is called. The fence key (highest key) for
101 //      the page is always present, even after cleanup.
102
103 typedef struct {
104         uint off:BT_maxbits;            // page offset for key start
105         uint dead:1;                            // set for deleted key
106         uint tod;                                       // time-stamp for key
107         unsigned char id[BtId];         // id associated with key
108 } BtSlot;
109
110 //      The key structure occupies space at the upper end of
111 //      each page.  It's a length byte followed by the value
112 //      bytes.
113
114 typedef struct {
115         unsigned char len;
116         unsigned char key[1];
117 } *BtKey;
118
119 //      The first part of an index page.
120 //      It is immediately followed
121 //      by the BtSlot array of keys.
122
123 typedef struct Page {
124         volatile uint cnt;                      // count of keys in page
125         volatile uint act;                      // count of active keys
126         volatile uint min;                      // next key offset
127         volatile uint foster;           // count of foster children
128         unsigned char bits;                     // page size in bits
129         unsigned char lvl:7;            // level of page
130         unsigned char dirty:1;          // page needs to be cleaned
131         unsigned char right[BtId];      // page number to right
132 } *BtPage;
133
134 //      mode & definition for hash latch implementation
135
136 enum {
137         Mutex = 1,
138         Write = 2,
139         Pending = 4,
140         Share = 8
141 } LockMode;
142
143 // mutex locks the other fields
144 // exclusive is set for write access
145 // share is count of read accessors
146
147 typedef struct {
148         volatile ushort mutex:1;
149         volatile ushort exclusive:1;
150         volatile ushort pending:1;
151         volatile ushort share:13;
152 } BtSpinLatch;
153
154 //  hash table entries
155
156 typedef struct {
157         BtSpinLatch latch[1];
158         volatile ushort slot;           // Latch table entry at head of chain
159 } BtHashEntry;
160
161 //      latch manager table structure
162
163 typedef struct {
164         BtSpinLatch readwr[1];          // read/write page lock
165         BtSpinLatch access[1];          // Access Intent/Page delete
166         BtSpinLatch parent[1];          // adoption of foster children
167         BtSpinLatch busy[1];            // slot is being moved between chains
168         volatile ushort next;           // next entry in hash table chain
169         volatile ushort prev;           // prev entry in hash table chain
170         volatile ushort pin;            // number of outstanding locks
171         volatile ushort hash;           // hash slot entry is under
172         volatile uid page_no;           // latch set page number
173 } BtLatchSet;
174
175 //      The memory mapping pool table buffer manager entry
176
177 typedef struct {
178         unsigned long long int lru;     // number of times accessed
179         uid  basepage;                          // mapped base page number
180         char *map;                                      // mapped memory pointer
181         ushort pin;                                     // mapped page pin counter
182         ushort slot;                            // slot index in this array
183         void *hashprev;                         // previous pool entry for the same hash idx
184         void *hashnext;                         // next pool entry for the same hash idx
185 #ifndef unix
186         HANDLE hmap;                            // Windows memory mapping handle
187 #endif
188 } BtPool;
189
190 //      structure for latch manager on ALLOC_page
191
192 typedef struct {
193         struct Page alloc[2];           // next & free page_nos in right ptr
194         BtSpinLatch lock[1];            // allocation area lite latch
195         ushort latchdeployed;           // highest number of latch entries deployed
196         ushort nlatchpage;                      // number of latch pages at BT_latch
197         ushort latchtotal;                      // number of page latch entries
198         ushort latchhash;                       // number of latch hash table slots
199         ushort latchvictim;                     // next latch entry to examine
200         BtHashEntry table[0];           // the hash table
201 } BtLatchMgr;
202
203 //      The object structure for Btree access
204
205 typedef struct {
206         uint page_size;                         // page size    
207         uint page_bits;                         // page size in bits    
208         uint seg_bits;                          // seg size in pages in bits
209         uint mode;                                      // read-write mode
210 #ifdef unix
211         int idx;
212         char *pooladvise;                       // bit maps for pool page advisements
213 #else
214         HANDLE idx;
215 #endif
216         ushort poolcnt;                         // highest page pool node in use
217         ushort poolmax;                         // highest page pool node allocated
218         ushort poolmask;                        // total number of pages in mmap segment - 1
219         ushort hashsize;                        // size of Hash Table for pool entries
220         ushort evicted;                         // last evicted hash table slot
221         ushort *hash;                           // hash table of pool entries
222         BtPool *pool;                           // memory pool page segments
223         BtSpinLatch *latch;                     // latches for pool hash slots
224         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
225         BtLatchSet *latchsets;          // mapped latch set from latch pages
226 #ifndef unix
227         HANDLE halloc;                          // allocation and latch table handle
228 #endif
229 } BtMgr;
230
231 typedef struct {
232         BtMgr *mgr;                     // buffer manager for thread
233         BtPage cursor;          // cached frame for start/next (never mapped)
234         BtPage frame;           // spare frame for the page split (never mapped)
235         BtPage zero;            // page frame for zeroes at end of file
236         BtPage page;            // current page
237         uid page_no;            // current page number  
238         uid cursor_page;        // current cursor page number   
239         BtLatchSet *set;        // current page latch set
240         BtPool *pool;           // current page pool
241         unsigned char *mem;     // frame, cursor, page memory buffer
242         int found;                      // last delete was found
243         int err;                        // last error
244 } BtDb;
245
246 typedef enum {
247         BTERR_ok = 0,
248         BTERR_struct,
249         BTERR_ovflw,
250         BTERR_lock,
251         BTERR_map,
252         BTERR_wrt,
253         BTERR_hash,
254         BTERR_latch
255 } BTERR;
256
257 // B-Tree functions
258 extern void bt_close (BtDb *bt);
259 extern BtDb *bt_open (BtMgr *mgr);
260 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl);
261 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len);
262 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
263 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
264 extern uint bt_nextkey   (BtDb *bt, uint slot);
265
266 //      manager functions
267 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
268 void bt_mgrclose (BtMgr *mgr);
269
270 //  Helper functions to return cursor slot values
271
272 extern BtKey bt_key (BtDb *bt, uint slot);
273 extern uid bt_uid (BtDb *bt, uint slot);
274 extern uint bt_tod (BtDb *bt, uint slot);
275
276 //  BTree page number constants
277 #define ALLOC_page              0       // allocation & lock manager hash table
278 #define ROOT_page               1       // root of the btree
279 #define LEAF_page               2       // first page of leaves
280 #define LATCH_page              3       // pages for lock manager
281
282 //      Number of levels to create in a new BTree
283
284 #define MIN_lvl                 2
285
286 //  The page is allocated from low and hi ends.
287 //  The key offsets and row-id's are allocated
288 //  from the bottom, while the text of the key
289 //  is allocated from the top.  When the two
290 //  areas meet, the page is split into two.
291
292 //  A key consists of a length byte, two bytes of
293 //  index number (0 - 65534), and up to 253 bytes
294 //  of key value.  Duplicate keys are discarded.
295 //  Associated with each key is a 48 bit row-id.
296
297 //  The b-tree root is always located at page 1.
298 //      The first leaf page of level zero is always
299 //      located on page 2.
300
301 //      When to root page fills, it is split in two and
302 //      the tree height is raised by a new root at page
303 //      one with two keys.
304
305 //      Deleted keys are marked with a dead bit until
306 //      page cleanup The fence key for a node is always
307 //      present, even after deletion and cleanup.
308
309 //  Groups of pages called segments from the btree are
310 //  cached with memory mapping. A hash table is used to keep
311 //  track of the cached segments.  This behaviour is controlled
312 //  by the cache block size parameter to bt_open.
313
314 //  To achieve maximum concurrency one page is locked at a time
315 //  as the tree is traversed to find leaf key in question.
316
317 //      An adoption traversal leaves the parent node locked as the
318 //      tree is traversed to the level in quesiton.
319
320 //  Page 0 is dedicated to lock for new page extensions,
321 //      and chains empty pages together for reuse.
322
323 //      Empty pages are chained together through the ALLOC page and reused.
324
325 //      Access macros to address slot and key values from the page
326
327 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
328 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
329
330 void bt_putid(unsigned char *dest, uid id)
331 {
332 int i = BtId;
333
334         while( i-- )
335                 dest[i] = (unsigned char)id, id >>= 8;
336 }
337
338 uid bt_getid(unsigned char *src)
339 {
340 uid id = 0;
341 int i;
342
343         for( i = 0; i < BtId; i++ )
344                 id <<= 8, id |= *src++; 
345
346         return id;
347 }
348
349 //      wait until write lock mode is clear
350 //      and add 1 to the share count
351
352 void bt_spinreadlock(BtSpinLatch *latch)
353 {
354 ushort prev;
355
356   do {
357 #ifdef unix
358         while( __sync_fetch_and_or((ushort *)latch, Mutex) & Mutex )
359                 sched_yield();
360 #else
361         while( _InterlockedOr16((ushort *)latch, Mutex) & Mutex )
362                 SwitchToThread();
363 #endif
364
365         //  see if exclusive request is granted or pending
366
367         if( prev = !(latch->exclusive | latch->pending) )
368 #ifdef unix
369                 __sync_fetch_and_add((ushort *)latch, Share);
370 #else
371                 _InterlockedExchangeAdd16 ((ushort *)latch, Share);
372 #endif
373
374 #ifdef unix
375         __sync_fetch_and_and ((ushort *)latch, ~Mutex);
376 #else
377         _InterlockedAnd16((ushort *)latch, ~Mutex);
378 #endif
379         if( prev )
380                 return;
381 #ifdef  unix
382   } while( sched_yield(), 1 );
383 #else
384   } while( SwitchToThread(), 1 );
385 #endif
386 }
387
388 //      wait for other read and write latches to relinquish
389
390 void bt_spinwritelock(BtSpinLatch *latch)
391 {
392   do {
393 #ifdef  unix
394         while( __sync_fetch_and_or((ushort *)latch, Mutex | Pending) & Mutex )
395                 sched_yield();
396 #else
397         while( _InterlockedOr16((ushort *)latch, Mutex | Pending) & Mutex )
398                 SwitchToThread();
399 #endif
400         if( !(latch->share | latch->exclusive) ) {
401 #ifdef unix
402                 __sync_fetch_and_or((ushort *)latch, Write);
403                 __sync_fetch_and_and ((ushort *)latch, ~(Mutex | Pending));
404 #else
405                 _InterlockedOr16((ushort *)latch, Write);
406                 _InterlockedAnd16((ushort *)latch, ~(Mutex | Pending));
407 #endif
408                 return;
409         }
410
411 #ifdef unix
412         __sync_fetch_and_and ((ushort *)latch, ~Mutex);
413         sched_yield();
414 #else
415         _InterlockedAnd16((ushort *)latch, ~Mutex);
416         SwitchToThread();
417 #endif
418   } while( 1 );
419 }
420
421 //      try to obtain write lock
422
423 //      return 1 if obtained,
424 //              0 otherwise
425
426 int bt_spinwritetry(BtSpinLatch *latch)
427 {
428 ushort prev;
429
430 #ifdef unix
431         if( prev = __sync_fetch_and_or((ushort *)latch, Mutex), prev & Mutex )
432                 return 0;
433 #else
434         if( prev = _InterlockedOr16((ushort *)latch, Mutex), prev & Mutex )
435                 return 0;
436 #endif
437         //      take write access if all bits are clear
438
439         if( !prev )
440 #ifdef unix
441                 __sync_fetch_and_or ((ushort *)latch, Write);
442 #else
443                 _InterlockedOr16((ushort *)latch, Write);
444 #endif
445
446 #ifdef unix
447         __sync_fetch_and_and ((ushort *)latch, ~Mutex);
448 #else
449         _InterlockedAnd16((ushort *)latch, ~Mutex);
450 #endif
451         return !prev;
452 }
453
454 //      clear write mode
455
456 void bt_spinreleasewrite(BtSpinLatch *latch)
457 {
458 #ifdef unix
459         __sync_fetch_and_and ((ushort *)latch, ~Write);
460 #else
461         _InterlockedAnd16((ushort *)latch, ~Write);
462 #endif
463 }
464
465 //      decrement reader count
466
467 void bt_spinreleaseread(BtSpinLatch *latch)
468 {
469 #ifdef unix
470         __sync_fetch_and_add((ushort *)latch, -Share);
471 #else
472         _InterlockedExchangeAdd16 ((ushort *)latch, -Share);
473 #endif
474 }
475
476 //      link latch table entry into latch hash table
477
478 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
479 {
480 BtLatchSet *set = bt->mgr->latchsets + victim;
481
482         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
483                 bt->mgr->latchsets[set->next].prev = victim;
484
485         bt->mgr->latchmgr->table[hashidx].slot = victim;
486         set->page_no = page_no;
487         set->hash = hashidx;
488         set->prev = 0;
489 }
490
491 //      release latch pin
492
493 void bt_unpinlatch (BtLatchSet *set)
494 {
495 #ifdef unix
496         __sync_fetch_and_add(&set->pin, -1);
497 #else
498         _InterlockedDecrement16 (&set->pin);
499 #endif
500 }
501
502 //      find existing latchset or inspire new one
503 //      return with latchset pinned
504
505 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
506 {
507 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
508 ushort slot, avail = 0, victim, idx;
509 BtLatchSet *set;
510
511         //  obtain read lock on hash table entry
512
513         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
514
515         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
516         {
517                 set = bt->mgr->latchsets + slot;
518                 if( page_no == set->page_no )
519                         break;
520         } while( slot = set->next );
521
522         if( slot ) {
523 #ifdef unix
524                 __sync_fetch_and_add(&set->pin, 1);
525 #else
526                 _InterlockedIncrement16 (&set->pin);
527 #endif
528         }
529
530     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
531
532         if( slot )
533                 return set;
534
535   //  try again, this time with write lock
536
537   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
538
539   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
540   {
541         set = bt->mgr->latchsets + slot;
542         if( page_no == set->page_no )
543                 break;
544         if( !set->pin && !avail )
545                 avail = slot;
546   } while( slot = set->next );
547
548   //  found our entry, or take over an unpinned one
549
550   if( slot || (slot = avail) ) {
551         set = bt->mgr->latchsets + slot;
552 #ifdef unix
553         __sync_fetch_and_add(&set->pin, 1);
554 #else
555         _InterlockedIncrement16 (&set->pin);
556 #endif
557         set->page_no = page_no;
558         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
559         return set;
560   }
561
562         //  see if there are any unused entries
563 #ifdef unix
564         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
565 #else
566         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
567 #endif
568
569         if( victim < bt->mgr->latchmgr->latchtotal ) {
570                 set = bt->mgr->latchsets + victim;
571 #ifdef unix
572                 __sync_fetch_and_add(&set->pin, 1);
573 #else
574                 _InterlockedIncrement16 (&set->pin);
575 #endif
576                 bt_latchlink (bt, hashidx, victim, page_no);
577                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
578                 return set;
579         }
580
581 #ifdef unix
582         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
583 #else
584         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
585 #endif
586   //  find and reuse previous lock entry
587
588   while( 1 ) {
589 #ifdef unix
590         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
591 #else
592         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
593 #endif
594         //      we don't use slot zero
595
596         if( victim %= bt->mgr->latchmgr->latchtotal )
597                 set = bt->mgr->latchsets + victim;
598         else
599                 continue;
600
601         //      take control of our slot
602         //      from other threads
603
604         if( set->pin || !bt_spinwritetry (set->busy) )
605                 continue;
606
607         idx = set->hash;
608
609         // try to get write lock on hash chain
610         //      skip entry if not obtained
611         //      or has outstanding locks
612
613         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
614                 bt_spinreleasewrite (set->busy);
615                 continue;
616         }
617
618         if( set->pin ) {
619                 bt_spinreleasewrite (set->busy);
620                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
621                 continue;
622         }
623
624         //  unlink our available victim from its hash chain
625
626         if( set->prev )
627                 bt->mgr->latchsets[set->prev].next = set->next;
628         else
629                 bt->mgr->latchmgr->table[idx].slot = set->next;
630
631         if( set->next )
632                 bt->mgr->latchsets[set->next].prev = set->prev;
633
634         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
635 #ifdef unix
636         __sync_fetch_and_add(&set->pin, 1);
637 #else
638         _InterlockedIncrement16 (&set->pin);
639 #endif
640         bt_latchlink (bt, hashidx, victim, page_no);
641         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
642         bt_spinreleasewrite (set->busy);
643         return set;
644   }
645 }
646
647 void bt_mgrclose (BtMgr *mgr)
648 {
649 BtPool *pool;
650 uint slot;
651
652         // release mapped pages
653         //      note that slot zero is never used
654
655         for( slot = 1; slot < mgr->poolmax; slot++ ) {
656                 pool = mgr->pool + slot;
657                 if( pool->slot )
658 #ifdef unix
659                         munmap (pool->map, (mgr->poolmask+1) << mgr->page_bits);
660 #else
661                 {
662                         FlushViewOfFile(pool->map, 0);
663                         UnmapViewOfFile(pool->map);
664                         CloseHandle(pool->hmap);
665                 }
666 #endif
667         }
668
669 #ifdef unix
670         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
671         munmap (mgr->latchmgr, mgr->page_size);
672 #else
673         FlushViewOfFile(mgr->latchmgr, 0);
674         UnmapViewOfFile(mgr->latchmgr);
675         CloseHandle(mgr->halloc);
676 #endif
677 #ifdef unix
678         close (mgr->idx);
679         free (mgr->pool);
680         free (mgr->hash);
681         free (mgr->latch);
682         free (mgr->pooladvise);
683         free (mgr);
684 #else
685         FlushFileBuffers(mgr->idx);
686         CloseHandle(mgr->idx);
687         GlobalFree (mgr->pool);
688         GlobalFree (mgr->hash);
689         GlobalFree (mgr->latch);
690         GlobalFree (mgr);
691 #endif
692 }
693
694 //      close and release memory
695
696 void bt_close (BtDb *bt)
697 {
698 #ifdef unix
699         if ( bt->mem )
700                 free (bt->mem);
701 #else
702         if ( bt->mem)
703                 VirtualFree (bt->mem, 0, MEM_RELEASE);
704 #endif
705         free (bt);
706 }
707
708 //  open/create new btree buffer manager
709
710 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
711 //              size of mapped page pool (e.g. 8192)
712
713 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
714 {
715 uint lvl, attr, cacheblk, last, slot, idx;
716 uint nlatchpage, latchhash;
717 BtLatchMgr *latchmgr;
718 off64_t size;
719 uint amt[1];
720 BtMgr* mgr;
721 BtKey key;
722 int flag;
723
724 #ifndef unix
725 SYSTEM_INFO sysinfo[1];
726 #endif
727
728         // determine sanity of page size and buffer pool
729
730         if( bits > BT_maxbits )
731                 bits = BT_maxbits;
732         else if( bits < BT_minbits )
733                 bits = BT_minbits;
734
735         if( !poolmax )
736                 return NULL;    // must have buffer pool
737
738 #ifdef unix
739         mgr = calloc (1, sizeof(BtMgr));
740
741         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
742
743         if( mgr->idx == -1 )
744                 return free(mgr), NULL;
745         
746         cacheblk = 4096;        // minimum mmap segment size for unix
747
748 #else
749         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
750         attr = FILE_ATTRIBUTE_NORMAL;
751         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
752
753         if( mgr->idx == INVALID_HANDLE_VALUE )
754                 return GlobalFree(mgr), NULL;
755
756         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
757         GetSystemInfo(sysinfo);
758         cacheblk = sysinfo->dwAllocationGranularity;
759 #endif
760
761 #ifdef unix
762         latchmgr = malloc (BT_maxpage);
763         *amt = 0;
764
765         // read minimum page size to get root info
766
767         if( size = lseek (mgr->idx, 0L, 2) ) {
768                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
769                         bits = latchmgr->alloc->bits;
770                 else
771                         return free(mgr), free(latchmgr), NULL;
772         } else if( mode == BT_ro )
773                 return free(latchmgr), bt_mgrclose (mgr), NULL;
774 #else
775         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
776         size = GetFileSize(mgr->idx, amt);
777
778         if( size || *amt ) {
779                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
780                         return bt_mgrclose (mgr), NULL;
781                 bits = latchmgr->alloc->bits;
782         } else if( mode == BT_ro )
783                 return bt_mgrclose (mgr), NULL;
784 #endif
785
786         mgr->page_size = 1 << bits;
787         mgr->page_bits = bits;
788
789         mgr->poolmax = poolmax;
790         mgr->mode = mode;
791
792         if( cacheblk < mgr->page_size )
793                 cacheblk = mgr->page_size;
794
795         //  mask for partial memmaps
796
797         mgr->poolmask = (cacheblk >> bits) - 1;
798
799         //      see if requested size of pages per memmap is greater
800
801         if( (1 << segsize) > mgr->poolmask )
802                 mgr->poolmask = (1 << segsize) - 1;
803
804         mgr->seg_bits = 0;
805
806         while( (1 << mgr->seg_bits) <= mgr->poolmask )
807                 mgr->seg_bits++;
808
809         mgr->hashsize = hashsize;
810
811 #ifdef unix
812         mgr->pool = calloc (poolmax, sizeof(BtPool));
813         mgr->hash = calloc (hashsize, sizeof(ushort));
814         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
815         mgr->pooladvise = calloc (poolmax, (mgr->poolmask + 8) / 8);
816 #else
817         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
818         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
819         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
820 #endif
821
822         if( size || *amt )
823                 goto mgrlatch;
824
825         // initialize an empty b-tree with latch page, root page, page of leaves
826         // and page(s) of latches
827
828         memset (latchmgr, 0, 1 << bits);
829         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
830         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
831         latchmgr->alloc->bits = mgr->page_bits;
832
833         latchmgr->nlatchpage = nlatchpage;
834         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
835
836         //  initialize latch manager
837
838         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
839
840         //      size of hash table = total number of latchsets
841
842         if( latchhash > latchmgr->latchtotal )
843                 latchhash = latchmgr->latchtotal;
844
845         latchmgr->latchhash = latchhash;
846
847 #ifdef unix
848         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
849                 return bt_mgrclose (mgr), NULL;
850 #else
851         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
852                 return bt_mgrclose (mgr), NULL;
853
854         if( *amt < mgr->page_size )
855                 return bt_mgrclose (mgr), NULL;
856 #endif
857
858         memset (latchmgr, 0, 1 << bits);
859         latchmgr->alloc->bits = mgr->page_bits;
860
861         for( lvl=MIN_lvl; lvl--; ) {
862                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3;
863                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
864                 key = keyptr(latchmgr->alloc, 1);
865                 key->len = 2;                   // create stopper key
866                 key->key[0] = 0xff;
867                 key->key[1] = 0xff;
868                 latchmgr->alloc->min = mgr->page_size - 3;
869                 latchmgr->alloc->lvl = lvl;
870                 latchmgr->alloc->cnt = 1;
871                 latchmgr->alloc->act = 1;
872 #ifdef unix
873                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
874                         return bt_mgrclose (mgr), NULL;
875 #else
876                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
877                         return bt_mgrclose (mgr), NULL;
878
879                 if( *amt < mgr->page_size )
880                         return bt_mgrclose (mgr), NULL;
881 #endif
882         }
883
884         // clear out latch manager locks
885         //      and rest of pages to round out segment
886
887         memset(latchmgr, 0, mgr->page_size);
888         last = MIN_lvl + 1;
889
890         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
891 #ifdef unix
892                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
893 #else
894                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
895                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
896                         return bt_mgrclose (mgr), NULL;
897                 if( *amt < mgr->page_size )
898                         return bt_mgrclose (mgr), NULL;
899 #endif
900                 last++;
901         }
902
903 mgrlatch:
904 #ifdef unix
905         flag = PROT_READ | PROT_WRITE;
906         mgr->latchmgr = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
907         if( mgr->latchmgr == MAP_FAILED )
908                 return bt_mgrclose (mgr), NULL;
909         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
910         if( mgr->latchsets == MAP_FAILED )
911                 return bt_mgrclose (mgr), NULL;
912 #else
913         flag = PAGE_READWRITE;
914         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
915         if( !mgr->halloc )
916                 return bt_mgrclose (mgr), NULL;
917
918         flag = FILE_MAP_WRITE;
919         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
920         if( !mgr->latchmgr )
921                 return GetLastError(), bt_mgrclose (mgr), NULL;
922
923         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
924 #endif
925
926 #ifdef unix
927         free (latchmgr);
928 #else
929         VirtualFree (latchmgr, 0, MEM_RELEASE);
930 #endif
931         return mgr;
932 }
933
934 //      open BTree access method
935 //      based on buffer manager
936
937 BtDb *bt_open (BtMgr *mgr)
938 {
939 BtDb *bt = malloc (sizeof(*bt));
940
941         memset (bt, 0, sizeof(*bt));
942         bt->mgr = mgr;
943 #ifdef unix
944         bt->mem = malloc (3 *mgr->page_size);
945 #else
946         bt->mem = VirtualAlloc(NULL, 3 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
947 #endif
948         bt->frame = (BtPage)bt->mem;
949         bt->zero = (BtPage)(bt->mem + 1 * mgr->page_size);
950         bt->cursor = (BtPage)(bt->mem + 2 * mgr->page_size);
951
952         memset(bt->zero, 0, mgr->page_size);
953         return bt;
954 }
955
956 //  compare two keys, returning > 0, = 0, or < 0
957 //  as the comparison value
958
959 int keycmp (BtKey key1, unsigned char *key2, uint len2)
960 {
961 uint len1 = key1->len;
962 int ans;
963
964         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
965                 return ans;
966
967         if( len1 > len2 )
968                 return 1;
969         if( len1 < len2 )
970                 return -1;
971
972         return 0;
973 }
974
975 //      Buffer Pool mgr
976
977 // find segment in pool
978 // must be called with hashslot idx locked
979 //      return NULL if not there
980 //      otherwise return node
981
982 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
983 {
984 BtPool *pool;
985 uint slot;
986
987         // compute start of hash chain in pool
988
989         if( slot = bt->mgr->hash[idx] ) 
990                 pool = bt->mgr->pool + slot;
991         else
992                 return NULL;
993
994         page_no &= ~bt->mgr->poolmask;
995
996         while( pool->basepage != page_no )
997           if( pool = pool->hashnext )
998                 continue;
999           else
1000                 return NULL;
1001
1002         return pool;
1003 }
1004
1005 // add segment to hash table
1006
1007 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1008 {
1009 BtPool *node;
1010 uint slot;
1011
1012         pool->hashprev = pool->hashnext = NULL;
1013         pool->basepage = page_no & ~bt->mgr->poolmask;
1014         pool->lru = 1;
1015
1016         if( slot = bt->mgr->hash[idx] ) {
1017                 node = bt->mgr->pool + slot;
1018                 pool->hashnext = node;
1019                 node->hashprev = pool;
1020         }
1021
1022         bt->mgr->hash[idx] = pool->slot;
1023 }
1024
1025 //      find best segment to evict from buffer pool
1026
1027 BtPool *bt_findlru (BtDb *bt, uint hashslot)
1028 {
1029 unsigned long long int target = ~0LL;
1030 BtPool *pool = NULL, *node;
1031
1032         if( !hashslot )
1033                 return NULL;
1034
1035         node = bt->mgr->pool + hashslot;
1036
1037         //      scan pool entries under hash table slot
1038
1039         do {
1040           if( node->pin )
1041                 continue;
1042           if( node->lru > target )
1043                 continue;
1044           target = node->lru;
1045           pool = node;
1046         } while( node = node->hashnext );
1047
1048         return pool;
1049 }
1050
1051 //  map new buffer pool segment to virtual memory
1052
1053 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1054 {
1055 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1056 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1057 int flag;
1058
1059 #ifdef unix
1060         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1061         pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1062         if( pool->map == MAP_FAILED )
1063                 return bt->err = BTERR_map;
1064         // clear out madvise issued bits
1065         memset (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8) / 8), 0, (bt->mgr->poolmask + 8)/8);
1066 #else
1067         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1068         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1069         if( !pool->hmap )
1070                 return bt->err = BTERR_map;
1071
1072         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1073         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1074         if( !pool->map )
1075                 return bt->err = BTERR_map;
1076 #endif
1077         return bt->err = 0;
1078 }
1079
1080 //      calculate page within pool
1081
1082 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1083 {
1084 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1085 BtPage page;
1086
1087         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1088 #ifdef unix
1089         {
1090         uint idx = subpage / 8;
1091         uint bit = subpage % 8;
1092
1093                 if( ~((bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] >> bit) & 1 ) {
1094                   madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1095                   (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] |= 1 << bit;
1096                 }
1097         }
1098 #endif
1099         return page;
1100 }
1101
1102 //  release pool pin
1103
1104 void bt_unpinpool (BtPool *pool)
1105 {
1106 #ifdef unix
1107         __sync_fetch_and_add(&pool->pin, -1);
1108 #else
1109         _InterlockedDecrement16 (&pool->pin);
1110 #endif
1111 }
1112
1113 //      find or place requested page in segment-pool
1114 //      return pool table entry, incrementing pin
1115
1116 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1117 {
1118 BtPool *pool, *node, *next;
1119 uint slot, idx, victim;
1120 BtLatchSet *set;
1121
1122         //      lock hash table chain
1123
1124         idx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1125         bt_spinreadlock (&bt->mgr->latch[idx]);
1126
1127         //      look up in hash table
1128
1129         if( pool = bt_findpool(bt, page_no, idx) ) {
1130 #ifdef unix
1131                 __sync_fetch_and_add(&pool->pin, 1);
1132 #else
1133                 _InterlockedIncrement16 (&pool->pin);
1134 #endif
1135                 bt_spinreleaseread (&bt->mgr->latch[idx]);
1136                 pool->lru++;
1137                 return pool;
1138         }
1139
1140         //      upgrade to write lock
1141
1142         bt_spinreleaseread (&bt->mgr->latch[idx]);
1143         bt_spinwritelock (&bt->mgr->latch[idx]);
1144
1145         // try to find page in pool with write lock
1146
1147         if( pool = bt_findpool(bt, page_no, idx) ) {
1148 #ifdef unix
1149                 __sync_fetch_and_add(&pool->pin, 1);
1150 #else
1151                 _InterlockedIncrement16 (&pool->pin);
1152 #endif
1153                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1154                 pool->lru++;
1155                 return pool;
1156         }
1157
1158         // allocate a new pool node
1159         // and add to hash table
1160
1161 #ifdef unix
1162         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1163 #else
1164         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1165 #endif
1166
1167         if( ++slot < bt->mgr->poolmax ) {
1168                 pool = bt->mgr->pool + slot;
1169                 pool->slot = slot;
1170
1171                 if( bt_mapsegment(bt, pool, page_no) )
1172                         return NULL;
1173
1174                 bt_linkhash(bt, pool, page_no, idx);
1175 #ifdef unix
1176                 __sync_fetch_and_add(&pool->pin, 1);
1177 #else
1178                 _InterlockedIncrement16 (&pool->pin);
1179 #endif
1180                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1181                 return pool;
1182         }
1183
1184         // pool table is full
1185         //      find best pool entry to evict
1186
1187 #ifdef unix
1188         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1189 #else
1190         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1191 #endif
1192
1193         while( 1 ) {
1194 #ifdef unix
1195                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1196 #else
1197                 victim = _InterlockedIncrement16 (&bt->mgr->evicted) - 1;
1198 #endif
1199                 victim %= bt->mgr->hashsize;
1200
1201                 // try to get write lock
1202                 //      skip entry if not obtained
1203
1204                 if( !bt_spinwritetry (&bt->mgr->latch[victim]) )
1205                         continue;
1206
1207                 //  if cache entry is empty
1208                 //      or no slots are unpinned
1209                 //      skip this entry
1210
1211                 if( !(pool = bt_findlru(bt, bt->mgr->hash[victim])) ) {
1212                         bt_spinreleasewrite (&bt->mgr->latch[victim]);
1213                         continue;
1214                 }
1215
1216                 // unlink victim pool node from hash table
1217
1218                 if( node = pool->hashprev )
1219                         node->hashnext = pool->hashnext;
1220                 else if( node = pool->hashnext )
1221                         bt->mgr->hash[victim] = node->slot;
1222                 else
1223                         bt->mgr->hash[victim] = 0;
1224
1225                 if( node = pool->hashnext )
1226                         node->hashprev = pool->hashprev;
1227
1228                 bt_spinreleasewrite (&bt->mgr->latch[victim]);
1229
1230                 //      remove old file mapping
1231 #ifdef unix
1232                 munmap (pool->map, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1233 #else
1234                 FlushViewOfFile(pool->map, 0);
1235                 UnmapViewOfFile(pool->map);
1236                 CloseHandle(pool->hmap);
1237 #endif
1238                 pool->map = NULL;
1239
1240                 //  create new pool mapping
1241                 //  and link into hash table
1242
1243                 if( bt_mapsegment(bt, pool, page_no) )
1244                         return NULL;
1245
1246                 bt_linkhash(bt, pool, page_no, idx);
1247 #ifdef unix
1248                 __sync_fetch_and_add(&pool->pin, 1);
1249 #else
1250                 _InterlockedIncrement16 (&pool->pin);
1251 #endif
1252                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1253                 return pool;
1254         }
1255 }
1256
1257 // place write, read, or parent lock on requested page_no.
1258 //      pin to buffer pool and return latchset pointer
1259
1260 void bt_lockpage(BtLock mode, BtLatchSet *set)
1261 {
1262         switch( mode ) {
1263         case BtLockRead:
1264                 bt_spinreadlock (set->readwr);
1265                 break;
1266         case BtLockWrite:
1267                 bt_spinwritelock (set->readwr);
1268                 break;
1269         case BtLockAccess:
1270                 bt_spinreadlock (set->access);
1271                 break;
1272         case BtLockDelete:
1273                 bt_spinwritelock (set->access);
1274                 break;
1275         case BtLockParent:
1276                 bt_spinwritelock (set->parent);
1277                 break;
1278         }
1279 }
1280
1281 // remove write, read, or parent lock on requested page_no.
1282
1283 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1284 {
1285         switch( mode ) {
1286         case BtLockRead:
1287                 bt_spinreleaseread (set->readwr);
1288                 break;
1289         case BtLockWrite:
1290                 bt_spinreleasewrite (set->readwr);
1291                 break;
1292         case BtLockAccess:
1293                 bt_spinreleaseread (set->access);
1294                 break;
1295         case BtLockDelete:
1296                 bt_spinreleasewrite (set->access);
1297                 break;
1298         case BtLockParent:
1299                 bt_spinreleasewrite (set->parent);
1300                 break;
1301         }
1302 }
1303
1304 //      allocate a new page and write page into it
1305
1306 uid bt_newpage(BtDb *bt, BtPage page)
1307 {
1308 BtLatchSet *set;
1309 BtPool *pool;
1310 uid new_page;
1311 BtPage pmap;
1312 int reuse;
1313
1314         //      lock allocation page
1315
1316         bt_spinwritelock(bt->mgr->latchmgr->lock);
1317
1318         // use empty chain first
1319         // else allocate empty page
1320
1321         if( new_page = bt_getid(bt->mgr->latchmgr->alloc[1].right) ) {
1322                 if( pool = bt_pinpool (bt, new_page) )
1323                         pmap = bt_page (bt, pool, new_page);
1324                 else
1325                         return 0;
1326                 bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(pmap->right));
1327                 bt_unpinpool (pool);
1328                 reuse = 1;
1329         } else {
1330                 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1331                 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1332                 reuse = 0;
1333         }
1334 #ifdef unix
1335         // if writing first page of pool block, zero last page in the block
1336
1337         if ( !reuse && bt->mgr->poolmask > 0 && (new_page & bt->mgr->poolmask) == 0 )
1338         {
1339                 // use zero buffer to write zeros
1340                 if ( pwrite(bt->mgr->idx,bt->zero, bt->mgr->page_size, (new_page | bt->mgr->poolmask) << bt->mgr->page_bits) < bt->mgr->page_size )
1341                         return bt->err = BTERR_wrt, 0;
1342         }
1343
1344         // unlock allocation latch
1345
1346         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1347
1348         if ( pwrite(bt->mgr->idx, page, bt->mgr->page_size, new_page << bt->mgr->page_bits) < bt->mgr->page_size )
1349                 return bt->err = BTERR_wrt, 0;
1350
1351 #else
1352         // unlock allocation latch
1353
1354         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1355
1356         //      bring new page into pool and copy page.
1357         //      this will extend the file into the new pages.
1358         //      NB -- no latch required
1359
1360         if( pool = bt_pinpool (bt, new_page) )
1361                 pmap = bt_page (bt, pool, new_page);
1362         else
1363                 return 0;
1364
1365         memcpy(pmap, page, bt->mgr->page_size);
1366         bt_unpinpool (pool);
1367 #endif
1368         return new_page;
1369 }
1370
1371 //  find slot in page for given key at a given level
1372
1373 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1374 {
1375 uint diff, higher = bt->page->cnt, low = 1, slot;
1376
1377         //      low is the lowest candidate, higher is already
1378         //      tested as .ge. the given key, loop ends when they meet
1379
1380         while( diff = higher - low ) {
1381                 slot = low + ( diff >> 1 );
1382                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1383                         low = slot + 1;
1384                 else
1385                         higher = slot;
1386         }
1387
1388         return higher;
1389 }
1390
1391 //  find and load page at given level for given key
1392 //      leave page rd or wr locked as requested
1393
1394 int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, BtLock lock)
1395 {
1396 uid page_no = ROOT_page, prevpage = 0;
1397 BtLatchSet *set, *prevset;
1398 uint drill = 0xff, slot;
1399 uint mode, prevmode;
1400 BtPool *prevpool;
1401
1402   //  start at root of btree and drill down
1403
1404   do {
1405         // determine lock mode of drill level
1406         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1407
1408         //      obtain latch set for this page
1409
1410         bt->set = bt_pinlatch (bt, page_no);
1411         bt->page_no = page_no;
1412
1413         // pin page contents
1414
1415         if( bt->pool = bt_pinpool (bt, page_no) )
1416                 bt->page = bt_page (bt, bt->pool, page_no);
1417         else
1418                 return 0;
1419
1420         // obtain access lock using lock chaining with Access mode
1421
1422         if( page_no > ROOT_page )
1423           bt_lockpage(BtLockAccess, bt->set);
1424
1425         //  now unlock and unpin our (possibly foster) parent
1426
1427         if( prevpage ) {
1428           bt_unlockpage(prevmode, prevset);
1429           bt_unpinlatch (prevset);
1430           bt_unpinpool (prevpool);
1431           prevpage = 0;
1432         }
1433
1434         // obtain read lock using lock chaining
1435
1436         bt_lockpage(mode, bt->set);
1437
1438         if( page_no > ROOT_page )
1439           bt_unlockpage(BtLockAccess, bt->set);
1440
1441         // re-read and re-lock root after determining actual level of root
1442
1443         if( page_no == ROOT_page )
1444           if( bt->page->lvl != drill) {
1445                 drill = bt->page->lvl;
1446
1447             if( lock == BtLockWrite && drill == lvl ) {
1448                   bt_unlockpage(mode, bt->set);
1449                   bt_unpinlatch (bt->set);
1450                   bt_unpinpool (bt->pool);
1451                   continue;
1452                 }
1453           }
1454
1455         prevpage = bt->page_no;
1456         prevpool = bt->pool;
1457         prevset = bt->set;
1458         prevmode = mode;
1459
1460         //  find key on page at this level
1461         //  and either descend to requested level
1462         //      or return key slot
1463
1464         slot = bt_findslot (bt, key, len);
1465
1466         //      is this slot < foster child area
1467         //      on the requested level?
1468
1469         //      if so, return actual slot even if dead
1470
1471         if( slot <= bt->page->cnt - bt->page->foster )
1472           if( drill == lvl )
1473                 return slot;
1474
1475         //      find next active slot
1476
1477         //      note: foster children are never dead
1478         //      nor fence keys for interiour nodes
1479
1480         while( slotptr(bt->page, slot)->dead )
1481           if( slot++ < bt->page->cnt )
1482                 continue;
1483           else
1484                 return bt->err = BTERR_struct, 0;       // last key shouldn't be deleted
1485
1486         //      is this slot < foster child area
1487         //      if so, drill to next level
1488
1489         if( slot <= bt->page->cnt - bt->page->foster )
1490                 drill--;
1491
1492         //  continue right onto foster child
1493         //      or down to next level.
1494
1495         page_no = bt_getid(slotptr(bt->page, slot)->id);
1496
1497   } while( page_no );
1498
1499   // return error on end of chain
1500
1501   bt->err = BTERR_struct;
1502   return 0;     // return error
1503 }
1504
1505 //  find and delete key on page by marking delete flag bit
1506 //  when leaf page becomes empty, delete it from the btree
1507
1508 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
1509 {
1510 unsigned char leftkey[256];
1511 BtLatchSet *rset, *set;
1512 BtPool *pool, *rpool;
1513 BtPage rpage, page;
1514 uid page_no, right;
1515 uint slot, tod;
1516 BtKey ptr;
1517
1518         if( slot = bt_loadpage (bt, key, len, 0, BtLockWrite) )
1519                 ptr = keyptr(bt->page, slot);
1520         else
1521                 return bt->err;
1522
1523         // if key is found delete it, otherwise ignore request
1524         // note that fence keys of interiour nodes are not deleted.
1525
1526         if( bt->found = !keycmp (ptr, key, len) )
1527                 if( bt->found = slotptr(bt->page, slot)->dead == 0 ) {
1528                         slotptr(bt->page,slot)->dead = 1;
1529                         if( slot < bt->page->cnt )
1530                                 bt->page->dirty = 1;
1531                         bt->page->act--;
1532                 }
1533
1534         page_no = bt->page_no;
1535         pool = bt->pool;
1536         page = bt->page;
1537         set = bt->set;
1538
1539         // return if page is not empty or not found
1540
1541         if( page->act || !bt->found ) {
1542                 bt_unlockpage(BtLockWrite, set);
1543                 bt_unpinlatch (set);
1544                 bt_unpinpool (pool);
1545                 return bt->err;
1546         }
1547
1548         // cache copy of fence key of empty node
1549
1550         ptr = keyptr(page, page->cnt);
1551         memcpy(leftkey, ptr, ptr->len + 1);
1552
1553         //      release write lock on empty node
1554         //      obtain Parent lock
1555
1556         bt_unlockpage(BtLockWrite, set);
1557         bt_lockpage(BtLockParent, set);
1558
1559         //      load and lock parent to see
1560         //  if delete of empty node is OK
1561         //      ie, not a fence key of parent
1562
1563         while( 1 ) {
1564           if( slot = bt_loadpage (bt, leftkey+1, *leftkey, 1, BtLockWrite) )
1565                 ptr = keyptr(bt->page, slot);
1566           else
1567                 return bt->err;
1568
1569           // does parent level contain our fence key yet?
1570           // and is it free of foster children?
1571
1572           if( !bt->page->foster )
1573            if( !keycmp (ptr, leftkey+1, *leftkey) )
1574                 break;
1575
1576           bt_unlockpage(BtLockWrite, bt->set);
1577           bt_unpinlatch (bt->set);
1578           bt_unpinpool (bt->pool);
1579 #ifdef unix
1580           sched_yield();
1581 #else
1582           SwitchToThread();
1583 #endif
1584         }
1585
1586         //      find our left fence key
1587
1588         while( slotptr(bt->page, slot)->dead )
1589           if( slot++ < bt->page->cnt )
1590                 continue;
1591           else
1592                 return bt->err = BTERR_struct;  // last key shouldn't be deleted
1593
1594         //      now we have both parent and child
1595
1596         bt_lockpage(BtLockDelete, set);
1597         bt_lockpage(BtLockWrite, set);
1598
1599         // return if page has no right sibling within parent
1600         //       or if empty node is no longer empty
1601
1602         if( page->act || slot == bt->page->cnt ) {
1603                 // unpin parent
1604                 bt_unlockpage(BtLockWrite, bt->set);
1605                 bt_unpinlatch (bt->set);
1606                 bt_unpinpool (bt->pool);
1607                 // unpin empty node
1608                 bt_unlockpage(BtLockParent, set);
1609                 bt_unlockpage(BtLockDelete, set);
1610                 bt_unlockpage(BtLockWrite, set);
1611                 bt_unpinlatch (set);
1612                 bt_unpinpool (pool);
1613                 return bt->err;
1614         }
1615
1616         // lock and map our right page
1617         // note that it cannot be our foster child
1618         // since the our node is empty
1619
1620         right = bt_getid(page->right);
1621
1622         if( rpool = bt_pinpool (bt, right) )
1623                 rpage = bt_page (bt, rpool, right);
1624         else
1625                 return bt->err;
1626
1627         rset = bt_pinlatch (bt, right);
1628         bt_lockpage(BtLockWrite, rset);
1629         bt_lockpage(BtLockDelete, rset);
1630
1631         // pull contents of right page into empty page 
1632
1633         memcpy (page, rpage, bt->mgr->page_size);
1634
1635         //      delete left parent slot for old empty page
1636         //      and redirect right parent slot to it
1637
1638         bt->page->act--;
1639         bt->page->dirty = 1;
1640         slotptr(bt->page, slot)->dead = 1;
1641
1642         while( slot++ < bt->page->cnt )
1643           if( !slotptr(bt->page, slot)->dead )
1644                 break;
1645
1646         bt_putid(slotptr(bt->page,slot)->id, page_no);
1647
1648         // release parent level lock
1649         //      and our empty node lock
1650
1651         bt_unlockpage(BtLockWrite, set);
1652         bt_unlockpage(BtLockWrite, bt->set);
1653         bt_unpinlatch (bt->set);
1654         bt_unpinpool (bt->pool);
1655
1656         //      add killed right block to free chain
1657         //      lock latch mgr
1658
1659         bt_spinwritelock(bt->mgr->latchmgr->lock);
1660
1661         //      store free chain in allocation page second right
1662         bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
1663         bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
1664
1665         // unlock latch mgr and right page
1666
1667         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1668
1669         bt_unlockpage(BtLockWrite, rset);
1670         bt_unlockpage(BtLockDelete, rset);
1671         bt_unpinlatch (rset);
1672         bt_unpinpool (rpool);
1673
1674         //      remove ParentModify lock
1675
1676         bt_unlockpage(BtLockParent, set);
1677         bt_unlockpage(BtLockDelete, set);
1678         bt_unpinlatch (set);
1679         bt_unpinpool (pool);
1680         return 0;
1681
1682
1683 //      find key in leaf level and return row-id
1684
1685 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1686 {
1687 uint  slot;
1688 BtKey ptr;
1689 uid id;
1690
1691         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1692                 ptr = keyptr(bt->page, slot);
1693         else
1694                 return 0;
1695
1696         // if key exists, return row-id
1697         //      otherwise return 0
1698
1699         if( slot <= bt->page->cnt && !keycmp (ptr, key, len) )
1700                 id = bt_getid(slotptr(bt->page,slot)->id);
1701         else
1702                 id = 0;
1703
1704         bt_unlockpage (BtLockRead, bt->set);
1705         bt_unpinlatch (bt->set);
1706         bt_unpinpool (bt->pool);
1707         return id;
1708 }
1709
1710 //      check page for space available,
1711 //      clean if necessary and return
1712 //      0 - page needs splitting
1713 //      >0  new slot value
1714
1715 uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
1716 {
1717 uint nxt = bt->mgr->page_size;
1718 BtPage page = bt->page;
1719 uint cnt = 0, idx = 0;
1720 uint max = page->cnt;
1721 uint newslot;
1722 BtKey key;
1723
1724         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1725                 return slot;
1726
1727         //      skip cleanup if nothing to reclaim
1728
1729         if( !page->dirty )
1730                 return 0;
1731
1732         memcpy (bt->frame, page, bt->mgr->page_size);
1733
1734         // skip page info and set rest of page to zero
1735
1736         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1737         page->dirty = 0;
1738         page->act = 0;
1739
1740         // try cleaning up page first
1741
1742         // always leave fence key in the array
1743         // otherwise, remove deleted key
1744
1745         // note: foster children are never dead
1746         //      nor are fence keys for interiour nodes
1747
1748         while( cnt++ < max ) {
1749                 if( cnt == slot )
1750                         newslot = idx + 1;
1751                 else if( cnt < max && slotptr(bt->frame,cnt)->dead )
1752                         continue;
1753
1754                 // copy key
1755
1756                 key = keyptr(bt->frame, cnt);
1757                 nxt -= key->len + 1;
1758                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1759
1760                 // copy slot
1761                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1762                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1763                         page->act++;
1764                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1765                 slotptr(page, idx)->off = nxt;
1766         }
1767
1768         page->min = nxt;
1769         page->cnt = idx;
1770
1771         //      see if page has enough space now, or does it need splitting?
1772
1773         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1774                 return newslot;
1775
1776         return 0;
1777 }
1778
1779 //      add key to current page
1780 //      page must already be writelocked
1781
1782 void bt_addkeytopage (BtDb *bt, uint slot, unsigned char *key, uint len, uid id, uint tod)
1783 {
1784 BtPage page = bt->page;
1785 uint idx;
1786
1787         // find next available dead slot and copy key onto page
1788         // note that foster children on the page are never dead
1789
1790         // look for next hole, but stay back from the fence key
1791
1792         for( idx = slot; idx < page->cnt; idx++ )
1793           if( slotptr(page, idx)->dead )
1794                 break;
1795
1796         if( idx == page->cnt )
1797                 idx++, page->cnt++;
1798
1799         page->act++;
1800
1801         // now insert key into array before slot
1802
1803         while( idx > slot )
1804                 *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
1805
1806         page->min -= len + 1;
1807         ((unsigned char *)page)[page->min] = len;
1808         memcpy ((unsigned char *)page + page->min +1, key, len );
1809
1810         bt_putid(slotptr(page,slot)->id, id);
1811         slotptr(page, slot)->off = page->min;
1812         slotptr(page, slot)->tod = tod;
1813         slotptr(page, slot)->dead = 0;
1814 }
1815
1816 // split the root and raise the height of the btree
1817 //      call with current page locked and page no of foster child
1818 //      return with current page (root) unlocked
1819
1820 BTERR bt_splitroot(BtDb *bt, uid right)
1821 {
1822 uint nxt = bt->mgr->page_size;
1823 unsigned char fencekey[256];
1824 BtPage root = bt->page;
1825 uid new_page;
1826 BtKey key;
1827
1828         //  Obtain an empty page to use, and copy the left page
1829         //  contents into it from the root.  Strip foster child key.
1830         //      (it's the stopper key)
1831
1832         memset (slotptr(root, root->cnt), 0, sizeof(BtSlot));
1833         root->dirty = 1;
1834         root->foster--;
1835         root->act--;
1836         root->cnt--;
1837
1838         //      Save left fence key.
1839
1840         key = keyptr(root, root->cnt);
1841         memcpy (fencekey, key, key->len + 1);
1842
1843         //  copy the lower keys into a new left page
1844
1845         if( !(new_page = bt_newpage(bt, root)) )
1846                 return bt->err;
1847
1848         // preserve the page info at the bottom
1849         // and set rest of the root to zero
1850
1851         memset (root+1, 0, bt->mgr->page_size - sizeof(*root));
1852
1853         // insert left fence key on empty newroot page
1854
1855         nxt -= *fencekey + 1;
1856         memcpy ((unsigned char *)root + nxt, fencekey, *fencekey + 1);
1857         bt_putid(slotptr(root, 1)->id, new_page);
1858         slotptr(root, 1)->off = nxt;
1859         
1860         // insert stopper key on newroot page
1861         // and increase the root height
1862
1863         nxt -= 3;
1864         fencekey[0] = 2;
1865         fencekey[1] = 0xff;
1866         fencekey[2] = 0xff;
1867         memcpy ((unsigned char *)root + nxt, fencekey, *fencekey + 1);
1868         bt_putid(slotptr(root, 2)->id, right);
1869         slotptr(root, 2)->off = nxt;
1870
1871         bt_putid(root->right, 0);
1872         root->min = nxt;                // reset lowest used offset and key count
1873         root->cnt = 2;
1874         root->act = 2;
1875         root->lvl++;
1876
1877         // release and unpin root (bt->page)
1878
1879         bt_unlockpage(BtLockWrite, bt->set);
1880         bt_unpinlatch (bt->set);
1881         bt_unpinpool (bt->pool);
1882         return 0;
1883 }
1884
1885 //  split already locked full node
1886 //      in current page variables
1887 //      return unlocked and unpinned.
1888
1889 BTERR bt_splitpage (BtDb *bt)
1890 {
1891 uint slot, cnt, idx, max, nxt = bt->mgr->page_size;
1892 unsigned char fencekey[256];
1893 uid page_no = bt->page_no;
1894 BtLatchSet *set = bt->set;
1895 BtPool *pool = bt->pool;
1896 BtPage page = bt->page;
1897 uint tod = time(NULL);
1898 uint lvl = page->lvl;
1899 uid new_page, right;
1900 BtKey key;
1901
1902         //      initialize frame buffer for right node
1903
1904         memset (bt->frame, 0, bt->mgr->page_size);
1905         max = page->cnt - page->foster;
1906         tod = (uint)time(NULL);
1907         cnt = max / 2;
1908         idx = 0;
1909
1910         //  split higher half of keys to bt->frame
1911         //      leaving old foster children in the left node,
1912         //      and adding a new foster child there.
1913
1914         while( cnt++ < max ) {
1915                 key = keyptr(page, cnt);
1916                 nxt -= key->len + 1;
1917                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1918                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1919                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
1920                         bt->frame->act++;
1921                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1922                 slotptr(bt->frame, idx)->off = nxt;
1923         }
1924
1925         // transfer right link node to new right node
1926
1927         if( page_no > ROOT_page ) {
1928                 right = bt_getid (page->right);
1929                 bt_putid(bt->frame->right, right);
1930         }
1931
1932         bt->frame->bits = bt->mgr->page_bits;
1933         bt->frame->min = nxt;
1934         bt->frame->cnt = idx;
1935         bt->frame->lvl = lvl;
1936
1937         //      get new free page and write right frame to it.
1938
1939         if( !(new_page = bt_newpage(bt, bt->frame)) )
1940                 return bt->err;
1941
1942         //      remember fence key for new right page to add
1943         //      as foster child to the left node
1944
1945         key = keyptr(bt->frame, idx);
1946         memcpy (fencekey, key, key->len + 1);
1947
1948         //      update lower keys and foster children to continue in old page
1949
1950         memcpy (bt->frame, page, bt->mgr->page_size);
1951         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1952         nxt = bt->mgr->page_size;
1953         page->dirty = 0;
1954         page->act = 0;
1955         cnt = 0;
1956         idx = 0;
1957
1958         //  assemble page of smaller keys
1959         //      to remain in the old page
1960
1961         while( cnt++ < max / 2 ) {
1962                 key = keyptr(bt->frame, cnt);
1963                 nxt -= key->len + 1;
1964                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1965                 memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1966                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1967                         page->act++;
1968                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1969                 slotptr(page, idx)->off = nxt;
1970         }
1971
1972         //      insert new foster child for right page in queue
1973         //      before any of the current foster children
1974
1975         nxt -= *fencekey + 1;
1976         memcpy ((unsigned char *)page + nxt, fencekey, *fencekey + 1);
1977
1978         bt_putid (slotptr(page,++idx)->id, new_page);
1979         slotptr(page, idx)->tod = tod;
1980         slotptr(page, idx)->off = nxt;
1981         page->foster++;
1982         page->act++;
1983
1984         //  continue with old foster child keys
1985         //      note that none will be dead
1986
1987         cnt = bt->frame->cnt - bt->frame->foster;
1988
1989         while( cnt++ < bt->frame->cnt ) {
1990                 key = keyptr(bt->frame, cnt);
1991                 nxt -= key->len + 1;
1992                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1993                 memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1994                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1995                 slotptr(page, idx)->off = nxt;
1996                 page->act++;
1997         }
1998
1999         page->min = nxt;
2000         page->cnt = idx;
2001
2002         //      link new right page
2003
2004         bt_putid (page->right, new_page);
2005
2006         // if current page is the root page, split it
2007
2008         if( page_no == ROOT_page )
2009                 return bt_splitroot (bt, new_page);
2010
2011         //  release wr lock on our page
2012
2013         bt_unlockpage (BtLockWrite, set);
2014
2015         //  obtain ParentModification lock for current page
2016         //      to fix new fence key and oldest foster child on page
2017
2018         bt_lockpage (BtLockParent, set);
2019
2020         //  get our new fence key to insert in parent node
2021
2022         bt_lockpage (BtLockRead, set);
2023
2024         key = keyptr(page, page->cnt-1);
2025         memcpy (fencekey, key, key->len+1);
2026
2027         bt_unlockpage (BtLockRead, set);
2028
2029         if( bt_insertkey (bt, fencekey + 1, *fencekey, page_no, tod, lvl + 1) )
2030                 return bt->err;
2031
2032         //      lock our page for writing
2033
2034         bt_lockpage (BtLockRead, set);
2035
2036         //      switch old parent key from us to our oldest foster child
2037
2038         key = keyptr(page, page->cnt);
2039         memcpy (fencekey, key, key->len+1);
2040
2041         new_page = bt_getid (slotptr(page, page->cnt)->id);
2042         bt_unlockpage (BtLockRead, set);
2043
2044         if( bt_insertkey (bt, fencekey + 1, *fencekey, new_page, tod, lvl + 1) )
2045                 return bt->err;
2046
2047         //      now that it has its own parent pointer,
2048         //      remove oldest foster child from our page
2049
2050         bt_lockpage (BtLockWrite, set);
2051         memset (slotptr(page, page->cnt), 0, sizeof(BtSlot));
2052         page->dirty = 1;
2053         page->foster--;
2054         page->cnt--;
2055         page->act--;
2056
2057         //      unlock and unpin
2058
2059         bt_unlockpage (BtLockWrite, set);
2060         bt_unlockpage (BtLockParent, set);
2061         bt_unpinlatch (set);
2062         bt_unpinpool (pool);
2063         return 0;
2064 }
2065
2066 //  Insert new key into the btree at leaf level.
2067
2068 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl)
2069 {
2070 uint slot, idx;
2071 BtPage page;
2072 BtKey ptr;
2073
2074         while( 1 ) {
2075                 if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
2076                         ptr = keyptr(bt->page, slot);
2077                 else
2078                 {
2079                         if ( !bt->err )
2080                                 bt->err = BTERR_ovflw;
2081                         return bt->err;
2082                 }
2083
2084                 // if key already exists, update id and return
2085
2086                 page = bt->page;
2087
2088                 if( !keycmp (ptr, key, len) ) {
2089                         if( slotptr(page, slot)->dead )
2090                                 page->act++;
2091                         slotptr(page, slot)->dead = 0;
2092                         slotptr(page, slot)->tod = tod;
2093                         bt_putid(slotptr(page,slot)->id, id);
2094                         bt_unlockpage(BtLockWrite, bt->set);
2095                         bt_unpinlatch (bt->set);
2096                         bt_unpinpool (bt->pool);
2097                         return bt->err;
2098                 }
2099
2100                 // check if page has enough space
2101
2102                 if( slot = bt_cleanpage (bt, len, slot) )
2103                         break;
2104
2105                 if( bt_splitpage (bt) )
2106                         return bt->err;
2107         }
2108
2109         bt_addkeytopage (bt, slot, key, len, id, tod);
2110
2111         bt_unlockpage (BtLockWrite, bt->set);
2112         bt_unpinlatch (bt->set);
2113         bt_unpinpool (bt->pool);
2114         return 0;
2115 }
2116
2117 //  cache page of keys into cursor and return starting slot for given key
2118
2119 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2120 {
2121 uint slot;
2122
2123         // cache page for retrieval
2124         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
2125                 memcpy (bt->cursor, bt->page, bt->mgr->page_size);
2126
2127         bt->cursor_page = bt->page_no;
2128
2129         bt_unlockpage(BtLockRead, bt->set);
2130         bt_unpinlatch (bt->set);
2131         bt_unpinpool (bt->pool);
2132         return slot;
2133 }
2134
2135 //  return next slot for cursor page
2136 //  or slide cursor right into next page
2137
2138 uint bt_nextkey (BtDb *bt, uint slot)
2139 {
2140 BtLatchSet *set;
2141 BtPool *pool;
2142 BtPage page;
2143 uid right;
2144
2145   do {
2146         right = bt_getid(bt->cursor->right);
2147         while( slot++ < bt->cursor->cnt - bt->cursor->foster )
2148           if( slotptr(bt->cursor,slot)->dead )
2149                 continue;
2150           else if( right || (slot < bt->cursor->cnt - bt->cursor->foster) )
2151                 return slot;
2152           else
2153                 break;
2154
2155         if( !right )
2156                 break;
2157
2158         bt->cursor_page = right;
2159         if( pool = bt_pinpool (bt, right) )
2160                 page = bt_page (bt, pool, right);
2161         else
2162                 return 0;
2163
2164         set = bt_pinlatch (bt, right);
2165     bt_lockpage(BtLockRead, set);
2166
2167         memcpy (bt->cursor, page, bt->mgr->page_size);
2168
2169         bt_unlockpage(BtLockRead, set);
2170         bt_unpinlatch (set);
2171         bt_unpinpool (pool);
2172         slot = 0;
2173   } while( 1 );
2174
2175   return bt->err = 0;
2176 }
2177
2178 BtKey bt_key(BtDb *bt, uint slot)
2179 {
2180         return keyptr(bt->cursor, slot);
2181 }
2182
2183 uid bt_uid(BtDb *bt, uint slot)
2184 {
2185         return bt_getid(slotptr(bt->cursor,slot)->id);
2186 }
2187
2188 uint bt_tod(BtDb *bt, uint slot)
2189 {
2190         return slotptr(bt->cursor,slot)->tod;
2191 }
2192
2193
2194 #ifdef STANDALONE
2195
2196 void bt_latchaudit (BtDb *bt)
2197 {
2198 ushort idx, hashidx;
2199 BtLatchSet *set;
2200 BtPool *pool;
2201 BtPage page;
2202 uid page_no;
2203
2204 #ifdef unix
2205         for( idx = 1; idx < bt->mgr->latchmgr->latchdeployed; idx++ ) {
2206                 set = bt->mgr->latchsets + idx;
2207                 if( *(ushort *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent ) {
2208                         fprintf(stderr, "latchset %d locked for page %6x\n", idx, set->page_no);
2209                         *(ushort *)set->readwr = 0;
2210                         *(ushort *)set->access = 0;
2211                         *(ushort *)set->parent = 0;
2212                 }
2213                 if( set->pin ) {
2214                         fprintf(stderr, "latchset %d pinned\n", idx);
2215                         set->pin = 0;
2216                 }
2217         }
2218
2219         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2220           if( *(ushort *)bt->mgr->latchmgr->table[hashidx].latch )
2221                 fprintf(stderr, "latchmgr locked\n");
2222           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2223                 set = bt->mgr->latchsets + idx;
2224                 if( *(ushort *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent )
2225                         fprintf(stderr, "latchset %d locked\n", idx);
2226                 if( set->hash != hashidx )
2227                         fprintf(stderr, "latchset %d wrong hashidx\n", idx);
2228                 if( set->pin )
2229                         fprintf(stderr, "latchset %d pinned\n", idx);
2230           } while( idx = set->next );
2231         }
2232         page_no = bt_getid(bt->mgr->latchmgr->alloc[1].right);
2233
2234         while( page_no ) {
2235                 fprintf(stderr, "free: %.6x\n", (uint)page_no);
2236                 pool = bt_pinpool (bt, page_no);
2237                 page = bt_page (bt, pool, page_no);
2238             page_no = bt_getid(page->right);
2239                 bt_unpinpool (pool);
2240         }
2241 #endif
2242 }
2243
2244 typedef struct {
2245         char type, idx;
2246         char *infile;
2247         BtMgr *mgr;
2248         int num;
2249 } ThreadArg;
2250
2251 //  standalone program to index file of keys
2252 //  then list them onto std-out
2253
2254 #ifdef unix
2255 void *index_file (void *arg)
2256 #else
2257 uint __stdcall index_file (void *arg)
2258 #endif
2259 {
2260 int line = 0, found = 0, cnt = 0;
2261 uid next, page_no = LEAF_page;  // start on first page of leaves
2262 unsigned char key[256];
2263 ThreadArg *args = arg;
2264 int ch, len = 0, slot;
2265 BtLatchSet *set;
2266 time_t tod[1];
2267 BtPool *pool;
2268 BtPage page;
2269 BtKey ptr;
2270 BtDb *bt;
2271 FILE *in;
2272
2273         bt = bt_open (args->mgr);
2274         time (tod);
2275
2276         switch(args->type | 0x20)
2277         {
2278         case 'a':
2279                 fprintf(stderr, "started latch mgr audit\n");
2280                 bt_latchaudit (bt);
2281                 fprintf(stderr, "finished latch mgr audit\n");
2282                 break;
2283         case 'w':
2284                 fprintf(stderr, "started indexing for %s\n", args->infile);
2285                 if( in = fopen (args->infile, "rb") )
2286                   while( ch = getc(in), ch != EOF )
2287                         if( ch == '\n' )
2288                         {
2289                           line++;
2290
2291                           if( args->num == 1 )
2292                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2293
2294                           else if( args->num )
2295                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2296
2297                           if( bt_insertkey (bt, key, len, line, *tod, 0) )
2298                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2299                           len = 0;
2300                         }
2301                         else if( len < 255 )
2302                                 key[len++] = ch;
2303                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2304                 break;
2305
2306         case 'd':
2307                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2308                 if( in = fopen (args->infile, "rb") )
2309                   while( ch = getc(in), ch != EOF )
2310                         if( ch == '\n' )
2311                         {
2312                           line++;
2313                           if( args->num == 1 )
2314                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2315
2316                           else if( args->num )
2317                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2318
2319                           if( bt_deletekey (bt, key, len) )
2320                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2321                           len = 0;
2322                         }
2323                         else if( len < 255 )
2324                                 key[len++] = ch;
2325                 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
2326                 break;
2327
2328         case 'f':
2329                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2330                 if( in = fopen (args->infile, "rb") )
2331                   while( ch = getc(in), ch != EOF )
2332                         if( ch == '\n' )
2333                         {
2334                           line++;
2335                           if( args->num == 1 )
2336                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2337
2338                           else if( args->num )
2339                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2340
2341                           if( bt_findkey (bt, key, len) )
2342                                 found++;
2343                           else if( bt->err )
2344                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2345                           len = 0;
2346                         }
2347                         else if( len < 255 )
2348                                 key[len++] = ch;
2349                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2350                 break;
2351
2352         case 's':
2353                 len = key[0] = 0;
2354
2355                 fprintf(stderr, "started reading\n");
2356
2357                 if( slot = bt_startkey (bt, key, len) )
2358                   slot--;
2359                 else
2360                   fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2361
2362                 while( slot = bt_nextkey (bt, slot) ) {
2363                         ptr = bt_key(bt, slot);
2364                         fwrite (ptr->key, ptr->len, 1, stdout);
2365                         fputc ('\n', stdout);
2366                 }
2367
2368                 break;
2369
2370         case 'c':
2371                 fprintf(stderr, "started reading\n");
2372
2373                 do {
2374                         if( pool = bt_pinpool (bt, page_no) )
2375                                 page = bt_page (bt, pool, page_no);
2376                         else
2377                                 break;
2378                         set = bt_pinlatch (bt, page_no);
2379                         bt_lockpage (BtLockRead, set);
2380                         cnt += page->act;
2381                         next = bt_getid (page->right);
2382                         bt_unlockpage (BtLockRead, set);
2383                         bt_unpinlatch (set);
2384                         bt_unpinpool (pool);
2385                 } while( page_no = next );
2386
2387                 cnt--;  // remove stopper key
2388                 fprintf(stderr, " Total keys read %d\n", cnt);
2389                 break;
2390         }
2391
2392         bt_close (bt);
2393 #ifdef unix
2394         return NULL;
2395 #else
2396         return 0;
2397 #endif
2398 }
2399
2400 typedef struct timeval timer;
2401
2402 int main (int argc, char **argv)
2403 {
2404 int idx, cnt, len, slot, err;
2405 int segsize, bits = 16;
2406 #ifdef unix
2407 pthread_t *threads;
2408 timer start, stop;
2409 #else
2410 time_t start[1], stop[1];
2411 HANDLE *threads;
2412 #endif
2413 double real_time;
2414 ThreadArg *args;
2415 uint poolsize = 0;
2416 int num = 0;
2417 char key[1];
2418 BtMgr *mgr;
2419 BtKey ptr;
2420 BtDb *bt;
2421
2422         if( argc < 3 ) {
2423                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2424                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2425                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2426                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2427                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2428                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2429                 exit(0);
2430         }
2431
2432 #ifdef unix
2433         gettimeofday(&start, NULL);
2434 #else
2435         time(start);
2436 #endif
2437
2438         if( argc > 3 )
2439                 bits = atoi(argv[3]);
2440
2441         if( argc > 4 )
2442                 poolsize = atoi(argv[4]);
2443
2444         if( !poolsize )
2445                 fprintf (stderr, "Warning: no mapped_pool\n");
2446
2447         if( poolsize > 65535 )
2448                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2449
2450         if( argc > 5 )
2451                 segsize = atoi(argv[5]);
2452         else
2453                 segsize = 4;    // 16 pages per mmap segment
2454
2455         if( argc > 6 )
2456                 num = atoi(argv[6]);
2457
2458         cnt = argc - 7;
2459 #ifdef unix
2460         threads = malloc (cnt * sizeof(pthread_t));
2461 #else
2462         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2463 #endif
2464         args = malloc (cnt * sizeof(ThreadArg));
2465
2466         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2467
2468         if( !mgr ) {
2469                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2470                 exit (1);
2471         }
2472
2473         //      fire off threads
2474
2475         for( idx = 0; idx < cnt; idx++ ) {
2476                 args[idx].infile = argv[idx + 7];
2477                 args[idx].type = argv[2][0];
2478                 args[idx].mgr = mgr;
2479                 args[idx].num = num;
2480                 args[idx].idx = idx;
2481 #ifdef unix
2482                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2483                         fprintf(stderr, "Error creating thread %d\n", err);
2484 #else
2485                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2486 #endif
2487         }
2488
2489         //      wait for termination
2490
2491 #ifdef unix
2492         for( idx = 0; idx < cnt; idx++ )
2493                 pthread_join (threads[idx], NULL);
2494         gettimeofday(&stop, NULL);
2495         real_time = 1000.0 * ( stop.tv_sec - start.tv_sec ) + 0.001 * (stop.tv_usec - start.tv_usec );
2496 #else
2497         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2498
2499         for( idx = 0; idx < cnt; idx++ )
2500                 CloseHandle(threads[idx]);
2501
2502         time (stop);
2503         real_time = 1000 * (*stop - *start);
2504 #endif
2505         fprintf(stderr, " Time to complete: %.2f seconds\n", real_time/1000);
2506         bt_mgrclose (mgr);
2507 }
2508
2509 #endif  //STANDALONE