1 // jaluta's balanced B-Link tree algorithms
4 // author: karl malbrain, malbrain@cal.berkeley.edu
7 This work, including the source code, documentation
8 and related data, is placed into the public domain.
10 The orginal author is Karl Malbrain.
12 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
13 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
14 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
15 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
16 RESULTING FROM THE USE, MODIFICATION, OR
17 REDISTRIBUTION OF THIS SOFTWARE.
20 // Please see the project home page for documentation
21 // http://code.google.com/p/high-concurrency-btree
23 #define _FILE_OFFSET_BITS 64
24 #define _LARGEFILE64_SOURCE
39 #define WIN32_LEAN_AND_MEAN
50 typedef unsigned long long uid;
53 typedef unsigned long long off64_t;
54 typedef unsigned short ushort;
55 typedef unsigned int uint;
58 #define BT_ro 0x6f72 // ro
59 #define BT_rw 0x7772 // rw
60 #define BT_fl 0x6c66 // fl
62 #define BT_maxbits 24 // maximum page size in bits
63 #define BT_minbits 9 // minimum page size in bits
64 #define BT_minpage (1 << BT_minbits) // minimum page size
66 #define BT_hashsize 512 // size of hash index for page cache
67 #define BT_hashprime 8191 // prime number for hashing
76 // Define the length of the page and key pointers
80 // Page key slot definition.
82 // If BT_maxbits is 16 or less, you can save 4 bytes
83 // for each key stored by making the first two uints
84 // into ushorts. You can also save 4 bytes by removing
85 // the tod field from the key.
88 uint off; // page offset for key start
89 uint tod; // time-stamp for key
90 unsigned char id[BtId]; // id associated with key
93 // The key structure occupies space at the upper end of
94 // each page. It's a length byte followed by up to
99 unsigned char key[255];
102 // The first part of an index page.
103 // It is immediately followed
104 // by the BtSlot array of keys.
107 uint cnt; // count of keys in page
108 uint min; // next key offset
109 unsigned char lvl:3; // level of page
110 unsigned char bits:5; // page size in bits
111 unsigned char fence; // len of fence key at top of page
112 unsigned char right[BtId]; // page number to right
113 BtSlot slots[0]; // page slots
116 // The memory mapping hash table entry
119 BtPage page; // mapped page pointer
120 uid page_no; // mapped page number
121 void *lruprev; // least recently used previous cache block
122 void *lrunext; // lru next cache block
123 void *hashprev; // previous cache block for the same hash idx
124 void *hashnext; // next cache block for the same hash idx
130 // The object structure for Btree access
133 uint page_size; // each page size
134 uint page_bits; // each page size in bits
135 uid parentpage; // current parent page number
136 uid cursorpage; // current cursor page number
137 uid childpage; // current child page number
139 uint mode; // read-write mode
140 uint mapped_io; // use memory mapping
141 BtPage temp; // temporary frame buffer (memory mapped/file IO)
142 BtPage alloc; // frame buffer for alloc page ( page 0 )
143 BtPage cursor; // cached frame for start/next (never mapped)
144 BtPage frame; // spare frame for the page split (never mapped)
145 BtPage parent; // current parent page
146 BtPage child; // current child page
147 BtPage sibling; // current sibling page
148 BtPage sibling2; // current sibling2 page
154 unsigned char *mem; // frame, cursor, page memory buffer
155 int nodecnt; // highest page cache node in use
156 int nodemax; // highest page cache node allocated
157 int hashmask; // number of hash headers in cache - 1
158 BtHash *lrufirst; // lru list head
159 BtHash *lrulast; // lru list tail
160 ushort cache[BT_hashsize]; // hash index for cache
161 BtHash nodes[1]; // page cache follows
176 extern void bt_close (BtDb *bt);
177 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk);
178 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod);
179 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len);
180 extern uid bt_findkey (BtDb *bt, unsigned char *key, uint len);
181 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
182 extern uint bt_nextkey (BtDb *bt, uint slot);
184 // Helper functions to return slot values
186 extern BtKey bt_key (BtDb *bt, uint slot);
187 extern uid bt_uid (BtDb *bt, uint slot);
188 extern uint bt_tod (BtDb *bt, uint slot);
190 // BTree page number constants
194 // The page is allocated from low and hi ends.
195 // The key offsets and row-id's are allocated
196 // from the bottom, while the text of the key
197 // is allocated from the top. When the two
198 // areas meet, the page overflowns.
200 // A key consists of a length byte, two bytes of
201 // index number (0 - 65535), and up to 253 bytes
202 // of key value. Duplicate keys are discarded.
203 // Associated with each key is a 48 bit row-id.
205 // The b-tree root is always located at page 1.
206 // The first leaf page of level zero is always
207 // located on page 2.
209 // The b-tree pages at each level are linked
210 // with next page to right to facilitate
211 // cursors and provide for concurrency.
213 // When to root page overflows, it is split in two and
214 // the tree height is raised by a new root at page
215 // one with two keys.
217 // Groups of pages from the btree are optionally
218 // cached with memory mapping. A hash table is used to keep
219 // track of the cached pages. This behaviour is controlled
220 // by the cache block size parameter to bt_open.
222 // To achieve maximum concurrency one page is locked at a time
223 // as the tree is traversed to find leaf key in question. The right
224 // page numbers are used in cases where the page is being split,
227 // Page 0 is dedicated to lock for new page extensions,
228 // and chains empty pages together for reuse.
230 // Empty nodes are chained together through the ALLOC page and reused.
232 // A special open mode of BT_fl is provided to safely access files on
233 // WIN32 networks. WIN32 network operations should not use memory mapping.
234 // This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
235 // to prevent local caching of network file contents.
237 // Access macros to address slot and key values from the page
239 #define slotptr(page, slot) ((page)->slots + slot - 1)
240 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
242 void bt_putid(unsigned char *dest, uid id)
247 dest[i] = (unsigned char)id, id >>= 8;
250 uid bt_getid(unsigned char *src)
255 for( i = 0; i < BtId; i++ )
256 id <<= 8, id |= *src++;
261 // place requested latch on requested page_no.
262 // the Shared latch is a read lock over segment 0
263 // the Update latch is a write lock over segment 1
264 // the Xclusive latch is a write lock over segment 0 & 1
265 // the Upgrade latch upgrades Update to Xclusive
267 BTERR bt_lockpage(BtDb *bt, uid page_no, BtLock mode)
269 off64_t off = page_no << bt->page_bits;
270 uint len = sizeof(*bt->parent);
273 int flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
274 struct flock lock[1];
281 case BtLockShared: // lock segment 0 w/read lock
285 case BtLockUpdate: // lock segment 1 w/write lock
286 off += sizeof(*bt->parent);
290 case BtLockXclusive:// lock both segments w/write lock
291 len += sizeof(*bt->parent);
295 case BtLockUpgrade: // lock segment 0 w/write lock
301 memset (lock, 0, sizeof(lock));
304 lock->l_type = type ? F_WRLCK : F_RDLCK;
308 if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
309 return bt->err = BTERR_lock;
313 memset (ovl, 0, sizeof(ovl));
314 ovl->OffsetHigh = (uint)(off >> 32);
315 ovl->Offset = (uint)off;
317 // use large offsets to
318 // simulate advisory locking
320 ovl->OffsetHigh |= 0x80000000;
323 flags |= LOCKFILE_EXCLUSIVE_LOCK;
325 if( LockFileEx (bt->idx, flags, 0, len, 0L, ovl) )
328 return bt->err = BTERR_lock;
332 // remove lock on requested page_no.
334 BTERR bt_unlockpage(BtDb *bt, uid page_no, BtLock mode)
336 off64_t off = page_no << bt->page_bits;
337 uint len = sizeof(*bt->parent);
339 struct flock lock[1];
345 case BtLockShared: // unlock segment 0
348 case BtLockUpdate: // unlock segment 1
349 off += sizeof(*bt->parent);
352 case BtLockXclusive:// unlock both segments
353 len += sizeof(*bt->parent);
356 case BtLockUpgrade: // unlock segment 0
361 memset (lock, 0, sizeof(lock));
364 lock->l_type = F_UNLCK;
368 if( fcntl (bt->idx, F_SETLK, lock) < 0 )
369 return bt->err = BTERR_lock;
371 memset (ovl, 0, sizeof(ovl));
372 ovl->OffsetHigh = (uint)(off >> 32);
373 ovl->Offset = (uint)off;
375 // use large offsets to
376 // simulate advisory locking
378 ovl->OffsetHigh |= 0x80000000;
380 if( !UnlockFileEx (bt->idx, 0, len, 0, ovl) )
381 return GetLastError(), bt->err = BTERR_lock;
387 // close and release memory
389 void bt_close (BtDb *bt)
393 // release mapped pages
395 if( hash = bt->lrufirst )
396 do munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
397 while(hash = hash->lrunext);
404 if( hash = bt->lrufirst )
407 FlushViewOfFile(hash->page, 0);
408 UnmapViewOfFile(hash->page);
409 CloseHandle(hash->hmap);
410 } while(hash = hash->lrunext);
413 VirtualFree (bt->mem, 0, MEM_RELEASE);
414 FlushFileBuffers(bt->idx);
415 CloseHandle(bt->idx);
420 // open/create new btree
421 // call with file_name, BT_openmode, bits in page size (e.g. 16),
422 // size of mapped page cache (e.g. 8192) or zero for no mapping.
424 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax)
426 BtLock lockmode = BtLockXclusive;
427 uint lvl, attr, cacheblk;
435 SYSTEM_INFO sysinfo[1];
439 bt = malloc (sizeof(BtDb) + nodemax * sizeof(BtHash));
440 memset (bt, 0, sizeof(BtDb));
442 switch (mode & 0x7fff)
446 bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
451 bt->idx = open ((char*)name, O_RDONLY);
452 lockmode = BtLockShared;
456 return free(bt), NULL;
459 cacheblk = 4096; // page size for unix
464 bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb) + nodemax * sizeof(BtHash));
465 attr = FILE_ATTRIBUTE_NORMAL;
466 switch (mode & 0x7fff)
469 attr |= FILE_FLAG_WRITE_THROUGH | FILE_FLAG_NO_BUFFERING;
472 bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
477 bt->idx = CreateFile(name, GENERIC_READ, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_EXISTING, attr, NULL);
478 lockmode = BtLockShared;
481 if( bt->idx == INVALID_HANDLE_VALUE )
482 return GlobalFree(bt), NULL;
484 // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
485 GetSystemInfo(sysinfo);
488 cacheblk = sysinfo->dwAllocationGranularity;
493 // determine sanity of page size
495 if( bits > BT_maxbits )
497 else if( bits < BT_minbits )
500 if ( bt_lockpage(bt, ALLOC_page, lockmode) )
501 return bt_close (bt), NULL;
506 // read minimum page size to get root info
508 if( size = lseek (bt->idx, 0L, 2) ) {
509 alloc = malloc (BT_minpage);
510 pread(bt->idx, alloc, BT_minpage, 0);
513 } else if( mode == BT_ro )
514 return bt_close (bt), NULL;
516 size = GetFileSize(bt->idx, amt);
519 alloc = VirtualAlloc(NULL, BT_minpage, MEM_COMMIT, PAGE_READWRITE);
520 if( !ReadFile(bt->idx, (char *)alloc, BT_minpage, amt, NULL) )
521 return bt_close (bt), NULL;
523 VirtualFree (alloc, 0, MEM_RELEASE);
524 } else if( mode == BT_ro )
525 return bt_close (bt), NULL;
528 bt->page_size = 1 << bits;
529 bt->page_bits = bits;
531 bt->nodemax = nodemax;
534 // setup cache mapping
537 if( cacheblk < bt->page_size )
538 cacheblk = bt->page_size;
540 bt->hashmask = (cacheblk >> bits) - 1;
545 bt->mem = malloc (8 *bt->page_size);
547 bt->mem = VirtualAlloc(NULL, 8 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
549 bt->frame = (BtPage)bt->mem;
550 bt->cursor = (BtPage)(bt->mem + bt->page_size);
551 bt->alloc = (BtPage)(bt->mem + 2 * bt->page_size);
552 bt->parent = (BtPage)(bt->mem + 3 * bt->page_size);
553 bt->child = (BtPage)(bt->mem + 4 * bt->page_size);
554 bt->temp = (BtPage)(bt->mem + 5 * bt->page_size);
555 bt->sibling = (BtPage)(bt->mem + 6 * bt->page_size);
556 bt->sibling2 = (BtPage)(bt->mem + 7 * bt->page_size);
559 if ( bt_unlockpage(bt, ALLOC_page, lockmode) )
560 return bt_close (bt), NULL;
565 // initialize an empty b-tree with alloc page & root page
567 memset (bt->alloc, 0, bt->page_size);
568 bt_putid(bt->alloc->right, ROOT_page + 1);
569 bt->alloc->bits = bt->page_bits;
572 if( write (bt->idx, bt->alloc, bt->page_size) < bt->page_size )
573 return bt_close (bt), NULL;
575 if( !WriteFile (bt->idx, (char *)bt->alloc, bt->page_size, amt, NULL) )
576 return bt_close (bt), NULL;
578 if( *amt < bt->page_size )
579 return bt_close (bt), NULL;
584 memset (bt->frame, 0, bt->page_size);
585 bt->frame->bits = bt->page_bits;
587 bt->frame->min = bt->page_size;
589 if( write (bt->idx, bt->frame, bt->page_size) < bt->page_size )
590 return bt_close (bt), NULL;
592 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
593 return bt_close (bt), NULL;
595 if( *amt < bt->page_size )
596 return bt_close (bt), NULL;
599 // create initial empty page area by writing last page of first
600 // cache area (other pages are zeroed by O/S)
602 if( bt->mapped_io && bt->hashmask > 2 ) {
603 memset(bt->frame, 0, bt->page_size);
606 pwrite(bt->idx, bt->frame, bt->page_size, bt->hashmask << bt->page_bits);
608 SetFilePointer (bt->idx, bt->hashmask << bt->page_bits, NULL, FILE_BEGIN);
609 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
610 return bt_close (bt), NULL;
611 if( *amt < bt->page_size )
612 return bt_close (bt), NULL;
616 if( bt_unlockpage(bt, ALLOC_page, lockmode) )
617 return bt_close (bt), NULL;
622 // reset parent/child page pointers
624 void bt_resetpages (BtDb *bt)
629 bt->frame = (BtPage)bt->mem;
630 bt->cursor = (BtPage)(bt->mem + bt->page_size);
631 bt->alloc = (BtPage)(bt->mem + 2 * bt->page_size);
632 bt->parent = (BtPage)(bt->mem + 3 * bt->page_size);
633 bt->child = (BtPage)(bt->mem + 4 * bt->page_size);
634 bt->temp = (BtPage)(bt->mem + 5 * bt->page_size);
635 bt->sibling = (BtPage)(bt->mem + 6 * bt->page_size);
636 bt->sibling2 = (BtPage)(bt->mem + 7 * bt->page_size);
639 // return pointer to high key
640 // or NULL if infinite value
642 BtKey bt_highkey (BtDb *bt, BtPage page)
645 if( bt_getid (page->right) )
646 return keyptr(page, page->cnt);
650 if( bt_getid (page->right) )
651 return ((BtKey)((unsigned char*)(page) + bt->page_size - page->fence));
656 // return pointer to slot key in index page
657 // or NULL if infinite value
659 BtKey bt_slotkey (BtPage page, uint slot)
661 if( slot < page->cnt || bt_getid (page->right) )
662 return keyptr(page, slot);
667 // compare two keys, returning > 0, = 0, or < 0
668 // as the comparison value
670 int keycmp (BtKey key1, unsigned char *key2, uint len2)
672 uint len1 = key1->len;
675 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
686 // Update current page of btree by writing file contents
687 // or flushing mapped area to disk.
689 BTERR bt_update (BtDb *bt, BtPage page, uid page_no)
691 off64_t off = page_no << bt->page_bits;
694 if ( !bt->mapped_io )
695 if ( pwrite(bt->idx, page, bt->page_size, off) != bt->page_size )
696 return bt->err = BTERR_wrt;
699 if ( !bt->mapped_io )
701 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
702 if( !WriteFile (bt->idx, (char *)page, bt->page_size, amt, NULL) )
703 return GetLastError(), bt->err = BTERR_wrt;
705 if( *amt < bt->page_size )
706 return GetLastError(), bt->err = BTERR_wrt;
708 else if ( bt->mode == BT_fl ) {
709 FlushViewOfFile(page, bt->page_size);
710 FlushFileBuffers(bt->idx);
716 // find page in cache
718 BtHash *bt_findhash(BtDb *bt, uid page_no)
723 // compute cache block first page and hash idx
725 page_no &= ~bt->hashmask;
726 idx = (uint)(page_no * BT_hashprime % BT_hashsize);
729 hash = bt->nodes + bt->cache[idx];
733 do if( hash->page_no == page_no )
735 while(hash = hash->hashnext );
740 // add page cache entry to hash index
742 void bt_linkhash(BtDb *bt, BtHash *node, uid page_no)
744 uint idx = (uint)((page_no & ~bt->hashmask) * BT_hashprime % BT_hashsize);
747 if( bt->cache[idx] ) {
748 node->hashnext = hash = bt->nodes + bt->cache[idx];
749 hash->hashprev = node;
752 node->hashprev = NULL;
753 bt->cache[idx] = (ushort)(node - bt->nodes);
756 // remove cache entry from hash table
758 void bt_unlinkhash(BtDb *bt, BtHash *node)
760 uint idx = (uint)((node->page_no & ~bt->hashmask) * BT_hashprime % BT_hashsize);
764 if( hash = node->hashprev )
765 hash->hashnext = node->hashnext;
766 else if( hash = node->hashnext )
767 bt->cache[idx] = (ushort)(hash - bt->nodes);
771 if( hash = node->hashnext )
772 hash->hashprev = node->hashprev;
775 // add cache page to lru chain and map pages
777 BtPage bt_linklru(BtDb *bt, BtHash *hash, uid page_no)
780 off64_t off = (page_no & ~bt->hashmask) << bt->page_bits;
781 off64_t limit = off + ((bt->hashmask+1) << bt->page_bits);
784 memset(hash, 0, sizeof(BtHash));
785 hash->page_no = (page_no & ~bt->hashmask);
786 bt_linkhash(bt, hash, page_no);
788 if( node = hash->lrunext = bt->lrufirst )
789 node->lruprev = hash;
796 flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
797 hash->page = (BtPage)mmap (0, (bt->hashmask+1) << bt->page_bits, flag, MAP_SHARED, bt->idx, off);
798 if( (long long int)hash->page == -1LL )
799 return bt->err = BTERR_map, (BtPage)NULL;
802 flag = ( bt->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
803 hash->hmap = CreateFileMapping(bt->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
805 return bt->err = BTERR_map, NULL;
807 flag = ( bt->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
808 hash->page = MapViewOfFile(hash->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->hashmask+1) << bt->page_bits);
810 return bt->err = BTERR_map, NULL;
813 return (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
816 // find or place requested page in page-cache
817 // return memory address where page is located.
819 BtPage bt_hashpage(BtDb *bt, uid page_no)
821 BtHash *hash, *node, *next;
824 // find page in cache and move to top of lru list
826 if( hash = bt_findhash(bt, page_no) ) {
827 page = (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
828 // swap node in lru list
829 if( node = hash->lruprev ) {
830 if( next = node->lrunext = hash->lrunext )
831 next->lruprev = node;
835 if( next = hash->lrunext = bt->lrufirst )
836 next->lruprev = hash;
838 return bt->err = BTERR_hash, (BtPage)NULL;
840 hash->lruprev = NULL;
846 // map pages and add to cache entry
848 if( bt->nodecnt < bt->nodemax ) {
849 hash = bt->nodes + ++bt->nodecnt;
850 return bt_linklru(bt, hash, page_no);
853 // hash table is already full, replace last lru entry from the cache
855 if( hash = bt->lrulast ) {
856 // unlink from lru list
857 if( node = bt->lrulast = hash->lruprev )
858 node->lrunext = NULL;
860 return bt->err = BTERR_hash, (BtPage)NULL;
863 munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
865 FlushViewOfFile(hash->page, 0);
866 UnmapViewOfFile(hash->page);
867 CloseHandle(hash->hmap);
869 // unlink from hash table
871 bt_unlinkhash(bt, hash);
873 // map and add to cache
875 return bt_linklru(bt, hash, page_no);
878 return bt->err = BTERR_hash, (BtPage)NULL;
881 // map a btree page onto current page
883 BTERR bt_mappage (BtDb *bt, BtPage *page, uid page_no)
885 off64_t off = page_no << bt->page_bits;
890 if( bt->mapped_io ) {
892 *page = bt_hashpage(bt, page_no);
896 if ( pread(bt->idx, *page, bt->page_size, off) < bt->page_size )
897 return bt->err = BTERR_map;
899 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
901 if( !ReadFile(bt->idx, *page, bt->page_size, amt, NULL) )
902 return bt->err = BTERR_map;
904 if( *amt < bt->page_size )
905 return bt->err = BTERR_map;
910 // deallocate a deleted page
911 // place on free chain out of allocator page
912 // page must already be BtLockXclusive and mapped
914 BTERR bt_freepage (BtDb *bt, BtPage page, uid page_no)
916 // lock allocation page
918 if ( bt_lockpage(bt, ALLOC_page, BtLockUpdate) )
921 if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
924 // store chain in second right
925 bt_putid(page->right, bt_getid(bt->alloc[1].right));
926 bt_putid(bt->alloc[1].right, page_no);
928 if( bt_update(bt, bt->alloc, ALLOC_page) )
930 if( bt_update(bt, page, page_no) )
935 if( bt_unlockpage(bt, ALLOC_page, BtLockUpdate) )
941 // allocate a new page and write page into it
943 uid bt_newpage(BtDb *bt, BtPage page)
951 if ( bt_lockpage(bt, ALLOC_page, BtLockUpdate) )
954 if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
957 // use empty chain first
958 // else allocate empty page
960 if( new_page = bt_getid(bt->alloc[1].right) ) {
961 if( bt_mappage (bt, &bt->temp, new_page) )
962 return 0; // don't unlock on error
963 bt_putid(bt->alloc[1].right, bt_getid(bt->temp->right));
966 new_page = bt_getid(bt->alloc->right);
967 bt_putid(bt->alloc->right, new_page+1);
971 if( bt_update(bt, bt->alloc, ALLOC_page) )
972 return 0; // don't unlock on error
976 if ( bt_unlockpage(bt, ALLOC_page, BtLockUpdate) )
979 if( !bt->mapped_io ) {
980 if( bt_update(bt, page, new_page) )
981 return 0; //don't unlock on error
987 if ( pwrite(bt->idx, page, bt->page_size, new_page << bt->page_bits) < bt->page_size )
988 return bt->err = BTERR_wrt, 0;
990 // if writing first page of hash block, zero last page in the block
992 if ( !reuse && bt->hashmask > 0 && (new_page & bt->hashmask) == 0 )
994 // use temp buffer to write zeros
995 memset(bt->temp, 0, bt->page_size);
996 if ( pwrite(bt->idx,bt->temp, bt->page_size, (new_page | bt->hashmask) << bt->page_bits) < bt->page_size )
997 return bt->err = BTERR_wrt, 0;
1000 // bring new page into page-cache and copy page.
1001 // this will extend the file into the new pages.
1003 if( !(pmap = (char*)bt_hashpage(bt, new_page & ~bt->hashmask)) )
1006 memcpy(pmap+((new_page & bt->hashmask) << bt->page_bits), page, bt->page_size);
1012 // find slot in given page for given key
1014 int bt_findslot (BtPage page, unsigned char *key, uint len)
1016 uint diff, higher = page->cnt, low = 1, slot;
1019 // make last key an infinite fence value
1021 if( !page->lvl || bt_getid (page->right) )
1026 // low is the next candidate, higher is already
1027 // tested as .ge. the given key, loop ends when they meet
1030 while( diff = higher - low ) {
1031 slot = low + ( diff >> 1 );
1032 if( keycmp (keyptr(page, slot), key, len) < 0 )
1035 higher = slot, good++;
1038 // return zero if key is beyond highkey value
1041 return good ? higher : 0;
1044 // split full parent node
1046 BTERR bt_splitparent (BtDb *bt, unsigned char *key, uint len)
1048 uint cnt = 0, idx = 0, max, nxt = bt->page_size;
1049 uid parentpage = bt->parentpage, right;
1050 BtPage page = bt->parent;
1054 // upgrade parent latch to Xclusive
1056 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
1059 // split higher half of keys to bt->frame
1061 memset (bt->frame, 0, bt->page_size);
1062 max = (int)page->cnt;
1066 // link right sibling node into new right page
1068 right = bt_getid (page->right);
1069 bt_putid(bt->frame->right, right);
1071 // record higher fence key in new right leaf page
1073 if( bt->frame->fence = page->fence ) {
1074 memcpy ((unsigned char *)bt->frame + bt->page_size - bt->frame->fence, (unsigned char *)(page) + bt->page_size - bt->frame->fence, bt->frame->fence);
1078 while( cnt++ < max ) {
1080 // copy key, but not infinite values
1082 if( cnt < max || !page->lvl || right ) {
1083 ptr = keyptr(page, cnt);
1084 nxt -= ptr->len + 1;
1085 memcpy ((unsigned char *)bt->frame + nxt, ptr, ptr->len + 1);
1090 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1091 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1092 slotptr(bt->frame, idx)->off = nxt;
1095 bt->frame->bits = bt->page_bits;
1096 bt->frame->lvl = page->lvl;
1097 bt->frame->min = nxt;
1098 bt->frame->cnt = idx;
1100 // get new free page and write right frame to it.
1102 if( !(new_page = bt_newpage(bt, bt->frame)) )
1105 // update lower keys to continue in old page
1107 memcpy (bt->frame, page, bt->page_size);
1108 memset (page+1, 0, bt->page_size - sizeof(*page));
1109 nxt = bt->page_size;
1114 // record fence key in left leaf page
1117 ptr = keyptr(bt->frame, max);
1118 nxt -= ptr->len + 1;
1119 memcpy ((unsigned char *)page + nxt, ptr, ptr->len + 1);
1120 page->fence = ptr->len + 1;
1123 // assemble page of smaller keys
1124 // no infinite value to deal with
1126 while( cnt++ < max ) {
1127 ptr = keyptr(bt->frame, cnt);
1128 nxt -= ptr->len + 1;
1129 memcpy ((unsigned char *)page + nxt, ptr, ptr->len + 1);
1130 memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1131 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1132 slotptr(page, idx)->off = nxt;
1135 bt_putid(page->right, new_page);
1141 if( bt_update(bt, page, parentpage) )
1144 // decide to move to new right
1145 // node or stay on left node
1147 ptr = bt_highkey (bt, page);
1149 if( keycmp (ptr, key, len) >= 0 )
1150 return bt_unlockpage (bt, parentpage, BtLockUpgrade);
1152 bt->parentpage = new_page;
1154 if( bt_mappage (bt, &bt->parent, new_page) )
1156 if( bt_lockpage (bt, new_page, BtLockUpdate) )
1158 if( bt_unlockpage (bt, parentpage, BtLockUpgrade) )
1161 return bt_unlockpage (bt, parentpage, BtLockUpdate);
1164 // add unlinked node key into parent
1165 // childpage is existing record
1166 // siblingpage is right record
1168 BTERR bt_parentlink (BtDb *bt, BtPage left, uid leftpage, BtKey rightkey, uid rightpage)
1170 BtKey leftkey = bt_highkey (bt, left);
1171 BtPage page = bt->parent;
1174 // upgrade parent latch to exclusive
1176 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
1179 // find the existing right high key in the parent
1180 // and fix the downlink to point to right page
1183 if( !(slot = bt_findslot (page, rightkey->key, rightkey->len)) )
1184 return bt->err = BTERR_struct;
1188 bt_putid(slotptr(page,slot)->id, rightpage);
1190 // calculate next available slot and copy left key onto page
1192 page->min -= leftkey->len + 1; // reset lowest used offset
1193 memcpy ((unsigned char *)page + page->min, leftkey, leftkey->len + 1);
1195 // now insert key into array before slot
1200 *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
1202 bt_putid(slotptr(page,slot)->id, leftpage);
1203 slotptr(page, slot)->off = page->min;
1205 if ( bt_update(bt, page, bt->parentpage) )
1208 // downgrade parent page lock to BtLockUpdate
1210 if( bt_unlockpage(bt, bt->parentpage, BtLockUpgrade) )
1216 // remove slot from parent
1218 void bt_removeslot(BtDb *bt, uint slot)
1220 uint nxt = bt->page_size, amt;
1221 BtPage page = bt->parent;
1222 uint cnt = 0, idx = 0;
1223 uint max = page->cnt;
1227 memcpy (bt->frame, page, bt->page_size);
1229 // skip page info and set rest of page to zero
1230 memset (page+1, 0, bt->page_size - sizeof(*page));
1232 // copy fence key onto new page
1233 if( amt = page->fence ) {
1235 memcpy ((unsigned char *)page + nxt, (unsigned char *)(bt->frame) + nxt, amt);
1238 right = bt_getid (page->right);
1240 while( cnt++ < max ) {
1242 // skip key to delete
1247 // copy key, but not infinite value
1249 if( cnt < max || !page->lvl || right ) {
1250 key = keyptr(bt->frame, cnt);
1251 nxt -= key->len + 1;
1252 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1256 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1257 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1258 slotptr(page, idx)->off = nxt;
1264 // unlink sibling node
1266 BTERR bt_parentunlink (BtDb *bt, BtPage child, uid childpage)
1268 BtKey parentkey = bt_highkey (bt, child);
1271 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
1274 // delete child's slot from parent
1276 if( slot = bt_findslot (bt->parent, parentkey->key, parentkey->len) )
1277 bt_removeslot (bt, slot);
1279 return bt->err + BTERR_struct;
1281 // sibling now in the slot
1283 bt_putid(slotptr(bt->parent,slot)->id, childpage);
1285 if ( bt_update(bt, bt->parent, bt->parentpage) )
1288 // unlock parent page completely
1290 if( bt_unlockpage (bt, bt->parentpage, BtLockUpgrade) )
1293 return bt_unlockpage(bt, bt->parentpage, BtLockUpdate);
1296 // merge right sibling page into child page
1298 BTERR bt_mergepages (BtDb *bt, BtPage *right, uid rightpage)
1300 BtPage *left = &bt->child;
1304 if( bt_lockpage (bt, bt->childpage, BtLockUpgrade) )
1306 if( bt_lockpage (bt, rightpage, BtLockUpgrade) )
1309 // initialize empty frame
1311 memset (bt->frame, 0, bt->page_size);
1312 *bt->frame = **right;
1314 // copy right fence key
1316 if( amt = (*right)->fence )
1317 memcpy ((unsigned char *)bt->frame + bt->page_size - amt, (unsigned char *)(*right) + bt->page_size - amt, amt);
1319 bt->frame->min = bt->page_size - amt;
1321 // copy lowerkey key/values from left page
1323 for( idx = 1; idx <= (*left)->cnt; idx++ ) {
1324 ptr = keyptr(*left, idx);
1325 bt->frame->min -= ptr->len + 1;
1326 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1327 memcpy (slotptr(bt->frame, idx)->id, slotptr(*left, idx)->id, BtId);
1328 slotptr(bt->frame, idx)->tod = slotptr(*left, idx)->tod;
1329 slotptr(bt->frame, idx)->off = bt->frame->min;
1332 bt->frame->cnt = (*left)->cnt;
1334 // copy higherkey key/values from right page
1336 for( idx = 1; idx <= (*right)->cnt; idx++ ) {
1338 // copy key but not infinite value
1340 if( idx < (*right)->cnt || !(*right)->lvl || right ) {
1341 ptr = keyptr(*right, idx);
1342 bt->frame->min -= ptr->len + 1;
1343 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1347 memcpy (slotptr(bt->frame, bt->frame->cnt + idx)->id, slotptr(*right, idx)->id, BtId);
1348 slotptr(bt->frame, bt->frame->cnt + idx)->tod = slotptr(*right, idx)->tod;
1349 slotptr(bt->frame, bt->frame->cnt + idx)->off = bt->frame->min;
1352 bt->frame->cnt += (*right)->cnt;
1353 memcpy (*left, bt->frame, bt->page_size);
1355 if( bt_update (bt, *left, bt->childpage) )
1357 if( bt_freepage (bt, *right, rightpage) )
1359 if( bt_unlockpage (bt, rightpage, BtLockUpgrade) )
1361 if( bt_unlockpage (bt, rightpage, BtLockUpdate) )
1364 return bt_unlockpage (bt, bt->childpage, BtLockUpgrade);
1367 // redistribute right sibling page with child page
1368 // switch child page to containing page
1370 BTERR bt_redistribute (BtDb *bt, BtPage *right, uid rightpage, unsigned char *key, uint len)
1372 uid siblingpage = bt_getid((*right)->right);
1373 BtPage *left = &bt->child;
1374 uint idx, cnt = 0, amt = 0;
1375 uint leftmax, rightmin;
1379 if( bt_lockpage (bt, bt->childpage, BtLockUpgrade) )
1381 if( bt_lockpage (bt, rightpage, BtLockUpgrade) )
1384 // initialize empty frames to contain redistributed pages
1386 memset (bt->frame, 0, bt->page_size); // new left page
1387 memset (bt->temp, 0, bt->page_size); // new right page
1389 *bt->frame = **left;
1390 *bt->temp = **right;
1392 bt->frame->min = bt->page_size;
1393 bt->temp->min = bt->page_size;
1397 // find new left fence index
1398 // and copy left fence key
1400 if( (*left)->cnt > (*right)->cnt ) {
1402 leftmax = (*left)->cnt / 2;
1403 if( !bt->frame->lvl ) {
1404 ptr = keyptr(*left, leftmax);
1405 bt->frame->fence = ptr->len + 1;
1406 bt->frame->min -= ptr->len + 1;
1407 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1410 leftmax = (*left)->cnt;
1411 rightmin = (*right)->cnt / 2;
1412 if( !bt->frame->lvl ) {
1413 ptr = keyptr(*right, rightmin);
1414 bt->frame->fence = ptr->len + 1;
1415 bt->frame->min -= ptr->len + 1;
1416 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1420 // right fence stays the same, if any
1422 if( amt = (*right)->fence ) {
1423 bt->temp->min -= amt;
1424 memcpy ((unsigned char *)bt->temp + bt->temp->min, (unsigned char *)(*right) + bt->temp->min, amt);
1427 // copy first set of lowerkey key/values from left page
1429 for( idx = 1; idx <= leftmax; idx++ ) {
1431 ptr = keyptr(*left, idx);
1432 bt->frame->min -= ptr->len + 1;
1433 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1434 memcpy (slotptr(bt->frame, idx)->id, slotptr(*left, idx)->id, BtId);
1435 slotptr(bt->frame, idx)->tod = slotptr(*left, idx)->tod;
1436 slotptr(bt->frame, idx)->off = bt->frame->min;
1439 // copy remaining left page key/values from right page, if any
1441 for( idx = 1; idx <= rightmin; idx++ ) {
1443 ptr = keyptr(*right, idx);
1444 bt->frame->min -= ptr->len + 1;
1445 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1446 memcpy (slotptr(bt->frame, bt->frame->cnt)->id, slotptr(*right, idx)->id, BtId);
1447 slotptr(bt->frame, bt->frame->cnt)->tod = slotptr(*right, idx)->tod;
1448 slotptr(bt->frame, bt->frame->cnt)->off = bt->frame->min;
1451 // copy remaining left page key/values into new right page, if any
1453 for( idx = leftmax; idx <= (*left)->cnt; idx++ ) {
1455 ptr = keyptr(*left, idx);
1456 bt->temp->min -= ptr->len + 1;
1457 memcpy ((unsigned char *)bt->temp + bt->temp->min, ptr, ptr->len + 1);
1458 memcpy (slotptr(bt->temp, bt->temp->cnt)->id, slotptr(*left, idx)->id, BtId);
1459 slotptr(bt->temp, bt->temp->cnt)->tod = slotptr(*left, idx)->tod;
1460 slotptr(bt->temp, bt->temp->cnt)->off = bt->temp->min;
1463 // copy rest of higherkey key/values from right page
1465 for( idx = rightmin; idx <= (*right)->cnt; idx++ ) {
1467 // copy key, but not infinite value
1469 if( idx < (*right)->cnt || !(*right)->lvl || siblingpage ) {
1470 ptr = keyptr(*right, idx);
1471 bt->temp->min -= ptr->len + 1;
1472 memcpy ((unsigned char *)bt->temp + bt->temp->min, ptr, ptr->len + 1);
1478 memcpy (slotptr(bt->temp, bt->temp->cnt)->id, slotptr(*right, idx)->id, BtId);
1479 slotptr(bt->temp, bt->temp->cnt)->tod = slotptr(*right, idx)->tod;
1480 slotptr(bt->temp, bt->temp->cnt)->off = bt->temp->min;
1483 memcpy (*left, bt->frame, bt->page_size);
1484 memcpy (*right, bt->temp, bt->page_size);
1486 if( bt_update (bt, *left, bt->childpage) )
1488 if( bt_update (bt, *right, rightpage) )
1491 if( bt_unlockpage (bt, rightpage, BtLockUpgrade) )
1493 if( bt_unlockpage (bt, rightpage, BtLockUpdate) )
1496 ptr = bt_highkey (bt, *left);
1498 // decide which page is the child page
1499 // if leftkey >= our key, go with left
1501 if( keycmp (ptr, key, len) >= 0 ) {
1502 if( bt_unlockpage (bt, bt->childpage, BtLockUpgrade) )
1504 if( bt_unlockpage (bt, rightpage, BtLockUpgrade) )
1506 if( bt_unlockpage (bt, rightpage, BtLockUpdate) )
1509 if( bt_unlockpage (bt, rightpage, BtLockUpgrade) )
1511 if( bt_unlockpage (bt, bt->childpage, BtLockUpgrade) )
1513 if( bt_unlockpage (bt, bt->childpage, BtLockUpdate) )
1518 bt->childpage = rightpage;
1524 // lower the root level by removing the child node
1526 BTERR bt_lowerroot(BtDb *bt)
1528 if( bt_lockpage (bt, ROOT_page, BtLockUpgrade) )
1531 if( bt_lockpage (bt, bt->childpage, BtLockUpgrade) )
1534 memcpy (bt->parent, bt->child, bt->page_size);
1536 if( bt_update(bt, bt->parent, ROOT_page) )
1539 if( bt_freepage(bt, bt->child, bt->childpage) )
1542 if( bt_unlockpage (bt, bt->childpage, BtLockUpgrade) )
1545 if( bt_unlockpage (bt, bt->childpage, BtLockUpdate) )
1548 return bt_unlockpage (bt, ROOT_page, BtLockUpgrade);
1551 // split the root and raise the height of the btree
1552 // return with parent page set to appropriate sibling
1554 BTERR bt_raiseroot(BtDb *bt, uid sibling, unsigned char *key, uint len)
1556 unsigned char lowerkey[256];
1557 uint nxt = bt->page_size;
1558 BtPage root = bt->parent;
1562 // upgrade root page lock to exclusive
1564 if( bt_lockpage (bt, ROOT_page, BtLockUpgrade) )
1567 // Obtain an empty page to use, and copy the current
1568 // root (lower half) contents into it
1570 if( !(new_page = bt_newpage(bt, root)) )
1573 if( bt_lockpage (bt, new_page, BtLockUpdate) )
1576 // save high fence key for left page
1578 ptr = bt_highkey(bt, root);
1579 memcpy (lowerkey, ptr, ptr->len + 1);
1581 // preserve the page info at the bottom
1582 // and set rest to zero to initialize new root page
1584 memset(root+1, 0, bt->page_size - sizeof(*root));
1586 // insert left key in newroot page
1588 nxt -= *lowerkey + 1;
1589 memcpy ((unsigned char *)root + nxt, lowerkey, *lowerkey + 1);
1590 bt_putid(slotptr(root, 1)->id, new_page);
1591 slotptr(root, 1)->off = nxt;
1593 // insert second (infinite) key on newroot page that's never examined
1594 // and increase the root level
1596 bt_putid(slotptr(root, 2)->id, sibling);
1597 bt_putid(root->right, 0);
1599 root->min = nxt; // reset lowest used offset and key count
1604 if( bt_update(bt, root, bt->parentpage) )
1607 if( bt_unlockpage(bt, ROOT_page, BtLockUpgrade) )
1610 if( bt_unlockpage(bt, ROOT_page, BtLockUpdate) )
1613 // decide which root node to continue with
1614 // sibling has upper keys, newpage the lower ones
1616 if( keycmp((BtKey)lowerkey, key, len) < 0 ) {
1617 bt->parentpage = sibling; // go with the upper ones
1619 if( bt_unlockpage (bt, new_page, BtLockUpdate) )
1622 return bt_mappage (bt, &bt->parent, sibling);
1625 bt->parentpage = new_page; // go with the lower ones
1627 if( bt_unlockpage (bt, sibling, BtLockUpdate) )
1630 return bt_mappage (bt, &bt->parent, new_page);
1633 // handle underflowing child node
1635 BTERR bt_repairchild (BtDb *bt, uint parentslot, unsigned char *key, uint len)
1637 BtKey parentkey = bt_slotkey(bt->parent, parentslot);
1638 BtKey fencekey = bt_highkey (bt, bt->child);
1639 BtKey highkey = bt_highkey(bt, bt->parent);
1640 BtKey siblingkey, siblingkey2;
1641 uid siblingpage, siblingpage2;
1646 // high key is never NULL
1647 // fence key is null on right end
1650 if( !highkey || keycmp (fencekey, highkey->key, highkey->len) < 0 ) {
1652 // childpage is not rightmost child of parent page
1654 siblingpage = bt_getid(bt->child->right);
1656 if( bt_lockpage (bt, siblingpage, BtLockUpdate) )
1659 if( bt_mappage (bt, &bt->sibling, siblingpage) )
1662 if( !parentkey || keycmp (fencekey, parentkey->key, parentkey->len) < 0 ) {
1664 // sibling is not linked in parent, so we can merge it
1666 if( bt_unlockpage (bt, bt->parentpage, BtLockUpdate) )
1669 // calculate size of merged page by adding single child key to sibling
1671 fencekey = keyptr(bt->child, 1);
1673 if( bt->sibling->min < (bt->sibling->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->sibling) + fencekey->len + 1)
1674 return bt_redistribute (bt, &bt->sibling, siblingpage, key, len);
1676 return bt_mergepages (bt, &bt->sibling, siblingpage);
1679 // sibling has a key in the parent node
1680 // find its slot in parent
1682 if( siblingkey = bt_highkey (bt, bt->sibling) )
1683 slot = bt_findslot (bt->parent, siblingkey->key, siblingkey->len);
1685 slot = bt->parent->cnt;
1687 parentkey = bt_slotkey (bt->parent, slot);
1688 siblingpage2 = bt_getid(bt->sibling->right);
1690 if( siblingkey && parentkey && keycmp (siblingkey, parentkey->key, parentkey->len) < 0 ) {
1692 // sibling2 is not linked in P, its key is parentkey
1693 // can parent support its insertion?
1694 // if not, split parent node first.
1696 if( bt->parent->min < (bt->parent->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->parent) + parentkey->len + 1) {
1697 if( bt_splitparent (bt, key, len) )
1700 // are we in the correct half of new parent nodes?
1701 // if not, restart the function.
1703 if( slot = bt_findslot (bt->parent, siblingkey->key, siblingkey->len) )
1704 parentkey = keyptr(bt->parent, slot);
1706 return BTERR_restart;
1709 // link right of sibling into parent
1711 if( bt_parentlink (bt, bt->sibling, siblingpage, parentkey, siblingpage2) )
1714 // unlink sibling from parent
1716 if( bt_parentunlink (bt, bt->child, bt->childpage) )
1719 fencekey = keyptr(bt->child, 1);
1721 if( bt->sibling->min < (bt->sibling->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->sibling) + fencekey->len + 1)
1722 return bt_redistribute (bt, &bt->sibling, siblingpage, key, len);
1724 return bt_mergepages (bt, &bt->sibling, siblingpage);
1727 // unlink sibling from parent and merge
1729 if( bt_parentunlink (bt, bt->child, bt->childpage) )
1732 fencekey = keyptr(bt->child, 1);
1734 if( bt->sibling->min < (bt->sibling->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->sibling) + fencekey->len + 1)
1735 return bt_redistribute (bt, &bt->sibling, siblingpage, key, len);
1737 return bt_mergepages (bt, &bt->sibling, siblingpage);
1741 // child is rightmost key in the parent,
1742 // work with nodes to left.
1744 siblingpage = bt_getid(slotptr(bt->parent, parentslot - 1)->id);
1745 siblingkey = keyptr(bt->parent, parentslot - 1);
1747 if( bt_unlockpage (bt, bt->childpage, BtLockUpdate) )
1749 if( bt_lockpage (bt, siblingpage, BtLockUpdate) )
1751 if( bt_mappage (bt, &bt->sibling, siblingpage) )
1754 siblingpage2 = bt_getid(bt->sibling->right);
1755 siblingkey2 = bt_highkey (bt, bt->sibling);
1757 if( bt_lockpage (bt, siblingpage2, BtLockUpdate) )
1759 if( bt_mappage (bt, &bt->sibling2, siblingpage2) )
1762 if( keycmp (siblingkey, siblingkey2->key, siblingkey2->len) == 0 ) {
1764 // left sibling right (mapped into sibling2) is our child node
1766 swap = bt->sibling2;
1767 bt->sibling2 = bt->child;
1770 // does child still need to merge/redistribute?
1772 if( bt->child->cnt > 1 ) {
1773 if( bt_unlockpage (bt, bt->parentpage, BtLockUpdate) )
1776 return bt_unlockpage (bt, siblingpage, BtLockUpdate);
1780 bt->sibling = bt->child;
1783 bt->childpage = siblingpage;
1784 siblingpage = siblingpage2;
1786 // unlink sibling node from parent and merge with child
1788 if( bt_parentunlink (bt, bt->child, bt->childpage) )
1791 fencekey = keyptr(bt->sibling, 1);
1793 if( bt->child->min < (bt->child->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->sibling) + fencekey->len + 1)
1794 return bt_redistribute (bt, &bt->sibling, siblingpage, key, len);
1796 return bt_mergepages (bt, &bt->sibling, siblingpage);
1799 // currently unlinked sibling2 merges with child
1801 fencekey = bt_highkey (bt, bt->sibling2);
1803 if( bt->parent->min < (bt->parent->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->parent) + fencekey->len + 1)
1804 if( bt_splitparent (bt, key, len) )
1807 if( bt_parentlink (bt, bt->sibling, siblingpage, fencekey, siblingpage2) )
1810 if( bt_unlockpage (bt, siblingpage, BtLockUpdate) )
1812 if( bt_lockpage (bt, bt->childpage, BtLockUpdate) )
1814 if( bt_mappage (bt, &bt->child, bt->childpage) )
1817 if( bt->child->cnt > 1 ) {
1818 if( bt_unlockpage (bt, bt->parentpage, BtLockUpdate) )
1820 return bt_unlockpage (bt, siblingpage2, BtLockUpdate);
1823 // unlink child from parent node leaving immediate
1824 // left sibling2 to accept merge/redistribution
1826 if( bt_parentunlink (bt, bt->sibling2, siblingpage2) )
1829 swap = bt->sibling2;
1830 bt->sibling2 = bt->child;
1832 bt->childpage = siblingpage2;
1834 fencekey = keyptr(bt->sibling2, 1);
1836 if( bt->child->min < (bt->child->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->child) + fencekey->len + 1)
1837 return bt_redistribute (bt, &bt->sibling2, siblingpage2, key, len);
1839 return bt_mergepages (bt, &bt->sibling2, siblingpage2);
1842 // find and load leaf page for given key
1843 // leave page BtLockShared, return with key's slot
1845 int bt_loadpageread (BtDb *bt, unsigned char *key, uint len)
1847 uid page_no = ROOT_page, prevpage = 0;
1850 // start at root of btree and drill down
1853 bt->parentpage = page_no;
1855 if( bt_lockpage(bt, bt->parentpage, BtLockShared) )
1859 if( bt_unlockpage(bt, prevpage, BtLockShared) )
1862 // map/obtain page contents
1864 if( bt_mappage (bt, &bt->parent, page_no) )
1867 // find key on page at this level
1868 // return if leaf page
1870 if( (slot = bt_findslot (bt->parent, key, len)) ) {
1871 if( !bt->parent->lvl )
1874 // continue down to next level
1876 page_no = bt_getid(slotptr(bt->parent, slot)->id);
1879 // or slide right into next page
1882 page_no = bt_getid(bt->parent->right);
1884 prevpage = bt->parentpage;
1887 // return EOF on end of right chain
1889 if( bt_unlockpage(bt, bt->parentpage, BtLockShared) )
1892 return 0; // return EOF
1895 // find and load leaf page for given key
1896 // return w/slot # on leaf page
1897 // leave page BtLockUpdate
1899 int bt_loadpageupdate (BtDb *bt, unsigned char *key, uint len)
1901 uid parentpage = 0, nextpage;
1902 BtKey fencekey, parentkey;
1906 // start at root of btree and drill down
1908 if( bt_lockpage(bt, ROOT_page, BtLockUpdate) )
1911 // map/obtain page contents
1913 if( bt_mappage (bt, &bt->parent, ROOT_page) )
1916 bt->parentpage = ROOT_page;
1919 // if root page, check for tree level growth
1920 // by existence of right pointer
1922 if( bt->parentpage == ROOT_page )
1923 if( nextpage = bt_getid(bt->parent->right) ) {
1924 if( bt_lockpage (bt, nextpage, BtLockUpdate) )
1927 if( bt_raiseroot (bt, nextpage, key, len) )
1931 // find key on page at this level
1932 // return if leaf page
1934 slot = bt_findslot (bt->parent, key, len);
1936 if( !bt->parent->lvl )
1939 // lock & map the child
1941 bt->childpage = bt_getid(slotptr(bt->parent, slot)->id);
1943 if( bt_lockpage(bt, bt->childpage, BtLockUpdate) )
1946 if( bt_mappage (bt, &bt->child, bt->childpage) )
1949 // check child for underflow
1951 if( bt->parentpage == ROOT_page )
1952 if( bt->parent->cnt == 1 )
1953 if( !bt_getid(bt->child->right) ) {
1954 if( bt_lowerroot (bt) )
1956 nextpage = ROOT_page;
1960 if( bt->child->cnt == 1 ) {
1961 while( bt_repairchild (bt, slot, key, len) == BTERR_restart );
1966 bt->child = bt->parent;
1968 nextpage = bt->childpage;
1972 fencekey = bt_highkey (bt, bt->child);
1973 parentkey = bt_slotkey(bt->parent, slot);
1975 // if right sibling is not linked,
1976 // fix links in parent node
1979 if( !parentkey || keycmp (fencekey, parentkey->key, parentkey->len) < 0 ) {
1981 if( bt->parent->min < (bt->parent->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->parent) + fencekey->len + 1)
1982 if( bt_splitparent (bt, key, len) ) {
1985 slot = bt_findslot (bt->parent, key, len);
1986 parentkey = bt_slotkey(bt->parent, slot);
1989 // add key to parent for child page
1990 // and fix downlink for childpage
1992 nextpage = bt_getid(bt->child->right);
1994 if( bt_parentlink (bt, bt->child, bt->childpage, parentkey, nextpage) )
1998 // unlock the parent page
2000 if( bt_unlockpage (bt, bt->parentpage, BtLockUpdate) )
2003 // is our key on this child page?
2004 // is fencekey infinite, or .ge. our key
2006 if( !fencekey || keycmp (fencekey, key, len) >= 0 ) {
2008 bt->child = bt->parent;
2010 nextpage = bt->childpage;
2014 // otherwise slide right into next page
2016 nextpage = bt_getid(bt->child->right);
2018 if( bt_lockpage (bt, nextpage, BtLockUpdate) )
2021 if( bt_unlockpage (bt, bt->childpage, BtLockUpdate) )
2024 if( bt_mappage (bt, &bt->parent, nextpage) )
2027 } while( bt->parentpage = nextpage );
2029 // return error on end of right chain
2031 bt->err = BTERR_struct;
2032 return 0; // return error
2035 // find and delete key on leaf page
2037 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
2045 if( slot = bt_loadpageupdate (bt, key, len) )
2046 ptr = keyptr(bt->parent, slot);
2050 // if key is found delete it, otherwise ignore request
2052 if( slot && !keycmp (ptr, key, len) ) {
2053 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
2056 bt_removeslot (bt, slot);
2058 if( bt_update(bt, bt->parent, bt->parentpage) )
2061 if( bt_unlockpage (bt, bt->parentpage, BtLockUpgrade) )
2065 return bt_unlockpage (bt, bt->parentpage, BtLockUpdate);
2068 // find key in leaf page and return row-id
2069 // or zero if key is not found.
2071 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
2079 if( slot = bt_loadpageread (bt, key, len) )
2080 ptr = keyptr(bt->parent, slot);
2084 // if key exists, return row-id
2085 // otherwise return 0
2087 if( ptr->len == len && !memcmp (ptr->key, key, len) )
2088 id = bt_getid(slotptr(bt->parent,slot)->id);
2092 if ( bt_unlockpage(bt, bt->parentpage, BtLockShared) )
2098 // Insert new key into the btree leaf page
2100 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod)
2105 if( slot = bt_loadpageupdate (bt, key, len) )
2106 ptr = keyptr(bt->parent, slot);
2110 if( bt->parent->lvl )
2112 if( bt->parent->lvl )
2116 // if key already exists, update id and return
2119 if( !keycmp (ptr, key, len) ) {
2120 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
2122 slotptr(bt->parent, slot)->tod = tod;
2123 bt_putid(slotptr(bt->parent,slot)->id, id);
2124 if ( bt_update(bt, bt->parent, bt->parentpage) )
2126 if( bt_unlockpage (bt, bt->parentpage, BtLockUpgrade) )
2128 return bt_unlockpage(bt, bt->parentpage, BtLockUpdate);
2131 // check if leaf page has enough space
2133 if( bt->parent->min < (bt->parent->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->parent) + len + 1) {
2134 if( bt_splitparent (bt, key, len) )
2137 slot = bt_findslot (bt->parent, key, len);
2140 // calculate next available slot and copy key into page
2142 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
2145 bt->parent->min -= len + 1; // reset lowest used offset
2146 ((unsigned char *)bt->parent)[bt->parent->min] = len;
2147 memcpy ((unsigned char *)bt->parent + bt->parent->min +1, key, len );
2149 // now insert key into array before slot
2151 idx = ++bt->parent->cnt;
2155 *slotptr(bt->parent, idx) = *slotptr(bt->parent, idx -1), idx--;
2157 bt_putid(slotptr(bt->parent,idx)->id, id);
2158 slotptr(bt->parent, idx)->off = bt->parent->min;
2159 slotptr(bt->parent, idx)->tod = tod;
2161 if ( bt_update(bt, bt->parent, bt->parentpage) )
2164 if( bt_unlockpage (bt, bt->parentpage, BtLockUpgrade) )
2167 return bt_unlockpage(bt, bt->parentpage, BtLockUpdate);
2170 // cache page of keys into cursor and return starting slot for given key
2172 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2176 // cache page for retrieval
2177 if( slot = bt_loadpageread (bt, key, len) )
2178 memcpy (bt->cursor, bt->parent, bt->page_size);
2179 bt->cursorpage = bt->parentpage;
2180 if ( bt_unlockpage(bt, bt->parentpage, BtLockShared) )
2186 // return next slot for cursor page
2187 // or slide cursor right into next page
2189 uint bt_nextkey (BtDb *bt, uint slot)
2194 if( slot++ < bt->cursor->cnt )
2197 right = bt_getid(bt->cursor->right);
2202 bt->cursorpage = right;
2204 if( bt_lockpage(bt, right,BtLockShared) )
2207 if( bt_mappage (bt, &bt->parent, right) )
2210 memcpy (bt->cursor, bt->parent, bt->page_size);
2211 if ( bt_unlockpage(bt, right, BtLockShared) )
2220 BtKey bt_key(BtDb *bt, uint slot)
2222 return keyptr(bt->cursor, slot);
2225 uid bt_uid(BtDb *bt, uint slot)
2227 return bt_getid(slotptr(bt->cursor,slot)->id);
2230 uint bt_tod(BtDb *bt, uint slot)
2232 return slotptr(bt->cursor,slot)->tod;
2237 // standalone program to index file of keys
2238 // then list them onto std-out
2240 int main (int argc, char **argv)
2242 uint slot, found = 0, line = 0, off = 0;
2243 int ch, cnt = 0, bits = 12;
2244 unsigned char key[256];
2245 clock_t done, start;
2255 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find [page_bits mapped_pool_pages start_line_number]", argv[0]);
2263 bits = atoi(argv[4]);
2266 map = atoi(argv[5]);
2269 fprintf (stderr, "Warning: mapped_pool > 65536 pages\n");
2272 off = atoi(argv[6]);
2274 bt = bt_open ((argv[1]), BT_rw, bits, map);
2277 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2281 switch(argv[3][0]| 0x20)
2284 fprintf(stderr, "started indexing for %s\n", argv[2]);
2285 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2286 while( ch = getc(in), ch != EOF )
2290 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2292 if( bt_insertkey (bt, key, len, ++line, *tod) )
2293 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2296 else if( len < 245 )
2298 fprintf(stderr, "finished adding keys, %d \n", line);
2302 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2303 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2304 while( ch = getc(in), ch != EOF )
2308 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2310 if( bt_deletekey (bt, key, len) )
2311 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2314 else if( len < 245 )
2316 fprintf(stderr, "finished deleting keys, %d\n", line);
2320 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2321 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2322 while( ch = getc(in), ch != EOF )
2326 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2328 if( bt_findkey (bt, key, len) )
2331 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2334 else if( len < 245 )
2336 fprintf(stderr, "finished search of %d keys, found %d\n", line, found);
2346 fprintf(stderr, " Time to complete: %.2f seconds\n", (float)(done - start) / CLOCKS_PER_SEC);
2351 fprintf(stderr, "started reading\n");
2353 slot = bt_startkey (bt, key, len);
2356 fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2359 while( slot = bt_nextkey (bt, slot) )
2361 ptr = bt_key(bt, slot);
2362 fwrite (ptr->key, ptr->len, 1, stdout);
2363 fputc ('\n', stdout);
2366 fprintf(stderr, " Total keys read %d\n", cnt);