1 // jaluta's balanced B-Link tree algorithms
4 // author: karl malbrain, malbrain@cal.berkeley.edu
7 This work, including the source code, documentation
8 and related data, is placed into the public domain.
10 The orginal author is Karl Malbrain.
12 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
13 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
14 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
15 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
16 RESULTING FROM THE USE, MODIFICATION, OR
17 REDISTRIBUTION OF THIS SOFTWARE.
20 // Please see the project home page for documentation
21 // http://code.google.com/p/high-concurrency-btree
23 #define _FILE_OFFSET_BITS 64
24 #define _LARGEFILE64_SOURCE
39 #define WIN32_LEAN_AND_MEAN
50 typedef unsigned long long uid;
53 typedef unsigned long long off64_t;
54 typedef unsigned short ushort;
55 typedef unsigned int uint;
58 #define BT_ro 0x6f72 // ro
59 #define BT_rw 0x7772 // rw
60 #define BT_fl 0x6c66 // fl
62 #define BT_maxbits 24 // maximum page size in bits
63 #define BT_minbits 9 // minimum page size in bits
64 #define BT_minpage (1 << BT_minbits) // minimum page size
66 #define BT_hashsize 512 // size of hash index for page cache
67 #define BT_hashprime 8191 // prime number for hashing
76 // Define the length of the page and key pointers
80 // Page key slot definition.
82 // If BT_maxbits is 16 or less, you can save 4 bytes
83 // for each key stored by making the first two uints
84 // into ushorts. You can also save 4 bytes by removing
85 // the tod field from the key.
88 uint off; // page offset for key start
89 uint tod; // time-stamp for key
90 unsigned char id[BtId]; // id associated with key
93 // The key structure occupies space at the upper end of
94 // each page. It's a length byte followed by up to
99 unsigned char key[255];
102 // The first part of an index page.
103 // It is immediately followed
104 // by the BtSlot array of keys.
107 uint cnt; // count of keys in page
108 uint min; // next key offset
109 unsigned char lvl:3; // level of page
110 unsigned char bits:5; // page size in bits
111 unsigned char fence; // len of fence key at top of page
112 unsigned char right[BtId]; // page number to right
113 BtSlot slots[0]; // page slots
116 // The memory mapping hash table entry
119 BtPage page; // mapped page pointer
120 uid page_no; // mapped page number
121 void *lruprev; // least recently used previous cache block
122 void *lrunext; // lru next cache block
123 void *hashprev; // previous cache block for the same hash idx
124 void *hashnext; // next cache block for the same hash idx
130 // The object structure for Btree access
133 uint page_size; // each page size
134 uint page_bits; // each page size in bits
135 uid parentpage; // current parent page number
136 uid cursorpage; // current cursor page number
137 uid childpage; // current child page number
139 uint mode; // read-write mode
140 uint mapped_io; // use memory mapping
141 BtPage temp; // temporary frame buffer (memory mapped/file IO)
142 BtPage alloc; // frame buffer for alloc page ( page 0 )
143 BtPage cursor; // cached frame for start/next (never mapped)
144 BtPage frame; // spare frame for the page split (never mapped)
145 BtPage parent; // current parent page
146 BtPage child; // current child page
147 BtPage sibling; // current sibling page
148 BtPage sibling2; // current sibling2 page
154 unsigned char *mem; // frame, cursor, page memory buffer
155 int nodecnt; // highest page cache node in use
156 int nodemax; // highest page cache node allocated
157 int hashmask; // number of hash headers in cache - 1
158 BtHash *lrufirst; // lru list head
159 BtHash *lrulast; // lru list tail
160 ushort cache[BT_hashsize]; // hash index for cache
161 BtHash nodes[1]; // page cache follows
176 extern void bt_close (BtDb *bt);
177 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk);
178 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod);
179 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len);
180 extern uid bt_findkey (BtDb *bt, unsigned char *key, uint len);
181 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
182 extern uint bt_nextkey (BtDb *bt, uint slot);
184 // Helper functions to return slot values
186 extern BtKey bt_key (BtDb *bt, uint slot);
187 extern uid bt_uid (BtDb *bt, uint slot);
188 extern uint bt_tod (BtDb *bt, uint slot);
190 // BTree page number constants
194 // The page is allocated from low and hi ends.
195 // The key offsets and row-id's are allocated
196 // from the bottom, while the text of the key
197 // is allocated from the top. When the two
198 // areas meet, the page overflowns.
200 // A key consists of a length byte, two bytes of
201 // index number (0 - 65535), and up to 253 bytes
202 // of key value. Duplicate keys are discarded.
203 // Associated with each key is a 48 bit row-id.
205 // The b-tree root is always located at page 1.
206 // The first leaf page of level zero is always
207 // located on page 2.
209 // The b-tree pages at each level are linked
210 // with next page to right to facilitate
211 // cursors and provide for concurrency.
213 // When to root page overflows, it is split in two and
214 // the tree height is raised by a new root at page
215 // one with two keys.
217 // Groups of pages from the btree are optionally
218 // cached with memory mapping. A hash table is used to keep
219 // track of the cached pages. This behaviour is controlled
220 // by the cache block size parameter to bt_open.
222 // To achieve maximum concurrency one page is locked at a time
223 // as the tree is traversed to find leaf key in question. The right
224 // page numbers are used in cases where the page is being split,
227 // Page 0 is dedicated to lock for new page extensions,
228 // and chains empty pages together for reuse.
230 // Empty nodes are chained together through the ALLOC page and reused.
232 // A special open mode of BT_fl is provided to safely access files on
233 // WIN32 networks. WIN32 network operations should not use memory mapping.
234 // This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
235 // to prevent local caching of network file contents.
237 // Access macros to address slot and key values from the page
239 #define slotptr(page, slot) ((page)->slots + slot - 1)
240 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
242 void bt_putid(unsigned char *dest, uid id)
247 dest[i] = (unsigned char)id, id >>= 8;
250 uid bt_getid(unsigned char *src)
255 for( i = 0; i < BtId; i++ )
256 id <<= 8, id |= *src++;
261 // place requested latch on requested page_no.
262 // the Shared latch is a read lock over segment 0
263 // the Update latch is a write lock over segment 1
264 // the Xclusive latch is a write lock over segment 0 & 1
265 // the Upgrade latch upgrades Update to Xclusive
267 BTERR bt_lockpage(BtDb *bt, uid page_no, BtLock mode)
269 off64_t off = page_no << bt->page_bits;
270 uint len = sizeof(*bt->parent);
273 int flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
274 struct flock lock[1];
281 case BtLockShared: // lock segment 0 w/read lock
285 case BtLockUpdate: // lock segment 1 w/write lock
286 off += sizeof(*bt->parent);
290 case BtLockXclusive:// lock both segments w/write lock
291 len += sizeof(*bt->parent);
295 case BtLockUpgrade: // lock segment 0 w/write lock
301 memset (lock, 0, sizeof(lock));
304 lock->l_type = type ? F_WRLCK : F_RDLCK;
308 if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
309 return bt->err = BTERR_lock;
313 memset (ovl, 0, sizeof(ovl));
314 ovl->OffsetHigh = (uint)(off >> 32);
315 ovl->Offset = (uint)off;
317 // use large offsets to
318 // simulate advisory locking
320 ovl->OffsetHigh |= 0x80000000;
323 flags |= LOCKFILE_EXCLUSIVE_LOCK;
325 if( LockFileEx (bt->idx, flags, 0, len, 0L, ovl) )
328 return bt->err = BTERR_lock;
332 // remove lock on requested page_no.
334 BTERR bt_unlockpage(BtDb *bt, uid page_no, BtLock mode)
336 off64_t off = page_no << bt->page_bits;
337 uint len = sizeof(*bt->parent);
339 struct flock lock[1];
345 case BtLockShared: // unlock segment 0
348 case BtLockUpdate: // unlock segment 1
349 off += sizeof(*bt->parent);
352 case BtLockXclusive:// unlock both segments
353 len += sizeof(*bt->parent);
356 case BtLockUpgrade: // unlock segment 0
361 memset (lock, 0, sizeof(lock));
364 lock->l_type = F_UNLCK;
368 if( fcntl (bt->idx, F_SETLK, lock) < 0 )
369 return bt->err = BTERR_lock;
371 memset (ovl, 0, sizeof(ovl));
372 ovl->OffsetHigh = (uint)(off >> 32);
373 ovl->Offset = (uint)off;
375 // use large offsets to
376 // simulate advisory locking
378 ovl->OffsetHigh |= 0x80000000;
380 if( !UnlockFileEx (bt->idx, 0, len, 0, ovl) )
381 return GetLastError(), bt->err = BTERR_lock;
387 // close and release memory
389 void bt_close (BtDb *bt)
393 // release mapped pages
395 if( hash = bt->lrufirst )
396 do munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
397 while(hash = hash->lrunext);
404 if( hash = bt->lrufirst )
407 FlushViewOfFile(hash->page, 0);
408 UnmapViewOfFile(hash->page);
409 CloseHandle(hash->hmap);
410 } while(hash = hash->lrunext);
413 VirtualFree (bt->mem, 0, MEM_RELEASE);
414 FlushFileBuffers(bt->idx);
415 CloseHandle(bt->idx);
420 // open/create new btree
421 // call with file_name, BT_openmode, bits in page size (e.g. 16),
422 // size of mapped page cache (e.g. 8192) or zero for no mapping.
424 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax)
426 BtLock lockmode = BtLockXclusive;
427 uint lvl, attr, cacheblk;
435 SYSTEM_INFO sysinfo[1];
439 bt = malloc (sizeof(BtDb) + nodemax * sizeof(BtHash));
440 memset (bt, 0, sizeof(BtDb));
442 switch (mode & 0x7fff)
446 bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
451 bt->idx = open ((char*)name, O_RDONLY);
452 lockmode = BtLockShared;
456 return free(bt), NULL;
459 cacheblk = 4096; // page size for unix
464 bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb) + nodemax * sizeof(BtHash));
465 attr = FILE_ATTRIBUTE_NORMAL;
466 switch (mode & 0x7fff)
469 attr |= FILE_FLAG_WRITE_THROUGH | FILE_FLAG_NO_BUFFERING;
472 bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
477 bt->idx = CreateFile(name, GENERIC_READ, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_EXISTING, attr, NULL);
478 lockmode = BtLockShared;
481 if( bt->idx == INVALID_HANDLE_VALUE )
482 return GlobalFree(bt), NULL;
484 // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
485 GetSystemInfo(sysinfo);
488 cacheblk = sysinfo->dwAllocationGranularity;
493 // determine sanity of page size
495 if( bits > BT_maxbits )
497 else if( bits < BT_minbits )
500 if ( bt_lockpage(bt, ALLOC_page, lockmode) )
501 return bt_close (bt), NULL;
506 // read minimum page size to get root info
508 if( size = lseek (bt->idx, 0L, 2) ) {
509 alloc = malloc (BT_minpage);
510 pread(bt->idx, alloc, BT_minpage, 0);
513 } else if( mode == BT_ro )
514 return bt_close (bt), NULL;
516 size = GetFileSize(bt->idx, amt);
519 alloc = VirtualAlloc(NULL, BT_minpage, MEM_COMMIT, PAGE_READWRITE);
520 if( !ReadFile(bt->idx, (char *)alloc, BT_minpage, amt, NULL) )
521 return bt_close (bt), NULL;
523 VirtualFree (alloc, 0, MEM_RELEASE);
524 } else if( mode == BT_ro )
525 return bt_close (bt), NULL;
528 bt->page_size = 1 << bits;
529 bt->page_bits = bits;
531 bt->nodemax = nodemax;
534 // setup cache mapping
537 if( cacheblk < bt->page_size )
538 cacheblk = bt->page_size;
540 bt->hashmask = (cacheblk >> bits) - 1;
545 bt->mem = malloc (8 *bt->page_size);
547 bt->mem = VirtualAlloc(NULL, 8 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
549 bt->frame = (BtPage)bt->mem;
550 bt->cursor = (BtPage)(bt->mem + bt->page_size);
551 bt->alloc = (BtPage)(bt->mem + 2 * bt->page_size);
552 bt->parent = (BtPage)(bt->mem + 3 * bt->page_size);
553 bt->child = (BtPage)(bt->mem + 4 * bt->page_size);
554 bt->temp = (BtPage)(bt->mem + 5 * bt->page_size);
555 bt->sibling = (BtPage)(bt->mem + 6 * bt->page_size);
556 bt->sibling2 = (BtPage)(bt->mem + 7 * bt->page_size);
559 if ( bt_unlockpage(bt, ALLOC_page, lockmode) )
560 return bt_close (bt), NULL;
565 // initialize an empty b-tree with alloc page & root page
567 memset (bt->alloc, 0, bt->page_size);
568 bt_putid(bt->alloc->right, ROOT_page + 1);
569 bt->alloc->bits = bt->page_bits;
572 if( write (bt->idx, bt->alloc, bt->page_size) < bt->page_size )
573 return bt_close (bt), NULL;
575 if( !WriteFile (bt->idx, (char *)bt->alloc, bt->page_size, amt, NULL) )
576 return bt_close (bt), NULL;
578 if( *amt < bt->page_size )
579 return bt_close (bt), NULL;
584 memset (bt->frame, 0, bt->page_size);
585 bt->frame->bits = bt->page_bits;
587 bt->frame->min = bt->page_size;
589 if( write (bt->idx, bt->frame, bt->page_size) < bt->page_size )
590 return bt_close (bt), NULL;
592 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
593 return bt_close (bt), NULL;
595 if( *amt < bt->page_size )
596 return bt_close (bt), NULL;
599 // create initial empty page area by writing last page of first
600 // cache area (other pages are zeroed by O/S)
602 if( bt->mapped_io && bt->hashmask > 2 ) {
603 memset(bt->frame, 0, bt->page_size);
606 pwrite(bt->idx, bt->frame, bt->page_size, bt->hashmask << bt->page_bits);
608 SetFilePointer (bt->idx, bt->hashmask << bt->page_bits, NULL, FILE_BEGIN);
609 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
610 return bt_close (bt), NULL;
611 if( *amt < bt->page_size )
612 return bt_close (bt), NULL;
616 if( bt_unlockpage(bt, ALLOC_page, lockmode) )
617 return bt_close (bt), NULL;
622 // reset parent/child page pointers
624 void bt_resetpages (BtDb *bt)
629 bt->frame = (BtPage)bt->mem;
630 bt->cursor = (BtPage)(bt->mem + bt->page_size);
631 bt->alloc = (BtPage)(bt->mem + 2 * bt->page_size);
632 bt->parent = (BtPage)(bt->mem + 3 * bt->page_size);
633 bt->child = (BtPage)(bt->mem + 4 * bt->page_size);
634 bt->temp = (BtPage)(bt->mem + 5 * bt->page_size);
635 bt->sibling = (BtPage)(bt->mem + 6 * bt->page_size);
636 bt->sibling2 = (BtPage)(bt->mem + 7 * bt->page_size);
639 // return pointer to high key
640 // or NULL if infinite value
642 BtKey bt_highkey (BtDb *bt, BtPage page)
645 if( bt_getid (page->right) )
646 return keyptr(page, page->cnt);
650 if( bt_getid (page->right) )
651 return ((BtKey)((unsigned char*)(page) + bt->page_size - page->fence));
656 // return pointer to slot key in index page
657 // or NULL if infinite value
659 BtKey bt_slotkey (BtPage page, uint slot)
661 if( slot < page->cnt || bt_getid (page->right) )
662 return keyptr(page, slot);
667 // compare two keys, returning > 0, = 0, or < 0
668 // as the comparison value
670 int keycmp (BtKey key1, unsigned char *key2, uint len2)
672 uint len1 = key1->len;
675 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
686 // Update current page of btree by writing file contents
687 // or flushing mapped area to disk.
689 BTERR bt_update (BtDb *bt, BtPage page, uid page_no)
691 off64_t off = page_no << bt->page_bits;
694 if ( !bt->mapped_io )
695 if ( pwrite(bt->idx, page, bt->page_size, off) != bt->page_size )
696 return bt->err = BTERR_wrt;
699 if ( !bt->mapped_io )
701 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
702 if( !WriteFile (bt->idx, (char *)page, bt->page_size, amt, NULL) )
703 return GetLastError(), bt->err = BTERR_wrt;
705 if( *amt < bt->page_size )
706 return GetLastError(), bt->err = BTERR_wrt;
708 else if ( bt->mode == BT_fl ) {
709 FlushViewOfFile(page, bt->page_size);
710 FlushFileBuffers(bt->idx);
716 // find page in cache
718 BtHash *bt_findhash(BtDb *bt, uid page_no)
723 // compute cache block first page and hash idx
725 page_no &= ~bt->hashmask;
726 idx = (uint)(page_no * BT_hashprime % BT_hashsize);
729 hash = bt->nodes + bt->cache[idx];
733 do if( hash->page_no == page_no )
735 while(hash = hash->hashnext );
740 // add page cache entry to hash index
742 void bt_linkhash(BtDb *bt, BtHash *node, uid page_no)
744 uint idx = (uint)((page_no & ~bt->hashmask) * BT_hashprime % BT_hashsize);
747 if( bt->cache[idx] ) {
748 node->hashnext = hash = bt->nodes + bt->cache[idx];
749 hash->hashprev = node;
752 node->hashprev = NULL;
753 bt->cache[idx] = (ushort)(node - bt->nodes);
756 // remove cache entry from hash table
758 void bt_unlinkhash(BtDb *bt, BtHash *node)
760 uint idx = (uint)((node->page_no & ~bt->hashmask) * BT_hashprime % BT_hashsize);
764 if( hash = node->hashprev )
765 hash->hashnext = node->hashnext;
766 else if( hash = node->hashnext )
767 bt->cache[idx] = (ushort)(hash - bt->nodes);
771 if( hash = node->hashnext )
772 hash->hashprev = node->hashprev;
775 // add cache page to lru chain and map pages
777 BtPage bt_linklru(BtDb *bt, BtHash *hash, uid page_no)
780 off64_t off = (page_no & ~bt->hashmask) << bt->page_bits;
781 off64_t limit = off + ((bt->hashmask+1) << bt->page_bits);
784 memset(hash, 0, sizeof(BtHash));
785 hash->page_no = (page_no & ~bt->hashmask);
786 bt_linkhash(bt, hash, page_no);
788 if( node = hash->lrunext = bt->lrufirst )
789 node->lruprev = hash;
796 flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
797 hash->page = (BtPage)mmap (0, (bt->hashmask+1) << bt->page_bits, flag, MAP_SHARED, bt->idx, off);
798 if( (long long int)hash->page == -1LL )
799 return bt->err = BTERR_map, (BtPage)NULL;
802 flag = ( bt->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
803 hash->hmap = CreateFileMapping(bt->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
805 return bt->err = BTERR_map, NULL;
807 flag = ( bt->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
808 hash->page = MapViewOfFile(hash->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->hashmask+1) << bt->page_bits);
810 return bt->err = BTERR_map, NULL;
813 return (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
816 // find or place requested page in page-cache
817 // return memory address where page is located.
819 BtPage bt_hashpage(BtDb *bt, uid page_no)
821 BtHash *hash, *node, *next;
824 // find page in cache and move to top of lru list
826 if( hash = bt_findhash(bt, page_no) ) {
827 page = (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
828 // swap node in lru list
829 if( node = hash->lruprev ) {
830 if( next = node->lrunext = hash->lrunext )
831 next->lruprev = node;
835 if( next = hash->lrunext = bt->lrufirst )
836 next->lruprev = hash;
838 return bt->err = BTERR_hash, (BtPage)NULL;
840 hash->lruprev = NULL;
846 // map pages and add to cache entry
848 if( bt->nodecnt < bt->nodemax ) {
849 hash = bt->nodes + ++bt->nodecnt;
850 return bt_linklru(bt, hash, page_no);
853 // hash table is already full, replace last lru entry from the cache
855 if( hash = bt->lrulast ) {
856 // unlink from lru list
857 if( node = bt->lrulast = hash->lruprev )
858 node->lrunext = NULL;
860 return bt->err = BTERR_hash, (BtPage)NULL;
863 munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
865 FlushViewOfFile(hash->page, 0);
866 UnmapViewOfFile(hash->page);
867 CloseHandle(hash->hmap);
869 // unlink from hash table
871 bt_unlinkhash(bt, hash);
873 // map and add to cache
875 return bt_linklru(bt, hash, page_no);
878 return bt->err = BTERR_hash, (BtPage)NULL;
881 // map a btree page onto current page
883 BTERR bt_mappage (BtDb *bt, BtPage *page, uid page_no)
885 off64_t off = page_no << bt->page_bits;
890 if( bt->mapped_io ) {
892 *page = bt_hashpage(bt, page_no);
896 if ( pread(bt->idx, *page, bt->page_size, off) < bt->page_size )
897 return bt->err = BTERR_map;
899 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
901 if( !ReadFile(bt->idx, *page, bt->page_size, amt, NULL) )
902 return bt->err = BTERR_map;
904 if( *amt < bt->page_size )
905 return bt->err = BTERR_map;
910 // deallocate a deleted page
911 // place on free chain out of allocator page
912 // page must already be BtLockXclusive and mapped
914 BTERR bt_freepage (BtDb *bt, BtPage page, uid page_no)
916 // lock allocation page
918 if ( bt_lockpage(bt, ALLOC_page, BtLockUpdate) )
921 if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
924 // store chain in second right
925 bt_putid(page->right, bt_getid(bt->alloc[1].right));
926 bt_putid(bt->alloc[1].right, page_no);
928 if( bt_update(bt, bt->alloc, ALLOC_page) )
930 if( bt_update(bt, page, page_no) )
935 if( bt_unlockpage(bt, ALLOC_page, BtLockUpdate) )
941 // allocate a new page and write page into it
943 uid bt_newpage(BtDb *bt, BtPage page)
951 if ( bt_lockpage(bt, ALLOC_page, BtLockUpdate) )
954 if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
957 // use empty chain first
958 // else allocate empty page
960 if( new_page = bt_getid(bt->alloc[1].right) ) {
961 if( bt_mappage (bt, &bt->temp, new_page) )
962 return 0; // don't unlock on error
963 bt_putid(bt->alloc[1].right, bt_getid(bt->temp->right));
966 new_page = bt_getid(bt->alloc->right);
967 bt_putid(bt->alloc->right, new_page+1);
971 if( bt_update(bt, bt->alloc, ALLOC_page) )
972 return 0; // don't unlock on error
976 if ( bt_unlockpage(bt, ALLOC_page, BtLockUpdate) )
979 if( !bt->mapped_io ) {
980 if( bt_update(bt, page, new_page) )
981 return 0; //don't unlock on error
982 if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
989 if ( pwrite(bt->idx, page, bt->page_size, new_page << bt->page_bits) < bt->page_size )
990 return bt->err = BTERR_wrt, 0;
992 // if writing first page of hash block, zero last page in the block
994 if ( !reuse && bt->hashmask > 0 && (new_page & bt->hashmask) == 0 )
996 // use temp buffer to write zeros
997 memset(bt->temp, 0, bt->page_size);
998 if ( pwrite(bt->idx,bt->temp, bt->page_size, (new_page | bt->hashmask) << bt->page_bits) < bt->page_size )
999 return bt->err = BTERR_wrt, 0;
1002 // bring new page into page-cache and copy page.
1003 // this will extend the file into the new pages.
1005 if( !(pmap = (char*)bt_hashpage(bt, new_page & ~bt->hashmask)) )
1008 memcpy(pmap+((new_page & bt->hashmask) << bt->page_bits), page, bt->page_size);
1014 // find slot in given page for given key
1016 int bt_findslot (BtPage page, unsigned char *key, uint len)
1018 uint diff, higher = page->cnt, low = 1, slot;
1021 // make last key an infinite fence value
1023 if( !page->lvl || bt_getid (page->right) )
1028 // low is the next candidate, higher is already
1029 // tested as .ge. the given key, loop ends when they meet
1032 while( diff = higher - low ) {
1033 slot = low + ( diff >> 1 );
1034 if( keycmp (keyptr(page, slot), key, len) < 0 )
1037 higher = slot, good++;
1040 // return zero if key is beyond highkey value
1043 return good ? higher : 0;
1046 // split full parent node
1048 BTERR bt_splitparent (BtDb *bt, unsigned char *key, uint len)
1050 uint cnt = 0, idx = 0, max, nxt = bt->page_size;
1051 uid parentpage = bt->parentpage, right;
1052 BtPage page = bt->parent;
1056 // upgrade parent latch to Xclusive
1058 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
1061 // split higher half of keys to bt->frame
1063 memset (bt->frame, 0, bt->page_size);
1064 max = (int)page->cnt;
1068 // link right sibling node into new right page
1070 right = bt_getid (page->right);
1071 bt_putid(bt->frame->right, right);
1073 // record higher fence key in new right leaf page
1075 if( bt->frame->fence = page->fence ) {
1076 memcpy ((unsigned char *)bt->frame + bt->page_size - bt->frame->fence, (unsigned char *)(page) + bt->page_size - bt->frame->fence, bt->frame->fence);
1080 while( cnt++ < max ) {
1082 // copy key, but not infinite values
1084 if( cnt < max || !page->lvl || right ) {
1085 ptr = keyptr(page, cnt);
1086 nxt -= ptr->len + 1;
1087 memcpy ((unsigned char *)bt->frame + nxt, ptr, ptr->len + 1);
1092 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1093 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1094 slotptr(bt->frame, idx)->off = nxt;
1097 bt->frame->bits = bt->page_bits;
1098 bt->frame->lvl = page->lvl;
1099 bt->frame->min = nxt;
1100 bt->frame->cnt = idx;
1102 // get new free page and write right frame to it.
1104 if( !(new_page = bt_newpage(bt, bt->frame)) )
1107 // update lower keys to continue in old page
1109 memcpy (bt->frame, page, bt->page_size);
1110 memset (page+1, 0, bt->page_size - sizeof(*page));
1111 nxt = bt->page_size;
1116 // record fence key in left leaf page
1119 ptr = keyptr(bt->frame, max);
1120 nxt -= ptr->len + 1;
1121 memcpy ((unsigned char *)page + nxt, ptr, ptr->len + 1);
1122 page->fence = ptr->len + 1;
1125 // assemble page of smaller keys
1126 // no infinite value to deal with
1128 while( cnt++ < max ) {
1129 ptr = keyptr(bt->frame, cnt);
1130 nxt -= ptr->len + 1;
1131 memcpy ((unsigned char *)page + nxt, ptr, ptr->len + 1);
1132 memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1133 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1134 slotptr(page, idx)->off = nxt;
1137 bt_putid(page->right, new_page);
1143 if( bt_update(bt, page, parentpage) )
1146 // decide to move to new right
1147 // node or stay on left node
1149 ptr = bt_highkey (bt, page);
1151 if( keycmp (ptr, key, len) >= 0 )
1152 return bt_unlockpage (bt, parentpage, BtLockUpgrade);
1154 bt->parentpage = new_page;
1156 if( bt_mappage (bt, &bt->parent, new_page) )
1158 if( bt_lockpage (bt, new_page, BtLockUpdate) )
1160 if( bt_unlockpage (bt, parentpage, BtLockUpgrade) )
1163 return bt_unlockpage (bt, parentpage, BtLockUpdate);
1166 // add unlinked node key into parent
1167 // childpage is existing record
1168 // siblingpage is right record
1170 BTERR bt_parentlink (BtDb *bt, BtPage left, uid leftpage, BtKey rightkey, uid rightpage)
1172 BtKey leftkey = bt_highkey (bt, left);
1173 BtPage page = bt->parent;
1176 // upgrade parent latch to exclusive
1178 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
1181 // find the existing right high key in the parent
1182 // and fix the downlink to point to right page
1185 if( !(slot = bt_findslot (page, rightkey->key, rightkey->len)) )
1186 return bt->err = BTERR_struct;
1190 bt_putid(slotptr(page,slot)->id, rightpage);
1192 // calculate next available slot and copy left key onto page
1194 page->min -= leftkey->len + 1; // reset lowest used offset
1195 memcpy ((unsigned char *)page + page->min, leftkey, leftkey->len + 1);
1197 // now insert key into array before slot
1202 *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
1204 bt_putid(slotptr(page,slot)->id, leftpage);
1205 slotptr(page, slot)->off = page->min;
1207 if ( bt_update(bt, page, bt->parentpage) )
1210 // downgrade parent page lock to BtLockUpdate
1212 if( bt_unlockpage(bt, bt->parentpage, BtLockUpgrade) )
1218 // remove slot from parent
1220 void bt_removeslot(BtDb *bt, uint slot)
1222 uint nxt = bt->page_size, amt;
1223 BtPage page = bt->parent;
1224 uint cnt = 0, idx = 0;
1225 uint max = page->cnt;
1229 memcpy (bt->frame, page, bt->page_size);
1231 // skip page info and set rest of page to zero
1232 memset (page+1, 0, bt->page_size - sizeof(*page));
1234 // copy fence key onto new page
1235 if( amt = page->fence ) {
1237 memcpy ((unsigned char *)page + nxt, (unsigned char *)(bt->frame) + nxt, amt);
1240 right = bt_getid (page->right);
1242 while( cnt++ < max ) {
1244 // skip key to delete
1249 // copy key, but not infinite value
1251 if( cnt < max || !page->lvl || right ) {
1252 key = keyptr(bt->frame, cnt);
1253 nxt -= key->len + 1;
1254 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1258 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1259 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1260 slotptr(page, idx)->off = nxt;
1266 // unlink sibling node
1268 BTERR bt_parentunlink (BtDb *bt, BtPage child, uid childpage)
1270 BtKey parentkey = bt_highkey (bt, child);
1273 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
1276 // delete child's slot from parent
1278 if( slot = bt_findslot (bt->parent, parentkey->key, parentkey->len) )
1279 bt_removeslot (bt, slot);
1281 return bt->err + BTERR_struct;
1283 // sibling now in the slot
1285 bt_putid(slotptr(bt->parent,slot)->id, childpage);
1287 if ( bt_update(bt, bt->parent, bt->parentpage) )
1290 // unlock parent page completely
1292 if( bt_unlockpage (bt, bt->parentpage, BtLockUpgrade) )
1295 return bt_unlockpage(bt, bt->parentpage, BtLockUpdate);
1298 // merge right sibling page into child page
1300 BTERR bt_mergepages (BtDb *bt, BtPage *right, uid rightpage)
1302 BtPage *left = &bt->child;
1306 if( bt_lockpage (bt, bt->childpage, BtLockUpgrade) )
1308 if( bt_lockpage (bt, rightpage, BtLockUpgrade) )
1311 // initialize empty frame
1313 memset (bt->frame, 0, bt->page_size);
1314 *bt->frame = **right;
1316 // copy right fence key
1318 if( amt = (*right)->fence )
1319 memcpy ((unsigned char *)bt->frame + bt->page_size - amt, (unsigned char *)(*right) + bt->page_size - amt, amt);
1321 bt->frame->min = bt->page_size - amt;
1323 // copy lowerkey key/values from left page
1325 for( idx = 1; idx <= (*left)->cnt; idx++ ) {
1326 ptr = keyptr(*left, idx);
1327 bt->frame->min -= ptr->len + 1;
1328 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1329 memcpy (slotptr(bt->frame, idx)->id, slotptr(*left, idx)->id, BtId);
1330 slotptr(bt->frame, idx)->tod = slotptr(*left, idx)->tod;
1331 slotptr(bt->frame, idx)->off = bt->frame->min;
1334 bt->frame->cnt = (*left)->cnt;
1336 // copy higherkey key/values from right page
1338 for( idx = 1; idx <= (*right)->cnt; idx++ ) {
1340 // copy key but not infinite value
1342 if( idx < (*right)->cnt || !(*right)->lvl || right ) {
1343 ptr = keyptr(*right, idx);
1344 bt->frame->min -= ptr->len + 1;
1345 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1349 memcpy (slotptr(bt->frame, bt->frame->cnt + idx)->id, slotptr(*right, idx)->id, BtId);
1350 slotptr(bt->frame, bt->frame->cnt + idx)->tod = slotptr(*right, idx)->tod;
1351 slotptr(bt->frame, bt->frame->cnt + idx)->off = bt->frame->min;
1354 bt->frame->cnt += (*right)->cnt;
1355 memcpy (*left, bt->frame, bt->page_size);
1357 if( bt_update (bt, *left, bt->childpage) )
1359 if( bt_freepage (bt, *right, rightpage) )
1361 if( bt_unlockpage (bt, rightpage, BtLockUpgrade) )
1363 if( bt_unlockpage (bt, rightpage, BtLockUpdate) )
1366 return bt_unlockpage (bt, bt->childpage, BtLockUpgrade);
1369 // redistribute right sibling page with child page
1370 // switch child page to containing page
1372 BTERR bt_redistribute (BtDb *bt, BtPage *right, uid rightpage, unsigned char *key, uint len)
1374 uid siblingpage = bt_getid((*right)->right);
1375 BtPage *left = &bt->child;
1376 uint idx, cnt = 0, amt = 0;
1377 uint leftmax, rightmin;
1381 if( bt_lockpage (bt, bt->childpage, BtLockUpgrade) )
1383 if( bt_lockpage (bt, rightpage, BtLockUpgrade) )
1386 // initialize empty frames to contain redistributed pages
1388 memset (bt->frame, 0, bt->page_size); // new left page
1389 memset (bt->temp, 0, bt->page_size); // new right page
1391 *bt->frame = **left;
1392 *bt->temp = **right;
1394 bt->frame->min = bt->page_size;
1395 bt->temp->min = bt->page_size;
1399 // find new left fence index
1400 // and copy left fence key
1402 if( (*left)->cnt > (*right)->cnt ) {
1404 leftmax = (*left)->cnt / 2;
1405 if( !bt->frame->lvl ) {
1406 ptr = keyptr(*left, leftmax);
1407 bt->frame->fence = ptr->len + 1;
1408 bt->frame->min -= ptr->len + 1;
1409 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1412 leftmax = (*left)->cnt;
1413 rightmin = (*right)->cnt / 2;
1414 if( !bt->frame->lvl ) {
1415 ptr = keyptr(*right, rightmin);
1416 bt->frame->fence = ptr->len + 1;
1417 bt->frame->min -= ptr->len + 1;
1418 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1422 // right fence stays the same, if any
1424 if( amt = (*right)->fence ) {
1425 bt->temp->min -= amt;
1426 memcpy ((unsigned char *)bt->temp + bt->temp->min, (unsigned char *)(*right) + bt->temp->min, amt);
1429 // copy first set of lowerkey key/values from left page
1431 for( idx = 1; idx <= leftmax; idx++ ) {
1433 ptr = keyptr(*left, idx);
1434 bt->frame->min -= ptr->len + 1;
1435 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1436 memcpy (slotptr(bt->frame, idx)->id, slotptr(*left, idx)->id, BtId);
1437 slotptr(bt->frame, idx)->tod = slotptr(*left, idx)->tod;
1438 slotptr(bt->frame, idx)->off = bt->frame->min;
1441 // copy remaining left page key/values from right page, if any
1443 for( idx = 1; idx <= rightmin; idx++ ) {
1445 ptr = keyptr(*right, idx);
1446 bt->frame->min -= ptr->len + 1;
1447 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1448 memcpy (slotptr(bt->frame, bt->frame->cnt)->id, slotptr(*right, idx)->id, BtId);
1449 slotptr(bt->frame, bt->frame->cnt)->tod = slotptr(*right, idx)->tod;
1450 slotptr(bt->frame, bt->frame->cnt)->off = bt->frame->min;
1453 // copy remaining left page key/values into new right page, if any
1455 for( idx = leftmax; idx <= (*left)->cnt; idx++ ) {
1457 ptr = keyptr(*left, idx);
1458 bt->temp->min -= ptr->len + 1;
1459 memcpy ((unsigned char *)bt->temp + bt->temp->min, ptr, ptr->len + 1);
1460 memcpy (slotptr(bt->temp, bt->temp->cnt)->id, slotptr(*left, idx)->id, BtId);
1461 slotptr(bt->temp, bt->temp->cnt)->tod = slotptr(*left, idx)->tod;
1462 slotptr(bt->temp, bt->temp->cnt)->off = bt->temp->min;
1465 // copy rest of higherkey key/values from right page
1467 for( idx = rightmin; idx <= (*right)->cnt; idx++ ) {
1469 // copy key, but not infinite value
1471 if( idx < (*right)->cnt || !(*right)->lvl || siblingpage ) {
1472 ptr = keyptr(*right, idx);
1473 bt->temp->min -= ptr->len + 1;
1474 memcpy ((unsigned char *)bt->temp + bt->temp->min, ptr, ptr->len + 1);
1480 memcpy (slotptr(bt->temp, bt->temp->cnt)->id, slotptr(*right, idx)->id, BtId);
1481 slotptr(bt->temp, bt->temp->cnt)->tod = slotptr(*right, idx)->tod;
1482 slotptr(bt->temp, bt->temp->cnt)->off = bt->temp->min;
1485 memcpy (*left, bt->frame, bt->page_size);
1486 memcpy (*right, bt->temp, bt->page_size);
1488 if( bt_update (bt, *left, bt->childpage) )
1490 if( bt_update (bt, *right, rightpage) )
1493 if( bt_unlockpage (bt, rightpage, BtLockUpgrade) )
1495 if( bt_unlockpage (bt, rightpage, BtLockUpdate) )
1498 ptr = bt_highkey (bt, *left);
1500 // decide which page is the child page
1501 // if leftkey >= our key, go with left
1503 if( keycmp (ptr, key, len) >= 0 ) {
1504 if( bt_unlockpage (bt, bt->childpage, BtLockUpgrade) )
1506 if( bt_unlockpage (bt, rightpage, BtLockUpgrade) )
1508 if( bt_unlockpage (bt, rightpage, BtLockUpdate) )
1511 if( bt_unlockpage (bt, rightpage, BtLockUpgrade) )
1513 if( bt_unlockpage (bt, bt->childpage, BtLockUpgrade) )
1515 if( bt_unlockpage (bt, bt->childpage, BtLockUpdate) )
1520 bt->childpage = rightpage;
1526 // lower the root level by removing the child node
1528 BTERR bt_lowerroot(BtDb *bt)
1530 if( bt_lockpage (bt, ROOT_page, BtLockUpgrade) )
1533 if( bt_lockpage (bt, bt->childpage, BtLockUpgrade) )
1536 memcpy (bt->parent, bt->child, bt->page_size);
1538 if( bt_update(bt, bt->parent, ROOT_page) )
1541 if( bt_freepage(bt, bt->child, bt->childpage) )
1544 if( bt_unlockpage (bt, bt->childpage, BtLockUpgrade) )
1547 if( bt_unlockpage (bt, bt->childpage, BtLockUpdate) )
1550 return bt_unlockpage (bt, ROOT_page, BtLockUpgrade);
1553 // split the root and raise the height of the btree
1554 // return with parent page set to appropriate sibling
1556 BTERR bt_raiseroot(BtDb *bt, uid sibling, unsigned char *key, uint len)
1558 unsigned char lowerkey[256];
1559 uint nxt = bt->page_size;
1560 BtPage root = bt->parent;
1564 // upgrade root page lock to exclusive
1566 if( bt_lockpage (bt, ROOT_page, BtLockUpgrade) )
1569 // Obtain an empty page to use, and copy the current
1570 // root (lower half) contents into it
1572 if( !(new_page = bt_newpage(bt, root)) )
1575 if( bt_lockpage (bt, new_page, BtLockUpdate) )
1578 // save high fence key for left page
1580 ptr = bt_highkey(bt, root);
1581 memcpy (lowerkey, ptr, ptr->len + 1);
1583 // preserve the page info at the bottom
1584 // and set rest to zero to initialize new root page
1586 memset(root+1, 0, bt->page_size - sizeof(*root));
1588 // insert left key in newroot page
1590 nxt -= *lowerkey + 1;
1591 memcpy ((unsigned char *)root + nxt, lowerkey, *lowerkey + 1);
1592 bt_putid(slotptr(root, 1)->id, new_page);
1593 slotptr(root, 1)->off = nxt;
1595 // insert second (infinite) key on newroot page that's never examined
1596 // and increase the root level
1598 bt_putid(slotptr(root, 2)->id, sibling);
1599 bt_putid(root->right, 0);
1601 root->min = nxt; // reset lowest used offset and key count
1606 if( bt_update(bt, root, bt->parentpage) )
1609 if( bt_unlockpage(bt, ROOT_page, BtLockUpgrade) )
1612 if( bt_unlockpage(bt, ROOT_page, BtLockUpdate) )
1615 // decide which root node to continue with
1616 // sibling has upper keys, newpage the lower ones
1618 if( keycmp((BtKey)lowerkey, key, len) < 0 ) {
1619 bt->parentpage = sibling; // go with the upper ones
1621 if( bt_unlockpage (bt, new_page, BtLockUpdate) )
1624 return bt_mappage (bt, &bt->parent, sibling);
1627 bt->parentpage = new_page; // go with the lower ones
1629 if( bt_unlockpage (bt, sibling, BtLockUpdate) )
1632 return bt_mappage (bt, &bt->parent, new_page);
1635 // handle underflowing child node
1637 BTERR bt_repairchild (BtDb *bt, uint parentslot, unsigned char *key, uint len)
1639 BtKey parentkey = bt_slotkey(bt->parent, parentslot);
1640 BtKey fencekey = bt_highkey (bt, bt->child);
1641 BtKey highkey = bt_highkey(bt, bt->parent);
1642 BtKey siblingkey, siblingkey2;
1643 uid siblingpage, siblingpage2;
1648 // high key is never NULL
1649 // fence key is null on right end
1652 if( !highkey || keycmp (fencekey, highkey->key, highkey->len) < 0 ) {
1654 // childpage is not rightmost child of parent page
1656 siblingpage = bt_getid(bt->child->right);
1658 if( bt_lockpage (bt, siblingpage, BtLockUpdate) )
1661 if( bt_mappage (bt, &bt->sibling, siblingpage) )
1664 if( !parentkey || keycmp (fencekey, parentkey->key, parentkey->len) < 0 ) {
1666 // sibling is not linked in parent, so we can merge it
1668 if( bt_unlockpage (bt, bt->parentpage, BtLockUpdate) )
1671 // calculate size of merged page by adding single child key to sibling
1673 fencekey = keyptr(bt->child, 1);
1675 if( bt->sibling->min < (bt->sibling->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->sibling) + fencekey->len + 1)
1676 return bt_redistribute (bt, &bt->sibling, siblingpage, key, len);
1678 return bt_mergepages (bt, &bt->sibling, siblingpage);
1681 // sibling has a key in the parent node
1682 // find its slot in parent
1684 if( siblingkey = bt_highkey (bt, bt->sibling) )
1685 slot = bt_findslot (bt->parent, siblingkey->key, siblingkey->len);
1687 slot = bt->parent->cnt;
1689 parentkey = bt_slotkey (bt->parent, slot);
1690 siblingpage2 = bt_getid(bt->sibling->right);
1692 if( siblingkey && parentkey && keycmp (siblingkey, parentkey->key, parentkey->len) < 0 ) {
1694 // sibling2 is not linked in P, its key is parentkey
1695 // can parent support its insertion?
1696 // if not, split parent node first.
1698 if( bt->parent->min < (bt->parent->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->parent) + parentkey->len + 1) {
1699 if( bt_splitparent (bt, key, len) )
1702 // are we in the correct half of new parent nodes?
1703 // if not, restart the function.
1705 if( slot = bt_findslot (bt->parent, siblingkey->key, siblingkey->len) )
1706 parentkey = keyptr(bt->parent, slot);
1708 return BTERR_restart;
1711 // link right of sibling into parent
1713 if( bt_parentlink (bt, bt->sibling, siblingpage, parentkey, siblingpage2) )
1716 // unlink sibling from parent
1718 if( bt_parentunlink (bt, bt->child, bt->childpage) )
1721 fencekey = keyptr(bt->child, 1);
1723 if( bt->sibling->min < (bt->sibling->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->sibling) + fencekey->len + 1)
1724 return bt_redistribute (bt, &bt->sibling, siblingpage, key, len);
1726 return bt_mergepages (bt, &bt->sibling, siblingpage);
1729 // unlink sibling from parent and merge
1731 if( bt_parentunlink (bt, bt->child, bt->childpage) )
1734 fencekey = keyptr(bt->child, 1);
1736 if( bt->sibling->min < (bt->sibling->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->sibling) + fencekey->len + 1)
1737 return bt_redistribute (bt, &bt->sibling, siblingpage, key, len);
1739 return bt_mergepages (bt, &bt->sibling, siblingpage);
1743 // child is rightmost key in the parent,
1744 // work with nodes to left.
1746 siblingpage = bt_getid(slotptr(bt->parent, parentslot - 1)->id);
1747 siblingkey = keyptr(bt->parent, parentslot - 1);
1749 if( bt_unlockpage (bt, bt->childpage, BtLockUpdate) )
1751 if( bt_lockpage (bt, siblingpage, BtLockUpdate) )
1753 if( bt_mappage (bt, &bt->sibling, siblingpage) )
1756 siblingpage2 = bt_getid(bt->sibling->right);
1757 siblingkey2 = bt_highkey (bt, bt->sibling);
1759 if( bt_lockpage (bt, siblingpage2, BtLockUpdate) )
1761 if( bt_mappage (bt, &bt->sibling2, siblingpage2) )
1764 if( keycmp (siblingkey, siblingkey2->key, siblingkey2->len) == 0 ) {
1766 // left sibling right (mapped into sibling2) is our child node
1768 swap = bt->sibling2;
1769 bt->sibling2 = bt->child;
1772 // does child still need to merge/redistribute?
1774 if( bt->child->cnt > 1 ) {
1775 if( bt_unlockpage (bt, bt->parentpage, BtLockUpdate) )
1778 return bt_unlockpage (bt, siblingpage, BtLockUpdate);
1782 bt->sibling = bt->child;
1785 bt->childpage = siblingpage;
1786 siblingpage = siblingpage2;
1788 // unlink sibling node from parent and merge with child
1790 if( bt_parentunlink (bt, bt->child, bt->childpage) )
1793 fencekey = keyptr(bt->sibling, 1);
1795 if( bt->child->min < (bt->child->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->sibling) + fencekey->len + 1)
1796 return bt_redistribute (bt, &bt->sibling, siblingpage, key, len);
1798 return bt_mergepages (bt, &bt->sibling, siblingpage);
1801 // currently unlinked sibling2 merges with child
1803 fencekey = bt_highkey (bt, bt->sibling2);
1805 if( bt->parent->min < (bt->parent->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->parent) + fencekey->len + 1)
1806 if( bt_splitparent (bt, key, len) )
1809 if( bt_parentlink (bt, bt->sibling, siblingpage, fencekey, siblingpage2) )
1812 if( bt_unlockpage (bt, siblingpage, BtLockUpdate) )
1814 if( bt_lockpage (bt, bt->childpage, BtLockUpdate) )
1816 if( bt_mappage (bt, &bt->child, bt->childpage) )
1819 if( bt->child->cnt > 1 ) {
1820 if( bt_unlockpage (bt, bt->parentpage, BtLockUpdate) )
1822 return bt_unlockpage (bt, siblingpage2, BtLockUpdate);
1825 // unlink child from parent node leaving immediate
1826 // left sibling2 to accept merge/redistribution
1828 if( bt_parentunlink (bt, bt->sibling2, siblingpage2) )
1831 swap = bt->sibling2;
1832 bt->sibling2 = bt->child;
1834 bt->childpage = siblingpage2;
1836 fencekey = keyptr(bt->sibling2, 1);
1838 if( bt->child->min < (bt->child->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->child) + fencekey->len + 1)
1839 return bt_redistribute (bt, &bt->sibling2, siblingpage2, key, len);
1841 return bt_mergepages (bt, &bt->sibling2, siblingpage2);
1844 // find and load leaf page for given key
1845 // leave page BtLockShared, return with key's slot
1847 int bt_loadpageread (BtDb *bt, unsigned char *key, uint len)
1849 uid page_no = ROOT_page, prevpage = 0;
1852 // start at root of btree and drill down
1855 bt->parentpage = page_no;
1857 if( bt_lockpage(bt, bt->parentpage, BtLockShared) )
1861 if( bt_unlockpage(bt, prevpage, BtLockShared) )
1864 // map/obtain page contents
1866 if( bt_mappage (bt, &bt->parent, page_no) )
1869 // find key on page at this level
1870 // return if leaf page
1872 if( (slot = bt_findslot (bt->parent, key, len)) ) {
1873 if( !bt->parent->lvl )
1876 // continue down to next level
1878 page_no = bt_getid(slotptr(bt->parent, slot)->id);
1881 // or slide right into next page
1884 page_no = bt_getid(bt->parent->right);
1886 prevpage = bt->parentpage;
1889 // return EOF on end of right chain
1891 if( bt_unlockpage(bt, bt->parentpage, BtLockShared) )
1894 return 0; // return EOF
1897 // find and load leaf page for given key
1898 // return w/slot # on leaf page
1899 // leave page BtLockUpdate
1901 int bt_loadpageupdate (BtDb *bt, unsigned char *key, uint len)
1903 uid parentpage = 0, nextpage;
1904 BtKey fencekey, parentkey;
1908 // start at root of btree and drill down
1910 if( bt_lockpage(bt, ROOT_page, BtLockUpdate) )
1913 // map/obtain page contents
1915 if( bt_mappage (bt, &bt->parent, ROOT_page) )
1918 bt->parentpage = ROOT_page;
1921 // if root page, check for tree level growth
1922 // by existence of right pointer
1924 if( bt->parentpage == ROOT_page )
1925 if( nextpage = bt_getid(bt->parent->right) ) {
1926 if( bt_lockpage (bt, nextpage, BtLockUpdate) )
1929 if( bt_raiseroot (bt, nextpage, key, len) )
1933 // find key on page at this level
1934 // return if leaf page
1936 slot = bt_findslot (bt->parent, key, len);
1938 if( !bt->parent->lvl )
1941 // lock & map the child
1943 bt->childpage = bt_getid(slotptr(bt->parent, slot)->id);
1945 if( bt_lockpage(bt, bt->childpage, BtLockUpdate) )
1948 if( bt_mappage (bt, &bt->child, bt->childpage) )
1951 // check child for underflow
1953 if( bt->parentpage == ROOT_page )
1954 if( bt->parent->cnt == 1 )
1955 if( !bt_getid(bt->child->right) ) {
1956 if( bt_lowerroot (bt) )
1958 nextpage = ROOT_page;
1962 if( bt->child->cnt == 1 ) {
1963 while( bt_repairchild (bt, slot, key, len) == BTERR_restart );
1968 bt->child = bt->parent;
1970 nextpage = bt->childpage;
1974 fencekey = bt_highkey (bt, bt->child);
1975 parentkey = bt_slotkey(bt->parent, slot);
1977 // if right sibling is not linked,
1978 // fix links in parent node
1981 if( !parentkey || keycmp (fencekey, parentkey->key, parentkey->len) < 0 ) {
1983 if( bt->parent->min < (bt->parent->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->parent) + fencekey->len + 1)
1984 if( bt_splitparent (bt, key, len) ) {
1987 slot = bt_findslot (bt->parent, key, len);
1988 parentkey = bt_slotkey(bt->parent, slot);
1991 // add key to parent for child page
1992 // and fix downlink for childpage
1994 nextpage = bt_getid(bt->child->right);
1996 if( bt_parentlink (bt, bt->child, bt->childpage, parentkey, nextpage) )
2000 // unlock the parent page
2002 if( bt_unlockpage (bt, bt->parentpage, BtLockUpdate) )
2005 // is our key on this child page?
2006 // is fencekey infinite, or .ge. our key
2008 if( !fencekey || keycmp (fencekey, key, len) >= 0 ) {
2010 bt->child = bt->parent;
2012 nextpage = bt->childpage;
2016 // otherwise slide right into next page
2018 nextpage = bt_getid(bt->child->right);
2020 if( bt_lockpage (bt, nextpage, BtLockUpdate) )
2023 if( bt_unlockpage (bt, bt->childpage, BtLockUpdate) )
2026 if( bt_mappage (bt, &bt->parent, nextpage) )
2029 } while( bt->parentpage = nextpage );
2031 // return error on end of right chain
2033 bt->err = BTERR_struct;
2034 return 0; // return error
2037 // find and delete key on leaf page
2039 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
2047 if( slot = bt_loadpageupdate (bt, key, len) )
2048 ptr = keyptr(bt->parent, slot);
2052 // if key is found delete it, otherwise ignore request
2054 if( slot && !keycmp (ptr, key, len) ) {
2055 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
2058 bt_removeslot (bt, slot);
2060 if( bt_update(bt, bt->parent, bt->parentpage) )
2063 if( bt_unlockpage (bt, bt->parentpage, BtLockUpgrade) )
2067 return bt_unlockpage (bt, bt->parentpage, BtLockUpdate);
2070 // find key in leaf page and return row-id
2071 // or zero if key is not found.
2073 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
2081 if( slot = bt_loadpageread (bt, key, len) )
2082 ptr = keyptr(bt->parent, slot);
2086 // if key exists, return row-id
2087 // otherwise return 0
2089 if( ptr->len == len && !memcmp (ptr->key, key, len) )
2090 id = bt_getid(slotptr(bt->parent,slot)->id);
2094 if ( bt_unlockpage(bt, bt->parentpage, BtLockShared) )
2100 // Insert new key into the btree leaf page
2102 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod)
2107 if( slot = bt_loadpageupdate (bt, key, len) )
2108 ptr = keyptr(bt->parent, slot);
2112 if( bt->parent->lvl )
2114 if( bt->parent->lvl )
2118 // if key already exists, update id and return
2121 if( !keycmp (ptr, key, len) ) {
2122 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
2124 slotptr(bt->parent, slot)->tod = tod;
2125 bt_putid(slotptr(bt->parent,slot)->id, id);
2126 if ( bt_update(bt, bt->parent, bt->parentpage) )
2128 if( bt_unlockpage (bt, bt->parentpage, BtLockUpgrade) )
2130 return bt_unlockpage(bt, bt->parentpage, BtLockUpdate);
2133 // check if leaf page has enough space
2135 if( bt->parent->min < (bt->parent->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->parent) + len + 1) {
2136 if( bt_splitparent (bt, key, len) )
2139 slot = bt_findslot (bt->parent, key, len);
2142 // calculate next available slot and copy key into page
2144 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
2147 bt->parent->min -= len + 1; // reset lowest used offset
2148 ((unsigned char *)bt->parent)[bt->parent->min] = len;
2149 memcpy ((unsigned char *)bt->parent + bt->parent->min +1, key, len );
2151 // now insert key into array before slot
2153 idx = ++bt->parent->cnt;
2157 *slotptr(bt->parent, idx) = *slotptr(bt->parent, idx -1), idx--;
2159 bt_putid(slotptr(bt->parent,idx)->id, id);
2160 slotptr(bt->parent, idx)->off = bt->parent->min;
2161 slotptr(bt->parent, idx)->tod = tod;
2163 if ( bt_update(bt, bt->parent, bt->parentpage) )
2166 if( bt_unlockpage (bt, bt->parentpage, BtLockUpgrade) )
2169 return bt_unlockpage(bt, bt->parentpage, BtLockUpdate);
2172 // cache page of keys into cursor and return starting slot for given key
2174 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2178 // cache page for retrieval
2179 if( slot = bt_loadpageread (bt, key, len) )
2180 memcpy (bt->cursor, bt->parent, bt->page_size);
2181 bt->cursorpage = bt->parentpage;
2182 if ( bt_unlockpage(bt, bt->parentpage, BtLockShared) )
2188 // return next slot for cursor page
2189 // or slide cursor right into next page
2191 uint bt_nextkey (BtDb *bt, uint slot)
2196 if( slot++ < bt->cursor->cnt )
2199 right = bt_getid(bt->cursor->right);
2204 bt->cursorpage = right;
2206 if( bt_lockpage(bt, right,BtLockShared) )
2209 if( bt_mappage (bt, &bt->parent, right) )
2212 memcpy (bt->cursor, bt->parent, bt->page_size);
2213 if ( bt_unlockpage(bt, right, BtLockShared) )
2222 BtKey bt_key(BtDb *bt, uint slot)
2224 return keyptr(bt->cursor, slot);
2227 uid bt_uid(BtDb *bt, uint slot)
2229 return bt_getid(slotptr(bt->cursor,slot)->id);
2232 uint bt_tod(BtDb *bt, uint slot)
2234 return slotptr(bt->cursor,slot)->tod;
2239 // standalone program to index file of keys
2240 // then list them onto std-out
2242 int main (int argc, char **argv)
2244 uint slot, found = 0, line = 0, off = 0;
2245 int ch, cnt = 0, bits = 12;
2246 unsigned char key[256];
2247 clock_t done, start;
2257 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find [page_bits mapped_pool_pages start_line_number]", argv[0]);
2265 bits = atoi(argv[4]);
2268 map = atoi(argv[5]);
2271 fprintf (stderr, "Warning: mapped_pool > 65536 pages\n");
2274 off = atoi(argv[6]);
2276 bt = bt_open ((argv[1]), BT_rw, bits, map);
2279 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2283 switch(argv[3][0]| 0x20)
2286 fprintf(stderr, "started indexing for %s\n", argv[2]);
2287 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2288 while( ch = getc(in), ch != EOF )
2292 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2294 if( bt_insertkey (bt, key, len, ++line, *tod) )
2295 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2298 else if( len < 245 )
2300 fprintf(stderr, "finished adding keys, %d \n", line);
2304 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2305 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2306 while( ch = getc(in), ch != EOF )
2310 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2312 if( bt_deletekey (bt, key, len) )
2313 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2316 else if( len < 245 )
2318 fprintf(stderr, "finished deleting keys, %d\n", line);
2322 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2323 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2324 while( ch = getc(in), ch != EOF )
2328 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2330 if( bt_findkey (bt, key, len) )
2333 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2336 else if( len < 245 )
2338 fprintf(stderr, "finished search of %d keys, found %d\n", line, found);
2348 fprintf(stderr, " Time to complete: %.2f seconds\n", (float)(done - start) / CLOCKS_PER_SEC);
2353 fprintf(stderr, "started reading\n");
2355 slot = bt_startkey (bt, key, len);
2358 fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2361 while( slot = bt_nextkey (bt, slot) )
2363 ptr = bt_key(bt, slot);
2364 fwrite (ptr->key, ptr->len, 1, stdout);
2365 fputc ('\n', stdout);
2368 fprintf(stderr, " Total keys read %d\n", cnt);