1 // btree version threadskv10g futex version
2 // with reworked bt_deletekey code,
3 // phase-fair re-entrant reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // redo log for failure recovery
10 // LSM B-trees for write optimization
11 // larger sized leaf pages than non-leaf
12 // and LSM B-tree find & count operations
16 // author: karl malbrain, malbrain@cal.berkeley.edu
19 This work, including the source code, documentation
20 and related data, is placed into the public domain.
22 The orginal author is Karl Malbrain.
24 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
25 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
26 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
27 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
28 RESULTING FROM THE USE, MODIFICATION, OR
29 REDISTRIBUTION OF THIS SOFTWARE.
32 // Please see the project home page for documentation
33 // code.google.com/p/high-concurrency-btree
35 #define _FILE_OFFSET_BITS 64
36 #define _LARGEFILE64_SOURCE
40 #include <xmmintrin.h>
41 #include <linux/futex.h>
56 #define WIN32_LEAN_AND_MEAN
70 typedef unsigned long long uid;
71 typedef unsigned long long logseqno;
74 typedef unsigned long long off64_t;
75 typedef unsigned short ushort;
76 typedef unsigned int uint;
79 #define BT_ro 0x6f72 // ro
80 #define BT_rw 0x7772 // rw
82 #define BT_maxbits 26 // maximum page size in bits
83 #define BT_minbits 9 // minimum page size in bits
84 #define BT_minpage (1 << BT_minbits) // minimum page size
85 #define BT_maxpage (1 << BT_maxbits) // maximum page size
87 // BTree page number constants
88 #define ALLOC_page 0 // allocation page
89 #define ROOT_page 1 // root of the btree
90 #define LEAF_page 2 // first page of leaves
92 // Number of levels to create in a new BTree
97 There are six lock types for each node in four independent sets:
98 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
99 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
100 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
101 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
102 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
103 6. (set 4) LinkModification: Exclusive. Update of a node's left link is underway. Incompatible with another LinkModification.
118 volatile unsigned char xcl[1];
119 volatile unsigned char filler;
120 volatile ushort waiters[1];
126 // definition for reader/writer reentrant lock implementation
132 ushort dup; // re-entrant locks
133 ushort tid; // owner thread-no
134 ushort line; // owner line #
137 // hash table entries
141 uint entry; // Latch table entry at head of chain
144 // latch manager table structure
147 uid page_no; // latch set page number
148 RWLock readwr[1]; // read/write page lock
149 RWLock access[1]; // Access Intent/Page delete
150 RWLock parent[1]; // Posting of fence key in parent
151 RWLock link[1]; // left link update in progress
152 MutexLatch modify[1]; // modify entry lite latch
153 uint split; // right split page atomic insert
154 uint next; // next entry in hash table chain
155 uint prev; // prev entry in hash table chain
156 volatile ushort pin; // number of accessing threads
157 unsigned char dirty; // page in cache is dirty
158 unsigned char leaf; // page in cache is a leaf
161 // Define the length of the page record numbers
165 // Page key slot definition.
167 // Keys are marked dead, but remain on the page until
168 // it cleanup is called. The fence key (highest key) for
169 // a leaf page is always present, even after cleanup.
173 // In addition to the Unique keys that occupy slots
174 // there are Librarian and Duplicate key
175 // slots occupying the key slot array.
177 // The Librarian slots are dead keys that
178 // serve as filler, available to add new Unique
179 // or Dup slots that are inserted into the B-tree.
181 // The Duplicate slots have had their key bytes extended
182 // by 6 bytes to contain a binary duplicate key uniqueifier.
193 uint off:BT_maxbits; // page offset for key start
194 uint type:3; // type of slot
195 uint dead:1; // set for deleted slot
198 // The key structure occupies space at the upper end of
199 // each page. It's a length byte followed by the key
203 unsigned char len; // this can be changed to a ushort or uint
204 unsigned char key[0];
207 // the value structure also occupies space at the upper
208 // end of the page. Each key is immediately followed by a value.
211 unsigned char len; // this can be changed to a ushort or uint
212 unsigned char value[0];
215 #define BT_maxkey 255 // maximum number of bytes in a key
216 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
218 // The first part of an index page.
219 // It is immediately followed
220 // by the BtSlot array of keys.
222 typedef struct BtPage_ {
223 uint cnt; // count of keys in page
224 uint act; // count of active keys
225 uint min; // next key/value offset
226 uint fence; // page fence key offset
227 uint garbage; // page garbage in bytes
228 unsigned char lvl; // level of page, zero = leaf
229 unsigned char free; // page is on the free chain
230 unsigned char kill; // page is being deleted
231 unsigned char nopromote; // page is being constructed
232 uid right, left; // page numbers to right and left
233 logseqno lsn; // log sequence number applied
236 // The loadpage interface object
239 BtPage page; // current page pointer
240 BtLatchSet *latch; // current page latch set
243 // structure for latch manager on ALLOC_page
246 struct BtPage_ alloc[1]; // next page_no in right ptr
247 uid freechain; // head of free page_nos chain
248 uid leafchain; // head of leaf page_nos chain
249 unsigned long long leafpages; // number of active leaf pages
250 unsigned long long upperpages; // number of active upper pages
251 uint redopages; // number of redo pages in file
252 unsigned char leaf_xtra; // leaf page size in xtra bits
253 unsigned char page_bits; // base page size in bits
256 // The object structure for Btree access
259 uint page_size; // base page size
260 uint page_bits; // base page size in bits
261 uint leaf_xtra; // leaf xtra bits
267 BtPageZero *pagezero; // mapped allocation page
268 BtHashEntry *hashtable; // the buffer pool hash table entries
269 BtHashEntry *leaftable; // the buffer pool hash table entries
270 BtLatchSet *latchsets; // mapped latch set from buffer pool
271 BtLatchSet *leafsets; // mapped latch set from buffer pool
272 unsigned char *pagepool; // mapped to the buffer pool pages
273 unsigned char *leafpool; // mapped to the leaf pool pages
274 unsigned char *redobuff; // mapped recovery buffer pointer
275 logseqno lsn, flushlsn; // current & first lsn flushed
276 MutexLatch redo[1]; // redo area lite latch
277 MutexLatch lock[1]; // allocation area lite latch
278 ushort thread_no[1]; // highest thread number issued
279 ushort err_thread; // error thread number
280 uint nlatchpage; // size of buffer pool & latchsets
281 uint latchtotal; // number of page latch entries
282 uint latchhash; // number of latch hash table slots
283 uint latchvictim; // next latch entry to swap out
284 uint nleafpage; // size of leaf pool & leafsets
285 uint leaftotal; // number of leaf latch entries
286 uint leafhash; // number of leaf hash table slots
287 uint leafvictim; // next leaf entry to swap out
288 uint leafpromote; // next leaf entry to promote
289 uint redopage; // page number of redo buffer
290 uint redolast; // last msync size of recovery buff
291 uint redoend; // eof/end element in recovery buff
292 int err; // last error
293 int line; // last error line no
294 int found; // number of keys found by delete
295 int reads, writes; // number of reads and writes
296 int type; // type of LSM tree 0=cache, 1=main
298 HANDLE halloc; // allocation handle
299 HANDLE hpool; // buffer pool handle
301 unsigned char *page0; // memory mapped pagezero of b-tree
305 BtMgr *mgr; // buffer manager for entire process
306 BtMgr *main; // buffer manager for main btree
307 BtPageSet cacheset[1]; // cached page frame for cache btree
308 BtPageSet mainset[1]; // cached page frame for main btree
309 uint cacheslot; // slot number in cacheset
310 uint mainslot; // slot number in mainset
311 ushort phase; // 1 = main btree 0 = cache btree 2 = both
312 ushort thread_no; // thread number
321 // atomic txn structure
324 logseqno reqlsn; // redo log seq no required
325 uint entry:31; // latch table entry number
326 uint reuse:1; // reused previous page
327 uint slot; // slot on page
328 uint src; // source slot
331 // Catastrophic errors
345 #define CLOCK_bit 0x8000
347 // recovery manager entry types
350 BTRM_eof = 0, // rest of buffer is emtpy
351 BTRM_add, // add a unique key-value to btree
352 BTRM_dup, // add a duplicate key-value to btree
353 BTRM_del, // delete a key-value from btree
354 BTRM_upd, // update a key with a new value
355 BTRM_new, // allocate a new empty page
356 BTRM_old // reuse an old empty page
359 // recovery manager entry
360 // structure followed by BtKey & BtVal
363 logseqno reqlsn; // log sequence number required
364 logseqno lsn; // log sequence number for entry
365 uint len; // length of entry
366 unsigned char type; // type of entry
367 unsigned char lvl; // level of btree entry pertains to
372 extern void bt_close (BtDb *bt);
373 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
374 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
375 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
376 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
377 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
378 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
379 extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
381 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
383 extern BTERR bt_startkey (BtDb *db, unsigned char *key, uint len);
384 extern BTERR bt_nextkey (BtDb *bt);
386 extern uint bt_lastkey (BtDb *bt);
387 extern uint bt_prevkey (BtDb *bt);
390 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
391 extern void bt_mgrclose (BtMgr *mgr);
392 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
393 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
395 // atomic transaction functions
396 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, uint count, logseqno lsn, int lsm, ushort thread_no);
397 BTERR bt_promote (BtDb *bt);
399 // The page is allocated from low and hi ends.
400 // The key slots are allocated from the bottom,
401 // while the text and value of the key
402 // are allocated from the top. When the two
403 // areas meet, the page is split into two.
405 // A key consists of a length byte, two bytes of
406 // index number (0 - 65535), and up to 253 bytes
409 // Associated with each key is a value byte string
410 // containing any value desired.
412 // The b-tree root is always located at page 1.
413 // The first leaf page of level zero is always
414 // located on page 2.
416 // The b-tree pages are linked with next
417 // pointers to facilitate enumerators,
418 // and provide for concurrency.
420 // When to root page fills, it is split in two and
421 // the tree height is raised by a new root at page
422 // one with two keys.
424 // Deleted keys are marked with a dead bit until
425 // page cleanup. The fence key for a leaf node is
428 // To achieve maximum concurrency one page is locked at a time
429 // as the tree is traversed to find leaf key in question. The right
430 // page numbers are used in cases where the page is being split,
433 // Page 0 is dedicated to lock for new page extensions,
434 // and chains empty pages together for reuse. It also
435 // contains the latch manager hash table.
437 // The ParentModification lock on a node is obtained to serialize posting
438 // or changing the fence key for a node.
440 // Empty pages are chained together through the ALLOC page and reused.
442 // Access macros to address slot and key values from the page
443 // Page slots use 1 based indexing.
445 #define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1))
446 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
447 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
448 #define fenceptr(page) ((BtKey*)((unsigned char*)(page) + page->fence))
450 void bt_putid(unsigned char *dest, uid id)
455 dest[i] = (unsigned char)id, id >>= 8;
458 uid bt_getid(unsigned char *src)
463 for( i = 0; i < BtId; i++ )
464 id <<= 8, id |= *src++;
469 // lite weight spin lock Latch Manager
471 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
473 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
476 void bt_mutexlock(MutexLatch *latch)
478 uint idx, waited = 0;
482 for( idx = 0; idx < 100; idx++ ) {
483 *prev->value = __sync_fetch_and_or (latch->value, 1);
484 if( !*prev->bits->xcl ) {
486 __sync_fetch_and_sub (latch->bits->waiters, 1);
492 __sync_fetch_and_add (latch->bits->waiters, 1);
493 *prev->bits->waiters += 1;
497 sys_futex (latch->value, FUTEX_WAIT_PRIVATE, *prev->value, NULL, NULL, 0);
501 int bt_mutextry(MutexLatch *latch)
503 return !__sync_lock_test_and_set (latch->bits->xcl, 1);
506 void bt_releasemutex(MutexLatch *latch)
510 *prev->value = __sync_fetch_and_and (latch->value, 0xffff0000);
512 if( *prev->bits->waiters )
513 sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
516 // reader/writer lock implementation
518 void WriteLock (RWLock *lock, ushort tid, uint line)
520 if( lock->tid == tid ) {
524 bt_mutexlock (lock->xcl);
525 bt_mutexlock (lock->wrt);
526 bt_releasemutex (lock->xcl);
532 void WriteRelease (RWLock *lock)
539 bt_releasemutex (lock->wrt);
542 void ReadLock (RWLock *lock, ushort tid)
544 bt_mutexlock (lock->xcl);
546 if( !__sync_fetch_and_add (&lock->readers, 1) )
547 bt_mutexlock (lock->wrt);
549 bt_releasemutex (lock->xcl);
552 void ReadRelease (RWLock *lock)
554 if( __sync_fetch_and_sub (&lock->readers, 1) == 1 )
555 bt_releasemutex (lock->wrt);
558 // recovery manager -- flush dirty pages
560 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
562 uint cnt3 = 0, cnt2 = 0, cnt = 0;
567 // flush dirty pool pages to the btree
569 fprintf(stderr, "Start flushlsn ");
570 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
571 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
572 latch = mgr->latchsets + entry;
573 bt_mutexlock (latch->modify);
574 bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
577 bt_writepage(mgr, page, latch->page_no, 0);
578 latch->dirty = 0, cnt++;
580 if( latch->pin & ~CLOCK_bit )
582 bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
583 bt_releasemutex (latch->modify);
585 for( entry = 1; entry < mgr->leaftotal; entry++ ) {
586 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
587 latch = mgr->leafsets + entry;
588 bt_mutexlock (latch->modify);
589 bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
592 bt_writepage(mgr, page, latch->page_no, 1);
593 latch->dirty = 0, cnt++;
595 if( latch->pin & ~CLOCK_bit )
597 bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
598 bt_releasemutex (latch->modify);
600 fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
603 // recovery manager -- process current recovery buff on startup
604 // this won't do much if previous session was properly closed.
606 BTERR bt_recoveryredo (BtMgr *mgr)
613 hdr = (BtLogHdr *)mgr->redobuff;
614 mgr->flushlsn = hdr->lsn;
617 hdr = (BtLogHdr *)(mgr->redobuff + offset);
618 switch( hdr->type ) {
622 case BTRM_add: // add a unique key-value to btree
624 case BTRM_dup: // add a duplicate key-value to btree
625 case BTRM_del: // delete a key-value from btree
626 case BTRM_upd: // update a key with a new value
627 case BTRM_new: // allocate a new empty page
628 case BTRM_old: // reuse an old empty page
634 // recovery manager -- append new entry to recovery log
635 // flush dirty pages to disk when it overflows.
637 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
639 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
640 uint amt = sizeof(BtLogHdr);
644 bt_mutexlock (mgr->redo);
647 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
649 // see if new entry fits in buffer
650 // flush and reset if it doesn't
652 if( amt > size - mgr->redoend ) {
653 mgr->flushlsn = mgr->lsn;
654 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
655 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
658 bt_flushlsn(mgr, thread_no);
661 // fill in new entry & either eof or end block
663 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
668 hdr->lsn = ++mgr->lsn;
672 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
673 memset (eof, 0, sizeof(BtLogHdr));
675 // fill in key and value
678 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
679 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
682 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
683 memset (eof, 0, sizeof(BtLogHdr));
686 last = mgr->redolast & ~0xfff;
689 if( end - last + sizeof(BtLogHdr) >= 32768 )
690 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
691 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
695 bt_releasemutex(mgr->redo);
699 // recovery manager -- append transaction to recovery log
700 // flush dirty pages to disk when it overflows.
702 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
704 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
705 uint amt = 0, src, type;
712 // determine amount of redo recovery log space required
714 for( src = 0; src++ < source->cnt; ) {
715 key = keyptr(source,src);
716 val = valptr(source,src);
717 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
718 amt += sizeof(BtLogHdr);
721 bt_mutexlock (mgr->redo);
723 // see if new entry fits in buffer
724 // flush and reset if it doesn't
726 if( amt > size - mgr->redoend ) {
727 mgr->flushlsn = mgr->lsn;
728 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
729 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
732 bt_flushlsn (mgr, thread_no);
735 // assign new lsn to transaction
739 // fill in new entries
741 for( src = 0; src++ < source->cnt; ) {
742 key = keyptr(source, src);
743 val = valptr(source, src);
745 switch( slotptr(source, src)->type ) {
757 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
758 amt += sizeof(BtLogHdr);
760 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
766 // fill in key and value
768 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
769 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
774 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
775 memset (eof, 0, sizeof(BtLogHdr));
778 last = mgr->redolast & ~0xfff;
781 if( end - last + sizeof(BtLogHdr) >= 32768 )
782 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
783 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
787 bt_releasemutex(mgr->redo);
791 // read page into buffer pool from permanent location in Btree file
793 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
795 uint page_size = mgr->page_size;
798 page_size <<= mgr->leaf_xtra;
800 if( pread(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
801 return mgr->err = BTERR_read;
803 __sync_fetch_and_add (&mgr->reads, 1);
807 // write page to permanent location in Btree file
809 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
811 uint page_size = mgr->page_size;
814 page_size <<= mgr->leaf_xtra;
816 if( pwrite(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
817 return mgr->err = BTERR_wrt;
819 __sync_fetch_and_add (&mgr->writes, 1);
823 // set CLOCK bit in latch
824 // decrement pin count
826 void bt_unpinlatch (BtLatchSet *latch, uint dirty, ushort thread_no, uint line)
828 bt_mutexlock(latch->modify);
831 latch->pin |= CLOCK_bit;
833 bt_releasemutex(latch->modify);
836 // return the btree cached page address
838 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
840 uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets);
844 page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
846 page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
851 // return next available leaf entry
852 // and with latch entry locked
854 uint bt_availleaf (BtMgr *mgr)
861 entry = __sync_fetch_and_add (&mgr->leafvictim, 1) + 1;
863 entry = _InterlockedIncrement (&mgr->leafvictim);
865 entry %= mgr->leaftotal;
870 latch = mgr->leafsets + entry;
872 if( !bt_mutextry(latch->modify) )
875 // return this entry if it is not pinned
880 // if the CLOCK bit is set
883 latch->pin &= ~CLOCK_bit;
884 bt_releasemutex(latch->modify);
888 // return next available latch entry
889 // and with latch entry locked
891 uint bt_availnext (BtMgr *mgr)
898 entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
900 entry = _InterlockedIncrement (&mgr->latchvictim);
902 entry %= mgr->latchtotal;
907 latch = mgr->latchsets + entry;
909 if( !bt_mutextry(latch->modify) )
912 // return this entry if it is not pinned
917 // if the CLOCK bit is set
920 latch->pin &= ~CLOCK_bit;
921 bt_releasemutex(latch->modify);
925 // pin leaf in leaf buffer pool
926 // return with latchset pinned
928 BtLatchSet *bt_pinleaf (BtMgr *mgr, uid page_no, ushort thread_no)
930 uint hashidx = page_no % mgr->leafhash;
935 // try to find our entry
937 bt_mutexlock(mgr->leaftable[hashidx].latch);
939 if( entry = mgr->leaftable[hashidx].entry )
941 latch = mgr->leafsets + entry;
943 if( page_no == latch->page_no )
945 } while( entry = latch->next );
947 // found our entry: increment pin
950 latch = mgr->leafsets + entry;
951 bt_mutexlock(latch->modify);
952 latch->pin |= CLOCK_bit;
955 bt_releasemutex(latch->modify);
956 bt_releasemutex(mgr->leaftable[hashidx].latch);
960 // find and reuse unpinned entry
963 entry = bt_availleaf (mgr);
964 latch = mgr->leafsets + entry;
966 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
967 oldidx = latch->page_no % mgr->leafhash;
969 // skip over this entry if old latch is not available
972 if( oldidx != hashidx )
973 if( !bt_mutextry (mgr->leaftable[oldidx].latch) ) {
974 bt_releasemutex(latch->modify);
978 // update permanent page area in btree from buffer pool
979 // no read-lock is required since page is not pinned.
982 if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) )
983 return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
987 // if latch is on a different hash chain
988 // unlink from the old page_no chain
991 if( oldidx != hashidx ) {
993 mgr->leafsets[latch->prev].next = latch->next;
995 mgr->leaftable[oldidx].entry = latch->next;
998 mgr->leafsets[latch->next].prev = latch->prev;
1000 bt_releasemutex (mgr->leaftable[oldidx].latch);
1003 if( bt_readpage (mgr, page, page_no, 1) )
1004 return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1006 // link page as head of hash table chain
1007 // if this is a never before used entry,
1008 // or it was previously on a different
1009 // hash table chain. Otherwise, just
1010 // leave it in its current hash table
1013 if( !latch->page_no || hashidx != oldidx ) {
1014 if( latch->next = mgr->leaftable[hashidx].entry )
1015 mgr->leafsets[latch->next].prev = entry;
1017 mgr->leaftable[hashidx].entry = entry;
1021 // fill in latch structure
1023 latch->pin = CLOCK_bit | 1;
1024 latch->page_no = page_no;
1027 bt_releasemutex (latch->modify);
1028 bt_releasemutex (mgr->leaftable[hashidx].latch);
1032 // pin page in non-leaf buffer pool
1033 // return with latchset pinned
1035 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, ushort thread_no)
1037 uint hashidx = page_no % mgr->latchhash;
1042 // try to find our entry
1044 bt_mutexlock(mgr->hashtable[hashidx].latch);
1046 if( entry = mgr->hashtable[hashidx].entry ) do
1048 latch = mgr->latchsets + entry;
1050 if( page_no == latch->page_no )
1052 } while( entry = latch->next );
1054 // found our entry: increment pin
1057 latch = mgr->latchsets + entry;
1058 bt_mutexlock(latch->modify);
1059 latch->pin |= CLOCK_bit;
1061 bt_releasemutex(latch->modify);
1062 bt_releasemutex(mgr->hashtable[hashidx].latch);
1066 // find and reuse unpinned entry
1069 entry = bt_availnext (mgr);
1070 latch = mgr->latchsets + entry;
1072 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1073 oldidx = latch->page_no % mgr->latchhash;
1075 // skip over this entry if latch not available
1077 if( latch->page_no )
1078 if( oldidx != hashidx )
1079 if( !bt_mutextry (mgr->hashtable[oldidx].latch) ) {
1080 bt_releasemutex(latch->modify);
1084 // update permanent page area in btree from buffer pool
1085 // no read-lock is required since page is not pinned.
1088 if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1089 return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1093 // if latch is on a different hash chain
1094 // unlink from the old page_no chain
1096 if( latch->page_no )
1097 if( oldidx != hashidx ) {
1099 mgr->latchsets[latch->prev].next = latch->next;
1101 mgr->hashtable[oldidx].entry = latch->next;
1104 mgr->latchsets[latch->next].prev = latch->prev;
1106 bt_releasemutex (mgr->hashtable[oldidx].latch);
1109 if( bt_readpage (mgr, page, page_no, 0) )
1110 return mgr->line = __LINE__, NULL;
1112 // link page as head of hash table chain
1113 // if this is a never before used entry,
1114 // or it was previously on a different
1115 // hash table chain. Otherwise, just
1116 // leave it in its current hash table
1119 if( !latch->page_no || hashidx != oldidx ) {
1120 if( latch->next = mgr->hashtable[hashidx].entry )
1121 mgr->latchsets[latch->next].prev = entry;
1123 mgr->hashtable[hashidx].entry = entry;
1127 // fill in latch structure
1129 latch->pin = CLOCK_bit | 1;
1130 latch->page_no = page_no;
1133 bt_releasemutex (latch->modify);
1134 bt_releasemutex (mgr->hashtable[hashidx].latch);
1138 void bt_mgrclose (BtMgr *mgr)
1140 char *name = mgr->type ? "Main" : "Cache";
1147 // flush previously written dirty pages
1148 // and write recovery buffer to disk
1150 fdatasync (mgr->idx);
1152 if( mgr->redoend ) {
1153 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1154 memset (eof, 0, sizeof(BtLogHdr));
1157 // write remaining dirty pool pages to the btree
1159 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
1160 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1161 latch = mgr->latchsets + entry;
1163 if( latch->dirty ) {
1164 bt_writepage(mgr, page, latch->page_no, 0);
1165 latch->dirty = 0, num++;
1170 fprintf(stderr, "%s %d non-leaf buffer pool pages flushed\n", name, num);
1174 // write remaining dirty leaf pages to the btree
1176 for( entry = 1; entry < mgr->leaftotal; entry++ ) {
1177 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1178 latch = mgr->leafsets + entry;
1180 if( latch->dirty ) {
1181 bt_writepage(mgr, page, latch->page_no, 1);
1182 latch->dirty = 0, num++;
1187 fprintf(stderr, "%s %d leaf buffer pool pages flushed\n", name, num);
1189 // clear redo recovery buffer on disk.
1191 if( mgr->pagezero->redopages ) {
1192 eof = (BtLogHdr *)mgr->redobuff;
1193 memset (eof, 0, sizeof(BtLogHdr));
1194 eof->lsn = mgr->lsn;
1195 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1196 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1200 munmap (mgr->page0, (uid)(mgr->redopage + mgr->pagezero->redopages) << mgr->page_bits);
1202 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1203 munmap (mgr->leafpool, (uid)mgr->nleafpage << mgr->page_bits);
1204 munmap (mgr->pagezero, mgr->page_size);
1206 FlushViewOfFile(mgr->pagezero, 0);
1207 UnmapViewOfFile(mgr->pagezero);
1208 UnmapViewOfFile(mgr->pagepool);
1209 CloseHandle(mgr->halloc);
1210 CloseHandle(mgr->hpool);
1216 FlushFileBuffers(mgr->idx);
1217 CloseHandle(mgr->idx);
1222 // close and release memory
1224 void bt_close (BtDb *bt)
1229 // open/create new btree buffer manager
1231 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1232 // size of page pool (e.g. 300) and leaf pool (e.g. 4000)
1234 BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax, uint leafmax, uint redopages)
1236 uint lvl, attr, last, slot, idx;
1237 uint nlatchpage, latchhash;
1238 uint nleafpage, leafhash;
1239 unsigned char value[BtId];
1240 int flag, initit = 0;
1241 BtPageZero *pagezero;
1249 // determine sanity of page size and buffer pool
1251 if( leafxtra + pagebits > BT_maxbits )
1252 fprintf (stderr, "pagebits + leafxtra > maxbits\n"), exit(1);
1254 if( pagebits < BT_minbits )
1255 fprintf (stderr, "pagebits < minbits\n"), exit(1);
1258 mgr = calloc (1, sizeof(BtMgr));
1260 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1262 if( mgr->idx == -1 ) {
1263 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1264 return free(mgr), NULL;
1267 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1268 attr = FILE_ATTRIBUTE_NORMAL;
1269 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1271 if( mgr->idx == INVALID_HANDLE_VALUE ) {
1272 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1273 return GlobalFree(mgr), NULL;
1278 pagezero = valloc (BT_maxpage);
1281 // read minimum page size to get root info
1282 // to support raw disk partition files
1283 // check if page_bits == 0 on the disk.
1285 if( size = lseek (mgr->idx, 0L, 2) )
1286 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1287 if( pagezero->page_bits ) {
1288 pagebits = pagezero->page_bits;
1289 leafxtra = pagezero->leaf_xtra;
1290 redopages = pagezero->redopages;
1294 return free(mgr), free(pagezero), NULL;
1298 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1299 size = GetFileSize(mgr->idx, amt);
1301 if( size || *amt ) {
1302 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1303 return bt_mgrclose (mgr), NULL;
1304 pagebits = pagezero->page_bits;
1305 leafxtra = pagezero->leaf_xtra;
1306 redopages = pagezero->redopages;
1311 mgr->page_size = 1 << pagebits;
1312 mgr->page_bits = pagebits;
1313 mgr->leaf_xtra = leafxtra;
1315 // calculate number of latch hash table entries
1317 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1319 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1320 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1) >> mgr->page_bits;
1321 mgr->latchtotal = nodemax;
1323 // calculate number of leaf hash table entries
1325 mgr->nleafpage = ((uid)leafmax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1327 mgr->nleafpage += leafmax << leafxtra; // size of the leaf pool in pages
1328 mgr->nleafpage += (sizeof(BtLatchSet) * leafmax + mgr->page_size - 1) >> mgr->page_bits;
1329 mgr->leaftotal = leafmax;
1331 mgr->redopage = LEAF_page + (1 << leafxtra);
1336 // initialize an empty b-tree with latch page, root page, page of leaves
1337 // and page(s) of latches and page pool cache
1339 ftruncate (mgr->idx, (mgr->redopage + pagezero->redopages) << mgr->page_bits);
1340 memset (pagezero, 0, 1 << pagebits);
1341 pagezero->alloc->lvl = MIN_lvl - 1;
1342 pagezero->redopages = redopages;
1343 pagezero->page_bits = mgr->page_bits;
1344 pagezero->leaf_xtra = leafxtra;
1346 pagezero->alloc->right = mgr->redopage + pagezero->redopages;
1347 pagezero->upperpages = 1;
1348 pagezero->leafpages = 1;
1350 // initialize left-most LEAF page in
1351 // alloc->left and count of active leaf pages.
1353 pagezero->alloc->left = LEAF_page;
1355 if( bt_writepage (mgr, pagezero->alloc, 0, 0) ) {
1356 fprintf (stderr, "Unable to create btree page zero\n");
1357 return bt_mgrclose (mgr), NULL;
1360 memset (pagezero, 0, 1 << pagebits);
1362 for( lvl=MIN_lvl; lvl--; ) {
1363 BtSlot *node = slotptr(pagezero->alloc, 1);
1364 node->off = mgr->page_size;
1367 node->off <<= mgr->leaf_xtra;
1369 node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1370 node->type = Librarian;
1373 node->off = node[-1].off;
1374 key = keyptr(pagezero->alloc, 2);
1375 key = keyptr(pagezero->alloc, 1);
1376 key->len = 2; // create stopper key
1380 bt_putid(value, MIN_lvl - lvl + 1);
1381 val = valptr(pagezero->alloc, 1);
1382 val->len = lvl ? BtId : 0;
1383 memcpy (val->value, value, val->len);
1385 pagezero->alloc->min = node->off;
1386 pagezero->alloc->lvl = lvl;
1387 pagezero->alloc->cnt = 2;
1388 pagezero->alloc->act = 1;
1390 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, !lvl) ) {
1391 fprintf (stderr, "Unable to create btree page\n");
1392 return bt_mgrclose (mgr), NULL;
1400 VirtualFree (pagezero, 0, MEM_RELEASE);
1403 // mlock the first segment of 64K pages
1405 flag = PROT_READ | PROT_WRITE;
1406 mgr->page0 = mmap (0, (uid)(mgr->redopage + redopages) << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1408 if( mgr->page0 == MAP_FAILED ) {
1409 fprintf (stderr, "Unable to mmap pagezero btree segment, error = %d\n", errno);
1410 return bt_mgrclose (mgr), NULL;
1413 mgr->pagezero = (BtPageZero *)mgr->page0;
1414 mlock (mgr->pagezero, mgr->page_size);
1416 mgr->redobuff = mgr->page0 + (mgr->redopage << mgr->page_bits);
1417 mlock (mgr->redobuff, redopages << mgr->page_bits);
1419 // allocate pool buffers
1421 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1422 if( mgr->pagepool == MAP_FAILED ) {
1423 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1424 return bt_mgrclose (mgr), NULL;
1427 mgr->leafpool = mmap (0, (uid)mgr->nleafpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1428 if( mgr->leafpool == MAP_FAILED ) {
1429 fprintf (stderr, "Unable to mmap anonymous leaf pool pages, error = %d\n", errno);
1430 return bt_mgrclose (mgr), NULL;
1433 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1434 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1435 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1437 mgr->leafsets = (BtLatchSet *)(mgr->leafpool + ((uid)mgr->leaftotal << mgr->page_bits << mgr->leaf_xtra));
1438 mgr->leaftable = (BtHashEntry *)(mgr->leafsets + mgr->leaftotal);
1439 mgr->leafhash = (mgr->leafpool + ((uid)mgr->nleafpage << mgr->page_bits) - (unsigned char *)mgr->leaftable) / sizeof(BtHashEntry);
1441 for( idx = 1; idx < mgr->leaftotal; idx++ ) {
1442 latch = mgr->leafsets + idx;
1449 // open BTree access method
1450 // based on buffer manager
1452 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1454 BtDb *bt = malloc (sizeof(*bt));
1456 memset (bt, 0, sizeof(*bt));
1460 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1462 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1467 // compare two keys, return > 0, = 0, or < 0
1468 // =0: keys are same
1471 // as the comparison value
1473 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1475 uint len1 = key1->len;
1478 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1489 // place write, read, or parent lock on requested page_no.
1491 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1495 ReadLock (latch->readwr, thread_no);
1498 WriteLock (latch->readwr, thread_no, line);
1501 ReadLock (latch->access, thread_no);
1504 WriteLock (latch->access, thread_no, line);
1507 WriteLock (latch->parent, thread_no, line);
1510 WriteLock (latch->link, thread_no, line);
1515 // remove write, read, or parent lock on requested page
1517 void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1521 ReadRelease (latch->readwr);
1524 WriteRelease (latch->readwr);
1527 ReadRelease (latch->access);
1530 WriteRelease (latch->access);
1533 WriteRelease (latch->parent);
1536 WriteRelease (latch->link);
1541 // allocate a new page
1542 // return with page latched, but unlocked.
1544 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_no)
1546 uint page_size = mgr->page_size, leaf_xtra = 0;
1550 // lock allocation page
1552 bt_mutexlock(mgr->lock);
1554 if( contents->lvl ) {
1555 freechain = &mgr->pagezero->freechain;
1556 mgr->pagezero->upperpages++;
1558 freechain = &mgr->pagezero->leafchain;
1559 mgr->pagezero->leafpages++;
1560 leaf_xtra = mgr->leaf_xtra;
1561 page_size <<= leaf_xtra;
1564 // use empty chain first
1565 // else allocate new page
1567 if( page_no = *freechain ) {
1568 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1569 set->page = bt_mappage (mgr, set->latch);
1571 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1573 *freechain = set->page->right;
1575 // the page is currently nopromote and this
1576 // will keep bt_promote out.
1578 // contents will replace this bit
1579 // and pin will keep bt_promote out
1581 contents->nopromote = 0;
1583 memcpy (set->page, contents, page_size);
1585 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1586 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1588 bt_releasemutex(mgr->lock);
1592 page_no = mgr->pagezero->alloc->right;
1593 mgr->pagezero->alloc->right += 1 << leaf_xtra;
1595 // unlock allocation latch and
1596 // extend file into new page.
1598 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1599 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1601 bt_releasemutex(mgr->lock);
1603 // keep bt_promote out of this page
1604 contents->nopromote = 1;
1606 if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
1607 fprintf(stderr, "Write %d error %d\n", (uint)page_no, errno);
1609 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1610 set->page = bt_mappage (mgr, set->latch);
1612 return mgr->err_thread = thread_no, mgr->err;
1614 // now pin will keep bt_promote out
1616 set->page->nopromote = 0;
1620 // find slot in page for given key at a given level
1622 int bt_findslot (BtPage page, unsigned char *key, uint len)
1624 uint diff, higher = page->cnt, low = 1, slot;
1627 // make stopper key an infinite fence value
1634 // low is the lowest candidate.
1635 // loop ends when they meet
1637 // higher is already
1638 // tested as .ge. the passed key.
1640 while( diff = higher - low ) {
1641 slot = low + ( diff >> 1 );
1642 if( keycmp (keyptr(page, slot), key, len) < 0 )
1645 higher = slot, good++;
1648 // return zero if key is on right link page
1650 return good ? higher : 0;
1653 // find and load page at given level for given key
1654 // leave page rd or wr locked as requested
1656 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1658 uid page_no = ROOT_page, prevpage_no = 0;
1659 uint drill = 0xff, slot;
1660 uint mode, prevmode;
1665 // start at root of btree and drill down
1668 if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1669 set->page = bt_mappage (mgr, set->latch);
1673 if( page_no > ROOT_page )
1674 bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1676 // release & unpin parent or left sibling page
1679 bt_unlockpage(prevmode, prev->latch, thread_no, __LINE__);
1680 bt_unpinlatch (prev->latch, 0, thread_no, __LINE__);
1684 // obtain mode lock using lock coupling through AccessLock
1685 // determine lock mode of drill level
1687 mode = (drill == lvl) ? lock : BtLockRead;
1688 bt_lockpage(mode, set->latch, thread_no, __LINE__);
1690 // grab our fence key
1692 ptr=fenceptr(set->page);
1694 if( set->page->free )
1695 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1697 if( page_no > ROOT_page )
1698 bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1700 // re-read and re-lock root after determining actual level of root
1702 if( set->page->lvl != drill) {
1703 if( set->latch->page_no != ROOT_page )
1704 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1706 drill = set->page->lvl;
1708 if( lock != BtLockRead && drill == lvl ) {
1709 bt_unlockpage(mode, set->latch, thread_no, __LINE__);
1710 bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
1715 prevpage_no = set->latch->page_no;
1719 // if requested key is beyond our fence,
1720 // slide to the right
1722 if( keycmp (ptr, key, len) < 0 )
1723 if( page_no = set->page->right )
1726 // if page is part of a delete operation,
1727 // slide to the left;
1729 if( set->page->kill ) {
1730 bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
1731 page_no = set->page->left;
1732 bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
1736 // find key on page at this level
1737 // and descend to requested level
1739 if( slot = bt_findslot (set->page, key, len) ) {
1743 // find next non-dead slot -- the fence key if nothing else
1745 while( slotptr(set->page, slot)->dead )
1746 if( slot++ < set->page->cnt )
1749 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1751 val = valptr(set->page, slot);
1753 if( val->len == BtId )
1754 page_no = bt_getid(val->value);
1756 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0;
1762 // slide right into next page
1764 page_no = set->page->right;
1768 // return error on end of right chain
1770 mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1771 return 0; // return error
1774 // return page to free list
1775 // page must be delete, link & write locked
1776 // and have no keys pointing to it.
1778 void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1782 // lock allocation page
1784 bt_mutexlock (mgr->lock);
1786 if( set->page->lvl ) {
1787 freechain = &mgr->pagezero->freechain;
1788 mgr->pagezero->upperpages--;
1790 freechain = &mgr->pagezero->leafchain;
1791 mgr->pagezero->leafpages--;
1796 set->page->right = *freechain;
1797 *freechain = set->latch->page_no;
1798 set->page->free = 1;
1800 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1801 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1803 // unlock released page
1804 // and unlock allocation page
1806 bt_unlockpage (BtLockDelete, set->latch, thread_no, __LINE__);
1807 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1808 bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
1809 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
1810 bt_releasemutex (mgr->lock);
1813 // a fence key was deleted from an interiour level page
1814 // push new fence value upwards
1816 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1818 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1819 unsigned char value[BtId];
1823 // remove the old fence value
1825 ptr = fenceptr(set->page);
1826 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1827 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1828 set->page->fence = slotptr(set->page, set->page->cnt)->off;
1830 // cache new fence value
1832 ptr = fenceptr(set->page);
1833 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1835 bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
1836 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1838 // insert new (now smaller) fence key
1840 bt_putid (value, set->latch->page_no);
1841 ptr = (BtKey*)leftkey;
1843 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1844 return mgr->err_thread = thread_no, mgr->err;
1846 // now delete old fence key
1848 ptr = (BtKey*)rightkey;
1850 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1851 return mgr->err_thread = thread_no, mgr->err;
1853 bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
1854 bt_unpinlatch(set->latch, 1, thread_no, __LINE__);
1858 // root has a single child
1859 // collapse a level from the tree
1861 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1868 // find the child entry and promote as new root contents
1871 for( idx = 0; idx++ < root->page->cnt; )
1872 if( !slotptr(root->page, idx)->dead )
1875 val = valptr(root->page, idx);
1877 if( val->len == BtId )
1878 page_no = bt_getid (valptr(root->page, idx)->value);
1880 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1882 if( child->latch = bt_pinlatch (mgr, page_no, thread_no) )
1883 child->page = bt_mappage (mgr, child->latch);
1885 return mgr->err_thread = thread_no, mgr->err;
1887 bt_lockpage (BtLockDelete, child->latch, thread_no, __LINE__);
1888 bt_lockpage (BtLockWrite, child->latch, thread_no, __LINE__);
1890 memcpy (root->page, child->page, mgr->page_size);
1891 bt_freepage (mgr, child, thread_no);
1893 } while( root->page->lvl > 1 && root->page->act == 1 );
1895 bt_unlockpage (BtLockWrite, root->latch, thread_no, __LINE__);
1896 bt_unpinlatch (root->latch, 1, thread_no, __LINE__);
1900 // delete a page and manage key
1901 // call with page writelocked
1903 // returns with page unpinned
1904 // from the page pool.
1906 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl)
1908 unsigned char lowerfence[BT_keyarray];
1909 uint page_size = mgr->page_size, kill;
1910 BtPageSet right[1], temp[1];
1911 unsigned char value[BtId];
1912 uid page_no, right2;
1916 page_size <<= mgr->leaf_xtra;
1918 // cache original copy of original fence key
1919 // that is going to be deleted.
1921 ptr = fenceptr(set->page);
1922 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1924 // pin and lock our right page
1926 page_no = set->page->right;
1928 if( right->latch = lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1929 right->page = bt_mappage (mgr, right->latch);
1933 bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
1935 if( right->page->kill || set->page->kill )
1936 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1938 // pull contents of right sibling over our empty page
1939 // preserving our left page number, and its right page number.
1941 bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
1942 page_no = set->page->left;
1943 memcpy (set->page, right->page, page_size);
1944 set->page->left = page_no;
1945 bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
1947 // fix left link from far right page
1949 if( right2 = set->page->right ) {
1950 if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
1951 temp->page = bt_mappage (mgr, temp->latch);
1955 bt_lockpage (BtLockAccess, temp->latch, thread_no, __LINE__);
1956 bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
1957 temp->page->left = set->latch->page_no;
1958 bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
1959 bt_unlockpage(BtLockAccess, temp->latch, thread_no, __LINE__);
1960 bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
1961 } else if( !lvl ) { // our page is now rightmost leaf
1962 bt_mutexlock (mgr->lock);
1963 mgr->pagezero->alloc->left = set->latch->page_no;
1964 bt_releasemutex(mgr->lock);
1967 // mark right page as being deleted and release lock
1969 right->page->kill = 1;
1970 bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
1972 // redirect the new higher key directly to our new node
1974 ptr = fenceptr(set->page);
1975 bt_putid (value, set->latch->page_no);
1977 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
1980 // delete our original fence key in parent
1982 ptr = (BtKey *)lowerfence;
1984 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1987 // wait for all access to drain away with delete lock,
1988 // then obtain write lock to right node and free it.
1990 bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
1991 bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
1992 bt_lockpage (BtLockLink, right->latch, thread_no, __LINE__);
1993 bt_freepage (mgr, right, thread_no);
1995 // release write lock to our node
1997 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1998 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2002 // find and delete key on page by marking delete flag bit
2003 // if page becomes empty, delete it from the btree
2005 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2007 uint slot, idx, found, fence;
2013 if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2014 node = slotptr(set->page, slot);
2015 ptr = keyptr(set->page, slot);
2017 return mgr->err_thread = thread_no, mgr->err;
2019 // if librarian slot, advance to real slot
2021 if( node->type == Librarian ) {
2022 ptr = keyptr(set->page, ++slot);
2023 node = slotptr(set->page, slot);
2026 fence = slot == set->page->cnt;
2028 // delete the key, ignore request if already dead
2030 if( found = !keycmp (ptr, key, len) )
2031 if( found = node->dead == 0 ) {
2032 val = valptr(set->page,slot);
2033 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2037 // collapse empty slots beneath the fence
2038 // on interiour nodes
2041 while( idx = set->page->cnt - 1 )
2042 if( slotptr(set->page, idx)->dead ) {
2043 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2044 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2052 // did we delete a fence key in an upper level?
2054 if( lvl && set->page->act && fence )
2055 return bt_fixfence (mgr, set, lvl, thread_no);
2057 // do we need to collapse root?
2059 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
2060 return bt_collapseroot (mgr, set, thread_no);
2062 // delete empty page
2064 if( !set->page->act )
2065 return bt_deletepage (mgr, set, thread_no, set->page->lvl);
2067 bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2068 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2072 // check page for space available,
2073 // clean if necessary and return
2074 // 0 - page needs splitting
2075 // >0 new slot value
2077 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2079 uint page_size = mgr->page_size;
2080 BtPage page = set->page, frame;
2081 uint cnt = 0, idx = 0;
2082 uint max = page->cnt;
2087 if( !set->page->lvl )
2088 page_size <<= mgr->leaf_xtra;
2090 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2093 // skip cleanup and proceed to split
2094 // if there's not enough garbage
2097 if( page->garbage < page_size / 5 )
2100 frame = malloc (page_size);
2101 memcpy (frame, page, page_size);
2103 // skip page info and set rest of page to zero
2105 memset (page+1, 0, page_size - sizeof(*page));
2107 page->min = page_size;
2111 // clean up page first by
2112 // removing dead keys
2114 while( cnt++ < max ) {
2118 if( cnt < max || frame->lvl )
2119 if( slotptr(frame,cnt)->dead )
2122 // copy the value across
2124 val = valptr(frame, cnt);
2125 page->min -= val->len + sizeof(BtVal);
2126 memcpy ((unsigned char *)page + page->min, val, val->len + sizeof(BtVal));
2128 // copy the key across
2130 key = keyptr(frame, cnt);
2131 page->min -= key->len + sizeof(BtKey);
2132 memcpy ((unsigned char *)page + page->min, key, key->len + sizeof(BtKey));
2134 // make a librarian slot
2136 slotptr(page, ++idx)->off = page->min;
2137 slotptr(page, idx)->type = Librarian;
2138 slotptr(page, idx)->dead = 1;
2142 slotptr(page, ++idx)->off = page->min;
2143 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2145 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2149 page->fence = page->min;
2153 // see if page has enough space now, or does it need splitting?
2155 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2161 // split the root and raise the height of the btree
2163 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort thread_no)
2165 unsigned char leftkey[BT_keyarray];
2166 uint nxt = mgr->page_size;
2167 unsigned char value[BtId];
2174 frame = malloc (mgr->page_size);
2175 memcpy (frame, root->page, mgr->page_size);
2177 // save left page fence key for new root
2179 ptr = fenceptr(root->page);
2180 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2182 // Obtain an empty page to use, and copy the current
2183 // root contents into it, e.g. lower keys
2185 if( bt_newpage(mgr, left, frame, thread_no) )
2186 return mgr->err_thread = thread_no, mgr->err;
2188 left_page_no = left->latch->page_no;
2189 bt_unpinlatch (left->latch, 1, thread_no, __LINE__);
2192 // left link the pages together
2194 page = bt_mappage (mgr, right);
2195 page->left = left_page_no;
2197 // preserve the page info at the bottom
2198 // of higher keys and set rest to zero
2200 memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2202 // insert stopper key at top of newroot page
2203 // and increase the root height
2205 nxt -= BtId + sizeof(BtVal);
2206 bt_putid (value, right->page_no);
2207 val = (BtVal *)((unsigned char *)root->page + nxt);
2208 memcpy (val->value, value, BtId);
2211 nxt -= 2 + sizeof(BtKey);
2214 slotptr(root->page, 2)->off = nxt;
2215 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2220 // insert lower keys page fence key on newroot page as first key
2222 nxt -= BtId + sizeof(BtVal);
2223 bt_putid (value, left_page_no);
2224 val = (BtVal *)((unsigned char *)root->page + nxt);
2225 memcpy (val->value, value, BtId);
2228 ptr = (BtKey *)leftkey;
2229 nxt -= ptr->len + sizeof(BtKey);
2230 slotptr(root->page, 1)->off = nxt;
2231 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2233 root->page->right = 0;
2234 root->page->min = nxt; // reset lowest used offset and key count
2235 root->page->cnt = 2;
2236 root->page->act = 2;
2239 mgr->pagezero->alloc->lvl = root->page->lvl;
2241 // release and unpin root pages
2243 bt_unlockpage(BtLockWrite, root->latch, thread_no, __LINE__);
2244 bt_unpinlatch (root->latch, 1, thread_no, __LINE__);
2246 bt_unpinlatch (right, 1, thread_no, __LINE__);
2250 // split already locked full node
2252 // return pool entry for new right
2253 // page, pinned & unlocked
2255 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint linkleft)
2257 uint page_size = mgr->page_size;
2258 BtPageSet right[1], temp[1];
2259 uint cnt = 0, idx = 0, max;
2260 uint lvl = set->page->lvl;
2268 if( !set->page->lvl )
2269 page_size <<= mgr->leaf_xtra;
2271 // split higher half of keys to frame
2273 frame = malloc (page_size);
2274 memset (frame, 0, page_size);
2275 frame->min = page_size;
2276 max = set->page->cnt;
2280 while( cnt++ < max ) {
2281 if( cnt < max || set->page->lvl )
2282 if( slotptr(set->page, cnt)->dead )
2285 val = valptr(set->page, cnt);
2286 frame->min -= val->len + sizeof(BtVal);
2287 memcpy ((unsigned char *)frame + frame->min, val, val->len + sizeof(BtVal));
2289 key = keyptr(set->page, cnt);
2290 frame->min -= key->len + sizeof(BtKey);
2291 memcpy ((unsigned char *)frame + frame->min, key, key->len + sizeof(BtKey));
2293 // add librarian slot
2295 slotptr(frame, ++idx)->off = frame->min;
2296 slotptr(frame, idx)->type = Librarian;
2297 slotptr(frame, idx)->dead = 1;
2301 slotptr(frame, ++idx)->off = frame->min;
2302 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2304 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2308 frame->fence = frame->min;
2314 if( set->latch->page_no > ROOT_page ) {
2315 right2 = set->page->right;
2316 frame->right = right2;
2319 frame->left = set->latch->page_no;
2322 // get new free page and write higher keys to it.
2324 if( bt_newpage(mgr, right, frame, thread_no) )
2327 // link far right's left pointer to new page
2329 if( linkleft && set->latch->page_no > ROOT_page )
2331 if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2332 temp->page = bt_mappage (mgr, temp->latch);
2336 bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2337 temp->page->left = right->latch->page_no;
2338 bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2339 bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
2340 } else if( !lvl ) { // page is rightmost leaf
2341 bt_mutexlock (mgr->lock);
2342 mgr->pagezero->alloc->left = right->latch->page_no;
2343 bt_releasemutex(mgr->lock);
2346 // process lower keys
2348 memcpy (frame, set->page, page_size);
2349 memset (set->page+1, 0, page_size - sizeof(*set->page));
2351 set->page->min = page_size;
2352 set->page->garbage = 0;
2358 // assemble page of smaller keys
2360 while( cnt++ < max ) {
2361 if( slotptr(frame, cnt)->dead )
2363 val = valptr(frame, cnt);
2364 set->page->min -= val->len + sizeof(BtVal);
2365 memcpy ((unsigned char *)set->page + set->page->min, val, val->len + sizeof(BtVal));
2367 key = keyptr(frame, cnt);
2368 set->page->min -= key->len + sizeof(BtKey);
2369 memcpy ((unsigned char *)set->page + set->page->min, key, key->len + sizeof(BtKey));
2371 // add librarian slot
2373 slotptr(set->page, ++idx)->off = set->page->min;
2374 slotptr(set->page, idx)->type = Librarian;
2375 slotptr(set->page, idx)->dead = 1;
2379 slotptr(set->page, ++idx)->off = set->page->min;
2380 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2384 set->page->right = right->latch->page_no;
2385 set->page->fence = set->page->min;
2386 set->page->cnt = idx;
2389 entry = right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
2393 // fix keys for newly split page
2394 // call with both pages pinned & locked
2395 // return unlocked and unpinned
2397 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2399 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2400 unsigned char value[BtId];
2401 uint lvl = set->page->lvl;
2407 // if current page is the root page, split it
2409 if( set->latch->page_no == ROOT_page )
2410 return bt_splitroot (mgr, set, right, thread_no);
2412 ptr = fenceptr(set->page);
2413 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2415 page = bt_mappage (mgr, right);
2417 ptr = fenceptr(page);
2418 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2420 // splice in far right page's left page_no
2422 if( right2 = page->right ) {
2423 if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2424 temp->page = bt_mappage (mgr, temp->latch);
2428 bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2429 temp->page->left = right->page_no;
2430 bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2431 bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
2432 } else if( !lvl ) { // right page is far right page
2433 bt_mutexlock (mgr->lock);
2434 mgr->pagezero->alloc->left = right->page_no;
2435 bt_releasemutex(mgr->lock);
2437 // insert new fences in their parent pages
2439 bt_lockpage (BtLockParent, right, thread_no, __LINE__);
2441 bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
2442 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2444 // insert new fence for reformulated left block of smaller keys
2446 bt_putid (value, set->latch->page_no);
2447 ptr = (BtKey *)leftkey;
2449 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2450 return mgr->err_thread = thread_no, mgr->err;
2452 // switch fence for right block of larger keys to new right page
2454 bt_putid (value, right->page_no);
2455 ptr = (BtKey *)rightkey;
2457 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2458 return mgr->err_thread = thread_no, mgr->err;
2460 bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
2461 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2463 bt_unlockpage (BtLockParent, right, thread_no, __LINE__);
2464 bt_unpinlatch (right, 1, thread_no, __LINE__);
2468 // install new key and value onto page
2469 // page must already be checked for
2472 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2474 uint idx, librarian;
2480 // if previous slot is a librarian slot, use it
2483 if( slotptr(set->page, slot-1)->type == Librarian )
2486 // copy value onto page
2488 set->page->min -= vallen + sizeof(BtVal);
2489 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2490 memcpy (val->value, value, vallen);
2493 // copy key onto page
2495 set->page->min -= keylen + sizeof(BtKey);
2496 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2497 memcpy (ptr->key, key, keylen);
2500 // find first empty slot at or above our insert slot
2502 for( idx = slot; idx < set->page->cnt; idx++ )
2503 if( slotptr(set->page, idx)->dead )
2506 // now insert key into array before slot.
2508 // if we're going all the way to the top,
2509 // add as many librarian slots as
2512 if( idx == set->page->cnt ) {
2513 int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2515 librarian = ++idx - slot;
2516 avail /= sizeof(BtSlot);
2521 if( librarian > avail )
2525 rate = (idx - slot) / librarian;
2526 set->page->cnt += librarian;
2531 librarian = 0, rate = 0;
2533 // transfer slots and add librarian slots
2535 while( idx > slot ) {
2536 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2538 // add librarian slot per rate
2541 if( (idx - slot)/2 <= librarian * rate ) {
2542 node = slotptr(set->page, --idx);
2543 node->off = node[1].off;
2544 node->type = Librarian;
2556 node = slotptr(set->page, slot);
2557 node->off = set->page->min;
2563 // Insert new key into the btree at given level.
2564 // either add a new key or update/add an existing one
2566 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2568 uint slot, idx, len, entry;
2574 while( 1 ) { // find the page and slot for the current key
2575 if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2576 node = slotptr(set->page, slot);
2577 ptr = keyptr(set->page, slot);
2581 // if librarian slot == found slot, advance to real slot
2583 if( node->type == Librarian ) {
2584 node = slotptr(set->page, ++slot);
2585 ptr = keyptr(set->page, slot);
2588 // if inserting a duplicate key or unique
2589 // key that doesn't exist on the page,
2590 // check for adequate space on the page
2591 // and insert the new key before slot.
2596 if( !keycmp (ptr, key, keylen) )
2599 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2600 if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
2605 if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2606 if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2609 return mgr->err_thread = thread_no, mgr->err;
2612 if( keycmp (ptr, key, keylen) )
2618 // if key already exists, update value and return
2620 val = valptr(set->page, slot);
2622 if( val->len >= vallen ) {
2628 set->page->garbage += val->len - vallen;
2630 memcpy (val->value, value, vallen);
2634 // new update value doesn't fit in existing value area
2635 // make sure page has room
2638 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2645 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2648 if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2649 if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2652 return mgr->err_thread = thread_no, mgr->err;
2655 // copy key and value onto page and update slot
2657 set->page->min -= vallen + sizeof(BtVal);
2658 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2659 memcpy (val->value, value, vallen);
2662 set->page->min -= keylen + sizeof(BtKey);
2663 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2664 memcpy (ptr->key, key, keylen);
2667 slotptr(set->page,slot)->off = set->page->min;
2670 bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2671 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2675 // determine actual page where key is located
2676 // return slot number
2678 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx, BtPageSet *set)
2680 BtKey *key = keyptr(source,locks[idx].src), *ptr;
2681 uint slot = locks[idx].slot;
2684 if( locks[idx].reuse )
2685 entry = locks[idx-1].entry;
2687 entry = locks[idx].entry;
2690 set->latch = mgr->leafsets + entry;
2691 set->page = bt_mappage (mgr, set->latch);
2695 // find where our key is located
2696 // on current page or pages split on
2697 // same page txn operations.
2700 set->latch = mgr->leafsets + entry;
2701 set->page = bt_mappage (mgr, set->latch);
2703 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2704 if( slotptr(set->page, slot)->type == Librarian )
2706 if( locks[idx].reuse )
2707 locks[idx].entry = entry;
2710 } while( entry = set->latch->split );
2712 mgr->line = __LINE__, mgr->err = BTERR_atomic;
2716 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx, ushort thread_no, logseqno lsn)
2718 BtKey *key = keyptr(source, locks[idx].src);
2719 BtVal *val = valptr(source, locks[idx].src);
2724 while( slot = bt_atomicpage (mgr, source, locks, idx, set) ) {
2725 if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2726 if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,locks[idx].src)->type) )
2727 return mgr->err_thread = thread_no, mgr->err;
2729 set->page->lsn = lsn;
2735 if( entry = bt_splitpage (mgr, set, thread_no, 0) )
2736 latch = mgr->leafsets + entry;
2740 // splice right page into split chain
2743 bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2744 latch->split = set->latch->split;
2745 set->latch->split = entry;
2747 // clear slot number for atomic page
2749 locks[idx].slot = 0;
2752 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
2755 // perform delete from smaller btree
2756 // insert a delete slot if not found there
2758 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx, ushort thread_no, logseqno lsn)
2760 BtKey *key = keyptr(source, locks[idx].src);
2768 while( slot = bt_atomicpage (mgr, source, locks, idx, set) ) {
2769 node = slotptr(set->page, slot);
2770 ptr = keyptr(set->page, slot);
2771 val = valptr(set->page, slot);
2773 // if slot is not found on cache btree, insert a delete slot
2774 // otherwise ignore the request.
2776 if( keycmp (ptr, key->key, key->len) )
2778 if( slot = bt_cleanpage(mgr, set, key->len, slot, 0) )
2779 return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete);
2780 else { // split page before inserting Delete slot
2781 if( entry = bt_splitpage (mgr, set, thread_no, 0) )
2782 latch = mgr->leafsets + entry;
2786 // splice right page into split chain
2789 bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2790 latch->split = set->latch->split;
2791 set->latch->split = entry;
2793 // clear slot number for atomic page
2795 locks[idx].slot = 0;
2801 // if node is already dead,
2802 // ignore the request.
2804 if( node->type == Delete || node->dead )
2807 // if main LSM btree, delete the slot
2808 // else change to delete type.
2814 node->type = Delete;
2816 __sync_fetch_and_add(&mgr->found, 1);
2817 set->page->lsn = lsn;
2821 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
2824 // release master's splits from right to left
2826 void bt_atomicrelease (BtMgr *mgr, uint entry, ushort thread_no)
2828 BtLatchSet *latch = mgr->leafsets + entry;
2831 bt_atomicrelease (mgr, latch->split, thread_no);
2834 bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
2835 bt_unpinlatch(latch, 1, thread_no, __LINE__);
2838 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2840 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2841 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2843 return keycmp (key1, key2->key, key2->len);
2845 // atomic modification of a batch of keys.
2847 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2849 uint src, idx, slot, samepage, entry, que = 0;
2850 BtKey *key, *ptr, *key2;
2856 // stable sort the list of keys into order to
2857 // prevent deadlocks between threads.
2859 for( src = 1; src++ < source->cnt; ) {
2860 *temp = *slotptr(source,src);
2861 key = keyptr (source,src);
2863 for( idx = src; --idx; ) {
2864 key2 = keyptr (source,idx);
2865 if( keycmp (key, key2->key, key2->len) < 0 ) {
2866 *slotptr(source,idx+1) = *slotptr(source,idx);
2867 *slotptr(source,idx) = *temp;
2873 qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2874 // add entries to redo log
2876 if( bt->mgr->pagezero->redopages )
2877 lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2881 // perform the individual actions in the transaction
2883 if( bt_atomicexec (bt->mgr, source, source->cnt, lsn, 0, bt->thread_no) )
2884 return bt->mgr->err;
2886 // if number of active pages
2887 // is greater than the buffer pool
2888 // promote page into larger btree
2891 while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no )
2892 if( bt_promote (bt) )
2893 return bt->mgr->err;
2900 // execute the source list of inserts/deletes
2902 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, uint count, logseqno lsn, int lsm, ushort thread_no)
2904 uint slot, src, idx, samepage, entry, outidx;
2905 BtPageSet set[1], prev[1];
2906 unsigned char value[BtId];
2914 locks = calloc (count, sizeof(AtomicTxn));
2915 memset (set, 0, sizeof(BtPageSet));
2918 // Load the leaf page for each key
2919 // group same page references with reuse bit
2921 for( src = 0; src++ < count; ) {
2922 if( slotptr(source,src)->dead )
2925 key = keyptr(source, src);
2927 // first determine if this modification falls
2928 // on the same page as the previous modification
2929 // note that the far right leaf page is a special case
2931 if( samepage = !!set->page )
2932 samepage = !set->page->right || keycmp (ptr, key->key, key->len) >= 0;
2935 if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
2936 ptr = fenceptr(set->page), set->latch->split = 0;
2943 if( slotptr(set->page, slot)->type == Librarian )
2946 entry = set->latch - mgr->leafsets;
2947 locks[outidx].reuse = samepage;
2948 locks[outidx].entry = entry;
2949 locks[outidx].slot = slot;
2950 locks[outidx].src = src;
2952 // capture current lsn for master page
2954 locks[outidx++].reqlsn = set->page->lsn;
2957 // insert or delete each key
2958 // process any splits or merges
2959 // run through txn list backwards
2963 for( src = outidx; src--; ) {
2964 if( locks[src].reuse )
2967 // perform the txn operations
2968 // from smaller to larger on
2971 for( idx = src; idx < samepage; idx++ )
2972 switch( slotptr(source,locks[idx].src)->type ) {
2974 if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
2980 if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
2985 bt_atomicpage (mgr, source, locks, idx, set);
2989 // after the same page operations have finished,
2990 // process master page for splits or deletion.
2992 latch = prev->latch = mgr->leafsets + locks[src].entry;
2993 prev->page = bt_mappage (mgr, prev->latch);
2996 // pick-up all splits from master page
2997 // each one is already pinned & WriteLocked.
2999 while( entry = prev->latch->split ) {
3000 set->latch = mgr->leafsets + entry;
3001 set->page = bt_mappage (mgr, set->latch);
3003 // delete empty master page by undoing its split
3004 // (this is potentially another empty page)
3005 // note that there are no pointers to it yet
3007 if( !prev->page->act ) {
3008 set->page->left = prev->page->left;
3009 memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
3010 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3011 bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3012 prev->latch->split = set->latch->split;
3013 bt_freepage (mgr, set, thread_no);
3017 // remove empty split page from the split chain
3018 // and return it to the free list. No other
3019 // thread has its page number yet.
3021 if( !set->page->act ) {
3022 prev->page->right = set->page->right;
3023 prev->latch->split = set->latch->split;
3025 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3026 bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3027 bt_freepage (mgr, set, thread_no);
3031 // update prev's fence key
3033 ptr = fenceptr(prev->page);
3034 bt_putid (value, prev->latch->page_no);
3036 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3039 // splice in the left link into the split page
3041 set->page->left = prev->latch->page_no;
3045 // update left pointer in next right page from last split page
3046 // (if all splits were reversed or none occurred, latch->split == 0)
3048 if( latch->split ) {
3049 // fix left pointer in master's original (now split)
3050 // far right sibling or set rightmost page in page zero
3052 if( right_page_no = prev->page->right ) {
3053 if( set->latch = bt_pinleaf (mgr, right_page_no, thread_no) )
3054 set->page = bt_mappage (mgr, set->latch);
3058 bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3059 set->page->left = prev->latch->page_no;
3060 bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
3061 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
3062 } else { // prev is rightmost page
3063 bt_mutexlock (mgr->lock);
3064 mgr->pagezero->alloc->left = prev->latch->page_no;
3065 bt_releasemutex(mgr->lock);
3068 // switch the original fence key from the
3069 // master page to the last split page.
3071 ptr = fenceptr(prev->page);
3072 bt_putid (value, prev->latch->page_no);
3074 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) )
3077 // unlock and unpin the split pages
3079 bt_atomicrelease (mgr, latch->split, thread_no);
3081 // unlock and unpin the master page
3084 bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
3085 bt_unpinlatch(latch, 1, thread_no, __LINE__);
3089 // since there are no splits remaining, we're
3090 // finished if master page occupied
3092 if( prev->page->act ) {
3093 bt_unlockpage(BtLockWrite, prev->latch, thread_no, __LINE__);
3094 bt_unpinlatch(prev->latch, 1, thread_no, __LINE__);
3098 // any and all splits were reversed, and the
3099 // master page located in prev is empty, delete it
3101 if( bt_deletepage (mgr, prev, thread_no, 0) )
3107 for( idx = 0; idx++ < count; ) {
3108 if( slotptr(source,idx)->dead )
3111 slotptr(source,idx)->dead = 1;
3119 // pick & promote a page into the larger btree
3121 BTERR bt_promote (BtDb *bt)
3123 uint entry, slot, idx;
3131 entry = __sync_fetch_and_add(&bt->mgr->leafpromote, 1);
3133 entry = _InterlockedIncrement (&bt->mgr->leafpromote) - 1;
3135 entry %= bt->mgr->leaftotal;
3140 set->latch = bt->mgr->leafsets + entry;
3142 // skip this entry if it has never been used
3144 if( !set->latch->page_no )
3147 if( !bt_mutextry(set->latch->modify) )
3150 // skip this entry if it is pinned
3152 if( set->latch->pin & ~CLOCK_bit ) {
3153 bt_releasemutex(set->latch->modify);
3157 set->page = bt_mappage (bt->mgr, set->latch);
3159 // entry has no right sibling
3161 if( !set->page->right ) {
3162 bt_releasemutex(set->latch->modify);
3166 // entry is being killed or constructed
3168 if( set->page->nopromote || set->page->kill ) {
3169 bt_releasemutex(set->latch->modify);
3173 // pin the page for our access
3174 // and leave it locked for the
3175 // duration of the promotion.
3178 bt_lockpage (BtLockWrite, set->latch, bt->thread_no, __LINE__);
3179 bt_releasemutex(set->latch->modify);
3181 // transfer slots in our selected page to the main btree
3183 if( !(entry % 100) )
3184 fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_no, set->page->act);
3186 if( bt_atomicexec (bt->main, set->page, set->page->cnt, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
3187 fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
3188 return bt->main->err;
3191 // now delete the page
3193 if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
3194 fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread);
3196 return bt->mgr->err;
3200 // find unique key == given key, or first duplicate key in
3201 // leaf level and return number of value bytes
3202 // or (-1) if not found.
3204 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
3213 for( type = 0; type < 2; type++ )
3214 if( slot = bt_loadpage (type ? bt->main : bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) ) {
3215 node = slotptr(set->page, slot);
3217 // skip librarian slot place holder
3219 if( node->type == Librarian )
3220 node = slotptr(set->page, ++slot);
3222 ptr = keyptr(set->page, slot);
3224 // not there if we reach the stopper key
3225 // or the key doesn't match what's on the page.
3227 if( slot == set->page->cnt )
3228 if( !set->page->right ) {
3229 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3230 bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3234 if( keycmp (ptr, key, keylen) ) {
3235 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3236 bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3240 // key matches, return >= 0 value bytes copied
3241 // or -1 if not there.
3243 if( node->type == Delete || node->dead ) {
3248 val = valptr (set->page,slot);
3250 if( valmax > val->len )
3253 memcpy (value, val->value, valmax);
3262 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3263 bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3268 // set cursor to highest slot on right-most page
3270 BTERR bt_lastkey (BtDb *bt)
3272 uid cache_page_no = bt->mgr->pagezero->alloc->left;
3273 uid main_page_no = bt->main->pagezero->alloc->left;
3275 if( bt->cacheset->latch = bt_pinleaf (bt->mgr, cache_page_no, bt->thread_no) )
3276 bt->cacheset->page = bt_mappage (bt->mgr, bt->cacheset->latch);
3278 return bt->mgr->err;
3280 bt_lockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3281 bt->cacheslot = bt->cacheset->page->cnt;
3283 if( bt->mainset->latch = bt_pinleaf (bt->main, main_page_no, bt->thread_no) )
3284 bt->mainset->page = bt_mappage (bt->main, bt->mainset->latch);
3286 return bt->main->err;
3288 bt_lockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3289 bt->mainslot = bt->mainset->page->cnt;
3294 // return previous slot on cursor page
3296 uint bt_prevslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
3298 uid next, us = set->latch->page_no;
3302 if( slotptr(set->page, slot)->dead )
3307 next = set->page->left;
3313 bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3314 bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
3316 if( set->latch = bt_pinleaf (mgr, next, thread_no) )
3317 set->page = bt_mappage (mgr, set->latch);
3321 bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3322 next = set->page->right;
3324 } while( next != us );
3326 slot = set->page->cnt + 1;
3330 // advance to previous key
3332 BTERR bt_prevkey (BtDb *bt)
3336 // first advance last key(s) one previous slot
3339 switch( bt->phase ) {
3341 bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3344 bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3347 bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3348 bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3354 if( bt->cacheslot ) {
3355 bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3356 bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3357 bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3360 if( bt->mainslot ) {
3361 bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3362 bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3363 bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3366 if( bt->mainslot && bt->cacheslot )
3367 cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3368 else if( bt->cacheslot )
3370 else if( bt->mainslot )
3375 // cache key is larger
3379 if( bt->cachenode->type == Delete )
3381 return bt->cacheslot;
3384 // main key is larger
3388 return bt->mainslot;
3395 if( bt->cachenode->type == Delete )
3398 return bt->cacheslot;
3402 // advance to next slot in cache or main btree
3403 // return 0 for EOF/error
3405 uint bt_nextslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
3411 while( slot++ < set->page->cnt )
3412 if( slotptr(set->page, slot)->dead )
3414 else if( slot < set->page->cnt || set->page->right )
3419 bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3420 bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
3422 if( page_no = set->page->right )
3423 if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
3424 set->page = bt_mappage (mgr, set->latch);
3430 // obtain access lock using lock chaining with Access mode
3432 bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
3433 bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3434 bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
3439 // advance to next key
3441 BTERR bt_nextkey (BtDb *bt)
3445 // first advance last key(s) one next slot
3448 switch( bt->phase ) {
3450 bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3453 bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3456 bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3457 bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3463 if( bt->cacheslot ) {
3464 bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3465 bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3466 bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3469 if( bt->mainslot ) {
3470 bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3471 bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3472 bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3475 if( bt->mainslot && bt->cacheslot )
3476 cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3477 else if( bt->mainslot )
3479 else if( bt->cacheslot )
3484 // main key is larger
3485 // return smaller key
3489 if( bt->cachenode->type == Delete )
3491 return bt->cacheslot;
3494 // cache key is larger
3498 return bt->mainslot;
3505 if( bt->cachenode->type == Delete )
3508 return bt->cacheslot;
3512 // start sweep of keys
3514 BTERR bt_startkey (BtDb *bt, unsigned char *key, uint len)
3521 if( slot = bt_loadpage (bt->mgr, bt->cacheset, key, len, 0, BtLockRead, bt->thread_no) )
3522 bt->cacheslot = slot - 1;
3524 return bt->mgr->err;
3528 if( slot = bt_loadpage (bt->main, bt->mainset, key, len, 0, BtLockRead, bt->thread_no) )
3529 bt->mainslot = slot - 1;
3531 return bt->mgr->err;
3537 // flush cache pages to main btree
3539 BTERR bt_flushmain (BtDb *bt)
3541 uint count, cnt = 0;
3544 while( bt->mgr->pagezero->leafpages > 0 ) {
3545 if( set->latch = bt_pinleaf (bt->mgr, LEAF_page, bt->thread_no) )
3546 set->page = bt_mappage (bt->mgr, set->latch);
3548 return bt->mgr->err;
3550 bt_lockpage(BtLockWrite, set->latch, bt->thread_no, __LINE__);
3551 count = set->page->cnt;
3553 if( !set->page->right )
3556 if( !(cnt++ % 100) )
3557 fprintf(stderr, "Promote LEAF_page %d with %d keys\n", cnt, set->page->act);
3559 if( bt_atomicexec (bt->main, set->page, count, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) )
3560 return bt->mgr->line = bt->main->line, bt->mgr->err = bt->main->err;
3562 if( set->page->right )
3563 if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
3564 return bt->mgr->err;
3568 bt_unlockpage(BtLockWrite, set->latch, bt->thread_no, __LINE__);
3569 bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3573 // leaf page count is off
3575 bt->mgr->err_thread = bt->thread_no, bt->mgr->line = __LINE__;
3576 return bt->mgr->err = BTERR_ovflw;
3582 double getCpuTime(int type)
3585 FILETIME xittime[1];
3586 FILETIME systime[1];
3587 FILETIME usrtime[1];
3588 SYSTEMTIME timeconv[1];
3591 memset (timeconv, 0, sizeof(SYSTEMTIME));
3595 GetSystemTimeAsFileTime (xittime);
3596 FileTimeToSystemTime (xittime, timeconv);
3597 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3600 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3601 FileTimeToSystemTime (usrtime, timeconv);
3604 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3605 FileTimeToSystemTime (systime, timeconv);
3609 ans += (double)timeconv->wHour * 3600;
3610 ans += (double)timeconv->wMinute * 60;
3611 ans += (double)timeconv->wSecond;
3612 ans += (double)timeconv->wMilliseconds / 1000;
3617 #include <sys/resource.h>
3619 double getCpuTime(int type)
3621 struct rusage used[1];
3622 struct timeval tv[1];
3626 gettimeofday(tv, NULL);
3627 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3630 getrusage(RUSAGE_SELF, used);
3631 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3634 getrusage(RUSAGE_SELF, used);
3635 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3642 void bt_poolaudit (BtMgr *mgr, char *type)
3644 BtLatchSet *latch, test[1];
3647 memset (test, 0, sizeof(test));
3649 if( memcmp (test, mgr->latchsets, sizeof(test)) )
3650 fprintf(stderr, "%s latchset zero overwritten\n", type);
3652 if( memcmp (test, mgr->leafsets, sizeof(test)) )
3653 fprintf(stderr, "%s leafset zero overwritten\n", type);
3655 for( entry = 0; ++entry < mgr->latchtotal; ) {
3656 latch = mgr->latchsets + entry;
3659 fprintf(stderr, "%s latchset %d is a leaf on page %d\n", type, entry, latch->page_no);
3661 if( *latch->modify->value )
3662 fprintf(stderr, "%s latchset %d modifylocked for page %d\n", type, entry, latch->page_no);
3664 if( latch->pin & ~CLOCK_bit )
3665 fprintf(stderr, "%s latchset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3668 for( entry = 0; ++entry < mgr->leaftotal; ) {
3669 latch = mgr->leafsets + entry;
3672 fprintf(stderr, "%s leafset %d is not a leaf on page %d\n", type, entry, latch->page_no);
3674 if( *latch->modify->value )
3675 fprintf(stderr, "%s leafset %d modifylocked for page %d\n", type, entry, latch->page_no);
3677 if( latch->pin & ~CLOCK_bit )
3678 fprintf(stderr, "%s leafset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3691 // standalone program to index file of keys
3692 // then list them onto std-out
3695 void *index_file (void *arg)
3697 uint __stdcall index_file (void *arg)
3700 int line = 0, found = 0, cnt = 0, cachecnt, idx;
3701 uid next, page_no = LEAF_page; // start on first page of leaves
3702 int ch, len = 0, slot, type = 0;
3703 unsigned char key[BT_maxkey];
3704 unsigned char buff[65536];
3705 uint nxt = sizeof(buff);
3706 ThreadArg *args = arg;
3717 bt = bt_open (args->mgr, args->main);
3718 page = (BtPage)buff;
3720 if( args->idx < strlen (args->type) )
3721 ch = args->type[args->idx];
3723 ch = args->type[strlen(args->type) - 1];
3728 fprintf(stderr, "started flushing cache to main btree\n");
3731 if( bt_flushmain(bt) )
3732 fprintf(stderr, "Error %d Line: %d thread: %d\n", bt->mgr->err, bt->mgr->line, bt->thread_no), exit(0);
3744 if( type == Delete )
3745 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3747 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3749 if( type == Delete )
3750 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3752 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3754 if( in = fopen (args->infile, "rb") )
3755 while( ch = getc(in), ch != EOF )
3761 if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3762 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3768 memcpy (buff + nxt, key + 10, len - 10);
3770 buff[nxt] = len - 10;
3772 memcpy (buff + nxt, key, 10);
3775 slotptr(page,++cnt)->off = nxt;
3776 slotptr(page,cnt)->type = type;
3777 slotptr(page,cnt)->dead = 0;
3780 if( cnt < args->num )
3787 if( bt_atomictxn (bt, page) )
3788 fprintf(stderr, "Error %d Line: %d by %d source: %d\n", bt->mgr->err, bt->mgr->line, bt->mgr->err_thread, line), exit(0);
3793 else if( len < BT_maxkey )
3795 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3799 fprintf(stderr, "started indexing for %s\n", args->infile);
3800 if( in = fopen (args->infile, "r") )
3801 while( ch = getc(in), ch != EOF )
3806 if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3807 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3810 else if( len < BT_maxkey )
3812 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3816 fprintf(stderr, "started finding keys for %s\n", args->infile);
3817 if( in = fopen (args->infile, "rb") )
3818 while( ch = getc(in), ch != EOF )
3822 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3824 else if( bt->mgr->err )
3825 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3828 else if( len < BT_maxkey )
3830 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3834 fprintf(stderr, "started forward scan\n");
3835 if( bt_startkey (bt, NULL, 0) )
3836 fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3838 while( bt_nextkey (bt) ) {
3839 if( bt->phase == 1 ) {
3840 len = bt->mainkey->len;
3842 if( bt->mainnode->type == Duplicate )
3845 fwrite (bt->mainkey->key, len, 1, stdout);
3846 fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3848 len = bt->cachekey->len;
3850 if( bt->cachenode->type == Duplicate )
3853 fwrite (bt->cachekey->key, len, 1, stdout);
3854 fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3857 fputc ('\n', stdout);
3861 bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3862 bt_unpinlatch (bt->cacheset->latch, 0, bt->thread_no, __LINE__);
3864 bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3865 bt_unpinlatch (bt->mainset->latch, 0, bt->thread_no, __LINE__);
3867 fprintf(stderr, " Total keys read %d\n", cnt);
3871 fprintf(stderr, "started reverse scan\n");
3872 if( bt_lastkey (bt) )
3873 fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3875 while( bt_prevkey (bt) ) {
3876 if( bt->phase == 1 ) {
3877 len = bt->mainkey->len;
3879 if( bt->mainnode->type == Duplicate )
3882 fwrite (bt->mainkey->key, len, 1, stdout);
3883 fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3885 len = bt->cachekey->len;
3887 if( bt->cachenode->type == Duplicate )
3890 fwrite (bt->cachekey->key, len, 1, stdout);
3891 fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3894 fputc ('\n', stdout);
3898 bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3899 bt_unpinlatch (bt->cacheset->latch, 0, bt->thread_no, __LINE__);
3901 bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3902 bt_unpinlatch (bt->mainset->latch, 0, bt->thread_no, __LINE__);
3904 fprintf(stderr, " Total keys read %d\n", cnt);
3908 fprintf(stderr, "started counting LSM cache btree\n");
3909 next = bt->mgr->redopage + bt->mgr->pagezero->redopages;
3910 memset (counts, 0, sizeof(counts));
3911 page_no = LEAF_page;
3912 size = bt->mgr->page_size << bt->mgr->leaf_xtra;
3913 page = malloc(size);
3916 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3918 while( page_no < bt->mgr->pagezero->alloc->right ) {
3919 pread(bt->mgr->idx, page, size, page_no << bt->mgr->page_bits);
3920 if( !page->lvl && !page->free ) {
3923 for( idx = 0; idx++ < page->cnt; ) {
3924 BtSlot *node = slotptr (page, idx);
3925 counts[node->type][node->dead]++;
3931 page_no += page->lvl ? 1 : (1 << bt->mgr->leaf_xtra);
3935 cachecnt = --cnt; // remove stopper key
3936 counts[Unique][0]--;
3938 fprintf(stderr, " Unique : %d dead: %d\n", counts[Unique][0], counts[Unique][1]);
3939 fprintf(stderr, " Duplicates: %d dead: %d\n", counts[Duplicate][0], counts[Duplicate][1]);
3940 fprintf(stderr, " Librarian : %d dead: %d\n", counts[Librarian][0], counts[Librarian][1]);
3941 fprintf(stderr, " Deletion : %d dead: %d\n", counts[Delete][0], counts[Delete][1]);
3942 fprintf(stderr, "total cache keys count: %d\n", cachecnt);
3945 fprintf(stderr, "started counting LSM main btree\n");
3946 next = bt->main->redopage + bt->main->pagezero->redopages;
3947 memset (counts, 0, sizeof(counts));
3948 size = bt->main->page_size << bt->main->leaf_xtra;
3949 page = malloc(size);
3950 page_no = LEAF_page;
3954 posix_fadvise( bt->main->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3956 while( page_no < bt->main->pagezero->alloc->right ) {
3957 pread(bt->main->idx, page, size, page_no << bt->main->page_bits);
3958 if( !page->lvl && !page->free ) {
3961 for( idx = 0; idx++ < page->cnt; ) {
3962 BtSlot *node = slotptr (page, idx);
3963 counts[node->type][node->dead]++;
3969 page_no += page->lvl ? 1 : (1 << bt->main->leaf_xtra);
3973 cnt--; // remove stopper key
3974 counts[Unique][0]--;
3976 fprintf(stderr, " Unique : %d dead: %d\n", counts[Unique][0], counts[Unique][1]);
3977 fprintf(stderr, " Duplicates: %d dead: %d\n", counts[Duplicate][0], counts[Duplicate][1]);
3978 fprintf(stderr, " Librarian : %d dead: %d\n", counts[Librarian][0], counts[Librarian][1]);
3979 fprintf(stderr, " Deletion : %d dead: %d\n", counts[Delete][0], counts[Delete][1]);
3980 fprintf(stderr, "total main keys count : %d\n", cnt);
3981 fprintf(stderr, "Total keys counted : %d\n", cnt + cachecnt);
3994 typedef struct timeval timer;
3996 int main (int argc, char **argv)
3998 int idx, cnt, len, slot, err;
4006 uint mainleafpool = 0;
4007 uint mainleafxtra = 0;
4023 fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
4024 fprintf (stderr, " where idx_file is the name of the cache btree file\n");
4025 fprintf (stderr, " where main_file is the name of the main btree file\n");
4026 fprintf (stderr, " cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort/(c)ount/(m)ainflush, with a one character command for each input src_file. A command can also be given with no input file\n");
4027 fprintf (stderr, " pagebits is the page size in bits for the cache btree\n");
4028 fprintf (stderr, " leafbits is the number of xtra bits for a leaf page\n");
4029 fprintf (stderr, " poolsize is the number of pages in buffer pool for the cache btree\n");
4030 fprintf (stderr, " leafpool is the number of leaf pages in leaf pool for the cache btree\n");
4031 fprintf (stderr, " txnsize = n to block transactions into n units, or zero for no transactions\n");
4032 fprintf (stderr, " redopages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
4033 fprintf (stderr, " mainbits is the page size of the main btree in bits\n");
4034 fprintf (stderr, " mainpool is the number of main pages in the main buffer pool\n");
4035 fprintf (stderr, " mainleaf is the number of main pages in the main leaf pool\n");
4036 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
4040 start = getCpuTime(0);
4043 bits = atoi(argv[4]);
4046 leafxtra = atoi(argv[5]);
4049 poolsize = atoi(argv[6]);
4052 fprintf (stderr, "no page pool\n"), exit(1);
4055 leafpool = atoi(argv[7]);
4058 num = atoi(argv[8]);
4061 redopages = atoi(argv[9]);
4063 if( redopages > 65535 )
4064 fprintf (stderr, "Recovery buffer too large\n"), exit(1);
4067 mainbits = atoi(argv[10]);
4070 mainleafxtra = atoi(argv[11]);
4073 mainpool = atoi(argv[12]);
4076 mainleafpool = atoi(argv[13]);
4080 threads = malloc (cnt * sizeof(pthread_t));
4082 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
4084 args = malloc ((cnt + 1) * sizeof(ThreadArg));
4086 mgr = bt_mgr (argv[1], bits, leafxtra, poolsize, leafpool, redopages);
4089 fprintf(stderr, "Index Open Error %s\n", argv[1]);
4095 main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool, mainleafpool, 0);
4098 fprintf(stderr, "Index Open Error %s\n", argv[2]);
4108 for( idx = 0; idx < cnt; idx++ ) {
4109 args[idx].infile = argv[idx + 14];
4110 args[idx].type = argv[3];
4111 args[idx].main = main;
4112 args[idx].mgr = mgr;
4113 args[idx].num = num;
4114 args[idx].idx = idx;
4116 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
4117 fprintf(stderr, "Error creating thread %d\n", err);
4119 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
4123 args[0].infile = argv[idx + 10];
4124 args[0].type = argv[3];
4125 args[0].main = main;
4132 // wait for termination
4135 for( idx = 0; idx < cnt; idx++ )
4136 pthread_join (threads[idx], NULL);
4138 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
4140 for( idx = 0; idx < cnt; idx++ )
4141 CloseHandle(threads[idx]);
4143 bt_poolaudit(mgr, "cache");
4146 bt_poolaudit(main, "main");
4148 fprintf(stderr, "cache %lld leaves %lld upper %d reads %d writes %d found\n", mgr->pagezero->leafpages, mgr->pagezero->upperpages, mgr->reads, mgr->writes, mgr->found);
4150 fprintf(stderr, "main %lld leaves %lld upper %d reads %d writes %d found\n", main->pagezero->leafpages, main->pagezero->upperpages, main->reads, main->writes, main->found);
4157 elapsed = getCpuTime(0) - start;
4158 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4159 elapsed = getCpuTime(1);
4160 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4161 elapsed = getCpuTime(2);
4162 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4165 BtKey *bt_fence (BtPage page)
4167 return fenceptr(page);
4170 BtKey *bt_key (BtPage page, uint slot)
4172 return keyptr(page,slot);
4175 BtSlot *bt_slot (BtPage page, uint slot)
4177 return slotptr(page,slot);