1 // jaluta's balanced B-Link tree algorithms
4 // author: karl malbrain, malbrain@cal.berkeley.edu
7 This work, including the source code, documentation
8 and related data, is placed into the public domain.
10 The orginal author is Karl Malbrain.
12 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
13 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
14 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
15 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
16 RESULTING FROM THE USE, MODIFICATION, OR
17 REDISTRIBUTION OF THIS SOFTWARE.
20 // Please see the project home page for documentation
21 // http://code.google.com/p/high-concurrency-btree
23 #define _FILE_OFFSET_BITS 64
24 #define _LARGEFILE64_SOURCE
39 #define WIN32_LEAN_AND_MEAN
50 typedef unsigned long long uid;
53 typedef unsigned long long off64_t;
54 typedef unsigned short ushort;
55 typedef unsigned int uint;
58 #define BT_ro 0x6f72 // ro
59 #define BT_rw 0x7772 // rw
60 #define BT_fl 0x6c66 // fl
62 #define BT_maxbits 24 // maximum page size in bits
63 #define BT_minbits 9 // minimum page size in bits
64 #define BT_minpage (1 << BT_minbits) // minimum page size
66 #define BT_hashsize 512 // size of hash index for page cache
67 #define BT_hashprime 8191 // prime number for hashing
76 // Define the length of the page and key pointers
80 // Page key slot definition.
82 // If BT_maxbits is 16 or less, you can save 4 bytes
83 // for each key stored by making the first two uints
84 // into ushorts. You can also save 4 bytes by removing
85 // the tod field from the key.
88 uint off; // page offset for key start
89 uint tod; // time-stamp for key
90 unsigned char id[BtId]; // id associated with key
93 // The key structure occupies space at the upper end of
94 // each page. It's a length byte followed by up to
99 unsigned char key[255];
102 // The first part of an index page.
103 // It is immediately followed
104 // by the BtSlot array of keys.
107 uint cnt; // count of keys in page
108 uint min; // next key offset
109 unsigned char lvl:3; // level of page
110 unsigned char bits:5; // page size in bits
111 unsigned char fence; // len of fence key at top of page
112 unsigned char right[BtId]; // page number to right
113 BtSlot slots[0]; // page slots
116 // The memory mapping hash table entry
119 BtPage page; // mapped page pointer
120 uid page_no; // mapped page number
121 void *lruprev; // least recently used previous cache block
122 void *lrunext; // lru next cache block
123 void *hashprev; // previous cache block for the same hash idx
124 void *hashnext; // next cache block for the same hash idx
130 // The object structure for Btree access
133 uint page_size; // each page size
134 uint page_bits; // each page size in bits
135 uid parentpage; // current parent page number
136 uid cursorpage; // current cursor page number
137 uid childpage; // current child page number
139 uint mode; // read-write mode
140 uint mapped_io; // use memory mapping
141 BtPage temp; // temporary frame buffer (memory mapped/file IO)
142 BtPage alloc; // frame buffer for alloc page ( page 0 )
143 BtPage cursor; // cached frame for start/next (never mapped)
144 BtPage frame; // spare frame for the page split (never mapped)
145 BtPage parent; // current parent page
146 BtPage child; // current child page
147 BtPage sibling; // current sibling page
148 BtPage sibling2; // current sibling2 page
154 unsigned char *mem; // frame, cursor, page memory buffer
155 int nodecnt; // highest page cache node in use
156 int nodemax; // highest page cache node allocated
157 int hashmask; // number of hash headers in cache - 1
158 BtHash *lrufirst; // lru list head
159 BtHash *lrulast; // lru list tail
160 ushort cache[BT_hashsize]; // hash index for cache
161 BtHash nodes[1]; // page cache follows
176 extern void bt_close (BtDb *bt);
177 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk);
178 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod);
179 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len);
180 extern uid bt_findkey (BtDb *bt, unsigned char *key, uint len);
181 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
182 extern uint bt_nextkey (BtDb *bt, uint slot);
184 // Helper functions to return slot values
186 extern BtKey bt_key (BtDb *bt, uint slot);
187 extern uid bt_uid (BtDb *bt, uint slot);
188 extern uint bt_tod (BtDb *bt, uint slot);
190 // BTree page number constants
194 // The page is allocated from low and hi ends.
195 // The key offsets and row-id's are allocated
196 // from the bottom, while the text of the key
197 // is allocated from the top. When the two
198 // areas meet, the page overflowns.
200 // A key consists of a length byte, two bytes of
201 // index number (0 - 65535), and up to 253 bytes
202 // of key value. Duplicate keys are discarded.
203 // Associated with each key is a 48 bit row-id.
205 // The b-tree root is always located at page 1.
206 // The first leaf page of level zero is always
207 // located on page 2.
209 // The b-tree pages at each level are linked
210 // with next page to right to facilitate
211 // cursors and provide for concurrency.
213 // When to root page overflows, it is split in two and
214 // the tree height is raised by a new root at page
215 // one with two keys.
217 // Groups of pages from the btree are optionally
218 // cached with memory mapping. A hash table is used to keep
219 // track of the cached pages. This behaviour is controlled
220 // by the cache block size parameter to bt_open.
222 // To achieve maximum concurrency one page is locked at a time
223 // as the tree is traversed to find leaf key in question. The right
224 // page numbers are used in cases where the page is being split,
227 // Page 0 is dedicated to lock for new page extensions,
228 // and chains empty pages together for reuse.
230 // Empty nodes are chained together through the ALLOC page and reused.
232 // A special open mode of BT_fl is provided to safely access files on
233 // WIN32 networks. WIN32 network operations should not use memory mapping.
234 // This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
235 // to prevent local caching of network file contents.
237 // Access macros to address slot and key values from the page
239 #define slotptr(page, slot) ((page)->slots + slot - 1)
240 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
242 void bt_putid(unsigned char *dest, uid id)
247 dest[i] = (unsigned char)id, id >>= 8;
250 uid bt_getid(unsigned char *src)
255 for( i = 0; i < BtId; i++ )
256 id <<= 8, id |= *src++;
261 // place requested latch on requested page_no.
262 // the Shared latch is a read lock over segment 0
263 // the Update latch is a write lock over segment 1
264 // the Xclusive latch is a write lock over segment 0 & 1
265 // the Upgrade latch upgrades Update to Xclusive
267 BTERR bt_lockpage(BtDb *bt, uid page_no, BtLock mode)
269 off64_t off = page_no << bt->page_bits;
270 uint len = sizeof(*bt->parent);
273 int flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
274 struct flock lock[1];
281 case BtLockShared: // lock segment 0 w/read lock
285 case BtLockUpdate: // lock segment 1 w/write lock
286 off += sizeof(*bt->parent);
290 case BtLockXclusive:// lock both segments w/write lock
291 len += sizeof(*bt->parent);
295 case BtLockUpgrade: // lock segment 0 w/write lock
301 memset (lock, 0, sizeof(lock));
304 lock->l_type = type ? F_WRLCK : F_RDLCK;
308 if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
309 return bt->err = BTERR_lock;
313 memset (ovl, 0, sizeof(ovl));
314 ovl->OffsetHigh = (uint)(off >> 32);
315 ovl->Offset = (uint)off;
317 // use large offsets to
318 // simulate advisory locking
320 ovl->OffsetHigh |= 0x80000000;
323 flags |= LOCKFILE_EXCLUSIVE_LOCK;
325 if( LockFileEx (bt->idx, flags, 0, len, 0L, ovl) )
328 return bt->err = BTERR_lock;
332 // remove lock on requested page_no.
334 BTERR bt_unlockpage(BtDb *bt, uid page_no, BtLock mode)
336 off64_t off = page_no << bt->page_bits;
337 uint len = sizeof(*bt->parent);
339 struct flock lock[1];
345 case BtLockShared: // unlock segment 0
348 case BtLockUpdate: // unlock segment 1
349 off += sizeof(*bt->parent);
352 case BtLockXclusive:// unlock both segments
353 len += sizeof(*bt->parent);
356 case BtLockUpgrade: // unlock segment 0
361 memset (lock, 0, sizeof(lock));
364 lock->l_type = F_UNLCK;
368 if( fcntl (bt->idx, F_SETLK, lock) < 0 )
369 return bt->err = BTERR_lock;
371 memset (ovl, 0, sizeof(ovl));
372 ovl->OffsetHigh = (uint)(off >> 32);
373 ovl->Offset = (uint)off;
375 // use large offsets to
376 // simulate advisory locking
378 ovl->OffsetHigh |= 0x80000000;
380 if( !UnlockFileEx (bt->idx, 0, len, 0, ovl) )
381 return GetLastError(), bt->err = BTERR_lock;
387 // close and release memory
389 void bt_close (BtDb *bt)
393 // release mapped pages
395 if( hash = bt->lrufirst )
396 do munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
397 while(hash = hash->lrunext);
404 if( hash = bt->lrufirst )
407 FlushViewOfFile(hash->page, 0);
408 UnmapViewOfFile(hash->page);
409 CloseHandle(hash->hmap);
410 } while(hash = hash->lrunext);
413 VirtualFree (bt->mem, 0, MEM_RELEASE);
414 FlushFileBuffers(bt->idx);
415 CloseHandle(bt->idx);
420 // open/create new btree
421 // call with file_name, BT_openmode, bits in page size (e.g. 16),
422 // size of mapped page cache (e.g. 8192) or zero for no mapping.
424 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax)
426 BtLock lockmode = BtLockXclusive;
427 uint lvl, attr, cacheblk;
435 SYSTEM_INFO sysinfo[1];
439 bt = malloc (sizeof(BtDb) + nodemax * sizeof(BtHash));
440 memset (bt, 0, sizeof(BtDb));
442 switch (mode & 0x7fff)
446 bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
451 bt->idx = open ((char*)name, O_RDONLY);
452 lockmode = BtLockShared;
456 return free(bt), NULL;
459 cacheblk = 4096; // page size for unix
464 bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb) + nodemax * sizeof(BtHash));
465 attr = FILE_ATTRIBUTE_NORMAL;
466 switch (mode & 0x7fff)
469 attr |= FILE_FLAG_WRITE_THROUGH | FILE_FLAG_NO_BUFFERING;
472 bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
477 bt->idx = CreateFile(name, GENERIC_READ, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_EXISTING, attr, NULL);
478 lockmode = BtLockShared;
481 if( bt->idx == INVALID_HANDLE_VALUE )
482 return GlobalFree(bt), NULL;
484 // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
485 GetSystemInfo(sysinfo);
488 cacheblk = sysinfo->dwAllocationGranularity;
493 // determine sanity of page size
495 if( bits > BT_maxbits )
497 else if( bits < BT_minbits )
500 if ( bt_lockpage(bt, ALLOC_page, lockmode) )
501 return bt_close (bt), NULL;
506 // read minimum page size to get root info
508 if( size = lseek (bt->idx, 0L, 2) ) {
509 alloc = malloc (BT_minpage);
510 pread(bt->idx, alloc, BT_minpage, 0);
513 } else if( mode == BT_ro )
514 return bt_close (bt), NULL;
516 size = GetFileSize(bt->idx, amt);
519 alloc = VirtualAlloc(NULL, BT_minpage, MEM_COMMIT, PAGE_READWRITE);
520 if( !ReadFile(bt->idx, (char *)alloc, BT_minpage, amt, NULL) )
521 return bt_close (bt), NULL;
523 VirtualFree (alloc, 0, MEM_RELEASE);
524 } else if( mode == BT_ro )
525 return bt_close (bt), NULL;
528 bt->page_size = 1 << bits;
529 bt->page_bits = bits;
531 bt->nodemax = nodemax;
534 // setup cache mapping
537 if( cacheblk < bt->page_size )
538 cacheblk = bt->page_size;
540 bt->hashmask = (cacheblk >> bits) - 1;
545 bt->mem = malloc (8 *bt->page_size);
547 bt->mem = VirtualAlloc(NULL, 8 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
549 bt->frame = (BtPage)bt->mem;
550 bt->cursor = (BtPage)(bt->mem + bt->page_size);
551 bt->alloc = (BtPage)(bt->mem + 2 * bt->page_size);
552 bt->parent = (BtPage)(bt->mem + 3 * bt->page_size);
553 bt->child = (BtPage)(bt->mem + 4 * bt->page_size);
554 bt->temp = (BtPage)(bt->mem + 5 * bt->page_size);
555 bt->sibling = (BtPage)(bt->mem + 6 * bt->page_size);
556 bt->sibling2 = (BtPage)(bt->mem + 7 * bt->page_size);
559 if ( bt_unlockpage(bt, ALLOC_page, lockmode) )
560 return bt_close (bt), NULL;
565 // initialize an empty b-tree with alloc page & root page
567 memset (bt->alloc, 0, bt->page_size);
568 bt_putid(bt->alloc->right, ROOT_page + 1);
569 bt->alloc->bits = bt->page_bits;
572 if( write (bt->idx, bt->alloc, bt->page_size) < bt->page_size )
573 return bt_close (bt), NULL;
575 if( !WriteFile (bt->idx, (char *)bt->alloc, bt->page_size, amt, NULL) )
576 return bt_close (bt), NULL;
578 if( *amt < bt->page_size )
579 return bt_close (bt), NULL;
584 memset (bt->frame, 0, bt->page_size);
585 bt->frame->bits = bt->page_bits;
587 bt->frame->min = bt->page_size;
589 if( write (bt->idx, bt->frame, bt->page_size) < bt->page_size )
590 return bt_close (bt), NULL;
592 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
593 return bt_close (bt), NULL;
595 if( *amt < bt->page_size )
596 return bt_close (bt), NULL;
599 // create initial empty page area by writing last page of first
600 // cache area (other pages are zeroed by O/S)
602 if( bt->mapped_io && bt->hashmask > 2 ) {
603 memset(bt->frame, 0, bt->page_size);
606 pwrite(bt->idx, bt->frame, bt->page_size, bt->hashmask << bt->page_bits);
608 SetFilePointer (bt->idx, bt->hashmask << bt->page_bits, NULL, FILE_BEGIN);
609 if( !WriteFile (bt->idx, (char *)bt->frame, bt->page_size, amt, NULL) )
610 return bt_close (bt), NULL;
611 if( *amt < bt->page_size )
612 return bt_close (bt), NULL;
616 if( bt_unlockpage(bt, ALLOC_page, lockmode) )
617 return bt_close (bt), NULL;
622 // reset parent/child page pointers
624 void bt_resetpages (BtDb *bt)
629 bt->frame = (BtPage)bt->mem;
630 bt->cursor = (BtPage)(bt->mem + bt->page_size);
631 bt->alloc = (BtPage)(bt->mem + 2 * bt->page_size);
632 bt->parent = (BtPage)(bt->mem + 3 * bt->page_size);
633 bt->child = (BtPage)(bt->mem + 4 * bt->page_size);
634 bt->temp = (BtPage)(bt->mem + 5 * bt->page_size);
635 bt->sibling = (BtPage)(bt->mem + 6 * bt->page_size);
636 bt->sibling2 = (BtPage)(bt->mem + 7 * bt->page_size);
639 // return pointer to high key
640 // or NULL if infinite value
642 BtKey bt_highkey (BtDb *bt, BtPage page)
645 if( bt_getid (page->right) )
646 return keyptr(page, page->cnt);
650 if( bt_getid (page->right) )
651 return ((BtKey)((unsigned char*)(page) + bt->page_size - page->fence));
656 // return pointer to slot key in index page
657 // or NULL if infinite value
659 BtKey bt_slotkey (BtPage page, uint slot)
661 if( slot < page->cnt || bt_getid (page->right) )
662 return keyptr(page, slot);
667 // compare two keys, returning > 0, = 0, or < 0
668 // as the comparison value
670 int keycmp (BtKey key1, unsigned char *key2, uint len2)
672 uint len1 = key1->len;
675 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
686 // Update current page of btree by writing file contents
687 // or flushing mapped area to disk.
689 BTERR bt_update (BtDb *bt, BtPage page, uid page_no)
691 off64_t off = page_no << bt->page_bits;
694 if ( !bt->mapped_io )
695 if ( pwrite(bt->idx, page, bt->page_size, off) != bt->page_size )
696 return bt->err = BTERR_wrt;
699 if ( !bt->mapped_io )
701 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
702 if( !WriteFile (bt->idx, (char *)page, bt->page_size, amt, NULL) )
703 return GetLastError(), bt->err = BTERR_wrt;
705 if( *amt < bt->page_size )
706 return GetLastError(), bt->err = BTERR_wrt;
708 else if ( bt->mode == BT_fl ) {
709 FlushViewOfFile(page, bt->page_size);
710 FlushFileBuffers(bt->idx);
716 // find page in cache
718 BtHash *bt_findhash(BtDb *bt, uid page_no)
723 // compute cache block first page and hash idx
725 page_no &= ~bt->hashmask;
726 idx = (uint)(page_no * BT_hashprime % BT_hashsize);
729 hash = bt->nodes + bt->cache[idx];
733 do if( hash->page_no == page_no )
735 while(hash = hash->hashnext );
740 // add page cache entry to hash index
742 void bt_linkhash(BtDb *bt, BtHash *node, uid page_no)
744 uint idx = (uint)((page_no & ~bt->hashmask) * BT_hashprime % BT_hashsize);
747 if( bt->cache[idx] ) {
748 node->hashnext = hash = bt->nodes + bt->cache[idx];
749 hash->hashprev = node;
752 node->hashprev = NULL;
753 bt->cache[idx] = (ushort)(node - bt->nodes);
756 // remove cache entry from hash table
758 void bt_unlinkhash(BtDb *bt, BtHash *node)
760 uint idx = (uint)((node->page_no & ~bt->hashmask) * BT_hashprime % BT_hashsize);
764 if( hash = node->hashprev )
765 hash->hashnext = node->hashnext;
766 else if( hash = node->hashnext )
767 bt->cache[idx] = (ushort)(hash - bt->nodes);
771 if( hash = node->hashnext )
772 hash->hashprev = node->hashprev;
775 // add cache page to lru chain and map pages
777 BtPage bt_linklru(BtDb *bt, BtHash *hash, uid page_no)
780 off64_t off = (page_no & ~bt->hashmask) << bt->page_bits;
781 off64_t limit = off + ((bt->hashmask+1) << bt->page_bits);
784 memset(hash, 0, sizeof(BtHash));
785 hash->page_no = (page_no & ~bt->hashmask);
786 bt_linkhash(bt, hash, page_no);
788 if( node = hash->lrunext = bt->lrufirst )
789 node->lruprev = hash;
796 flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
797 hash->page = (BtPage)mmap (0, (bt->hashmask+1) << bt->page_bits, flag, MAP_SHARED, bt->idx, off);
798 if( (long long int)hash->page == -1LL )
799 return bt->err = BTERR_map, (BtPage)NULL;
802 flag = ( bt->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
803 hash->hmap = CreateFileMapping(bt->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
805 return bt->err = BTERR_map, NULL;
807 flag = ( bt->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
808 hash->page = MapViewOfFile(hash->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->hashmask+1) << bt->page_bits);
810 return bt->err = BTERR_map, NULL;
813 return (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
816 // find or place requested page in page-cache
817 // return memory address where page is located.
819 BtPage bt_hashpage(BtDb *bt, uid page_no)
821 BtHash *hash, *node, *next;
824 // find page in cache and move to top of lru list
826 if( hash = bt_findhash(bt, page_no) ) {
827 page = (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
828 // swap node in lru list
829 if( node = hash->lruprev ) {
830 if( next = node->lrunext = hash->lrunext )
831 next->lruprev = node;
835 if( next = hash->lrunext = bt->lrufirst )
836 next->lruprev = hash;
838 return bt->err = BTERR_hash, (BtPage)NULL;
840 hash->lruprev = NULL;
846 // map pages and add to cache entry
848 if( bt->nodecnt < bt->nodemax ) {
849 hash = bt->nodes + ++bt->nodecnt;
850 return bt_linklru(bt, hash, page_no);
853 // hash table is already full, replace last lru entry from the cache
855 if( hash = bt->lrulast ) {
856 // unlink from lru list
857 if( node = bt->lrulast = hash->lruprev )
858 node->lrunext = NULL;
860 return bt->err = BTERR_hash, (BtPage)NULL;
863 munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
865 FlushViewOfFile(hash->page, 0);
866 UnmapViewOfFile(hash->page);
867 CloseHandle(hash->hmap);
869 // unlink from hash table
871 bt_unlinkhash(bt, hash);
873 // map and add to cache
875 return bt_linklru(bt, hash, page_no);
878 return bt->err = BTERR_hash, (BtPage)NULL;
881 // map a btree page onto current page
883 BTERR bt_mappage (BtDb *bt, BtPage *page, uid page_no)
885 off64_t off = page_no << bt->page_bits;
890 if( bt->mapped_io ) {
892 *page = bt_hashpage(bt, page_no);
896 if ( pread(bt->idx, *page, bt->page_size, off) < bt->page_size )
897 return bt->err = BTERR_map;
899 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
901 if( !ReadFile(bt->idx, *page, bt->page_size, amt, NULL) )
902 return bt->err = BTERR_map;
904 if( *amt < bt->page_size )
905 return bt->err = BTERR_map;
910 // deallocate a deleted page
911 // place on free chain out of allocator page
912 // page must already be BtLockXclusive and mapped
914 BTERR bt_freepage (BtDb *bt, BtPage page, uid page_no)
916 // lock allocation page
918 if ( bt_lockpage(bt, ALLOC_page, BtLockUpdate) )
921 if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
924 // store chain in second right
925 bt_putid(page->right, bt_getid(bt->alloc[1].right));
926 bt_putid(bt->alloc[1].right, page_no);
928 if( bt_update(bt, bt->alloc, ALLOC_page) )
930 if( bt_update(bt, page, page_no) )
935 if( bt_unlockpage(bt, ALLOC_page, BtLockUpdate) )
941 // allocate a new page and write page into it
943 uid bt_newpage(BtDb *bt, BtPage page)
951 if ( bt_lockpage(bt, ALLOC_page, BtLockUpdate) )
954 if( bt_mappage (bt, &bt->alloc, ALLOC_page) )
957 // use empty chain first
958 // else allocate empty page
960 if( new_page = bt_getid(bt->alloc[1].right) ) {
961 if( bt_mappage (bt, &bt->temp, new_page) )
962 return 0; // don't unlock on error
963 bt_putid(bt->alloc[1].right, bt_getid(bt->temp->right));
966 new_page = bt_getid(bt->alloc->right);
967 bt_putid(bt->alloc->right, new_page+1);
971 if( bt_update(bt, bt->alloc, ALLOC_page) )
972 return 0; // don't unlock on error
976 if ( bt_unlockpage(bt, ALLOC_page, BtLockUpdate) )
979 if( !bt->mapped_io ) {
980 if( bt_update(bt, page, new_page) )
981 return 0; //don't unlock on error
985 if ( bt_unlockpage(bt, ALLOC_page, BtLockWrite) )
992 if ( pwrite(bt->idx, page, bt->page_size, new_page << bt->page_bits) < bt->page_size )
993 return bt->err = BTERR_wrt, 0;
995 // if writing first page of hash block, zero last page in the block
997 if ( !reuse && bt->hashmask > 0 && (new_page & bt->hashmask) == 0 )
999 // use temp buffer to write zeros
1000 memset(bt->temp, 0, bt->page_size);
1001 if ( pwrite(bt->idx,bt->temp, bt->page_size, (new_page | bt->hashmask) << bt->page_bits) < bt->page_size )
1002 return bt->err = BTERR_wrt, 0;
1005 // bring new page into page-cache and copy page.
1006 // this will extend the file into the new pages.
1008 if( !(pmap = (char*)bt_hashpage(bt, new_page & ~bt->hashmask)) )
1011 memcpy(pmap+((new_page & bt->hashmask) << bt->page_bits), page, bt->page_size);
1017 // find slot in given page for given key
1019 int bt_findslot (BtPage page, unsigned char *key, uint len)
1021 uint diff, higher = page->cnt, low = 1, slot;
1024 // make last key an infinite fence value
1026 if( !page->lvl || bt_getid (page->right) )
1031 // low is the next candidate, higher is already
1032 // tested as .ge. the given key, loop ends when they meet
1035 while( diff = higher - low ) {
1036 slot = low + ( diff >> 1 );
1037 if( keycmp (keyptr(page, slot), key, len) < 0 )
1040 higher = slot, good++;
1043 // return zero if key is beyond highkey value
1046 return good ? higher : 0;
1049 // split full parent node
1051 BTERR bt_splitparent (BtDb *bt, unsigned char *key, uint len)
1053 uint cnt = 0, idx = 0, max, nxt = bt->page_size;
1054 uid parentpage = bt->parentpage, right;
1055 BtPage page = bt->parent;
1059 // upgrade parent latch to Xclusive
1061 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
1064 // split higher half of keys to bt->frame
1066 memset (bt->frame, 0, bt->page_size);
1067 max = (int)page->cnt;
1071 // link right sibling node into new right page
1073 right = bt_getid (page->right);
1074 bt_putid(bt->frame->right, right);
1076 // record higher fence key in new right leaf page
1078 if( bt->frame->fence = page->fence ) {
1079 memcpy ((unsigned char *)bt->frame + bt->page_size - bt->frame->fence, (unsigned char *)(page) + bt->page_size - bt->frame->fence, bt->frame->fence);
1083 while( cnt++ < max ) {
1085 // copy key, but not infinite values
1087 if( cnt < max || !page->lvl || right ) {
1088 ptr = keyptr(page, cnt);
1089 nxt -= ptr->len + 1;
1090 memcpy ((unsigned char *)bt->frame + nxt, ptr, ptr->len + 1);
1095 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1096 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1097 slotptr(bt->frame, idx)->off = nxt;
1100 bt->frame->bits = bt->page_bits;
1101 bt->frame->lvl = page->lvl;
1102 bt->frame->min = nxt;
1103 bt->frame->cnt = idx;
1105 // get new free page and write right frame to it.
1107 if( !(new_page = bt_newpage(bt, bt->frame)) )
1110 // update lower keys to continue in old page
1112 memcpy (bt->frame, page, bt->page_size);
1113 memset (page+1, 0, bt->page_size - sizeof(*page));
1114 nxt = bt->page_size;
1119 // record fence key in left leaf page
1122 ptr = keyptr(bt->frame, max);
1123 nxt -= ptr->len + 1;
1124 memcpy ((unsigned char *)page + nxt, ptr, ptr->len + 1);
1125 page->fence = ptr->len + 1;
1128 // assemble page of smaller keys
1129 // no infinite value to deal with
1131 while( cnt++ < max ) {
1132 ptr = keyptr(bt->frame, cnt);
1133 nxt -= ptr->len + 1;
1134 memcpy ((unsigned char *)page + nxt, ptr, ptr->len + 1);
1135 memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1136 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1137 slotptr(page, idx)->off = nxt;
1140 bt_putid(page->right, new_page);
1146 if( bt_update(bt, page, parentpage) )
1149 // decide to move to new right
1150 // node or stay on left node
1152 ptr = bt_highkey (bt, page);
1154 if( keycmp (ptr, key, len) >= 0 )
1155 return bt_unlockpage (bt, parentpage, BtLockUpgrade);
1157 bt->parentpage = new_page;
1159 if( bt_mappage (bt, &bt->parent, new_page) )
1161 if( bt_lockpage (bt, new_page, BtLockUpdate) )
1163 if( bt_unlockpage (bt, parentpage, BtLockUpgrade) )
1166 return bt_unlockpage (bt, parentpage, BtLockUpdate);
1169 // add unlinked node key into parent
1170 // childpage is existing record
1171 // siblingpage is right record
1173 BTERR bt_parentlink (BtDb *bt, BtPage left, uid leftpage, BtKey rightkey, uid rightpage)
1175 BtKey leftkey = bt_highkey (bt, left);
1176 BtPage page = bt->parent;
1179 // upgrade parent latch to exclusive
1181 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
1184 // find the existing right high key in the parent
1185 // and fix the downlink to point to right page
1188 if( !(slot = bt_findslot (page, rightkey->key, rightkey->len)) )
1189 return bt->err = BTERR_struct;
1193 bt_putid(slotptr(page,slot)->id, rightpage);
1195 // calculate next available slot and copy left key onto page
1197 page->min -= leftkey->len + 1; // reset lowest used offset
1198 memcpy ((unsigned char *)page + page->min, leftkey, leftkey->len + 1);
1200 // now insert key into array before slot
1205 *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
1207 bt_putid(slotptr(page,slot)->id, leftpage);
1208 slotptr(page, slot)->off = page->min;
1210 if ( bt_update(bt, page, bt->parentpage) )
1213 // downgrade parent page lock to BtLockUpdate
1215 if( bt_unlockpage(bt, bt->parentpage, BtLockUpgrade) )
1221 // remove slot from parent
1223 void bt_removeslot(BtDb *bt, uint slot)
1225 uint nxt = bt->page_size, amt;
1226 BtPage page = bt->parent;
1227 uint cnt = 0, idx = 0;
1228 uint max = page->cnt;
1232 memcpy (bt->frame, page, bt->page_size);
1234 // skip page info and set rest of page to zero
1235 memset (page+1, 0, bt->page_size - sizeof(*page));
1237 // copy fence key onto new page
1238 if( amt = page->fence ) {
1240 memcpy ((unsigned char *)page + nxt, (unsigned char *)(bt->frame) + nxt, amt);
1243 right = bt_getid (page->right);
1245 while( cnt++ < max ) {
1247 // skip key to delete
1252 // copy key, but not infinite value
1254 if( cnt < max || !page->lvl || right ) {
1255 key = keyptr(bt->frame, cnt);
1256 nxt -= key->len + 1;
1257 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1261 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1262 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1263 slotptr(page, idx)->off = nxt;
1269 // unlink sibling node
1271 BTERR bt_parentunlink (BtDb *bt, BtPage child, uid childpage)
1273 BtKey parentkey = bt_highkey (bt, child);
1276 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
1279 // delete child's slot from parent
1281 if( slot = bt_findslot (bt->parent, parentkey->key, parentkey->len) )
1282 bt_removeslot (bt, slot);
1284 return bt->err + BTERR_struct;
1286 // sibling now in the slot
1288 bt_putid(slotptr(bt->parent,slot)->id, childpage);
1290 if ( bt_update(bt, bt->parent, bt->parentpage) )
1293 // unlock parent page completely
1295 if( bt_unlockpage (bt, bt->parentpage, BtLockUpgrade) )
1298 return bt_unlockpage(bt, bt->parentpage, BtLockUpdate);
1301 // merge right sibling page into child page
1303 BTERR bt_mergepages (BtDb *bt, BtPage *right, uid rightpage)
1305 BtPage *left = &bt->child;
1309 if( bt_lockpage (bt, bt->childpage, BtLockUpgrade) )
1311 if( bt_lockpage (bt, rightpage, BtLockUpgrade) )
1314 // initialize empty frame
1316 memset (bt->frame, 0, bt->page_size);
1317 *bt->frame = **right;
1319 // copy right fence key
1321 if( amt = (*right)->fence )
1322 memcpy ((unsigned char *)bt->frame + bt->page_size - amt, (unsigned char *)(*right) + bt->page_size - amt, amt);
1324 bt->frame->min = bt->page_size - amt;
1326 // copy lowerkey key/values from left page
1328 for( idx = 1; idx <= (*left)->cnt; idx++ ) {
1329 ptr = keyptr(*left, idx);
1330 bt->frame->min -= ptr->len + 1;
1331 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1332 memcpy (slotptr(bt->frame, idx)->id, slotptr(*left, idx)->id, BtId);
1333 slotptr(bt->frame, idx)->tod = slotptr(*left, idx)->tod;
1334 slotptr(bt->frame, idx)->off = bt->frame->min;
1337 bt->frame->cnt = (*left)->cnt;
1339 // copy higherkey key/values from right page
1341 for( idx = 1; idx <= (*right)->cnt; idx++ ) {
1343 // copy key but not infinite value
1345 if( idx < (*right)->cnt || !(*right)->lvl || right ) {
1346 ptr = keyptr(*right, idx);
1347 bt->frame->min -= ptr->len + 1;
1348 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1352 memcpy (slotptr(bt->frame, bt->frame->cnt + idx)->id, slotptr(*right, idx)->id, BtId);
1353 slotptr(bt->frame, bt->frame->cnt + idx)->tod = slotptr(*right, idx)->tod;
1354 slotptr(bt->frame, bt->frame->cnt + idx)->off = bt->frame->min;
1357 bt->frame->cnt += (*right)->cnt;
1358 memcpy (*left, bt->frame, bt->page_size);
1360 if( bt_update (bt, *left, bt->childpage) )
1362 if( bt_freepage (bt, *right, rightpage) )
1364 if( bt_unlockpage (bt, rightpage, BtLockUpgrade) )
1366 if( bt_unlockpage (bt, rightpage, BtLockUpdate) )
1369 return bt_unlockpage (bt, bt->childpage, BtLockUpgrade);
1372 // redistribute right sibling page with child page
1373 // switch child page to containing page
1375 BTERR bt_redistribute (BtDb *bt, BtPage *right, uid rightpage, unsigned char *key, uint len)
1377 uid siblingpage = bt_getid((*right)->right);
1378 BtPage *left = &bt->child;
1379 uint idx, cnt = 0, amt = 0;
1380 uint leftmax, rightmin;
1384 if( bt_lockpage (bt, bt->childpage, BtLockUpgrade) )
1386 if( bt_lockpage (bt, rightpage, BtLockUpgrade) )
1389 // initialize empty frames to contain redistributed pages
1391 memset (bt->frame, 0, bt->page_size); // new left page
1392 memset (bt->temp, 0, bt->page_size); // new right page
1394 *bt->frame = **left;
1395 *bt->temp = **right;
1397 bt->frame->min = bt->page_size;
1398 bt->temp->min = bt->page_size;
1402 // find new left fence index
1403 // and copy left fence key
1405 if( (*left)->cnt > (*right)->cnt ) {
1407 leftmax = (*left)->cnt / 2;
1408 if( !bt->frame->lvl ) {
1409 ptr = keyptr(*left, leftmax);
1410 bt->frame->fence = ptr->len + 1;
1411 bt->frame->min -= ptr->len + 1;
1412 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1415 leftmax = (*left)->cnt;
1416 rightmin = (*right)->cnt / 2;
1417 if( !bt->frame->lvl ) {
1418 ptr = keyptr(*right, rightmin);
1419 bt->frame->fence = ptr->len + 1;
1420 bt->frame->min -= ptr->len + 1;
1421 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1425 // right fence stays the same, if any
1427 if( amt = (*right)->fence ) {
1428 bt->temp->min -= amt;
1429 memcpy ((unsigned char *)bt->temp + bt->temp->min, (unsigned char *)(*right) + bt->temp->min, amt);
1432 // copy first set of lowerkey key/values from left page
1434 for( idx = 1; idx <= leftmax; idx++ ) {
1436 ptr = keyptr(*left, idx);
1437 bt->frame->min -= ptr->len + 1;
1438 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1439 memcpy (slotptr(bt->frame, idx)->id, slotptr(*left, idx)->id, BtId);
1440 slotptr(bt->frame, idx)->tod = slotptr(*left, idx)->tod;
1441 slotptr(bt->frame, idx)->off = bt->frame->min;
1444 // copy remaining left page key/values from right page, if any
1446 for( idx = 1; idx <= rightmin; idx++ ) {
1448 ptr = keyptr(*right, idx);
1449 bt->frame->min -= ptr->len + 1;
1450 memcpy ((unsigned char *)bt->frame + bt->frame->min, ptr, ptr->len + 1);
1451 memcpy (slotptr(bt->frame, bt->frame->cnt)->id, slotptr(*right, idx)->id, BtId);
1452 slotptr(bt->frame, bt->frame->cnt)->tod = slotptr(*right, idx)->tod;
1453 slotptr(bt->frame, bt->frame->cnt)->off = bt->frame->min;
1456 // copy remaining left page key/values into new right page, if any
1458 for( idx = leftmax; idx <= (*left)->cnt; idx++ ) {
1460 ptr = keyptr(*left, idx);
1461 bt->temp->min -= ptr->len + 1;
1462 memcpy ((unsigned char *)bt->temp + bt->temp->min, ptr, ptr->len + 1);
1463 memcpy (slotptr(bt->temp, bt->temp->cnt)->id, slotptr(*left, idx)->id, BtId);
1464 slotptr(bt->temp, bt->temp->cnt)->tod = slotptr(*left, idx)->tod;
1465 slotptr(bt->temp, bt->temp->cnt)->off = bt->temp->min;
1468 // copy rest of higherkey key/values from right page
1470 for( idx = rightmin; idx <= (*right)->cnt; idx++ ) {
1472 // copy key, but not infinite value
1474 if( idx < (*right)->cnt || !(*right)->lvl || siblingpage ) {
1475 ptr = keyptr(*right, idx);
1476 bt->temp->min -= ptr->len + 1;
1477 memcpy ((unsigned char *)bt->temp + bt->temp->min, ptr, ptr->len + 1);
1483 memcpy (slotptr(bt->temp, bt->temp->cnt)->id, slotptr(*right, idx)->id, BtId);
1484 slotptr(bt->temp, bt->temp->cnt)->tod = slotptr(*right, idx)->tod;
1485 slotptr(bt->temp, bt->temp->cnt)->off = bt->temp->min;
1488 memcpy (*left, bt->frame, bt->page_size);
1489 memcpy (*right, bt->temp, bt->page_size);
1491 if( bt_update (bt, *left, bt->childpage) )
1493 if( bt_update (bt, *right, rightpage) )
1496 if( bt_unlockpage (bt, rightpage, BtLockUpgrade) )
1498 if( bt_unlockpage (bt, rightpage, BtLockUpdate) )
1501 ptr = bt_highkey (bt, *left);
1503 // decide which page is the child page
1504 // if leftkey >= our key, go with left
1506 if( keycmp (ptr, key, len) >= 0 ) {
1507 if( bt_unlockpage (bt, bt->childpage, BtLockUpgrade) )
1509 if( bt_unlockpage (bt, rightpage, BtLockUpgrade) )
1511 if( bt_unlockpage (bt, rightpage, BtLockUpdate) )
1514 if( bt_unlockpage (bt, rightpage, BtLockUpgrade) )
1516 if( bt_unlockpage (bt, bt->childpage, BtLockUpgrade) )
1518 if( bt_unlockpage (bt, bt->childpage, BtLockUpdate) )
1523 bt->childpage = rightpage;
1529 // lower the root level by removing the child node
1531 BTERR bt_lowerroot(BtDb *bt)
1533 if( bt_lockpage (bt, ROOT_page, BtLockUpgrade) )
1536 if( bt_lockpage (bt, bt->childpage, BtLockUpgrade) )
1539 memcpy (bt->parent, bt->child, bt->page_size);
1541 if( bt_update(bt, bt->parent, ROOT_page) )
1544 if( bt_freepage(bt, bt->child, bt->childpage) )
1547 if( bt_unlockpage (bt, bt->childpage, BtLockUpgrade) )
1550 if( bt_unlockpage (bt, bt->childpage, BtLockUpdate) )
1553 return bt_unlockpage (bt, ROOT_page, BtLockUpgrade);
1556 // split the root and raise the height of the btree
1557 // return with parent page set to appropriate sibling
1559 BTERR bt_raiseroot(BtDb *bt, uid sibling, unsigned char *key, uint len)
1561 unsigned char lowerkey[256];
1562 uint nxt = bt->page_size;
1563 BtPage root = bt->parent;
1567 // upgrade root page lock to exclusive
1569 if( bt_lockpage (bt, ROOT_page, BtLockUpgrade) )
1572 // Obtain an empty page to use, and copy the current
1573 // root (lower half) contents into it
1575 if( !(new_page = bt_newpage(bt, root)) )
1578 if( bt_lockpage (bt, new_page, BtLockUpdate) )
1581 // save high fence key for left page
1583 ptr = bt_highkey(bt, root);
1584 memcpy (lowerkey, ptr, ptr->len + 1);
1586 // preserve the page info at the bottom
1587 // and set rest to zero to initialize new root page
1589 memset(root+1, 0, bt->page_size - sizeof(*root));
1591 // insert left key in newroot page
1593 nxt -= *lowerkey + 1;
1594 memcpy ((unsigned char *)root + nxt, lowerkey, *lowerkey + 1);
1595 bt_putid(slotptr(root, 1)->id, new_page);
1596 slotptr(root, 1)->off = nxt;
1598 // insert second (infinite) key on newroot page that's never examined
1599 // and increase the root level
1601 bt_putid(slotptr(root, 2)->id, sibling);
1602 bt_putid(root->right, 0);
1604 root->min = nxt; // reset lowest used offset and key count
1609 if( bt_update(bt, root, bt->parentpage) )
1612 if( bt_unlockpage(bt, ROOT_page, BtLockUpgrade) )
1615 if( bt_unlockpage(bt, ROOT_page, BtLockUpdate) )
1618 // decide which root node to continue with
1619 // sibling has upper keys, newpage the lower ones
1621 if( keycmp((BtKey)lowerkey, key, len) < 0 ) {
1622 bt->parentpage = sibling; // go with the upper ones
1624 if( bt_unlockpage (bt, new_page, BtLockUpdate) )
1627 return bt_mappage (bt, &bt->parent, sibling);
1630 bt->parentpage = new_page; // go with the lower ones
1632 if( bt_unlockpage (bt, sibling, BtLockUpdate) )
1635 return bt_mappage (bt, &bt->parent, new_page);
1638 // handle underflowing child node
1640 BTERR bt_repairchild (BtDb *bt, uint parentslot, unsigned char *key, uint len)
1642 BtKey parentkey = bt_slotkey(bt->parent, parentslot);
1643 BtKey fencekey = bt_highkey (bt, bt->child);
1644 BtKey highkey = bt_highkey(bt, bt->parent);
1645 BtKey siblingkey, siblingkey2;
1646 uid siblingpage, siblingpage2;
1651 // high key is never NULL
1652 // fence key is null on right end
1655 if( !highkey || keycmp (fencekey, highkey->key, highkey->len) < 0 ) {
1657 // childpage is not rightmost child of parent page
1659 siblingpage = bt_getid(bt->child->right);
1661 if( bt_lockpage (bt, siblingpage, BtLockUpdate) )
1664 if( bt_mappage (bt, &bt->sibling, siblingpage) )
1667 if( !parentkey || keycmp (fencekey, parentkey->key, parentkey->len) < 0 ) {
1669 // sibling is not linked in parent, so we can merge it
1671 if( bt_unlockpage (bt, bt->parentpage, BtLockUpdate) )
1674 // calculate size of merged page by adding single child key to sibling
1676 fencekey = keyptr(bt->child, 1);
1678 if( bt->sibling->min < (bt->sibling->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->sibling) + fencekey->len + 1)
1679 return bt_redistribute (bt, &bt->sibling, siblingpage, key, len);
1681 return bt_mergepages (bt, &bt->sibling, siblingpage);
1684 // sibling has a key in the parent node
1685 // find its slot in parent
1687 if( siblingkey = bt_highkey (bt, bt->sibling) )
1688 slot = bt_findslot (bt->parent, siblingkey->key, siblingkey->len);
1690 slot = bt->parent->cnt;
1692 parentkey = bt_slotkey (bt->parent, slot);
1693 siblingpage2 = bt_getid(bt->sibling->right);
1695 if( siblingkey && parentkey && keycmp (siblingkey, parentkey->key, parentkey->len) < 0 ) {
1697 // sibling2 is not linked in P, its key is parentkey
1698 // can parent support its insertion?
1699 // if not, split parent node first.
1701 if( bt->parent->min < (bt->parent->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->parent) + parentkey->len + 1) {
1702 if( bt_splitparent (bt, key, len) )
1705 // are we in the correct half of new parent nodes?
1706 // if not, restart the function.
1708 if( slot = bt_findslot (bt->parent, siblingkey->key, siblingkey->len) )
1709 parentkey = keyptr(bt->parent, slot);
1711 return BTERR_restart;
1714 // link right of sibling into parent
1716 if( bt_parentlink (bt, bt->sibling, siblingpage, parentkey, siblingpage2) )
1719 // unlink sibling from parent
1721 if( bt_parentunlink (bt, bt->child, bt->childpage) )
1724 fencekey = keyptr(bt->child, 1);
1726 if( bt->sibling->min < (bt->sibling->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->sibling) + fencekey->len + 1)
1727 return bt_redistribute (bt, &bt->sibling, siblingpage, key, len);
1729 return bt_mergepages (bt, &bt->sibling, siblingpage);
1732 // unlink sibling from parent and merge
1734 if( bt_parentunlink (bt, bt->child, bt->childpage) )
1737 fencekey = keyptr(bt->child, 1);
1739 if( bt->sibling->min < (bt->sibling->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->sibling) + fencekey->len + 1)
1740 return bt_redistribute (bt, &bt->sibling, siblingpage, key, len);
1742 return bt_mergepages (bt, &bt->sibling, siblingpage);
1746 // child is rightmost key in the parent,
1747 // work with nodes to left.
1749 siblingpage = bt_getid(slotptr(bt->parent, parentslot - 1)->id);
1750 siblingkey = keyptr(bt->parent, parentslot - 1);
1752 if( bt_unlockpage (bt, bt->childpage, BtLockUpdate) )
1754 if( bt_lockpage (bt, siblingpage, BtLockUpdate) )
1756 if( bt_mappage (bt, &bt->sibling, siblingpage) )
1759 siblingpage2 = bt_getid(bt->sibling->right);
1760 siblingkey2 = bt_highkey (bt, bt->sibling);
1762 if( bt_lockpage (bt, siblingpage2, BtLockUpdate) )
1764 if( bt_mappage (bt, &bt->sibling2, siblingpage2) )
1767 if( keycmp (siblingkey, siblingkey2->key, siblingkey2->len) == 0 ) {
1769 // left sibling right (mapped into sibling2) is our child node
1771 swap = bt->sibling2;
1772 bt->sibling2 = bt->child;
1775 // does child still need to merge/redistribute?
1777 if( bt->child->cnt > 1 ) {
1778 if( bt_unlockpage (bt, bt->parentpage, BtLockUpdate) )
1781 return bt_unlockpage (bt, siblingpage, BtLockUpdate);
1785 bt->sibling = bt->child;
1788 bt->childpage = siblingpage;
1789 siblingpage = siblingpage2;
1791 // unlink sibling node from parent and merge with child
1793 if( bt_parentunlink (bt, bt->child, bt->childpage) )
1796 fencekey = keyptr(bt->sibling, 1);
1798 if( bt->child->min < (bt->child->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->sibling) + fencekey->len + 1)
1799 return bt_redistribute (bt, &bt->sibling, siblingpage, key, len);
1801 return bt_mergepages (bt, &bt->sibling, siblingpage);
1804 // currently unlinked sibling2 merges with child
1806 fencekey = bt_highkey (bt, bt->sibling2);
1808 if( bt->parent->min < (bt->parent->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->parent) + fencekey->len + 1)
1809 if( bt_splitparent (bt, key, len) )
1812 if( bt_parentlink (bt, bt->sibling, siblingpage, fencekey, siblingpage2) )
1815 if( bt_unlockpage (bt, siblingpage, BtLockUpdate) )
1817 if( bt_lockpage (bt, bt->childpage, BtLockUpdate) )
1819 if( bt_mappage (bt, &bt->child, bt->childpage) )
1822 if( bt->child->cnt > 1 ) {
1823 if( bt_unlockpage (bt, bt->parentpage, BtLockUpdate) )
1825 return bt_unlockpage (bt, siblingpage2, BtLockUpdate);
1828 // unlink child from parent node leaving immediate
1829 // left sibling2 to accept merge/redistribution
1831 if( bt_parentunlink (bt, bt->sibling2, siblingpage2) )
1834 swap = bt->sibling2;
1835 bt->sibling2 = bt->child;
1837 bt->childpage = siblingpage2;
1839 fencekey = keyptr(bt->sibling2, 1);
1841 if( bt->child->min < (bt->child->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->child) + fencekey->len + 1)
1842 return bt_redistribute (bt, &bt->sibling2, siblingpage2, key, len);
1844 return bt_mergepages (bt, &bt->sibling2, siblingpage2);
1847 // find and load leaf page for given key
1848 // leave page BtLockShared, return with key's slot
1850 int bt_loadpageread (BtDb *bt, unsigned char *key, uint len)
1852 uid page_no = ROOT_page, prevpage = 0;
1855 // start at root of btree and drill down
1858 bt->parentpage = page_no;
1860 if( bt_lockpage(bt, bt->parentpage, BtLockShared) )
1864 if( bt_unlockpage(bt, prevpage, BtLockShared) )
1867 // map/obtain page contents
1869 if( bt_mappage (bt, &bt->parent, page_no) )
1872 // find key on page at this level
1873 // return if leaf page
1875 if( (slot = bt_findslot (bt->parent, key, len)) ) {
1876 if( !bt->parent->lvl )
1879 // continue down to next level
1881 page_no = bt_getid(slotptr(bt->parent, slot)->id);
1884 // or slide right into next page
1887 page_no = bt_getid(bt->parent->right);
1889 prevpage = bt->parentpage;
1892 // return EOF on end of right chain
1894 if( bt_unlockpage(bt, bt->parentpage, BtLockShared) )
1897 return 0; // return EOF
1900 // find and load leaf page for given key
1901 // return w/slot # on leaf page
1902 // leave page BtLockUpdate
1904 int bt_loadpageupdate (BtDb *bt, unsigned char *key, uint len)
1906 uid parentpage = 0, nextpage;
1907 BtKey fencekey, parentkey;
1911 // start at root of btree and drill down
1913 if( bt_lockpage(bt, ROOT_page, BtLockUpdate) )
1916 // map/obtain page contents
1918 if( bt_mappage (bt, &bt->parent, ROOT_page) )
1921 bt->parentpage = ROOT_page;
1924 // if root page, check for tree level growth
1925 // by existence of right pointer
1927 if( bt->parentpage == ROOT_page )
1928 if( nextpage = bt_getid(bt->parent->right) ) {
1929 if( bt_lockpage (bt, nextpage, BtLockUpdate) )
1932 if( bt_raiseroot (bt, nextpage, key, len) )
1936 // find key on page at this level
1937 // return if leaf page
1939 slot = bt_findslot (bt->parent, key, len);
1941 if( !bt->parent->lvl )
1944 // lock & map the child
1946 bt->childpage = bt_getid(slotptr(bt->parent, slot)->id);
1948 if( bt_lockpage(bt, bt->childpage, BtLockUpdate) )
1951 if( bt_mappage (bt, &bt->child, bt->childpage) )
1954 // check child for underflow
1956 if( bt->parentpage == ROOT_page )
1957 if( bt->parent->cnt == 1 )
1958 if( !bt_getid(bt->child->right) ) {
1959 if( bt_lowerroot (bt) )
1961 nextpage = ROOT_page;
1965 if( bt->child->cnt == 1 ) {
1966 while( bt_repairchild (bt, slot, key, len) == BTERR_restart );
1971 bt->child = bt->parent;
1973 nextpage = bt->childpage;
1977 fencekey = bt_highkey (bt, bt->child);
1978 parentkey = bt_slotkey(bt->parent, slot);
1980 // if right sibling is not linked,
1981 // fix links in parent node
1984 if( !parentkey || keycmp (fencekey, parentkey->key, parentkey->len) < 0 ) {
1986 if( bt->parent->min < (bt->parent->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->parent) + fencekey->len + 1)
1987 if( bt_splitparent (bt, key, len) ) {
1990 slot = bt_findslot (bt->parent, key, len);
1991 parentkey = bt_slotkey(bt->parent, slot);
1994 // add key to parent for child page
1995 // and fix downlink for childpage
1997 nextpage = bt_getid(bt->child->right);
1999 if( bt_parentlink (bt, bt->child, bt->childpage, parentkey, nextpage) )
2003 // unlock the parent page
2005 if( bt_unlockpage (bt, bt->parentpage, BtLockUpdate) )
2008 // is our key on this child page?
2009 // is fencekey infinite, or .ge. our key
2011 if( !fencekey || keycmp (fencekey, key, len) >= 0 ) {
2013 bt->child = bt->parent;
2015 nextpage = bt->childpage;
2019 // otherwise slide right into next page
2021 nextpage = bt_getid(bt->child->right);
2023 if( bt_lockpage (bt, nextpage, BtLockUpdate) )
2026 if( bt_unlockpage (bt, bt->childpage, BtLockUpdate) )
2029 if( bt_mappage (bt, &bt->parent, nextpage) )
2032 } while( bt->parentpage = nextpage );
2034 // return error on end of right chain
2036 bt->err = BTERR_struct;
2037 return 0; // return error
2040 // find and delete key on leaf page
2042 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
2050 if( slot = bt_loadpageupdate (bt, key, len) )
2051 ptr = keyptr(bt->parent, slot);
2055 // if key is found delete it, otherwise ignore request
2057 if( slot && !keycmp (ptr, key, len) ) {
2058 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
2061 bt_removeslot (bt, slot);
2063 if( bt_update(bt, bt->parent, bt->parentpage) )
2066 if( bt_unlockpage (bt, bt->parentpage, BtLockUpgrade) )
2070 return bt_unlockpage (bt, bt->parentpage, BtLockUpdate);
2073 // find key in leaf page and return row-id
2074 // or zero if key is not found.
2076 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
2084 if( slot = bt_loadpageread (bt, key, len) )
2085 ptr = keyptr(bt->parent, slot);
2089 // if key exists, return row-id
2090 // otherwise return 0
2092 if( ptr->len == len && !memcmp (ptr->key, key, len) )
2093 id = bt_getid(slotptr(bt->parent,slot)->id);
2097 if ( bt_unlockpage(bt, bt->parentpage, BtLockShared) )
2103 // Insert new key into the btree leaf page
2105 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod)
2110 if( slot = bt_loadpageupdate (bt, key, len) )
2111 ptr = keyptr(bt->parent, slot);
2115 if( bt->parent->lvl )
2117 if( bt->parent->lvl )
2121 // if key already exists, update id and return
2124 if( !keycmp (ptr, key, len) ) {
2125 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
2127 slotptr(bt->parent, slot)->tod = tod;
2128 bt_putid(slotptr(bt->parent,slot)->id, id);
2129 if ( bt_update(bt, bt->parent, bt->parentpage) )
2131 if( bt_unlockpage (bt, bt->parentpage, BtLockUpgrade) )
2133 return bt_unlockpage(bt, bt->parentpage, BtLockUpdate);
2136 // check if leaf page has enough space
2138 if( bt->parent->min < (bt->parent->cnt + 1) * sizeof(BtSlot) + sizeof(*bt->parent) + len + 1) {
2139 if( bt_splitparent (bt, key, len) )
2142 slot = bt_findslot (bt->parent, key, len);
2145 // calculate next available slot and copy key into page
2147 if( bt_lockpage (bt, bt->parentpage, BtLockUpgrade) )
2150 bt->parent->min -= len + 1; // reset lowest used offset
2151 ((unsigned char *)bt->parent)[bt->parent->min] = len;
2152 memcpy ((unsigned char *)bt->parent + bt->parent->min +1, key, len );
2154 // now insert key into array before slot
2156 idx = ++bt->parent->cnt;
2160 *slotptr(bt->parent, idx) = *slotptr(bt->parent, idx -1), idx--;
2162 bt_putid(slotptr(bt->parent,idx)->id, id);
2163 slotptr(bt->parent, idx)->off = bt->parent->min;
2164 slotptr(bt->parent, idx)->tod = tod;
2166 if ( bt_update(bt, bt->parent, bt->parentpage) )
2169 if( bt_unlockpage (bt, bt->parentpage, BtLockUpgrade) )
2172 return bt_unlockpage(bt, bt->parentpage, BtLockUpdate);
2175 // cache page of keys into cursor and return starting slot for given key
2177 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2181 // cache page for retrieval
2182 if( slot = bt_loadpageread (bt, key, len) )
2183 memcpy (bt->cursor, bt->parent, bt->page_size);
2184 bt->cursorpage = bt->parentpage;
2185 if ( bt_unlockpage(bt, bt->parentpage, BtLockShared) )
2191 // return next slot for cursor page
2192 // or slide cursor right into next page
2194 uint bt_nextkey (BtDb *bt, uint slot)
2199 if( slot++ < bt->cursor->cnt )
2202 right = bt_getid(bt->cursor->right);
2207 bt->cursorpage = right;
2209 if( bt_lockpage(bt, right,BtLockShared) )
2212 if( bt_mappage (bt, &bt->parent, right) )
2215 memcpy (bt->cursor, bt->parent, bt->page_size);
2216 if ( bt_unlockpage(bt, right, BtLockShared) )
2225 BtKey bt_key(BtDb *bt, uint slot)
2227 return keyptr(bt->cursor, slot);
2230 uid bt_uid(BtDb *bt, uint slot)
2232 return bt_getid(slotptr(bt->cursor,slot)->id);
2235 uint bt_tod(BtDb *bt, uint slot)
2237 return slotptr(bt->cursor,slot)->tod;
2242 // standalone program to index file of keys
2243 // then list them onto std-out
2245 int main (int argc, char **argv)
2247 uint slot, found = 0, line = 0, off = 0;
2248 int ch, cnt = 0, bits = 12;
2249 unsigned char key[256];
2250 clock_t done, start;
2260 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find [page_bits mapped_pool_pages start_line_number]", argv[0]);
2268 bits = atoi(argv[4]);
2271 map = atoi(argv[5]);
2274 fprintf (stderr, "Warning: mapped_pool > 65536 pages\n");
2277 off = atoi(argv[6]);
2279 bt = bt_open ((argv[1]), BT_rw, bits, map);
2282 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2286 switch(argv[3][0]| 0x20)
2289 fprintf(stderr, "started indexing for %s\n", argv[2]);
2290 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2291 while( ch = getc(in), ch != EOF )
2295 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2297 if( bt_insertkey (bt, key, len, ++line, *tod) )
2298 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2301 else if( len < 245 )
2303 fprintf(stderr, "finished adding keys, %d \n", line);
2307 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2308 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2309 while( ch = getc(in), ch != EOF )
2313 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2315 if( bt_deletekey (bt, key, len) )
2316 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2319 else if( len < 245 )
2321 fprintf(stderr, "finished deleting keys, %d\n", line);
2325 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2326 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2327 while( ch = getc(in), ch != EOF )
2331 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2333 if( bt_findkey (bt, key, len) )
2336 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2339 else if( len < 245 )
2341 fprintf(stderr, "finished search of %d keys, found %d\n", line, found);
2351 fprintf(stderr, " Time to complete: %.2f seconds\n", (float)(done - start) / CLOCKS_PER_SEC);
2356 fprintf(stderr, "started reading\n");
2358 slot = bt_startkey (bt, key, len);
2361 fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
2364 while( slot = bt_nextkey (bt, slot) )
2366 ptr = bt_key(bt, slot);
2367 fwrite (ptr->key, ptr->len, 1, stdout);
2368 fputc ('\n', stdout);
2371 fprintf(stderr, " Total keys read %d\n", cnt);