]> pd.if.org Git - btree/blob - fosterbtreea.c
4a96ff394b8607850fac4f29b4f643b04c670ceb
[btree] / fosterbtreea.c
1 // foster btree version a
2 // 16 DEC 2013
3
4 // author: karl malbrain, malbrain@cal.berkeley.edu
5
6 /*
7 This work, including the source code, documentation
8 and related data, is placed into the public domain.
9
10 The orginal author is Karl Malbrain.
11
12 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
13 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
14 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
15 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
16 RESULTING FROM THE USE, MODIFICATION, OR
17 REDISTRIBUTION OF THIS SOFTWARE.
18 */
19
20 // Please see the project home page for documentation
21 // code.google.com/p/high-concurrency-btree
22
23 #define _FILE_OFFSET_BITS 64
24 #define _LARGEFILE64_SOURCE
25
26 #ifdef linux
27 #define _GNU_SOURCE
28 #endif
29
30 #ifdef unix
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <fcntl.h>
35 #include <sys/time.h>
36 #include <sys/mman.h>
37 #include <errno.h>
38 #include <pthread.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #include <process.h>
47 #endif
48
49 #include <memory.h>
50 #include <string.h>
51
52 typedef unsigned long long      uid;
53
54 #ifndef unix
55 typedef unsigned long long      off64_t;
56 typedef unsigned short          ushort;
57 typedef unsigned int            uint;
58 #endif
59
60 #define BT_ro 0x6f72    // ro
61 #define BT_rw 0x7772    // rw
62
63 #define BT_maxbits              24                                      // maximum page size in bits
64 #define BT_minbits              9                                       // minimum page size in bits
65 #define BT_minpage              (1 << BT_minbits)       // minimum page size
66 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
67
68 /*
69 There are five lock types for each node in three independent sets: 
70 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
71 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
72 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
73 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
74 5. (set 3) ParentLock: Exclusive. Have parent adopt/delete maximum foster child from the node.
75 */
76
77 typedef enum{
78         BtLockAccess,
79         BtLockDelete,
80         BtLockRead,
81         BtLockWrite,
82         BtLockParent
83 }BtLock;
84
85 //      Define the length of the page and key pointers
86
87 #define BtId 6
88
89 //      Page key slot definition.
90
91 //      If BT_maxbits is 15 or less, you can save 4 bytes
92 //      for each key stored by making the first two uints
93 //      into ushorts.  You can also save 4 bytes by removing
94 //      the tod field from the key.
95
96 //      Keys are marked dead, but remain on the page until
97 //      it cleanup is called. The fence key (highest key) for
98 //      the page is always present, even after cleanup.
99
100 typedef struct {
101         uint off:BT_maxbits;            // page offset for key start
102         uint dead:1;                            // set for deleted key
103         uint tod;                                       // time-stamp for key
104         unsigned char id[BtId];         // id associated with key
105 } BtSlot;
106
107 //      The key structure occupies space at the upper end of
108 //      each page.  It's a length byte followed by the value
109 //      bytes.
110
111 typedef struct {
112         unsigned char len;
113         unsigned char key[1];
114 } *BtKey;
115
116 //      The first part of an index page.
117 //      It is immediately followed
118 //      by the BtSlot array of keys.
119
120 typedef struct Page {
121         uint cnt;                                       // count of keys in page
122         uint act;                                       // count of active keys
123         uint min;                                       // next key offset
124         uint foster;                            // count of foster children
125         unsigned char bits:6;           // page size in bits
126         unsigned char dirty:1;          // page needs to be cleaned
127         unsigned char kill:1;           // page is being deleted
128         unsigned char lvl;                      // level of page
129         unsigned char right[BtId];      // page number to right
130 } *BtPage;
131
132 //      mode & definition for latch table implementation
133
134 enum {
135         Write = 1,
136         Share = 2
137 } LockMode;
138
139 //      latch table lock structure
140
141 // mode is set for write access
142 // share is count of read accessors
143 // grant write lock when share == 0
144
145 typedef struct {
146         int mode:1;
147         int share:31;
148 } BtLatch;
149
150 typedef struct {
151         BtLatch readwr[1];              // read/write page lock
152         BtLatch access[1];              // Access Intent/Page delete
153         BtLatch parent[1];              // adoption of foster children
154 } BtLatchSet;
155
156 //      The memory mapping hash table buffer manager entry
157
158 typedef struct {
159         unsigned long long int lru;     // number of times accessed
160         uid  basepage;                          // mapped base page number
161         char *map;                                      // mapped memory pointer
162         uint pin;                                       // mapped page pin counter
163         uint slot;                                      // slot index in this array
164         void *hashprev;                         // previous cache block for the same hash idx
165         void *hashnext;                         // next cache block for the same hash idx
166 #ifndef unix
167         HANDLE hmap;
168 #endif
169 //      array of page latch sets, one for each page in map segment
170         BtLatchSet pagelatch[0];
171 } BtHash;
172
173 //      The object structure for Btree access
174
175 typedef struct {
176         uint page_size;                         // page size    
177         uint page_bits;                         // page size in bits    
178         uint seg_bits;                          // seg size in pages in bits
179         uint mode;                                      // read-write mode
180 #ifdef unix
181         int idx;
182 #else
183         HANDLE idx;
184 #endif
185         uint nodecnt;           // highest page cache node in use
186         uint nodemax;           // highest page cache node allocated
187         uint hashmask;          // number of pages in mmap segment
188         uint hashsize;          // size of Hash Table
189         uint evicted;           // last evicted hash slot
190         ushort *cache;          // hash index for memory pool
191         BtLatch *latch;         // latches for hash table slots
192         char *nodes;            // memory pool page hash nodes
193 } BtMgr;
194
195 typedef struct {
196         BtMgr *mgr;                     // buffer manager for thread
197         BtPage temp;            // temporary frame buffer (memory mapped/file IO)
198         BtPage alloc;           // frame buffer for alloc page ( page 0 )
199         BtPage cursor;          // cached frame for start/next (never mapped)
200         BtPage frame;           // spare frame for the page split (never mapped)
201         BtPage zero;            // page frame for zeroes at end of file
202         BtPage page;            // current page
203         uid page_no;            // current page number  
204         uid cursor_page;        // current cursor page number   
205         unsigned char *mem;     // frame, cursor, page memory buffer
206         int err;                        // last error
207 } BtDb;
208
209 typedef enum {
210         BTERR_ok = 0,
211         BTERR_again,
212         BTERR_struct,
213         BTERR_ovflw,
214         BTERR_lock,
215         BTERR_map,
216         BTERR_wrt,
217         BTERR_hash
218 } BTERR;
219
220 // B-Tree functions
221 extern void bt_close (BtDb *bt);
222 extern BtDb *bt_open (BtMgr *mgr);
223 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod);
224 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
225 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
226 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
227 extern uint bt_nextkey   (BtDb *bt, uint slot);
228
229 //      manager functions
230 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint cacheblk, uint segsize, uint hashsize);
231 void bt_mgrclose (BtMgr *mgr);
232
233 //  Helper functions to return cursor slot values
234
235 extern BtKey bt_key (BtDb *bt, uint slot);
236 extern uid bt_uid (BtDb *bt, uint slot);
237 extern uint bt_tod (BtDb *bt, uint slot);
238
239 //  BTree page number constants
240 #define ALLOC_page              0
241 #define ROOT_page               1
242
243 //      Number of levels to create in a new BTree
244
245 #define MIN_lvl                 2
246
247 //  The page is allocated from low and hi ends.
248 //  The key offsets and row-id's are allocated
249 //  from the bottom, while the text of the key
250 //  is allocated from the top.  When the two
251 //  areas meet, the page is split into two.
252
253 //  A key consists of a length byte, two bytes of
254 //  index number (0 - 65534), and up to 253 bytes
255 //  of key value.  Duplicate keys are discarded.
256 //  Associated with each key is a 48 bit row-id.
257
258 //  The b-tree root is always located at page 1.
259 //      The first leaf page of level zero is always
260 //      located on page 2.
261
262 //      When to root page fills, it is split in two and
263 //      the tree height is raised by a new root at page
264 //      one with two keys.
265
266 //      Deleted keys are marked with a dead bit until
267 //      page cleanup The fence key for a node is always
268 //      present, even after deletion and cleanup.
269
270 //  Groups of pages called segments from the btree are
271 //  cached with memory mapping. A hash table is used to keep
272 //  track of the cached segments.  This behaviour is controlled
273 //  by the cache block size parameter to bt_open.
274
275 //  To achieve maximum concurrency one page is locked at a time
276 //  as the tree is traversed to find leaf key in question.
277
278 //      An adoption traversal leaves the parent node locked as the
279 //      tree is traversed to the level in quesiton.
280
281 //  Page 0 is dedicated to lock for new page extensions,
282 //      and chains empty pages together for reuse.
283
284 //      Empty pages are chained together through the ALLOC page and reused.
285
286 //      Access macros to address slot and key values from the page
287
288 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
289 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
290
291 void bt_putid(unsigned char *dest, uid id)
292 {
293 int i = BtId;
294
295         while( i-- )
296                 dest[i] = (unsigned char)id, id >>= 8;
297 }
298
299 uid bt_getid(unsigned char *src)
300 {
301 uid id = 0;
302 int i;
303
304         for( i = 0; i < BtId; i++ )
305                 id <<= 8, id |= *src++; 
306
307         return id;
308 }
309
310 void bt_mgrclose (BtMgr *mgr)
311 {
312 BtHash *hash;
313 uint slot;
314
315         // release mapped pages
316
317         for( slot = 0; slot < mgr->nodemax; slot++ ) {
318                 hash = (BtHash *)(mgr->nodes + slot * (sizeof(BtHash) + (mgr->hashmask + 1) * sizeof(BtLatchSet)));
319                 if( hash->slot )
320 #ifdef unix
321                         munmap (hash->map, (mgr->hashmask+1) << mgr->page_bits);
322 #else
323                 {
324                         FlushViewOfFile(hash->map, 0);
325                         UnmapViewOfFile(hash->map);
326                         CloseHandle(hash->hmap);
327                 }
328 #endif
329         }
330
331 #ifdef unix
332         close (mgr->idx);
333         free (mgr->nodes);
334         free (mgr->cache);
335         free (mgr->latch);
336 #else
337         FlushFileBuffers(mgr->idx);
338         CloseHandle(mgr->idx);
339         GlobalFree (mgr->nodes);
340         GlobalFree (mgr->cache);
341         GlobalFree (mgr->latch);
342 #endif
343 }
344
345 //      close and release memory
346
347 void bt_close (BtDb *bt)
348 {
349 #ifdef unix
350         if ( bt->mem )
351                 free (bt->mem);
352         free (bt);
353 #else
354         if ( bt->mem)
355                 VirtualFree (bt->mem, 0, MEM_RELEASE);
356         GlobalFree (bt);
357 #endif
358 }
359
360 //  open/create new btree buffer manager
361
362 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
363 //              size of mapped page cache (e.g. 8192)
364
365 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint nodemax, uint segsize, uint hashsize)
366 {
367 uint lvl, attr, cacheblk, last;
368 BtPage alloc;
369 int lockmode;
370 off64_t size;
371 uint amt[1];
372 BtMgr* mgr;
373 BtKey key;
374
375 #ifndef unix
376 SYSTEM_INFO sysinfo[1];
377 #endif
378
379         // determine sanity of page size and buffer pool
380
381         if( bits > BT_maxbits )
382                 bits = BT_maxbits;
383         else if( bits < BT_minbits )
384                 bits = BT_minbits;
385
386         if( !nodemax )
387                 return NULL;    // must have buffer pool
388
389 #ifdef unix
390         mgr = calloc (1, sizeof(BtMgr));
391
392         switch (mode & 0x7fff)
393         {
394         case BT_rw:
395                 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
396                 lockmode = 1;
397                 break;
398
399         case BT_ro:
400         default:
401                 mgr->idx = open ((char*)name, O_RDONLY);
402                 lockmode = 0;
403                 break;
404         }
405         if( mgr->idx == -1 )
406                 return free(mgr), NULL;
407         
408         cacheblk = 4096;        // minimum mmap segment size for unix
409
410 #else
411         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
412         attr = FILE_ATTRIBUTE_NORMAL;
413         switch (mode & 0x7fff)
414         {
415         case BT_rw:
416                 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
417                 lockmode = 1;
418                 break;
419
420         case BT_ro:
421         default:
422                 mgr->idx = CreateFile(name, GENERIC_READ, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_EXISTING, attr, NULL);
423                 lockmode = 0;
424                 break;
425         }
426         if( mgr->idx == INVALID_HANDLE_VALUE )
427                 return GlobalFree(mgr), NULL;
428
429         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
430         GetSystemInfo(sysinfo);
431         cacheblk = sysinfo->dwAllocationGranularity;
432 #endif
433
434 #ifdef unix
435         alloc = malloc (BT_maxpage);
436         *amt = 0;
437
438         // read minimum page size to get root info
439
440         if( size = lseek (mgr->idx, 0L, 2) ) {
441                 if( pread(mgr->idx, alloc, BT_minpage, 0) == BT_minpage )
442                         bits = alloc->bits;
443                 else
444                         return free(mgr), free(alloc), NULL;
445         } else if( mode == BT_ro )
446                 return bt_mgrclose (mgr), NULL;
447 #else
448         alloc = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
449         size = GetFileSize(mgr->idx, amt);
450
451         if( size || *amt ) {
452                 if( !ReadFile(mgr->idx, (char *)alloc, BT_minpage, amt, NULL) )
453                         return bt_mgrclose (mgr), NULL;
454                 bits = alloc->bits;
455         } else if( mode == BT_ro )
456                 return bt_mgrclose (mgr), NULL;
457 #endif
458
459         mgr->page_size = 1 << bits;
460         mgr->page_bits = bits;
461
462         mgr->nodemax = nodemax;
463         mgr->mode = mode;
464
465         if( cacheblk < mgr->page_size )
466                 cacheblk = mgr->page_size;
467
468         //  mask for partial memmaps
469
470         mgr->hashmask = (cacheblk >> bits) - 1;
471
472         //      see if requested number of pages per memmap is greater
473
474         if( (1 << segsize) > mgr->hashmask )
475                 mgr->hashmask = (1 << segsize) - 1;
476
477         mgr->seg_bits = 0;
478
479         while( (1 << mgr->seg_bits) <= mgr->hashmask )
480                 mgr->seg_bits++;
481
482         mgr->hashsize = hashsize;
483
484 #ifdef unix
485         mgr->nodes = calloc (nodemax, (sizeof(BtHash) + (mgr->hashmask + 1) * sizeof(BtLatchSet)));
486         mgr->cache = calloc (hashsize, sizeof(ushort));
487         mgr->latch = calloc (hashsize, sizeof(BtLatch));
488 #else
489         mgr->nodes = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cacheblk * (sizeof(BtHash) + (mgr->hashmask + 1) * sizeof(BtLatchSet)));
490         mgr->cache = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
491         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtLatch));
492 #endif
493
494         if( size || *amt )
495                 goto mgrxit;
496
497         // initializes an empty b-tree with root page and page of leaves
498
499         memset (alloc, 0, 1 << bits);
500         bt_putid(slotptr(alloc, 2)->id, MIN_lvl+1);
501         alloc->bits = mgr->page_bits;
502
503 #ifdef unix
504         if( write (mgr->idx, alloc, mgr->page_size) < mgr->page_size )
505                 return bt_mgrclose (mgr), NULL;
506 #else
507         if( !WriteFile (mgr->idx, (char *)alloc, mgr->page_size, amt, NULL) )
508                 return bt_mgrclose (mgr), NULL;
509
510         if( *amt < mgr->page_size )
511                 return bt_mgrclose (mgr), NULL;
512 #endif
513
514         memset (alloc, 0, 1 << bits);
515         alloc->bits = mgr->page_bits;
516
517         for( lvl=MIN_lvl; lvl--; ) {
518                 slotptr(alloc, 1)->off = mgr->page_size - 3;
519                 bt_putid(slotptr(alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);           // next(lower) page number
520                 key = keyptr(alloc, 1);
521                 key->len = 2;                   // create stopper key
522                 key->key[0] = 0xff;
523                 key->key[1] = 0xff;
524                 alloc->min = mgr->page_size - 3;
525                 alloc->lvl = lvl;
526                 alloc->cnt = 1;
527                 alloc->act = 1;
528 #ifdef unix
529                 if( write (mgr->idx, alloc, mgr->page_size) < mgr->page_size )
530                         return bt_mgrclose (mgr), NULL;
531 #else
532                 if( !WriteFile (mgr->idx, (char *)alloc, mgr->page_size, amt, NULL) )
533                         return bt_mgrclose (mgr), NULL;
534
535                 if( *amt < mgr->page_size )
536                         return bt_mgrclose (mgr), NULL;
537 #endif
538         }
539
540         // create empty page area by writing last page of first
541         // cache area (other pages are zeroed by O/S)
542
543         if( mgr->hashmask ) {
544                 memset(alloc, 0, mgr->page_size);
545                 last = mgr->hashmask;
546
547                 while( last < MIN_lvl + 1 )
548                         last += mgr->hashmask + 1;
549
550 #ifdef unix
551                 pwrite(mgr->idx, alloc, mgr->page_size, last << mgr->page_bits);
552 #else
553                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
554                 if( !WriteFile (mgr->idx, (char *)alloc, mgr->page_size, amt, NULL) )
555                         return bt_mgrclose (mgr), NULL;
556                 if( *amt < mgr->page_size )
557                         return bt_mgrclose (mgr), NULL;
558 #endif
559         }
560
561 mgrxit:
562 #ifdef unix
563         free (alloc);
564 #else
565         VirtualFree (alloc, 0, MEM_RELEASE);
566 #endif
567         return mgr;
568 }
569
570 //      open BTree access method
571 //      based on buffer manager
572
573 BtDb *bt_open (BtMgr *mgr)
574 {
575 BtDb *bt = malloc (sizeof(*bt));
576
577         memset (bt, 0, sizeof(*bt));
578         bt->mgr = mgr;
579 #ifdef unix
580         bt->mem = malloc (3 *mgr->page_size);
581 #else
582         bt->mem = VirtualAlloc(NULL, 3 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
583 #endif
584         bt->frame = (BtPage)bt->mem;
585         bt->zero = (BtPage)(bt->mem + 1 * mgr->page_size);
586         bt->cursor = (BtPage)(bt->mem + 2 * mgr->page_size);
587         return bt;
588 }
589
590 //  compare two keys, returning > 0, = 0, or < 0
591 //  as the comparison value
592
593 int keycmp (BtKey key1, unsigned char *key2, uint len2)
594 {
595 uint len1 = key1->len;
596 int ans;
597
598         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
599                 return ans;
600
601         if( len1 > len2 )
602                 return 1;
603         if( len1 < len2 )
604                 return -1;
605
606         return 0;
607 }
608
609 //      Latch Manager
610
611 //      wait until write lock mode is clear
612 //      and add 1 to the share count
613
614 void bt_readlock(BtLatch *latch)
615 {
616   do {
617         //  add one to counter, check write bit
618
619 #ifdef unix
620         if( ~__sync_fetch_and_add((int *)latch, Share) & Write )
621                 return;
622 #else
623         if( ~InterlockedAdd((int *)latch, Share) & Write )
624                 return;
625 #endif
626         //  didn't get latch, reset counter by one
627
628 #ifdef unix
629         __sync_fetch_and_add((int *)latch, -Share);
630 #else
631         InterlockedAdd ((int *)latch, -Share);
632 #endif
633
634         //      and yield
635 #ifdef  unix
636         sched_yield();
637 #else
638         SwitchToThread();
639 #endif
640   } while( 1 );
641 }
642
643 //      wait for other read and write latches to relinquish
644
645 void bt_writelock(BtLatch *latch)
646 {
647 int prev, ours = 0;
648
649   do {
650         //  see if we can get write access
651         //      with no readers
652 #ifdef unix
653         prev = __sync_fetch_and_or((int *)latch, Write);
654 #else
655         prev = InterlockedOr((int *)latch, Write);
656 #endif
657
658         if( ~prev & 1 )
659                 ours++; // it's ours
660
661         if( !(prev >> 1) && ours )
662                 return;
663
664         //      otherwise yield
665
666 #ifdef  unix
667         sched_yield();
668 #else
669         SwitchToThread();
670 #endif
671   } while( 1 );
672 }
673
674 //      try to obtain write lock
675
676 //      return 1 if obtained,
677 //              0 if already write locked
678
679 int bt_writetry(BtLatch *latch)
680 {
681 int prev, ours = 0;
682
683   do {
684         //  see if we can get write access
685         //      with no readers
686 #ifdef unix
687         prev = __sync_fetch_and_or((int *)latch, Write);
688 #else
689         prev = InterlockedOr((int *)latch, Write);
690 #endif
691
692         if( ~prev & 1 )
693                 ours++; // it's ours
694
695         if( !ours )
696                 return 0;
697
698         if( !(prev >> 1) && ours )
699                 return 1;
700
701         //      otherwise yield
702 #ifdef  unix
703         sched_yield();
704 #else
705         SwitchToThread();
706 #endif
707   } while( 1 );
708 }
709
710 //      clear write mode
711
712 void bt_releasewrite(BtLatch *latch)
713 {
714 #ifdef unix
715         __sync_fetch_and_and((int *)latch, ~Write);
716 #else
717         InterlockedAnd ((int *)latch, ~Write);
718 #endif
719 }
720
721 //      decrement reader count
722
723 void bt_releaseread(BtLatch *latch)
724 {
725 #ifdef unix
726         __sync_fetch_and_add((int *)latch, -Share);
727 #else
728         InterlockedAdd((int *)latch, -Share);
729 #endif
730 }
731
732 //      Buffer Pool mgr
733
734 // find segment in cache
735 //      return NULL if not there
736 //      otherwise return node
737
738 BtHash *bt_findhash(BtDb *bt, uid page_no, uint idx)
739 {
740 BtHash *hash;
741 uint slot;
742
743         // compute cache block first page and hash idx 
744
745         if( slot = bt->mgr->cache[idx] ) 
746                 hash = (BtHash *)(bt->mgr->nodes + slot * (sizeof(BtHash) + (bt->mgr->hashmask + 1) * sizeof(BtLatchSet)));
747         else
748                 return NULL;
749
750         page_no &= ~bt->mgr->hashmask;
751
752         while( hash->basepage != page_no )
753           if( hash = hash->hashnext )
754                 continue;
755           else
756                 return NULL;
757
758         return hash;
759 }
760
761 // add segment to hash table
762
763 void bt_linkhash(BtDb *bt, BtHash *hash, uid page_no, int idx)
764 {
765 BtHash *node;
766 uint slot;
767
768         hash->hashprev = hash->hashnext = NULL;
769         hash->basepage = page_no & ~bt->mgr->hashmask;
770         hash->pin = 1;
771         hash->lru = 1;
772
773         if( slot = bt->mgr->cache[idx] ) {
774                 node = (BtHash *)(bt->mgr->nodes + slot * (sizeof(BtHash) + (bt->mgr->hashmask + 1) * sizeof(BtLatchSet)));
775                 hash->hashnext = node;
776                 node->hashprev = hash;
777         }
778
779         bt->mgr->cache[idx] = hash->slot;
780 }
781
782 //      find best segment to evict from buffer pool
783
784 BtHash *bt_findlru (BtDb *bt, uint slot)
785 {
786 unsigned long long int target = ~0LL;
787 BtHash *hash = NULL, *node;
788
789         if( !slot )
790                 return NULL;
791
792         node = (BtHash *)(bt->mgr->nodes + slot * (sizeof(BtHash) + (bt->mgr->hashmask + 1) * sizeof(BtLatchSet)));
793
794         do {
795           if( node->pin )
796                 continue;
797           if( node->lru > target )
798                 continue;
799           target = node->lru;
800           hash = node;
801         } while( node = node->hashnext );
802
803         return hash;
804 }
805
806 //  map new segment to virtual memory
807
808 BTERR bt_mapsegment(BtDb *bt, BtHash *hash, uid page_no)
809 {
810 off64_t off = (page_no & ~bt->mgr->hashmask) << bt->mgr->page_bits;
811 off64_t limit = off + ((bt->mgr->hashmask+1) << bt->mgr->page_bits);
812 int flag;
813
814 #ifdef unix
815         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
816         hash->map = mmap (0, (bt->mgr->hashmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
817         if( hash->map == MAP_FAILED )
818                 return bt->err = BTERR_map;
819 #else
820         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
821         hash->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
822         if( !hash->hmap )
823                 return bt->err = BTERR_map;
824
825         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
826         hash->map = MapViewOfFile(hash->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->mgr->hashmask+1) << bt->mgr->page_bits);
827         if( !hash->map )
828                 return bt->err = BTERR_map;
829 #endif
830         return bt->err = 0;
831 }
832
833 //      find or place requested page in segment-cache
834 //      return hash table entry
835
836 BtHash *bt_hashpage(BtDb *bt, uid page_no)
837 {
838 BtHash *hash, *node, *next;
839 uint slot, idx, victim;
840 BtLatchSet *set;
841
842         //      lock hash table chain
843
844         idx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
845         bt_readlock (&bt->mgr->latch[idx]);
846
847         //      look up in hash table
848
849         if( hash = bt_findhash(bt, page_no, idx) ) {
850 #ifdef unix
851                 __sync_fetch_and_add(&hash->pin, 1);
852 #else
853                 InterlockedIncrement (&hash->pin);
854 #endif
855                 bt_releaseread (&bt->mgr->latch[idx]);
856                 hash->lru++;
857                 return hash;
858         }
859
860         //      upgrade to write lock
861
862         bt_releaseread (&bt->mgr->latch[idx]);
863         bt_writelock (&bt->mgr->latch[idx]);
864
865         // try to find page in cache with write lock
866
867         if( hash = bt_findhash(bt, page_no, idx) ) {
868 #ifdef unix
869                 __sync_fetch_and_add(&hash->pin, 1);
870 #else
871                 InterlockedIncrement (&hash->pin);
872 #endif
873                 bt_releasewrite (&bt->mgr->latch[idx]);
874                 hash->lru++;
875                 return hash;
876         }
877
878         // allocate a new hash node
879         // and add to hash table
880
881 #ifdef unix
882         slot = __sync_fetch_and_add(&bt->mgr->nodecnt, 1);
883 #else
884         slot = InterlockedIncrement (&bt->mgr->nodecnt) - 1;
885 #endif
886
887         if( ++slot < bt->mgr->nodemax ) {
888                 hash = (BtHash *)(bt->mgr->nodes + slot * (sizeof(BtHash) + (bt->mgr->hashmask + 1) * sizeof(BtLatchSet)));
889                 hash->slot = slot;
890
891                 if( bt_mapsegment(bt, hash, page_no) )
892                         return NULL;
893
894                 bt_linkhash(bt, hash, page_no, idx);
895                 bt_releasewrite (&bt->mgr->latch[idx]);
896                 return hash;
897         }
898
899         // hash table is full
900         //      find best cache entry to evict
901
902 #ifdef unix
903         __sync_fetch_and_add(&bt->mgr->nodecnt, -1);
904 #else
905         InterlockedDecrement (&bt->mgr->nodecnt);
906 #endif
907
908         while( 1 ) {
909 #ifdef unix
910                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
911 #else
912                 victim = InterlockedIncrement (&bt->mgr->evicted) - 1;
913 #endif
914                 victim %= bt->mgr->hashsize;
915
916                 // try to get write lock
917                 //      skip entry if not obtained
918
919                 if( !bt_writetry (&bt->mgr->latch[victim]) )
920                         continue;
921
922                 //  if cache entry is empty
923                 //      or no slots are unpinned
924                 //      skip this entry
925
926                 if( !(hash = bt_findlru(bt, bt->mgr->cache[victim])) ) {
927                         bt_releasewrite (&bt->mgr->latch[victim]);
928                         continue;
929                 }
930
931                 // unlink victim hash node from hash table
932
933                 if( node = hash->hashprev )
934                         node->hashnext = hash->hashnext;
935                 else if( node = hash->hashnext )
936                         bt->mgr->cache[victim] = node->slot;
937                 else
938                         bt->mgr->cache[victim] = 0;
939
940                 if( node = hash->hashnext )
941                         node->hashprev = hash->hashprev;
942
943                 //      remove old file mapping
944 #ifdef unix
945                 munmap (hash->map, (bt->mgr->hashmask+1) << bt->mgr->page_bits);
946 #else
947                 FlushViewOfFile(hash->map, 0);
948                 UnmapViewOfFile(hash->map);
949                 CloseHandle(hash->hmap);
950 #endif
951                 hash->map = NULL;
952                 bt_releasewrite (&bt->mgr->latch[victim]);
953
954                 //  create new file mapping
955                 //  and link into hash table
956
957                 if( bt_mapsegment(bt, hash, page_no) )
958                         return NULL;
959
960                 bt_linkhash(bt, hash, page_no, idx);
961                 bt_releasewrite (&bt->mgr->latch[idx]);
962                 return hash;
963         }
964 }
965
966 // place write, read, or parent lock on requested page_no.
967 //      pin to buffer pool
968
969 BTERR bt_lockpage(BtDb *bt, uid page_no, BtLock mode, BtPage *page)
970 {
971 BtLatchSet *set;
972 BtHash *hash;
973 uint subpage;
974
975         //      find/create maping in hash table
976
977         if( hash = bt_hashpage(bt, page_no) )
978                 subpage = (uint)(page_no & bt->mgr->hashmask); // page within mapping
979         else
980                 return bt->err;
981
982         set = hash->pagelatch + subpage;
983
984         switch( mode ) {
985         case BtLockRead:
986                 bt_readlock (set->readwr);
987                 break;
988         case BtLockWrite:
989                 bt_writelock (set->readwr);
990                 break;
991         case BtLockAccess:
992                 bt_readlock (set->access);
993                 break;
994         case BtLockDelete:
995                 bt_writelock (set->access);
996                 break;
997         case BtLockParent:
998                 bt_writelock (set->parent);
999                 break;
1000         default:
1001                 return bt->err = BTERR_lock;
1002         }
1003
1004         if( page )
1005                 *page = (BtPage)(hash->map + (subpage << bt->mgr->page_bits));
1006
1007         return bt->err = 0;
1008 }
1009
1010 // remove write, read, or parent lock on requested page_no.
1011
1012 BTERR bt_unlockpage(BtDb *bt, uid page_no, BtLock mode)
1013 {
1014 uint subpage, idx;
1015 BtLatchSet *set;
1016 BtHash *hash;
1017
1018         //      since page is pinned
1019         //      it should still be in the buffer pool
1020
1021         idx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1022         bt_readlock (&bt->mgr->latch[idx]);
1023
1024         if( hash = bt_findhash(bt, page_no, idx) )
1025                 subpage = (uint)(page_no & bt->mgr->hashmask);
1026         else
1027                 return bt->err = BTERR_hash;
1028
1029         bt_releaseread (&bt->mgr->latch[idx]);
1030         set = hash->pagelatch + subpage;
1031
1032         switch( mode ) {
1033         case BtLockRead:
1034                 bt_releaseread (set->readwr);
1035                 break;
1036         case BtLockWrite:
1037                 bt_releasewrite (set->readwr);
1038                 break;
1039         case BtLockAccess:
1040                 bt_releaseread (set->access);
1041                 break;
1042         case BtLockDelete:
1043                 bt_releasewrite (set->access);
1044                 break;
1045         case BtLockParent:
1046                 bt_releasewrite (set->parent);
1047                 break;
1048         default:
1049                 return bt->err = BTERR_lock;
1050         }
1051
1052 #ifdef  unix
1053         __sync_fetch_and_add(&hash->pin, -1);
1054 #else
1055         InterlockedDecrement (&hash->pin);
1056 #endif
1057         return bt->err = 0;
1058 }
1059
1060 //      deallocate a deleted page that has no tree pointers
1061 //      place on free chain out of allocator page
1062
1063 BTERR bt_freepage(BtDb *bt, uid page_no)
1064 {
1065         //  obtain delete lock on deleted page
1066
1067         if( bt_lockpage(bt, page_no, BtLockDelete, NULL) )
1068                 return bt->err;
1069
1070         //  obtain write lock on deleted page
1071
1072         if( bt_lockpage(bt, page_no, BtLockWrite, &bt->temp) )
1073                 return bt->err;
1074
1075         //      lock allocation page
1076
1077         if ( bt_lockpage(bt, ALLOC_page, BtLockWrite, &bt->alloc) )
1078                 return bt->err;
1079
1080         //      store chain in first key
1081         bt_putid(slotptr(bt->temp, 1)->id, bt_getid(slotptr(bt->alloc, 1)->id));
1082         bt_putid(slotptr(bt->alloc, 1)->id, page_no);
1083
1084         // unlock page zero 
1085
1086         if( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
1087                 return bt->err;
1088
1089         //  remove write lock on deleted node
1090
1091         if( bt_unlockpage(bt, page_no, BtLockWrite) )
1092                 return bt->err;
1093
1094         //  remove delete lock on deleted node
1095
1096         if( bt_unlockpage(bt, page_no, BtLockDelete) )
1097                 return bt->err;
1098
1099         return 0;
1100 }
1101
1102 //      allocate a new page and write page into it
1103
1104 uid bt_newpage(BtDb *bt, BtPage page)
1105 {
1106 uid new_page;
1107 BtPage pmap;
1108 int reuse;
1109
1110         // lock page zero
1111
1112         if ( bt_lockpage(bt, ALLOC_page, BtLockWrite, &bt->alloc) )
1113                 return 0;
1114
1115         // use empty chain first
1116         // else allocate empty page
1117
1118         if( new_page = bt_getid(slotptr(bt->alloc, 1)->id) ) {
1119                 if( bt_lockpage (bt, new_page, BtLockWrite, &bt->temp) )
1120                         return 0;
1121                 bt_putid(slotptr(bt->alloc, 1)->id, bt_getid(slotptr(bt->temp, 1)->id));
1122                 if( bt_unlockpage (bt, new_page, BtLockWrite) )
1123                         return 0;
1124                 reuse = 1;
1125         } else {
1126                 new_page = bt_getid(slotptr(bt->alloc, 2)->id);
1127                 bt_putid(slotptr(bt->alloc, 2)->id, new_page+1);
1128                 reuse = 0;
1129         }
1130 #ifdef unix
1131         if ( pwrite(bt->mgr->idx, page, bt->mgr->page_size, new_page << bt->mgr->page_bits) < bt->mgr->page_size )
1132                 return bt->err = BTERR_wrt, 0;
1133
1134         // if writing first page of hash block, zero last page in the block
1135
1136         if ( !reuse && bt->mgr->hashmask > 0 && (new_page & bt->mgr->hashmask) == 0 )
1137         {
1138                 // use zero buffer to write zeros
1139                 memset(bt->zero, 0, bt->mgr->page_size);
1140                 if ( pwrite(bt->mgr->idx,bt->zero, bt->mgr->page_size, (new_page | bt->mgr->hashmask) << bt->mgr->page_bits) < bt->mgr->page_size )
1141                         return bt->err = BTERR_wrt, 0;
1142         }
1143 #else
1144         //      bring new page into page-cache and copy page.
1145         //      this will extend the file into the new pages.
1146
1147         if( bt_lockpage(bt, new_page, BtLockWrite, &pmap) )
1148                 return 0;
1149
1150         memcpy(pmap, page, bt->mgr->page_size);
1151
1152         if( bt_unlockpage (bt, new_page, BtLockWrite) )
1153                 return 0;
1154 #endif
1155         // unlock page zero 
1156
1157         if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
1158                 return 0;
1159
1160         return new_page;
1161 }
1162
1163 //  find slot in page for given key at a given level
1164
1165 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1166 {
1167 uint diff, higher = bt->page->cnt, low = 1, slot;
1168
1169         //      low is the lowest candidate, higher is already
1170         //      tested as .ge. the given key, loop ends when they meet
1171
1172         while( diff = higher - low ) {
1173                 slot = low + ( diff >> 1 );
1174                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1175                         low = slot + 1;
1176                 else
1177                         higher = slot;
1178         }
1179
1180         return higher;
1181 }
1182
1183 //  find and load page at given level for given key
1184 //      leave page rd or wr locked as requested
1185
1186 int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
1187 {
1188 uid page_no = ROOT_page, prevpage = 0;
1189 uint drill = 0xff, slot;
1190 uint mode, prevmode;
1191
1192   //  start at root of btree and drill down
1193
1194   do {
1195         // determine lock mode of drill level
1196         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1197
1198         bt->page_no = page_no;
1199
1200         // obtain access lock using lock chaining with Access mode
1201
1202         if( page_no > ROOT_page )
1203           if( bt_lockpage(bt, page_no, BtLockAccess, NULL) )
1204                 return 0;                                                                       
1205
1206         if( prevpage )
1207           if( bt_unlockpage(bt, prevpage, prevmode) )
1208                 return 0;
1209
1210         // obtain read lock using lock chaining
1211         // and pin page contents
1212
1213         if( bt_lockpage(bt, page_no, mode, &bt->page) )
1214                 return 0;                                                                       
1215
1216         if( page_no > ROOT_page )
1217           if( bt_unlockpage(bt, page_no, BtLockAccess) )
1218                 return 0;                                                                       
1219
1220         // re-read and re-lock root after determining actual level of root
1221
1222         if( bt->page_no == ROOT_page )
1223           if( bt->page->lvl != drill) {
1224                 drill = bt->page->lvl;
1225
1226             if( lock == BtLockWrite && drill == lvl )
1227                   if( bt_unlockpage(bt, page_no, mode) )
1228                         return 0;
1229                   else
1230                         continue;
1231           }
1232
1233         //      if page is being deleted,
1234         //      move back to preceeding page
1235
1236         if( bt->page->kill ) {
1237                 page_no = bt_getid (bt->page->right);
1238                 continue;
1239         }
1240
1241         //  find key on page at this level
1242         //  and descend to requested level
1243
1244         slot = bt_findslot (bt, key, len);
1245
1246         //      is this slot a foster child?
1247
1248         if( slot <= bt->page->cnt - bt->page->foster )
1249           if( drill == lvl )
1250                 return slot;
1251           else
1252                 drill--;
1253
1254         while( slotptr(bt->page, slot)->dead )
1255           if( slot++ < bt->page->cnt )
1256                 continue;
1257           else
1258                 return bt->err = BTERR_struct, 0;
1259
1260         //  continue down / right using overlapping locks
1261         //  to protect pages being killed or split.
1262
1263         prevmode = mode;
1264         prevpage = bt->page_no;
1265         page_no = bt_getid(slotptr(bt->page, slot)->id);
1266   } while( page_no );
1267
1268   // return error on end of chain
1269
1270   bt->err = BTERR_struct;
1271   return 0;     // return error
1272 }
1273
1274 //  find and delete key on page by marking delete flag bit
1275 //  when page becomes empty, delete it from the btree
1276
1277 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1278 {
1279 unsigned char leftkey[256], rightkey[256];
1280 uid page_no, right;
1281 uint slot, tod;
1282 BtKey ptr;
1283
1284         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1285                 ptr = keyptr(bt->page, slot);
1286         else
1287                 return bt->err;
1288
1289         // if key is found delete it, otherwise ignore request
1290
1291         if( !keycmp (ptr, key, len) )
1292                 if( slotptr(bt->page, slot)->dead == 0 ) {
1293                         slotptr(bt->page,slot)->dead = 1;
1294                         if( slot < bt->page->cnt )
1295                                 bt->page->dirty = 1;
1296                         bt->page->act--;
1297                 }
1298
1299         // return if page is not empty, or it has no right sibling
1300
1301         right = bt_getid(bt->page->right);
1302         page_no = bt->page_no;
1303
1304         if( !right || bt->page->act )
1305                 return bt_unlockpage(bt, page_no, BtLockWrite);
1306
1307         // obtain Parent lock over write lock
1308
1309         if( bt_lockpage(bt, page_no, BtLockParent, NULL) )
1310                 return bt->err;
1311
1312         // cache copy of key to delete
1313
1314         ptr = keyptr(bt->page, bt->page->cnt);
1315         memcpy(leftkey, ptr, ptr->len + 1);
1316
1317         // lock and map right page
1318
1319         if ( bt_lockpage(bt, right, BtLockWrite, &bt->temp) )
1320                 return bt->err;
1321
1322         // pull contents of next page into current empty page 
1323         memcpy (bt->page, bt->temp, bt->mgr->page_size);
1324
1325         //      cache copy of key to update
1326         ptr = keyptr(bt->temp, bt->temp->cnt);
1327         memcpy(rightkey, ptr, ptr->len + 1);
1328
1329         //  Mark right page as deleted and point it to left page
1330         //      until we can post updates at higher level.
1331
1332         bt_putid(bt->temp->right, page_no);
1333         bt->temp->kill = 1;
1334         bt->temp->cnt = 0;
1335
1336         if( bt_unlockpage(bt, right, BtLockWrite) )
1337                 return bt->err;
1338         if( bt_unlockpage(bt, page_no, BtLockWrite) )
1339                 return bt->err;
1340
1341         //  delete old lower key to consolidated node
1342
1343         if( bt_deletekey (bt, leftkey + 1, *leftkey, lvl + 1) )
1344                 return bt->err;
1345
1346         //  redirect higher key directly to consolidated node
1347
1348         if( slot = bt_loadpage (bt, rightkey+1, *rightkey, lvl+1, BtLockWrite) )
1349                 ptr = keyptr(bt->page, slot);
1350         else
1351                 return bt->err;
1352
1353         // since key already exists, update id
1354
1355         if( keycmp (ptr, rightkey+1, *rightkey) )
1356                 return bt->err = BTERR_struct;
1357
1358         slotptr(bt->page, slot)->dead = 0;
1359         bt_putid(slotptr(bt->page,slot)->id, page_no);
1360         bt_unlockpage(bt, bt->page_no, BtLockWrite);
1361
1362         //      obtain write lock and
1363         //      add right block to free chain
1364
1365         if( bt_freepage (bt, right) )
1366                 return bt->err;
1367
1368         //      remove ParentModify lock
1369
1370         if( bt_unlockpage(bt, page_no, BtLockParent) )
1371                 return bt->err;
1372         
1373         return 0;
1374 }
1375
1376 //      find key in leaf level and return row-id
1377
1378 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1379 {
1380 uint  slot;
1381 BtKey ptr;
1382 uid id;
1383
1384         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1385                 ptr = keyptr(bt->page, slot);
1386         else
1387                 return 0;
1388
1389         // if key exists, return row-id
1390         //      otherwise return 0
1391
1392         if( ptr->len == len && !memcmp (ptr->key, key, len) )
1393                 id = bt_getid(slotptr(bt->page,slot)->id);
1394         else
1395                 id = 0;
1396
1397         if ( bt_unlockpage(bt, bt->page_no, BtLockRead) )
1398                 return 0;
1399
1400         return id;
1401 }
1402
1403 //      check page for space available,
1404 //      clean if necessary and return
1405 //      0 - page needs splitting
1406 //      1 - go ahead
1407
1408 uint bt_cleanpage(BtDb *bt, uint amt)
1409 {
1410 uint nxt = bt->mgr->page_size;
1411 BtPage page = bt->page;
1412 uint cnt = 0, idx = 0;
1413 uint max = page->cnt;
1414 BtKey key;
1415
1416         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1417                 return 1;
1418
1419         //      skip cleanup if nothing to reclaim
1420
1421         if( !page->dirty )
1422                 return 0;
1423
1424         memcpy (bt->frame, page, bt->mgr->page_size);
1425
1426         // skip page info and set rest of page to zero
1427
1428         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1429         page->dirty = 0;
1430         page->act = 0;
1431
1432         // try cleaning up page first
1433
1434         while( cnt++ < max ) {
1435                 // always leave fence key in list
1436                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1437                         continue;
1438
1439                 // copy key
1440                 key = keyptr(bt->frame, cnt);
1441                 nxt -= key->len + 1;
1442                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1443
1444                 // copy slot
1445                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1446                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1447                         page->act++;
1448                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1449                 slotptr(page, idx)->off = nxt;
1450         }
1451         page->min = nxt;
1452         page->cnt = idx;
1453
1454         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1455                 return 1;
1456
1457         return 0;
1458 }
1459
1460 //      add key to page
1461 //      return with page unlocked
1462
1463 BTERR bt_addkeytopage (BtDb *bt, uint slot, unsigned char *key, uint len, uid id, uint tod)
1464 {
1465 BtPage page = bt->page;
1466 uint idx;
1467
1468         // calculate next available slot and copy key into page
1469
1470         page->min -= len + 1;
1471         ((unsigned char *)page)[page->min] = len;
1472         memcpy ((unsigned char *)page + page->min +1, key, len );
1473
1474         for( idx = slot; idx < page->cnt; idx++ )
1475           if( slotptr(page, idx)->dead )
1476                 break;
1477
1478         // now insert key into array before slot
1479         // preserving the fence slot
1480
1481         if( idx == page->cnt )
1482                 idx++, page->cnt++;
1483
1484         page->act++;
1485
1486         while( idx > slot )
1487                 *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
1488
1489         bt_putid(slotptr(page,slot)->id, id);
1490         slotptr(page, slot)->off = page->min;
1491         slotptr(page, slot)->tod = tod;
1492         slotptr(page, slot)->dead = 0;
1493
1494         return bt_unlockpage(bt, bt->page_no, BtLockWrite);
1495 }
1496
1497 // split the root and raise the height of the btree
1498
1499 BTERR bt_splitroot(BtDb *bt, uid right)
1500 {
1501 uint nxt = bt->mgr->page_size;
1502 unsigned char fencekey[256];
1503 BtPage root = bt->page;
1504 uid new_page;
1505 BtKey key;
1506
1507         //  Obtain an empty page to use, and copy the left page
1508         //  contents into it.  Strip foster child key.
1509         //      Save left fence key.
1510
1511         bt->page->act--;
1512         bt->page->cnt--;
1513         bt->page->foster--;
1514         key = keyptr(bt->page, bt->page->cnt);
1515         memcpy (fencekey, key, key->len + 1);
1516
1517         if( !(new_page = bt_newpage(bt, bt->page)) )
1518                 return bt->err;
1519
1520         // preserve the page info at the bottom
1521         // and set rest to zero
1522
1523         memset (root+1, 0, bt->mgr->page_size - sizeof(*root));
1524
1525         // insert left fence key on newroot page
1526
1527         nxt -= *fencekey + 1;
1528         memcpy ((unsigned char *)root + nxt, fencekey, *fencekey + 1);
1529         bt_putid(slotptr(root, 1)->id, new_page);
1530         slotptr(root, 1)->off = nxt;
1531         
1532         // insert stopper key on newroot page
1533         // and increase the root height
1534
1535         nxt -= 3;
1536         fencekey[0] = 2;
1537         fencekey[1] = 0xff;
1538         fencekey[2] = 0xff;
1539         memcpy ((unsigned char *)root + nxt, fencekey, *fencekey + 1);
1540         bt_putid(slotptr(root, 2)->id, right);
1541         slotptr(root, 2)->off = nxt;
1542
1543         bt_putid(root->right, 0);
1544         root->min = nxt;                // reset lowest used offset and key count
1545         root->cnt = 2;
1546         root->act = 2;
1547         root->lvl++;
1548
1549         // release root (bt->page)
1550
1551         return bt_unlockpage(bt, bt->page_no, BtLockWrite);
1552 }
1553
1554 //  split already locked full node
1555 //      return unlocked.
1556
1557 BTERR bt_splitpage (BtDb *bt)
1558 {
1559 uint slot, cnt, idx, max, nxt = bt->mgr->page_size;
1560 unsigned char fencekey[256];
1561 uid page_no = bt->page_no;
1562 BtPage page = bt->page;
1563 uint tod = time(NULL);
1564 uint lvl = page->lvl;
1565 uid new_page, right;
1566 BtKey key;
1567
1568         //      initialize frame buffer
1569
1570         memset (bt->frame, 0, bt->mgr->page_size);
1571         max = page->cnt - page->foster;
1572         tod = (uint)time(NULL);
1573         cnt = max / 2;
1574         idx = 0;
1575
1576         //  split higher half of keys to bt->frame
1577         //      leaving foster children in the left node.
1578
1579         while( cnt++ < max ) {
1580                 key = keyptr(page, cnt);
1581                 nxt -= key->len + 1;
1582                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1583                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1584                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1585                 slotptr(bt->frame, idx)->off = nxt;
1586                 bt->frame->act++;
1587         }
1588
1589         // transfer right link node
1590
1591         if( page_no > ROOT_page ) {
1592                 right = bt_getid (page->right);
1593                 bt_putid(bt->frame->right, right);
1594         }
1595
1596         bt->frame->bits = bt->mgr->page_bits;
1597         bt->frame->min = nxt;
1598         bt->frame->cnt = idx;
1599         bt->frame->lvl = lvl;
1600
1601         //      get new free page and write frame to it.
1602
1603         if( !(new_page = bt_newpage(bt, bt->frame)) )
1604                 return bt->err;
1605
1606         //      update lower keys and foster children to continue in old page
1607
1608         memcpy (bt->frame, page, bt->mgr->page_size);
1609         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1610         nxt = bt->mgr->page_size;
1611         page->act = 0;
1612         cnt = 0;
1613         idx = 0;
1614
1615         //  assemble page of smaller keys
1616         //      to remain in the old page
1617
1618         while( cnt++ < max / 2 ) {
1619                 key = keyptr(bt->frame, cnt);
1620                 nxt -= key->len + 1;
1621                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1622                 memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1623                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1624                 slotptr(page, idx)->off = nxt;
1625                 page->act++;
1626         }
1627
1628         //  assemble old foster child keys
1629         //      add new foster child fence
1630
1631         cnt = bt->frame->cnt - bt->frame->foster - 1;
1632
1633         while( cnt++ < bt->frame->cnt ) {
1634                 key = keyptr(bt->frame, cnt);
1635                 nxt -= key->len + 1;
1636                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1637                 memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1638                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1639                 slotptr(page, idx)->off = nxt;
1640                 page->act++;
1641         }
1642
1643         //      link new right page
1644
1645         bt_putid (page->right, new_page);
1646
1647         //      put new page as smallest foster child key
1648
1649         page->min = nxt;
1650         page->cnt = idx;
1651         cnt = page->cnt - page->foster++;
1652         bt_putid (slotptr(page,cnt)->id, new_page);
1653
1654         // if current page is the root page, split it
1655
1656         if( page_no == ROOT_page )
1657                 return bt_splitroot (bt, new_page);
1658
1659         //  release wr lock on page
1660
1661         if( bt_unlockpage (bt, page_no, BtLockWrite) )
1662                 return bt->err;
1663
1664         // obtain ParentModification lock for current page
1665         //      to fix highest foster child on page
1666
1667         if( bt_lockpage (bt, page_no, BtLockParent, NULL) )
1668                 return bt->err;
1669
1670         if( bt_lockpage (bt, page_no, BtLockRead, &page) )
1671                 return bt->err;
1672
1673         //  get our old fence key
1674
1675         key = keyptr(page, page->cnt);
1676         memcpy (fencekey, key, key->len+1);
1677
1678         if( bt_unlockpage (bt, page_no, BtLockRead) )
1679                 return bt->err;
1680
1681         do {
1682           slot = bt_loadpage (bt, fencekey + 1, *fencekey, lvl + 1, BtLockWrite);
1683
1684           if( !slot )
1685                 return bt->err;
1686
1687           // check if parent page has enough space for largest possible key
1688
1689           if( bt_cleanpage (bt, 256) )
1690                 break;
1691
1692           if( bt_splitpage (bt) )
1693                 return bt->err;
1694         } while( 1 );
1695
1696         //      wait until readers from parent get their locks
1697
1698         if( bt_lockpage (bt, page_no, BtLockDelete, NULL) )
1699                 return bt->err;
1700
1701         if( bt_lockpage (bt, page_no, BtLockWrite, &page) )
1702                 return bt->err;
1703
1704         //      switch parent fence key to foster child
1705
1706         if( slotptr(page, page->cnt)->dead )
1707                 slotptr(bt->page, slot)->dead = 1;
1708         else
1709                 bt_putid (slotptr(bt->page, slot)->id, bt_getid(slotptr(page, page->cnt)->id));
1710
1711         //      remove foster child from our page
1712         //      add our new fence key to parent
1713
1714         page->cnt--;
1715         page->act--;
1716         page->foster--;
1717         page->dirty = 1;
1718         key = keyptr(page, page->cnt);
1719
1720         if( bt_addkeytopage (bt, slot, key->key, key->len, page_no, tod) )
1721                 return bt->err;
1722
1723         if( bt_unlockpage (bt, page_no, BtLockDelete) )
1724                 return bt->err;
1725
1726         if( bt_unlockpage (bt, page_no, BtLockParent) )
1727                 return bt->err;
1728
1729         return bt_unlockpage (bt, page_no, BtLockWrite);
1730 }
1731
1732 //  Insert new key into the btree at leaf level.
1733
1734 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod)
1735 {
1736 uint slot, idx;
1737 BtPage page;
1738 BtKey ptr;
1739
1740         while( 1 ) {
1741                 if( slot = bt_loadpage (bt, key, len, 0, BtLockWrite) )
1742                         ptr = keyptr(bt->page, slot);
1743                 else
1744                 {
1745                         if ( !bt->err )
1746                                 bt->err = BTERR_ovflw;
1747                         return bt->err;
1748                 }
1749
1750                 // if key already exists, update id and return
1751
1752                 page = bt->page;
1753
1754                 if( !keycmp (ptr, key, len) ) {
1755                         slotptr(page, slot)->dead = 0;
1756                         slotptr(page, slot)->tod = tod;
1757                         bt_putid(slotptr(page,slot)->id, id);
1758                         return bt_unlockpage(bt, bt->page_no, BtLockWrite);
1759                 }
1760
1761                 // check if page has enough space
1762
1763                 if( bt_cleanpage (bt, len) )
1764                         break;
1765
1766                 if( bt_splitpage (bt) )
1767                         return bt->err;
1768         }
1769
1770         return bt_addkeytopage (bt, slot, key, len, id, tod);
1771 }
1772
1773 //  cache page of keys into cursor and return starting slot for given key
1774
1775 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
1776 {
1777 uint slot;
1778
1779         // cache page for retrieval
1780         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1781                 memcpy (bt->cursor, bt->page, bt->mgr->page_size);
1782         bt->cursor_page = bt->page_no;
1783         if ( bt_unlockpage(bt, bt->page_no, BtLockRead) )
1784                 return 0;
1785
1786         return slot;
1787 }
1788
1789 //  return next slot for cursor page
1790 //  or slide cursor right into next page
1791
1792 uint bt_nextkey (BtDb *bt, uint slot)
1793 {
1794 BtPage page;
1795 uid right;
1796
1797   do {
1798         right = bt_getid(bt->cursor->right);
1799         while( slot++ < bt->cursor->cnt - bt->cursor->foster )
1800           if( slotptr(bt->cursor,slot)->dead )
1801                 continue;
1802           else if( right || (slot < bt->cursor->cnt - bt->cursor->foster) )
1803                 return slot;
1804           else
1805                 break;
1806
1807         if( !right )
1808                 break;
1809
1810         bt->cursor_page = right;
1811
1812     if( bt_lockpage(bt, right, BtLockRead, &page) )
1813                 return 0;
1814
1815         memcpy (bt->cursor, page, bt->mgr->page_size);
1816
1817         if ( bt_unlockpage(bt, right, BtLockRead) )
1818                 return 0;
1819
1820         slot = 0;
1821   } while( 1 );
1822
1823   return bt->err = 0;
1824 }
1825
1826 BtKey bt_key(BtDb *bt, uint slot)
1827 {
1828         return keyptr(bt->cursor, slot);
1829 }
1830
1831 uid bt_uid(BtDb *bt, uint slot)
1832 {
1833         return bt_getid(slotptr(bt->cursor,slot)->id);
1834 }
1835
1836 uint bt_tod(BtDb *bt, uint slot)
1837 {
1838         return slotptr(bt->cursor,slot)->tod;
1839 }
1840
1841
1842 #ifdef STANDALONE
1843
1844 typedef struct {
1845         char type, num;
1846         char *infile;
1847         BtMgr *mgr;
1848 } ThreadArg;
1849
1850 //  standalone program to index file of keys
1851 //  then list them onto std-out
1852
1853 #ifdef unix
1854 void *index_file (void *arg)
1855 #else
1856 uint __stdcall index_file (void *arg)
1857 #endif
1858 {
1859 int line = 0, found = 0;
1860 unsigned char key[256];
1861 ThreadArg *args = arg;
1862 int ch, len = 0, slot;
1863 time_t tod[1];
1864 BtKey ptr;
1865 BtDb *bt;
1866 FILE *in;
1867
1868         bt = bt_open (args->mgr);
1869         time (tod);
1870
1871         switch(args->type | 0x20)
1872         {
1873         case 'w':
1874                 fprintf(stderr, "started indexing for %s\n", args->infile);
1875                 if( in = fopen (args->infile, "rb") )
1876                   while( ch = getc(in), ch != EOF )
1877                         if( ch == '\n' )
1878                         {
1879                           line++;
1880
1881                           if( args->num )
1882                                 sprintf((char *)key+len, "%.9d", line), len += 9;
1883
1884                           if( bt_insertkey (bt, key, len, line, *tod) )
1885                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
1886                           len = 0;
1887                         }
1888                         else if( len < 255 )
1889                                 key[len++] = ch;
1890                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
1891                 break;
1892
1893         case 'd':
1894                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
1895                 if( in = fopen (args->infile, "rb") )
1896                   while( ch = getc(in), ch != EOF )
1897                         if( ch == '\n' )
1898                         {
1899                           line++;
1900                           if( bt_deletekey (bt, key, len, 0) )
1901                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
1902                           len = 0;
1903                         }
1904                         else if( len < 255 )
1905                                 key[len++] = ch;
1906                 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
1907                 break;
1908
1909         case 'f':
1910                 fprintf(stderr, "started finding keys for %s\n", args->infile);
1911                 if( in = fopen (args->infile, "rb") )
1912                   while( ch = getc(in), ch != EOF )
1913                         if( ch == '\n' )
1914                         {
1915                           line++;
1916                           if( bt_findkey (bt, key, len) )
1917                                 found++;
1918                           else if( bt->err )
1919                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
1920                           len = 0;
1921                         }
1922                         else if( len < 255 )
1923                                 key[len++] = ch;
1924                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
1925                 break;
1926
1927         case 's':
1928                 len = key[0] = 0;
1929
1930                 fprintf(stderr, "started reading\n");
1931
1932                 if( slot = bt_startkey (bt, key, len) )
1933                   slot--;
1934                 else
1935                   fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
1936
1937                 while( slot = bt_nextkey (bt, slot) ) {
1938                         ptr = bt_key(bt, slot);
1939                         fwrite (ptr->key, ptr->len, 1, stdout);
1940                         fputc ('\n', stdout);
1941                 }
1942         }
1943
1944         bt_close (bt);
1945 #ifdef unix
1946         return NULL;
1947 #else
1948         return 0;
1949 #endif
1950 }
1951
1952 typedef struct timeval timer;
1953
1954 int main (int argc, char **argv)
1955 {
1956 int idx, cnt, len, slot, err;
1957 int segsize, bits = 16;
1958 #ifdef unix
1959 pthread_t *threads;
1960 timer start, stop;
1961 #else
1962 time_t start[1], stop[1];
1963 HANDLE *threads;
1964 #endif
1965 double real_time;
1966 ThreadArg *args;
1967 uint map = 0;
1968 int num = 0;
1969 char key[1];
1970 BtMgr *mgr;
1971 BtKey ptr;
1972 BtDb *bt;
1973
1974         if( argc < 3 ) {
1975                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
1976                 fprintf (stderr, "  where page_bits is the page size in bits\n");
1977                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
1978                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
1979                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
1980                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
1981                 exit(0);
1982         }
1983
1984 #ifdef unix
1985         gettimeofday(&start, NULL);
1986 #else
1987         time(start);
1988 #endif
1989
1990         if( argc > 3 )
1991                 bits = atoi(argv[3]);
1992
1993         if( argc > 4 )
1994                 map = atoi(argv[4]);
1995
1996         if( map > 65536 )
1997                 fprintf (stderr, "Warning: mapped_pool > 65536 segments\n");
1998
1999         if( argc > 5 )
2000                 segsize = atoi(argv[5]);
2001         else
2002                 segsize = 4;    // 16 pages per mmap segment
2003
2004         if( argc > 6 )
2005                 num = atoi(argv[6]);
2006
2007         cnt = argc - 7;
2008 #ifdef unix
2009         threads = malloc (cnt * sizeof(pthread_t));
2010 #else
2011         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2012 #endif
2013         args = malloc (cnt * sizeof(ThreadArg));
2014
2015         mgr = bt_mgr ((argv[1]), BT_rw, bits, map, segsize, map / 8);
2016
2017         if( !mgr ) {
2018                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2019                 exit (1);
2020         }
2021
2022         //      fire off threads
2023
2024         for( idx = 0; idx < cnt; idx++ ) {
2025                 args[idx].infile = argv[idx + 7];
2026                 args[idx].type = argv[2][0];
2027                 args[idx].mgr = mgr;
2028                 args[idx].num = num;
2029 #ifdef unix
2030                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2031                         fprintf(stderr, "Error creating thread %d\n", err);
2032 #else
2033                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2034 #endif
2035         }
2036
2037         //      wait for termination
2038
2039 #ifdef unix
2040         for( idx = 0; idx < cnt; idx++ )
2041                 pthread_join (threads[idx], NULL);
2042         gettimeofday(&stop, NULL);
2043         real_time = 1000.0 * ( stop.tv_sec - start.tv_sec ) + 0.001 * (stop.tv_usec - start.tv_usec );
2044 #else
2045         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2046
2047         for( idx = 0; idx < cnt; idx++ )
2048                 CloseHandle(threads[idx]);
2049
2050         time (stop);
2051         real_time = 1000 * (*stop - *start);
2052 #endif
2053         fprintf(stderr, " Time to complete: %.2f seconds\n", real_time/1000);
2054
2055         cnt = 0;
2056         len = key[0] = 0;
2057         bt = bt_open (mgr);
2058
2059         fprintf(stderr, "started reading\n");
2060
2061         if( slot = bt_startkey (bt, key, len) )
2062           slot--;
2063         else
2064           fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2065
2066         while( slot = bt_nextkey (bt, slot) )
2067           cnt++;
2068
2069         fprintf(stderr, " Total keys read %d\n", cnt);
2070
2071         bt_close (bt);
2072         bt_mgrclose (mgr);
2073 }
2074
2075 #endif  //STANDALONE