1 // btree version threadskv10g futex version
2 // with reworked bt_deletekey code,
3 // phase-fair re-entrant reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // redo log for failure recovery
10 // LSM B-trees for write optimization
11 // larger sized leaf pages than non-leaf
12 // and LSM B-tree find & count operations
16 // author: karl malbrain, malbrain@cal.berkeley.edu
19 This work, including the source code, documentation
20 and related data, is placed into the public domain.
22 The orginal author is Karl Malbrain.
24 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
25 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
26 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
27 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
28 RESULTING FROM THE USE, MODIFICATION, OR
29 REDISTRIBUTION OF THIS SOFTWARE.
32 // Please see the project home page for documentation
33 // code.google.com/p/high-concurrency-btree
35 #define _FILE_OFFSET_BITS 64
36 #define _LARGEFILE64_SOURCE
40 #include <xmmintrin.h>
41 #include <linux/futex.h>
56 #define WIN32_LEAN_AND_MEAN
70 typedef unsigned long long uid;
71 typedef unsigned long long logseqno;
74 typedef unsigned long long off64_t;
75 typedef unsigned short ushort;
76 typedef unsigned int uint;
79 #define BT_ro 0x6f72 // ro
80 #define BT_rw 0x7772 // rw
82 #define BT_maxbits 26 // maximum page size in bits
83 #define BT_minbits 9 // minimum page size in bits
84 #define BT_minpage (1 << BT_minbits) // minimum page size
85 #define BT_maxpage (1 << BT_maxbits) // maximum page size
87 // BTree page number constants
88 #define ALLOC_page 0 // allocation page
89 #define ROOT_page 1 // root of the btree
90 #define LEAF_page 2 // first page of leaves
92 // Number of levels to create in a new BTree
97 There are six lock types for each node in four independent sets:
98 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
99 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
100 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
101 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
102 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
103 6. (set 4) LinkModification: Exclusive. Update of a node's left link is underway. Incompatible with another LinkModification.
118 volatile unsigned char xcl[1];
119 volatile unsigned char filler;
120 volatile ushort waiters[1];
126 // definition for reader/writer reentrant lock implementation
132 ushort dup; // re-entrant locks
133 ushort tid; // owner thread-no
134 ushort line; // owner line #
137 // hash table entries
141 uint entry; // Latch table entry at head of chain
144 // latch manager table structure
147 uid page_no; // latch set page number
148 RWLock readwr[1]; // read/write page lock
149 RWLock access[1]; // Access Intent/Page delete
150 RWLock parent[1]; // Posting of fence key in parent
151 RWLock link[1]; // left link update in progress
152 MutexLatch modify[1]; // modify entry lite latch
153 uint split; // right split page atomic insert
154 uint next; // next entry in hash table chain
155 uint prev; // prev entry in hash table chain
156 volatile ushort pin; // number of accessing threads
157 unsigned char dirty; // page in cache is dirty
158 unsigned char leaf; // page in cache is a leaf
161 // Define the length of the page record numbers
165 // Page key slot definition.
167 // Keys are marked dead, but remain on the page until
168 // it cleanup is called. The fence key (highest key) for
169 // a leaf page is always present, even after cleanup.
173 // In addition to the Unique keys that occupy slots
174 // there are Librarian and Duplicate key
175 // slots occupying the key slot array.
177 // The Librarian slots are dead keys that
178 // serve as filler, available to add new Unique
179 // or Dup slots that are inserted into the B-tree.
181 // The Duplicate slots have had their key bytes extended
182 // by 6 bytes to contain a binary duplicate key uniqueifier.
193 uint off:BT_maxbits; // page offset for key start
194 uint type:3; // type of slot
195 uint dead:1; // set for deleted slot
198 // The key structure occupies space at the upper end of
199 // each page. It's a length byte followed by the key
203 unsigned char len; // this can be changed to a ushort or uint
204 unsigned char key[0];
207 // the value structure also occupies space at the upper
208 // end of the page. Each key is immediately followed by a value.
211 unsigned char len; // this can be changed to a ushort or uint
212 unsigned char value[0];
215 #define BT_maxkey 255 // maximum number of bytes in a key
216 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
218 // The first part of an index page.
219 // It is immediately followed
220 // by the BtSlot array of keys.
222 typedef struct BtPage_ {
223 uint cnt; // count of keys in page
224 uint act; // count of active keys
225 uint min; // next key offset
226 uint garbage; // page garbage in bytes
227 unsigned char lvl; // level of page
228 unsigned char free; // page is on free chain
229 unsigned char kill; // page is being deleted
230 unsigned char nopromote; // page is being constructed
231 unsigned char filler1[6]; // padding to multiple of 8 bytes
232 unsigned char right[BtId]; // page number to right
233 unsigned char left[BtId]; // page number to left
234 unsigned char filler2[2]; // padding to multiple of 8 bytes
235 logseqno lsn; // log sequence number applied
236 uid page_no; // this page number
239 // The loadpage interface object
242 BtPage page; // current page pointer
243 BtLatchSet *latch; // current page latch set
246 // structure for latch manager on ALLOC_page
249 struct BtPage_ alloc[1]; // next page_no in right ptr
250 unsigned char freechain[BtId]; // head of free page_nos chain
251 unsigned char leafchain[BtId]; // head of leaf page_nos chain
252 unsigned long long leafpages; // number of active leaf pages
253 unsigned long long upperpages; // number of active upper pages
254 uint redopages; // number of redo pages in file
255 unsigned char leaf_xtra; // leaf page size in xtra bits
256 unsigned char page_bits; // base page size in bits
259 // The object structure for Btree access
262 uint page_size; // base page size
263 uint page_bits; // base page size in bits
264 uint leaf_xtra; // leaf xtra bits
270 BtPageZero *pagezero; // mapped allocation page
271 BtHashEntry *hashtable; // the buffer pool hash table entries
272 BtHashEntry *leaftable; // the buffer pool hash table entries
273 BtLatchSet *latchsets; // mapped latch set from buffer pool
274 BtLatchSet *leafsets; // mapped latch set from buffer pool
275 unsigned char *pagepool; // mapped to the buffer pool pages
276 unsigned char *leafpool; // mapped to the leaf pool pages
277 unsigned char *redobuff; // mapped recovery buffer pointer
278 logseqno lsn, flushlsn; // current & first lsn flushed
279 MutexLatch redo[1]; // redo area lite latch
280 MutexLatch lock[1]; // allocation area lite latch
281 ushort thread_no[1]; // highest thread number issued
282 ushort err_thread; // error thread number
283 uint nlatchpage; // size of buffer pool & latchsets
284 uint latchtotal; // number of page latch entries
285 uint latchhash; // number of latch hash table slots
286 uint latchvictim; // next latch entry to swap out
287 uint nleafpage; // size of leaf pool & leafsets
288 uint leaftotal; // number of leaf latch entries
289 uint leafhash; // number of leaf hash table slots
290 uint leafvictim; // next leaf entry to swap out
291 uint leafpromote; // next leaf entry to promote
292 uint redopage; // page number of redo buffer
293 uint redolast; // last msync size of recovery buff
294 uint redoend; // eof/end element in recovery buff
295 int err; // last error
296 int line; // last error line no
297 int found; // number of keys found by delete
298 int reads, writes; // number of reads and writes
299 int type; // type of LSM tree 0=cache, 1=main
301 HANDLE halloc; // allocation handle
302 HANDLE hpool; // buffer pool handle
304 unsigned char *page0; // memory mapped pagezero of b-tree
308 BtMgr *mgr; // buffer manager for entire process
309 BtMgr *main; // buffer manager for main btree
310 BtPageSet cacheset[1]; // cached page frame for cache btree
311 BtPageSet mainset[1]; // cached page frame for main btree
312 uint cacheslot; // slot number in cacheset
313 uint mainslot; // slot number in mainset
314 ushort phase; // 1 = main btree 0 = cache btree 2 = both
315 ushort thread_no; // thread number
324 // atomic txn structure
327 logseqno reqlsn; // redo log seq no required
328 uint entry:31; // latch table entry number
329 uint reuse:1; // reused previous page
330 uint slot; // slot on page
333 // Catastrophic errors
347 #define CLOCK_bit 0x8000
349 // recovery manager entry types
352 BTRM_eof = 0, // rest of buffer is emtpy
353 BTRM_add, // add a unique key-value to btree
354 BTRM_dup, // add a duplicate key-value to btree
355 BTRM_del, // delete a key-value from btree
356 BTRM_upd, // update a key with a new value
357 BTRM_new, // allocate a new empty page
358 BTRM_old // reuse an old empty page
361 // recovery manager entry
362 // structure followed by BtKey & BtVal
365 logseqno reqlsn; // log sequence number required
366 logseqno lsn; // log sequence number for entry
367 uint len; // length of entry
368 unsigned char type; // type of entry
369 unsigned char lvl; // level of btree entry pertains to
374 extern void bt_close (BtDb *bt);
375 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
376 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
377 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
378 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
379 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
380 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
381 extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
383 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
385 extern BTERR bt_startkey (BtDb *db, unsigned char *key, uint len);
386 extern BTERR bt_nextkey (BtDb *bt);
388 extern uint bt_lastkey (BtDb *bt);
389 extern uint bt_prevkey (BtDb *bt);
392 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
393 extern void bt_mgrclose (BtMgr *mgr);
394 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
395 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
397 // atomic transaction functions
398 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
399 BTERR bt_promote (BtDb *bt);
401 // The page is allocated from low and hi ends.
402 // The key slots are allocated from the bottom,
403 // while the text and value of the key
404 // are allocated from the top. When the two
405 // areas meet, the page is split into two.
407 // A key consists of a length byte, two bytes of
408 // index number (0 - 65535), and up to 253 bytes
411 // Associated with each key is a value byte string
412 // containing any value desired.
414 // The b-tree root is always located at page 1.
415 // The first leaf page of level zero is always
416 // located on page 2.
418 // The b-tree pages are linked with next
419 // pointers to facilitate enumerators,
420 // and provide for concurrency.
422 // When to root page fills, it is split in two and
423 // the tree height is raised by a new root at page
424 // one with two keys.
426 // Deleted keys are marked with a dead bit until
427 // page cleanup. The fence key for a leaf node is
430 // To achieve maximum concurrency one page is locked at a time
431 // as the tree is traversed to find leaf key in question. The right
432 // page numbers are used in cases where the page is being split,
435 // Page 0 is dedicated to lock for new page extensions,
436 // and chains empty pages together for reuse. It also
437 // contains the latch manager hash table.
439 // The ParentModification lock on a node is obtained to serialize posting
440 // or changing the fence key for a node.
442 // Empty pages are chained together through the ALLOC page and reused.
444 // Access macros to address slot and key values from the page
445 // Page slots use 1 based indexing.
447 #define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1))
448 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
449 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
451 void bt_putid(unsigned char *dest, uid id)
456 dest[i] = (unsigned char)id, id >>= 8;
459 uid bt_getid(unsigned char *src)
464 for( i = 0; i < BtId; i++ )
465 id <<= 8, id |= *src++;
470 // lite weight spin lock Latch Manager
472 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
474 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
477 void bt_mutexlock(MutexLatch *latch)
479 uint idx, waited = 0;
483 for( idx = 0; idx < 100; idx++ ) {
484 *prev->value = __sync_fetch_and_or (latch->value, 1);
485 if( !*prev->bits->xcl ) {
487 __sync_fetch_and_sub (latch->bits->waiters, 1);
493 __sync_fetch_and_add (latch->bits->waiters, 1);
494 *prev->bits->waiters += 1;
498 sys_futex (latch->value, FUTEX_WAIT_PRIVATE, *prev->value, NULL, NULL, 0);
502 int bt_mutextry(MutexLatch *latch)
504 return !__sync_lock_test_and_set (latch->bits->xcl, 1);
507 void bt_releasemutex(MutexLatch *latch)
511 *prev->value = __sync_fetch_and_and (latch->value, 0xffff0000);
513 if( *prev->bits->waiters )
514 sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
517 // reader/writer lock implementation
519 void WriteLock (RWLock *lock, ushort tid, uint line)
521 if( lock->tid == tid ) {
525 bt_mutexlock (lock->xcl);
526 bt_mutexlock (lock->wrt);
527 bt_releasemutex (lock->xcl);
533 void WriteRelease (RWLock *lock)
540 bt_releasemutex (lock->wrt);
543 void ReadLock (RWLock *lock, ushort tid)
545 bt_mutexlock (lock->xcl);
547 if( !__sync_fetch_and_add (&lock->readers, 1) )
548 bt_mutexlock (lock->wrt);
550 bt_releasemutex (lock->xcl);
553 void ReadRelease (RWLock *lock)
555 if( __sync_fetch_and_sub (&lock->readers, 1) == 1 )
556 bt_releasemutex (lock->wrt);
559 // recovery manager -- flush dirty pages
561 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
563 uint cnt3 = 0, cnt2 = 0, cnt = 0;
568 // flush dirty pool pages to the btree
570 fprintf(stderr, "Start flushlsn ");
571 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
572 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
573 latch = mgr->latchsets + entry;
574 bt_mutexlock (latch->modify);
575 bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
578 bt_writepage(mgr, page, latch->page_no, 0);
579 latch->dirty = 0, cnt++;
581 if( latch->pin & ~CLOCK_bit )
583 bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
584 bt_releasemutex (latch->modify);
586 for( entry = 1; entry < mgr->leaftotal; entry++ ) {
587 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
588 latch = mgr->leafsets + entry;
589 bt_mutexlock (latch->modify);
590 bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
593 bt_writepage(mgr, page, latch->page_no, 1);
594 latch->dirty = 0, cnt++;
596 if( latch->pin & ~CLOCK_bit )
598 bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
599 bt_releasemutex (latch->modify);
601 fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
604 // recovery manager -- process current recovery buff on startup
605 // this won't do much if previous session was properly closed.
607 BTERR bt_recoveryredo (BtMgr *mgr)
614 hdr = (BtLogHdr *)mgr->redobuff;
615 mgr->flushlsn = hdr->lsn;
618 hdr = (BtLogHdr *)(mgr->redobuff + offset);
619 switch( hdr->type ) {
623 case BTRM_add: // add a unique key-value to btree
625 case BTRM_dup: // add a duplicate key-value to btree
626 case BTRM_del: // delete a key-value from btree
627 case BTRM_upd: // update a key with a new value
628 case BTRM_new: // allocate a new empty page
629 case BTRM_old: // reuse an old empty page
635 // recovery manager -- append new entry to recovery log
636 // flush dirty pages to disk when it overflows.
638 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
640 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
641 uint amt = sizeof(BtLogHdr);
645 bt_mutexlock (mgr->redo);
648 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
650 // see if new entry fits in buffer
651 // flush and reset if it doesn't
653 if( amt > size - mgr->redoend ) {
654 mgr->flushlsn = mgr->lsn;
655 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
656 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
659 bt_flushlsn(mgr, thread_no);
662 // fill in new entry & either eof or end block
664 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
669 hdr->lsn = ++mgr->lsn;
673 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
674 memset (eof, 0, sizeof(BtLogHdr));
676 // fill in key and value
679 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
680 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
683 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
684 memset (eof, 0, sizeof(BtLogHdr));
687 last = mgr->redolast & ~0xfff;
690 if( end - last + sizeof(BtLogHdr) >= 32768 )
691 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
692 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
696 bt_releasemutex(mgr->redo);
700 // recovery manager -- append transaction to recovery log
701 // flush dirty pages to disk when it overflows.
703 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
705 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
706 uint amt = 0, src, type;
713 // determine amount of redo recovery log space required
715 for( src = 0; src++ < source->cnt; ) {
716 key = keyptr(source,src);
717 val = valptr(source,src);
718 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
719 amt += sizeof(BtLogHdr);
722 bt_mutexlock (mgr->redo);
724 // see if new entry fits in buffer
725 // flush and reset if it doesn't
727 if( amt > size - mgr->redoend ) {
728 mgr->flushlsn = mgr->lsn;
729 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
730 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
733 bt_flushlsn (mgr, thread_no);
736 // assign new lsn to transaction
740 // fill in new entries
742 for( src = 0; src++ < source->cnt; ) {
743 key = keyptr(source, src);
744 val = valptr(source, src);
746 switch( slotptr(source, src)->type ) {
758 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
759 amt += sizeof(BtLogHdr);
761 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
767 // fill in key and value
769 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
770 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
775 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
776 memset (eof, 0, sizeof(BtLogHdr));
779 last = mgr->redolast & ~0xfff;
782 if( end - last + sizeof(BtLogHdr) >= 32768 )
783 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
784 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
788 bt_releasemutex(mgr->redo);
792 // read page into buffer pool from permanent location in Btree file
794 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
796 uint page_size = mgr->page_size;
799 page_size <<= mgr->leaf_xtra;
801 if( pread(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
802 return mgr->err = BTERR_read;
804 __sync_fetch_and_add (&mgr->reads, 1);
808 // write page to permanent location in Btree file
810 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
812 uint page_size = mgr->page_size;
815 page_size <<= mgr->leaf_xtra;
817 if( pwrite(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
818 return mgr->err = BTERR_wrt;
820 __sync_fetch_and_add (&mgr->writes, 1);
824 // set CLOCK bit in latch
825 // decrement pin count
827 void bt_unpinlatch (BtLatchSet *latch, uint dirty, ushort thread_no, uint line)
829 bt_mutexlock(latch->modify);
832 latch->pin |= CLOCK_bit;
834 bt_releasemutex(latch->modify);
837 // return the btree cached page address
839 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
841 uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets);
845 page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
847 page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
852 // return next available leaf entry
853 // and with latch entry locked
855 uint bt_availleaf (BtMgr *mgr)
862 entry = __sync_fetch_and_add (&mgr->leafvictim, 1) + 1;
864 entry = _InterlockedIncrement (&mgr->leafvictim);
866 entry %= mgr->leaftotal;
871 latch = mgr->leafsets + entry;
873 if( !bt_mutextry(latch->modify) )
876 // return this entry if it is not pinned
881 // if the CLOCK bit is set
884 latch->pin &= ~CLOCK_bit;
885 bt_releasemutex(latch->modify);
889 // return next available latch entry
890 // and with latch entry locked
892 uint bt_availnext (BtMgr *mgr)
899 entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
901 entry = _InterlockedIncrement (&mgr->latchvictim);
903 entry %= mgr->latchtotal;
908 latch = mgr->latchsets + entry;
910 if( !bt_mutextry(latch->modify) )
913 // return this entry if it is not pinned
918 // if the CLOCK bit is set
921 latch->pin &= ~CLOCK_bit;
922 bt_releasemutex(latch->modify);
926 // pin leaf in leaf buffer pool
927 // return with latchset pinned
929 BtLatchSet *bt_pinleaf (BtMgr *mgr, uid page_no, ushort thread_no)
931 uint hashidx = page_no % mgr->leafhash;
936 // try to find our entry
938 bt_mutexlock(mgr->leaftable[hashidx].latch);
940 if( entry = mgr->leaftable[hashidx].entry )
942 latch = mgr->leafsets + entry;
944 if( page_no == latch->page_no )
946 } while( entry = latch->next );
948 // found our entry: increment pin
951 latch = mgr->leafsets + entry;
952 bt_mutexlock(latch->modify);
953 latch->pin |= CLOCK_bit;
956 bt_releasemutex(latch->modify);
957 bt_releasemutex(mgr->leaftable[hashidx].latch);
961 // find and reuse unpinned entry
964 entry = bt_availleaf (mgr);
965 latch = mgr->leafsets + entry;
967 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
968 oldidx = latch->page_no % mgr->leafhash;
970 // skip over this entry if old latch is not available
973 if( oldidx != hashidx )
974 if( !bt_mutextry (mgr->leaftable[oldidx].latch) ) {
975 bt_releasemutex(latch->modify);
979 // update permanent page area in btree from buffer pool
980 // no read-lock is required since page is not pinned.
983 if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) )
984 return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
988 // if latch is on a different hash chain
989 // unlink from the old page_no chain
992 if( oldidx != hashidx ) {
994 mgr->leafsets[latch->prev].next = latch->next;
996 mgr->leaftable[oldidx].entry = latch->next;
999 mgr->leafsets[latch->next].prev = latch->prev;
1001 bt_releasemutex (mgr->leaftable[oldidx].latch);
1004 if( bt_readpage (mgr, page, page_no, 1) )
1005 return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1007 // link page as head of hash table chain
1008 // if this is a never before used entry,
1009 // or it was previously on a different
1010 // hash table chain. Otherwise, just
1011 // leave it in its current hash table
1014 if( !latch->page_no || hashidx != oldidx ) {
1015 if( latch->next = mgr->leaftable[hashidx].entry )
1016 mgr->leafsets[latch->next].prev = entry;
1018 mgr->leaftable[hashidx].entry = entry;
1022 // fill in latch structure
1024 latch->pin = CLOCK_bit | 1;
1025 latch->page_no = page_no;
1028 bt_releasemutex (latch->modify);
1029 bt_releasemutex (mgr->leaftable[hashidx].latch);
1033 // pin page in non-leaf buffer pool
1034 // return with latchset pinned
1036 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, ushort thread_no)
1038 uint hashidx = page_no % mgr->latchhash;
1043 // try to find our entry
1045 bt_mutexlock(mgr->hashtable[hashidx].latch);
1047 if( entry = mgr->hashtable[hashidx].entry ) do
1049 latch = mgr->latchsets + entry;
1051 if( page_no == latch->page_no )
1053 } while( entry = latch->next );
1055 // found our entry: increment pin
1058 latch = mgr->latchsets + entry;
1059 bt_mutexlock(latch->modify);
1060 latch->pin |= CLOCK_bit;
1062 bt_releasemutex(latch->modify);
1063 bt_releasemutex(mgr->hashtable[hashidx].latch);
1067 // find and reuse unpinned entry
1070 entry = bt_availnext (mgr);
1071 latch = mgr->latchsets + entry;
1073 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1074 oldidx = latch->page_no % mgr->latchhash;
1076 // skip over this entry if latch not available
1078 if( latch->page_no )
1079 if( oldidx != hashidx )
1080 if( !bt_mutextry (mgr->hashtable[oldidx].latch) ) {
1081 bt_releasemutex(latch->modify);
1085 // update permanent page area in btree from buffer pool
1086 // no read-lock is required since page is not pinned.
1089 if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1090 return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1094 // if latch is on a different hash chain
1095 // unlink from the old page_no chain
1097 if( latch->page_no )
1098 if( oldidx != hashidx ) {
1100 mgr->latchsets[latch->prev].next = latch->next;
1102 mgr->hashtable[oldidx].entry = latch->next;
1105 mgr->latchsets[latch->next].prev = latch->prev;
1107 bt_releasemutex (mgr->hashtable[oldidx].latch);
1110 if( bt_readpage (mgr, page, page_no, 0) )
1111 return mgr->line = __LINE__, NULL;
1113 // link page as head of hash table chain
1114 // if this is a never before used entry,
1115 // or it was previously on a different
1116 // hash table chain. Otherwise, just
1117 // leave it in its current hash table
1120 if( !latch->page_no || hashidx != oldidx ) {
1121 if( latch->next = mgr->hashtable[hashidx].entry )
1122 mgr->latchsets[latch->next].prev = entry;
1124 mgr->hashtable[hashidx].entry = entry;
1128 // fill in latch structure
1130 latch->pin = CLOCK_bit | 1;
1131 latch->page_no = page_no;
1134 bt_releasemutex (latch->modify);
1135 bt_releasemutex (mgr->hashtable[hashidx].latch);
1139 void bt_mgrclose (BtMgr *mgr)
1141 char *name = mgr->type ? "Main" : "Cache";
1148 // flush previously written dirty pages
1149 // and write recovery buffer to disk
1151 fdatasync (mgr->idx);
1153 if( mgr->redoend ) {
1154 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1155 memset (eof, 0, sizeof(BtLogHdr));
1158 // write remaining dirty pool pages to the btree
1160 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
1161 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1162 latch = mgr->latchsets + entry;
1164 if( latch->dirty ) {
1165 bt_writepage(mgr, page, latch->page_no, 0);
1166 latch->dirty = 0, num++;
1171 fprintf(stderr, "%s %d non-leaf buffer pool pages flushed\n", name, num);
1175 // write remaining dirty leaf pages to the btree
1177 for( entry = 1; entry < mgr->leaftotal; entry++ ) {
1178 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1179 latch = mgr->leafsets + entry;
1181 if( latch->dirty ) {
1182 bt_writepage(mgr, page, latch->page_no, 1);
1183 latch->dirty = 0, num++;
1188 fprintf(stderr, "%s %d leaf buffer pool pages flushed\n", name, num);
1190 // clear redo recovery buffer on disk.
1192 if( mgr->pagezero->redopages ) {
1193 eof = (BtLogHdr *)mgr->redobuff;
1194 memset (eof, 0, sizeof(BtLogHdr));
1195 eof->lsn = mgr->lsn;
1196 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1197 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1201 munmap (mgr->page0, (uid)(mgr->redopage + mgr->pagezero->redopages) << mgr->page_bits);
1203 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1204 munmap (mgr->leafpool, (uid)mgr->nleafpage << mgr->page_bits);
1205 munmap (mgr->pagezero, mgr->page_size);
1207 FlushViewOfFile(mgr->pagezero, 0);
1208 UnmapViewOfFile(mgr->pagezero);
1209 UnmapViewOfFile(mgr->pagepool);
1210 CloseHandle(mgr->halloc);
1211 CloseHandle(mgr->hpool);
1217 FlushFileBuffers(mgr->idx);
1218 CloseHandle(mgr->idx);
1223 // close and release memory
1225 void bt_close (BtDb *bt)
1230 // open/create new btree buffer manager
1232 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1233 // size of page pool (e.g. 300) and leaf pool (e.g. 4000)
1235 BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax, uint leafmax, uint redopages)
1237 uint lvl, attr, last, slot, idx;
1238 uint nlatchpage, latchhash;
1239 uint nleafpage, leafhash;
1240 unsigned char value[BtId];
1241 int flag, initit = 0;
1242 BtPageZero *pagezero;
1250 // determine sanity of page size and buffer pool
1252 if( leafxtra + pagebits > BT_maxbits )
1253 fprintf (stderr, "pagebits + leafxtra > maxbits\n"), exit(1);
1255 if( pagebits < BT_minbits )
1256 fprintf (stderr, "pagebits < minbits\n"), exit(1);
1259 mgr = calloc (1, sizeof(BtMgr));
1261 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1263 if( mgr->idx == -1 ) {
1264 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1265 return free(mgr), NULL;
1268 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1269 attr = FILE_ATTRIBUTE_NORMAL;
1270 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1272 if( mgr->idx == INVALID_HANDLE_VALUE ) {
1273 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1274 return GlobalFree(mgr), NULL;
1279 pagezero = valloc (BT_maxpage);
1282 // read minimum page size to get root info
1283 // to support raw disk partition files
1284 // check if page_bits == 0 on the disk.
1286 if( size = lseek (mgr->idx, 0L, 2) )
1287 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1288 if( pagezero->page_bits ) {
1289 pagebits = pagezero->page_bits;
1290 leafxtra = pagezero->leaf_xtra;
1291 redopages = pagezero->redopages;
1295 return free(mgr), free(pagezero), NULL;
1299 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1300 size = GetFileSize(mgr->idx, amt);
1302 if( size || *amt ) {
1303 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1304 return bt_mgrclose (mgr), NULL;
1305 pagebits = pagezero->page_bits;
1306 leafxtra = pagezero->leaf_xtra;
1307 redopages = pagezero->redopages;
1312 mgr->page_size = 1 << pagebits;
1313 mgr->page_bits = pagebits;
1314 mgr->leaf_xtra = leafxtra;
1316 // calculate number of latch hash table entries
1318 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1320 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1321 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1) >> mgr->page_bits;
1322 mgr->latchtotal = nodemax;
1324 // calculate number of leaf hash table entries
1326 mgr->nleafpage = ((uid)leafmax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1328 mgr->nleafpage += leafmax << leafxtra; // size of the leaf pool in pages
1329 mgr->nleafpage += (sizeof(BtLatchSet) * leafmax + mgr->page_size - 1) >> mgr->page_bits;
1330 mgr->leaftotal = leafmax;
1332 mgr->redopage = LEAF_page + (1 << leafxtra);
1337 // initialize an empty b-tree with latch page, root page, page of leaves
1338 // and page(s) of latches and page pool cache
1340 ftruncate (mgr->idx, (mgr->redopage + pagezero->redopages) << mgr->page_bits);
1341 memset (pagezero, 0, 1 << pagebits);
1342 pagezero->alloc->lvl = MIN_lvl - 1;
1343 pagezero->redopages = redopages;
1344 pagezero->page_bits = mgr->page_bits;
1345 pagezero->leaf_xtra = leafxtra;
1347 bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages);
1348 pagezero->upperpages = 1;
1349 pagezero->leafpages = 1;
1351 // initialize left-most LEAF page in
1352 // alloc->left and count of active leaf pages.
1354 bt_putid (pagezero->alloc->left, LEAF_page);
1356 if( bt_writepage (mgr, pagezero->alloc, 0, 0) ) {
1357 fprintf (stderr, "Unable to create btree page zero\n");
1358 return bt_mgrclose (mgr), NULL;
1361 memset (pagezero, 0, 1 << pagebits);
1363 for( lvl=MIN_lvl; lvl--; ) {
1364 BtSlot *node = slotptr(pagezero->alloc, 1);
1365 node->off = mgr->page_size;
1368 node->off <<= mgr->leaf_xtra;
1370 node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1371 node->type = Librarian;
1374 node->off = node[-1].off;
1375 key = keyptr(pagezero->alloc, 2);
1376 key = keyptr(pagezero->alloc, 1);
1377 key->len = 2; // create stopper key
1381 bt_putid(value, MIN_lvl - lvl + 1);
1382 val = valptr(pagezero->alloc, 1);
1383 val->len = lvl ? BtId : 0;
1384 memcpy (val->value, value, val->len);
1386 pagezero->alloc->min = node->off;
1387 pagezero->alloc->lvl = lvl;
1388 pagezero->alloc->cnt = 2;
1389 pagezero->alloc->act = 1;
1390 pagezero->alloc->page_no = MIN_lvl - lvl;
1392 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, !lvl) ) {
1393 fprintf (stderr, "Unable to create btree page\n");
1394 return bt_mgrclose (mgr), NULL;
1402 VirtualFree (pagezero, 0, MEM_RELEASE);
1405 // mlock the first segment of 64K pages
1407 flag = PROT_READ | PROT_WRITE;
1408 mgr->page0 = mmap (0, (uid)(mgr->redopage + redopages) << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1410 if( mgr->page0 == MAP_FAILED ) {
1411 fprintf (stderr, "Unable to mmap pagezero btree segment, error = %d\n", errno);
1412 return bt_mgrclose (mgr), NULL;
1415 mgr->pagezero = (BtPageZero *)mgr->page0;
1416 mlock (mgr->pagezero, mgr->page_size);
1418 mgr->redobuff = mgr->page0 + (mgr->redopage << mgr->page_bits);
1419 mlock (mgr->redobuff, redopages << mgr->page_bits);
1421 // allocate pool buffers
1423 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1424 if( mgr->pagepool == MAP_FAILED ) {
1425 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1426 return bt_mgrclose (mgr), NULL;
1429 mgr->leafpool = mmap (0, (uid)mgr->nleafpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1430 if( mgr->leafpool == MAP_FAILED ) {
1431 fprintf (stderr, "Unable to mmap anonymous leaf pool pages, error = %d\n", errno);
1432 return bt_mgrclose (mgr), NULL;
1435 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1436 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1437 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1439 mgr->leafsets = (BtLatchSet *)(mgr->leafpool + ((uid)mgr->leaftotal << mgr->page_bits << mgr->leaf_xtra));
1440 mgr->leaftable = (BtHashEntry *)(mgr->leafsets + mgr->leaftotal);
1441 mgr->leafhash = (mgr->leafpool + ((uid)mgr->nleafpage << mgr->page_bits) - (unsigned char *)mgr->leaftable) / sizeof(BtHashEntry);
1443 for( idx = 1; idx < mgr->leaftotal; idx++ ) {
1444 latch = mgr->leafsets + idx;
1451 // open BTree access method
1452 // based on buffer manager
1454 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1456 BtDb *bt = malloc (sizeof(*bt));
1458 memset (bt, 0, sizeof(*bt));
1462 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1464 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1469 // compare two keys, return > 0, = 0, or < 0
1470 // =0: keys are same
1473 // as the comparison value
1475 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1477 uint len1 = key1->len;
1480 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1491 // place write, read, or parent lock on requested page_no.
1493 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1497 ReadLock (latch->readwr, thread_no);
1500 WriteLock (latch->readwr, thread_no, line);
1503 ReadLock (latch->access, thread_no);
1506 WriteLock (latch->access, thread_no, line);
1509 WriteLock (latch->parent, thread_no, line);
1512 WriteLock (latch->link, thread_no, line);
1517 // remove write, read, or parent lock on requested page
1519 void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1523 ReadRelease (latch->readwr);
1526 WriteRelease (latch->readwr);
1529 ReadRelease (latch->access);
1532 WriteRelease (latch->access);
1535 WriteRelease (latch->parent);
1538 WriteRelease (latch->link);
1543 // allocate a new page
1544 // return with page latched, but unlocked.
1546 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_no)
1548 uint page_size = mgr->page_size, leaf_xtra = 0;
1549 unsigned char *freechain;
1552 if( contents->lvl ) {
1553 freechain = mgr->pagezero->freechain;
1554 mgr->pagezero->upperpages++;
1556 freechain = mgr->pagezero->leafchain;
1557 mgr->pagezero->leafpages++;
1558 leaf_xtra = mgr->leaf_xtra;
1559 page_size <<= leaf_xtra;
1562 // lock allocation page
1564 bt_mutexlock(mgr->lock);
1566 // use empty chain first
1567 // else allocate new page
1569 if( page_no = bt_getid(freechain) ) {
1570 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1571 set->page = bt_mappage (mgr, set->latch);
1573 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1575 bt_putid(freechain, bt_getid(set->page->right));
1577 // the page is currently nopromote and this
1578 // will keep bt_promote out.
1580 // contents will replace this bit
1581 // and pin will keep bt_promote out
1583 contents->page_no = page_no;
1584 contents->nopromote = 0;
1586 memcpy (set->page, contents, page_size);
1588 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1589 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1591 bt_releasemutex(mgr->lock);
1595 page_no = bt_getid(mgr->pagezero->alloc->right);
1596 bt_putid(mgr->pagezero->alloc->right, page_no+(1 << leaf_xtra));
1598 // unlock allocation latch and
1599 // extend file into new page.
1601 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1602 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1604 bt_releasemutex(mgr->lock);
1606 // keep bt_promote out of this page
1607 contents->nopromote = 1;
1608 contents->page_no = page_no;
1610 if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
1611 fprintf(stderr, "Write %d error %d\n", (uint)page_no, errno);
1613 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1614 set->page = bt_mappage (mgr, set->latch);
1616 return mgr->err_thread = thread_no, mgr->err;
1618 // now pin will keep bt_promote out
1620 set->page->nopromote = 0;
1624 // find slot in page for given key at a given level
1626 int bt_findslot (BtPage page, unsigned char *key, uint len)
1628 uint diff, higher = page->cnt, low = 1, slot;
1631 // make stopper key an infinite fence value
1633 if( bt_getid (page->right) )
1638 // low is the lowest candidate.
1639 // loop ends when they meet
1641 // higher is already
1642 // tested as .ge. the passed key.
1644 while( diff = higher - low ) {
1645 slot = low + ( diff >> 1 );
1646 if( keycmp (keyptr(page, slot), key, len) < 0 )
1649 higher = slot, good++;
1652 // return zero if key is on right link page
1654 return good ? higher : 0;
1657 // find and load page at given level for given key
1658 // leave page rd or wr locked as requested
1660 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1662 uid page_no = ROOT_page, prevpage_no = 0;
1663 uint drill = 0xff, slot;
1664 BtLatchSet *prevlatch;
1665 uint mode, prevmode;
1670 // start at root of btree and drill down
1673 if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1674 set->page = bt_mappage (mgr, set->latch);
1678 if( page_no > ROOT_page )
1679 bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1681 // release & unpin parent or left sibling page
1684 bt_unlockpage(prevmode, prevlatch, thread_no, __LINE__);
1685 bt_unpinlatch (prevlatch, 0, thread_no, __LINE__);
1689 // obtain mode lock using lock coupling through AccessLock
1690 // determine lock mode of drill level
1692 mode = (drill == lvl) ? lock : BtLockRead;
1693 bt_lockpage(mode, set->latch, thread_no, __LINE__);
1695 // grab our fence key
1697 ptr=keyptr(set->page,set->page->cnt);
1699 if( set->page->free )
1700 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1702 if( page_no > ROOT_page )
1703 bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1705 // re-read and re-lock root after determining actual level of root
1707 if( set->page->lvl != drill) {
1708 if( set->latch->page_no != ROOT_page )
1709 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1711 drill = set->page->lvl;
1713 if( lock != BtLockRead && drill == lvl ) {
1714 bt_unlockpage(mode, set->latch, thread_no, __LINE__);
1715 bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
1720 prevpage_no = set->latch->page_no;
1721 prevlatch = set->latch;
1722 prevpage = set->page;
1725 // if requested key is beyond our fence,
1726 // slide to the right
1728 if( keycmp (ptr, key, len) < 0 )
1729 if( page_no = bt_getid(set->page->right) )
1732 // if page is part of a delete operation,
1733 // slide to the left;
1735 if( set->page->kill ) {
1736 bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
1737 page_no = bt_getid(set->page->left);
1738 bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
1742 // find key on page at this level
1743 // and descend to requested level
1745 if( slot = bt_findslot (set->page, key, len) ) {
1749 // find next non-dead slot -- the fence key if nothing else
1751 while( slotptr(set->page, slot)->dead )
1752 if( slot++ < set->page->cnt )
1755 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1757 val = valptr(set->page, slot);
1759 if( val->len == BtId )
1760 page_no = bt_getid(val->value);
1762 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0;
1768 // slide right into next page
1770 page_no = bt_getid(set->page->right);
1774 // return error on end of right chain
1776 mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1777 return 0; // return error
1780 // return page to free list
1781 // page must be delete & write locked
1782 // and have no keys pointing to it.
1784 void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1786 unsigned char *freechain;
1788 // lock allocation page
1790 bt_mutexlock (mgr->lock);
1792 if( set->page->lvl ) {
1793 freechain = mgr->pagezero->freechain;
1794 mgr->pagezero->upperpages--;
1796 freechain = mgr->pagezero->leafchain;
1797 mgr->pagezero->leafpages--;
1802 memcpy(set->page->right, freechain, BtId);
1803 bt_putid(freechain, set->latch->page_no);
1804 set->page->free = 1;
1806 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1807 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1809 // unlock released page
1810 // and unlock allocation page
1812 bt_unlockpage (BtLockDelete, set->latch, thread_no, __LINE__);
1813 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1814 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
1815 bt_releasemutex (mgr->lock);
1818 // a fence key was deleted from a page
1819 // push new fence value upwards
1821 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1823 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1824 unsigned char value[BtId];
1828 // remove the old fence value
1830 ptr = keyptr(set->page, set->page->cnt);
1831 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1832 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1834 // cache new fence value
1836 ptr = keyptr(set->page, set->page->cnt);
1837 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1839 bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
1840 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1842 // insert new (now smaller) fence key
1844 bt_putid (value, set->latch->page_no);
1845 ptr = (BtKey*)leftkey;
1847 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1848 return mgr->err_thread = thread_no, mgr->err;
1850 // now delete old fence key
1852 ptr = (BtKey*)rightkey;
1854 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1855 return mgr->err_thread = thread_no, mgr->err;
1857 bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
1858 bt_unpinlatch(set->latch, 1, thread_no, __LINE__);
1862 // root has a single child
1863 // collapse a level from the tree
1865 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1872 // find the child entry and promote as new root contents
1875 for( idx = 0; idx++ < root->page->cnt; )
1876 if( !slotptr(root->page, idx)->dead )
1879 val = valptr(root->page, idx);
1881 if( val->len == BtId )
1882 page_no = bt_getid (valptr(root->page, idx)->value);
1884 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1886 if( child->latch = bt_pinlatch (mgr, page_no, thread_no) )
1887 child->page = bt_mappage (mgr, child->latch);
1889 return mgr->err_thread = thread_no, mgr->err;
1891 bt_lockpage (BtLockDelete, child->latch, thread_no, __LINE__);
1892 bt_lockpage (BtLockWrite, child->latch, thread_no, __LINE__);
1894 memcpy (root->page, child->page, mgr->page_size);
1895 bt_freepage (mgr, child, thread_no);
1897 } while( root->page->lvl > 1 && root->page->act == 1 );
1899 bt_unlockpage (BtLockWrite, root->latch, thread_no, __LINE__);
1900 bt_unpinlatch (root->latch, 1, thread_no, __LINE__);
1904 // delete a page and manage key
1905 // call with page writelocked
1907 // returns with page unpinned
1908 // from the page pool.
1910 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl)
1912 unsigned char lowerfence[BT_keyarray];
1913 uint page_size = mgr->page_size;
1914 BtPageSet right[1], temp[1];
1915 unsigned char value[BtId];
1916 uid page_no, right2;
1920 page_size <<= mgr->leaf_xtra;
1922 // cache original copy of original fence key
1923 // that is going to be deleted.
1925 ptr = keyptr(set->page, set->page->cnt);
1926 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1928 // pin and lock our right page
1930 page_no = bt_getid(set->page->right);
1932 if( right->latch = lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1933 right->page = bt_mappage (mgr, right->latch);
1937 bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
1939 if( right->page->kill )
1940 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1942 // pull contents of right sibling over our empty page
1943 // preserving our left page number, and its right page number.
1945 bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
1946 page_no = bt_getid(set->page->left);
1947 memcpy (set->page, right->page, page_size);
1948 bt_putid (set->page->left, page_no);
1949 set->page->page_no = set->latch->page_no;
1950 bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
1952 // fix left link from far right page
1954 if( right2 = bt_getid (right->page->right) ) {
1955 if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
1956 temp->page = bt_mappage (mgr, temp->latch);
1960 bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
1961 bt_putid (temp->page->left, set->latch->page_no);
1962 bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
1963 bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
1964 } else if( !lvl ) { // our page is now rightmost leaf
1965 bt_mutexlock (mgr->lock);
1966 bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
1967 bt_releasemutex(mgr->lock);
1970 // mark right page as being deleted and release lock
1972 right->page->kill = 1;
1973 bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
1975 // redirect the new higher key directly to our new node
1977 ptr = keyptr(set->page, set->page->cnt);
1978 bt_putid (value, set->latch->page_no);
1980 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
1983 // delete our original fence key in parent
1985 ptr = (BtKey *)lowerfence;
1987 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1990 // release write lock to our node
1992 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1993 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
1995 // wait for all access to drain away with delete lock,
1996 // then obtain write lock to right node and free it.
1998 bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
1999 bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2000 bt_freepage (mgr, right, thread_no);
2004 // find and delete key on page by marking delete flag bit
2005 // if page becomes empty, delete it from the btree
2007 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2009 uint slot, idx, found, fence;
2015 if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2016 node = slotptr(set->page, slot);
2017 ptr = keyptr(set->page, slot);
2019 return mgr->err_thread = thread_no, mgr->err;
2021 // if librarian slot, advance to real slot
2023 if( node->type == Librarian ) {
2024 ptr = keyptr(set->page, ++slot);
2025 node = slotptr(set->page, slot);
2028 fence = slot == set->page->cnt;
2030 // delete the key, ignore request if already dead
2032 if( found = !keycmp (ptr, key, len) )
2033 if( found = node->dead == 0 ) {
2034 val = valptr(set->page,slot);
2035 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2039 // collapse empty slots beneath the fence
2040 // on interiour nodes
2043 while( idx = set->page->cnt - 1 )
2044 if( slotptr(set->page, idx)->dead ) {
2045 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2046 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2054 // did we delete a fence key in an upper level?
2056 if( lvl && set->page->act && fence )
2057 if( bt_fixfence (mgr, set, lvl, thread_no) )
2058 return mgr->err_thread = thread_no, mgr->err;
2062 // do we need to collapse root?
2064 if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2065 if( bt_collapseroot (mgr, set, thread_no) )
2066 return mgr->err_thread = thread_no, mgr->err;
2070 // delete empty page
2072 if( !set->page->act )
2073 return bt_deletepage (mgr, set, thread_no, set->page->lvl);
2075 bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2076 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2080 // check page for space available,
2081 // clean if necessary and return
2082 // 0 - page needs splitting
2083 // >0 new slot value
2085 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2087 uint page_size = mgr->page_size;
2088 BtPage page = set->page, frame;
2089 uint cnt = 0, idx = 0;
2090 uint max = page->cnt;
2095 if( !set->page->lvl )
2096 page_size <<= mgr->leaf_xtra;
2098 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2101 // skip cleanup and proceed to split
2102 // if there's not enough garbage
2105 if( page->garbage < page_size / 5 )
2108 frame = malloc (page_size);
2109 memcpy (frame, page, page_size);
2111 // skip page info and set rest of page to zero
2113 memset (page+1, 0, page_size - sizeof(*page));
2115 page->min = page_size;
2119 // clean up page first by
2120 // removing dead keys
2122 while( cnt++ < max ) {
2126 if( cnt < max || frame->lvl )
2127 if( slotptr(frame,cnt)->dead )
2130 // copy the value across
2132 val = valptr(frame, cnt);
2133 page->min -= val->len + sizeof(BtVal);
2134 memcpy ((unsigned char *)page + page->min, val, val->len + sizeof(BtVal));
2136 // copy the key across
2138 key = keyptr(frame, cnt);
2139 page->min -= key->len + sizeof(BtKey);
2140 memcpy ((unsigned char *)page + page->min, key, key->len + sizeof(BtKey));
2142 // make a librarian slot
2144 slotptr(page, ++idx)->off = page->min;
2145 slotptr(page, idx)->type = Librarian;
2146 slotptr(page, idx)->dead = 1;
2150 slotptr(page, ++idx)->off = page->min;
2151 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2153 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2160 // see if page has enough space now, or does it need splitting?
2162 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2168 // split the root and raise the height of the btree
2170 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort thread_no)
2172 unsigned char leftkey[BT_keyarray];
2173 uint nxt = mgr->page_size;
2174 unsigned char value[BtId];
2181 frame = malloc (mgr->page_size);
2182 memcpy (frame, root->page, mgr->page_size);
2184 // save left page fence key for new root
2186 ptr = keyptr(root->page, root->page->cnt);
2187 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2189 // Obtain an empty page to use, and copy the current
2190 // root contents into it, e.g. lower keys
2192 if( bt_newpage(mgr, left, frame, thread_no) )
2193 return mgr->err_thread = thread_no, mgr->err;
2195 left_page_no = left->latch->page_no;
2196 bt_unpinlatch (left->latch, 1, thread_no, __LINE__);
2199 // left link the pages together
2201 page = bt_mappage (mgr, right);
2202 bt_putid (page->left, left_page_no);
2204 // preserve the page info at the bottom
2205 // of higher keys and set rest to zero
2207 memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2209 // insert stopper key at top of newroot page
2210 // and increase the root height
2212 nxt -= BtId + sizeof(BtVal);
2213 bt_putid (value, right->page_no);
2214 val = (BtVal *)((unsigned char *)root->page + nxt);
2215 memcpy (val->value, value, BtId);
2218 nxt -= 2 + sizeof(BtKey);
2219 slotptr(root->page, 2)->off = nxt;
2220 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2225 // insert lower keys page fence key on newroot page as first key
2227 nxt -= BtId + sizeof(BtVal);
2228 bt_putid (value, left_page_no);
2229 val = (BtVal *)((unsigned char *)root->page + nxt);
2230 memcpy (val->value, value, BtId);
2233 ptr = (BtKey *)leftkey;
2234 nxt -= ptr->len + sizeof(BtKey);
2235 slotptr(root->page, 1)->off = nxt;
2236 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2238 bt_putid(root->page->right, 0);
2239 root->page->min = nxt; // reset lowest used offset and key count
2240 root->page->cnt = 2;
2241 root->page->act = 2;
2244 mgr->pagezero->alloc->lvl = root->page->lvl;
2246 // release and unpin root pages
2248 bt_unlockpage(BtLockWrite, root->latch, thread_no, __LINE__);
2249 bt_unpinlatch (root->latch, 1, thread_no, __LINE__);
2251 bt_unpinlatch (right, 1, thread_no, __LINE__);
2255 // split already locked full node
2257 // return pool entry for new right
2258 // page, pinned & unlocked
2260 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint linkleft)
2262 uint page_size = mgr->page_size;
2263 BtPageSet right[1], temp[1];
2264 uint cnt = 0, idx = 0, max;
2265 uint lvl = set->page->lvl;
2273 if( !set->page->lvl )
2274 page_size <<= mgr->leaf_xtra;
2276 // split higher half of keys to frame
2278 frame = malloc (page_size);
2279 memset (frame, 0, page_size);
2280 frame->min = page_size;
2281 max = set->page->cnt;
2285 while( cnt++ < max ) {
2286 if( cnt < max || set->page->lvl )
2287 if( slotptr(set->page, cnt)->dead )
2290 val = valptr(set->page, cnt);
2291 frame->min -= val->len + sizeof(BtVal);
2292 memcpy ((unsigned char *)frame + frame->min, val, val->len + sizeof(BtVal));
2294 key = keyptr(set->page, cnt);
2295 frame->min -= key->len + sizeof(BtKey);
2296 memcpy ((unsigned char *)frame + frame->min, key, key->len + sizeof(BtKey));
2298 // add librarian slot
2300 slotptr(frame, ++idx)->off = frame->min;
2301 slotptr(frame, idx)->type = Librarian;
2302 slotptr(frame, idx)->dead = 1;
2306 slotptr(frame, ++idx)->off = frame->min;
2307 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2309 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2318 if( set->latch->page_no > ROOT_page ) {
2319 right2 = bt_getid (set->page->right);
2320 bt_putid (frame->right, right2);
2323 bt_putid (frame->left, set->latch->page_no);
2326 // get new free page and write higher keys to it.
2328 if( bt_newpage(mgr, right, frame, thread_no) )
2331 // link far right's left pointer to new page
2333 if( linkleft && set->latch->page_no > ROOT_page )
2335 if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2336 temp->page = bt_mappage (mgr, temp->latch);
2340 bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2341 bt_putid (temp->page->left, right->latch->page_no);
2342 bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2343 bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
2344 } else if( !lvl ) { // page is rightmost leaf
2345 bt_mutexlock (mgr->lock);
2346 bt_putid (mgr->pagezero->alloc->left, right->latch->page_no);
2347 bt_releasemutex(mgr->lock);
2350 // process lower keys
2352 memcpy (frame, set->page, page_size);
2353 memset (set->page+1, 0, page_size - sizeof(*set->page));
2355 set->page->min = page_size;
2356 set->page->garbage = 0;
2362 // assemble page of smaller keys
2364 while( cnt++ < max ) {
2365 if( slotptr(frame, cnt)->dead )
2367 val = valptr(frame, cnt);
2368 set->page->min -= val->len + sizeof(BtVal);
2369 memcpy ((unsigned char *)set->page + set->page->min, val, val->len + sizeof(BtVal));
2371 key = keyptr(frame, cnt);
2372 set->page->min -= key->len + sizeof(BtKey);
2373 memcpy ((unsigned char *)set->page + set->page->min, key, key->len + sizeof(BtKey));
2375 // add librarian slot
2377 slotptr(set->page, ++idx)->off = set->page->min;
2378 slotptr(set->page, idx)->type = Librarian;
2379 slotptr(set->page, idx)->dead = 1;
2383 slotptr(set->page, ++idx)->off = set->page->min;
2384 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2388 bt_putid(set->page->right, right->latch->page_no);
2389 set->page->cnt = idx;
2392 entry = right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
2396 // fix keys for newly split page
2397 // call with both pages pinned & locked
2398 // return unlocked and unpinned
2400 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2402 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2403 unsigned char value[BtId];
2404 uint lvl = set->page->lvl;
2410 // if current page is the root page, split it
2412 if( set->latch->page_no == ROOT_page )
2413 return bt_splitroot (mgr, set, right, thread_no);
2415 ptr = keyptr(set->page, set->page->cnt);
2416 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2418 page = bt_mappage (mgr, right);
2420 ptr = keyptr(page, page->cnt);
2421 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2423 // splice in far right page's left page_no
2425 if( right2 = bt_getid (page->right) ) {
2426 if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2427 temp->page = bt_mappage (mgr, temp->latch);
2431 bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2432 bt_putid (temp->page->left, right->page_no);
2433 bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2434 bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
2435 } else if( !lvl ) { // right page is far right page
2436 bt_mutexlock (mgr->lock);
2437 bt_putid (mgr->pagezero->alloc->left, right->page_no);
2438 bt_releasemutex(mgr->lock);
2440 // insert new fences in their parent pages
2442 bt_lockpage (BtLockParent, right, thread_no, __LINE__);
2444 bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
2445 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2447 // insert new fence for reformulated left block of smaller keys
2449 bt_putid (value, set->latch->page_no);
2450 ptr = (BtKey *)leftkey;
2452 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2453 return mgr->err_thread = thread_no, mgr->err;
2455 // switch fence for right block of larger keys to new right page
2457 bt_putid (value, right->page_no);
2458 ptr = (BtKey *)rightkey;
2460 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2461 return mgr->err_thread = thread_no, mgr->err;
2463 bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
2464 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2466 bt_unlockpage (BtLockParent, right, thread_no, __LINE__);
2467 bt_unpinlatch (right, 1, thread_no, __LINE__);
2471 // install new key and value onto page
2472 // page must already be checked for
2475 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2477 uint idx, librarian;
2483 // if previous slot is a librarian slot, use it
2486 if( slotptr(set->page, slot-1)->type == Librarian )
2489 // copy value onto page
2491 set->page->min -= vallen + sizeof(BtVal);
2492 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2493 memcpy (val->value, value, vallen);
2496 // copy key onto page
2498 set->page->min -= keylen + sizeof(BtKey);
2499 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2500 memcpy (ptr->key, key, keylen);
2503 // find first empty slot at or above our insert slot
2505 for( idx = slot; idx < set->page->cnt; idx++ )
2506 if( slotptr(set->page, idx)->dead )
2509 // now insert key into array before slot.
2511 // if we're going all the way to the top,
2512 // add as many librarian slots as
2515 if( idx == set->page->cnt ) {
2516 int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2518 librarian = ++idx - slot;
2519 avail /= sizeof(BtSlot);
2524 if( librarian > avail )
2528 rate = (idx - slot) / librarian;
2529 set->page->cnt += librarian;
2534 librarian = 0, rate = 0;
2536 // transfer slots and add librarian slots
2538 while( idx > slot ) {
2539 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2541 // add librarian slot per rate
2544 if( (idx - slot)/2 <= librarian * rate ) {
2545 node = slotptr(set->page, --idx);
2546 node->off = node[1].off;
2547 node->type = Librarian;
2559 node = slotptr(set->page, slot);
2560 node->off = set->page->min;
2566 // Insert new key into the btree at given level.
2567 // either add a new key or update/add an existing one
2569 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2571 uint slot, idx, len, entry;
2577 while( 1 ) { // find the page and slot for the current key
2578 if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2579 node = slotptr(set->page, slot);
2580 ptr = keyptr(set->page, slot);
2584 // if librarian slot == found slot, advance to real slot
2586 if( node->type == Librarian ) {
2587 node = slotptr(set->page, ++slot);
2588 ptr = keyptr(set->page, slot);
2591 // if inserting a duplicate key or unique
2592 // key that doesn't exist on the page,
2593 // check for adequate space on the page
2594 // and insert the new key before slot.
2599 if( !keycmp (ptr, key, keylen) )
2602 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2603 if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
2608 if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2609 if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2612 return mgr->err_thread = thread_no, mgr->err;
2615 if( keycmp (ptr, key, keylen) )
2621 // if key already exists, update value and return
2623 val = valptr(set->page, slot);
2625 if( val->len >= vallen ) {
2631 set->page->garbage += val->len - vallen;
2633 memcpy (val->value, value, vallen);
2637 // new update value doesn't fit in existing value area
2638 // make sure page has room
2641 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2648 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2651 if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2652 if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2655 return mgr->err_thread = thread_no, mgr->err;
2658 // copy key and value onto page and update slot
2660 set->page->min -= vallen + sizeof(BtVal);
2661 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2662 memcpy (val->value, value, vallen);
2665 set->page->min -= keylen + sizeof(BtKey);
2666 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2667 memcpy (ptr->key, key, keylen);
2670 slotptr(set->page,slot)->off = set->page->min;
2673 bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2674 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2678 // determine actual page where key is located
2679 // return slot number
2681 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2683 BtKey *key = keyptr(source,src), *ptr;
2684 uint slot = locks[src].slot;
2687 if( locks[src].reuse )
2688 entry = locks[src-1].entry;
2690 entry = locks[src].entry;
2693 set->latch = mgr->leafsets + entry;
2694 set->page = bt_mappage (mgr, set->latch);
2698 // find where our key is located
2699 // on current page or pages split on
2700 // same page txn operations.
2703 set->latch = mgr->leafsets + entry;
2704 set->page = bt_mappage (mgr, set->latch);
2706 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2707 if( slotptr(set->page, slot)->type == Librarian )
2709 if( locks[src].reuse )
2710 locks[src].entry = entry;
2713 } while( entry = set->latch->split );
2715 mgr->line = __LINE__, mgr->err = BTERR_atomic;
2719 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2721 BtKey *key = keyptr(source, src);
2722 BtVal *val = valptr(source, src);
2727 while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2728 if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2729 if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type) )
2730 return mgr->err_thread = thread_no, mgr->err;
2732 set->page->lsn = lsn;
2738 if( entry = bt_splitpage (mgr, set, thread_no, 0) )
2739 latch = mgr->leafsets + entry;
2743 // splice right page into split chain
2746 bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2747 latch->split = set->latch->split;
2748 set->latch->split = entry;
2750 // clear slot number for atomic page
2752 locks[src].slot = 0;
2755 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
2758 // perform delete from smaller btree
2759 // insert a delete slot if not found there
2761 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2763 BtKey *key = keyptr(source, src);
2764 uint idx, slot, entry;
2771 while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2772 node = slotptr(set->page, slot);
2773 ptr = keyptr(set->page, slot);
2774 val = valptr(set->page, slot);
2776 // if slot is not found on cache btree, insert a delete slot
2777 // otherwise ignore the request.
2779 if( keycmp (ptr, key->key, key->len) )
2781 if( slot = bt_cleanpage(mgr, set, key->len, slot, 0) )
2782 return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete);
2783 else { // split page before inserting Delete slot
2784 if( entry = bt_splitpage (mgr, set, thread_no, 0) )
2785 latch = mgr->leafsets + entry;
2789 // splice right page into split chain
2792 bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2793 latch->split = set->latch->split;
2794 set->latch->split = entry;
2796 // clear slot number for atomic page
2798 locks[src].slot = 0;
2804 // if node is already dead,
2805 // ignore the request.
2807 if( node->type == Delete || node->dead )
2810 node->type = Delete;
2812 // if main LSM btree, delete the slot
2819 __sync_fetch_and_add(&mgr->found, 1);
2820 set->page->lsn = lsn;
2824 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
2827 // release master's splits from right to left
2829 void bt_atomicrelease (BtMgr *mgr, uint entry, ushort thread_no)
2831 BtLatchSet *latch = mgr->leafsets + entry;
2834 bt_atomicrelease (mgr, latch->split, thread_no);
2837 bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
2838 bt_unpinlatch(latch, 1, thread_no, __LINE__);
2841 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2843 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2844 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2846 return keycmp (key1, key2->key, key2->len);
2848 // atomic modification of a batch of keys.
2850 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2852 uint src, idx, slot, samepage, entry, que = 0;
2853 BtKey *key, *ptr, *key2;
2859 // stable sort the list of keys into order to
2860 // prevent deadlocks between threads.
2862 for( src = 1; src++ < source->cnt; ) {
2863 *temp = *slotptr(source,src);
2864 key = keyptr (source,src);
2866 for( idx = src; --idx; ) {
2867 key2 = keyptr (source,idx);
2868 if( keycmp (key, key2->key, key2->len) < 0 ) {
2869 *slotptr(source,idx+1) = *slotptr(source,idx);
2870 *slotptr(source,idx) = *temp;
2876 qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2877 // add entries to redo log
2879 if( bt->mgr->pagezero->redopages )
2880 lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2884 // perform the individual actions in the transaction
2886 if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2887 return bt->mgr->err;
2889 // if number of active pages
2890 // is greater than the buffer pool
2891 // promote page into larger btree
2894 while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no )
2895 if( bt_promote (bt) )
2896 return bt->mgr->err;
2903 // execute the source list of inserts/deletes
2905 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2907 uint slot, src, idx, samepage, entry;
2908 BtPageSet set[1], prev[1];
2909 unsigned char value[BtId];
2917 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2919 // Load the leaf page for each key
2920 // group same page references with reuse bit
2922 for( src = 0; src++ < source->cnt; ) {
2923 key = keyptr(source, src);
2925 // first determine if this modification falls
2926 // on the same page as the previous modification
2927 // note that the far right leaf page is a special case
2929 if( samepage = src > 1 )
2930 samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
2933 if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
2934 ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
2941 if( slotptr(set->page, slot)->type == Librarian )
2944 entry = set->latch - mgr->leafsets;
2945 locks[src].reuse = samepage;
2946 locks[src].entry = entry;
2947 locks[src].slot = slot;
2949 // capture current lsn for master page
2951 locks[src].reqlsn = set->page->lsn;
2954 // insert or delete each key
2955 // process any splits or merges
2956 // run through txn list backwards
2958 samepage = source->cnt + 1;
2960 for( src = source->cnt; src; src-- ) {
2961 if( locks[src].reuse )
2964 // perform the txn operations
2965 // from smaller to larger on
2968 for( idx = src; idx < samepage; idx++ )
2969 switch( slotptr(source,idx)->type ) {
2971 if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
2977 if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
2982 bt_atomicpage (mgr, source, locks, idx, set);
2986 // after the same page operations have finished,
2987 // process master page for splits or deletion.
2989 latch = prev->latch = mgr->leafsets + locks[src].entry;
2990 prev->page = bt_mappage (mgr, prev->latch);
2993 // pick-up all splits from master page
2994 // each one is already pinned & WriteLocked.
2996 while( entry = prev->latch->split ) {
2997 set->latch = mgr->leafsets + entry;
2998 set->page = bt_mappage (mgr, set->latch);
3000 // delete empty master page by undoing its split
3001 // (this is potentially another empty page)
3002 // note that there are no pointers to it yet
3004 if( !prev->page->act ) {
3005 memcpy (set->page->left, prev->page->left, BtId);
3006 memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
3007 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3008 prev->latch->split = set->latch->split;
3009 bt_freepage (mgr, set, thread_no);
3013 // remove empty split page from the split chain
3014 // and return it to the free list. No other
3015 // thread has its page number yet.
3017 if( !set->page->act ) {
3018 memcpy (prev->page->right, set->page->right, BtId);
3019 prev->latch->split = set->latch->split;
3021 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3022 bt_freepage (mgr, set, thread_no);
3026 // update prev's fence key
3028 ptr = keyptr(prev->page,prev->page->cnt);
3029 bt_putid (value, prev->latch->page_no);
3031 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3034 // splice in the left link into the split page
3036 bt_putid (set->page->left, prev->latch->page_no);
3040 // update left pointer in next right page from last split page
3041 // (if all splits were reversed or none occurred, latch->split == 0)
3043 if( latch->split ) {
3044 // fix left pointer in master's original (now split)
3045 // far right sibling or set rightmost page in page zero
3047 if( right_page_no = bt_getid (prev->page->right) ) {
3048 if( set->latch = bt_pinleaf (mgr, right_page_no, thread_no) )
3049 set->page = bt_mappage (mgr, set->latch);
3053 bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3054 bt_putid (set->page->left, prev->latch->page_no);
3055 bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
3056 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
3057 } else { // prev is rightmost page
3058 bt_mutexlock (mgr->lock);
3059 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3060 bt_releasemutex(mgr->lock);
3063 // switch the original fence key from the
3064 // master page to the last split page.
3066 ptr = keyptr(prev->page,prev->page->cnt);
3067 bt_putid (value, prev->latch->page_no);
3069 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) )
3072 // unlock and unpin the split pages
3074 bt_atomicrelease (mgr, latch->split, thread_no);
3076 // unlock and unpin the master page
3079 bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
3080 bt_unpinlatch(latch, 1, thread_no, __LINE__);
3084 // since there are no splits remaining, we're
3085 // finished if master page occupied
3087 if( prev->page->act ) {
3088 bt_unlockpage(BtLockWrite, prev->latch, thread_no, __LINE__);
3089 bt_unpinlatch(prev->latch, 1, thread_no, __LINE__);
3093 // any and all splits were reversed, and the
3094 // master page located in prev is empty, delete it
3096 if( bt_deletepage (mgr, prev, thread_no, 0) )
3104 // pick & promote a page into the larger btree
3106 BTERR bt_promote (BtDb *bt)
3108 uint entry, slot, idx;
3116 entry = __sync_fetch_and_add(&bt->mgr->leafpromote, 1);
3118 entry = _InterlockedIncrement (&bt->mgr->leafpromote) - 1;
3120 entry %= bt->mgr->leaftotal;
3125 set->latch = bt->mgr->leafsets + entry;
3127 // skip this entry if it has never been used
3129 if( !set->latch->page_no )
3132 if( !bt_mutextry(set->latch->modify) )
3135 // skip this entry if it is pinned
3137 if( set->latch->pin & ~CLOCK_bit ) {
3138 bt_releasemutex(set->latch->modify);
3142 set->page = bt_mappage (bt->mgr, set->latch);
3144 // entry has no right sibling
3146 if( !bt_getid (set->page->right) ) {
3147 bt_releasemutex(set->latch->modify);
3151 // entry is being killed or constructed
3153 if( set->page->nopromote || set->page->kill ) {
3154 bt_releasemutex(set->latch->modify);
3158 // pin the page for our access
3159 // and leave it locked for the
3160 // duration of the promotion.
3163 bt_lockpage (BtLockWrite, set->latch, bt->thread_no, __LINE__);
3164 bt_releasemutex(set->latch->modify);
3166 // transfer slots in our selected page to the main btree
3167 if( !(entry % 100) )
3168 fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_no, set->page->act);
3170 if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
3171 fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
3172 return bt->main->err;
3175 // now delete the page
3177 if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
3178 fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread);
3180 return bt->mgr->err;
3184 // find unique key == given key, or first duplicate key in
3185 // leaf level and return number of value bytes
3186 // or (-1) if not found.
3188 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
3197 for( type = 0; type < 2; type++ )
3198 if( slot = bt_loadpage (type ? bt->main : bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) ) {
3199 node = slotptr(set->page, slot);
3201 // skip librarian slot place holder
3203 if( node->type == Librarian )
3204 node = slotptr(set->page, ++slot);
3206 ptr = keyptr(set->page, slot);
3208 // not there if we reach the stopper key
3209 // or the key doesn't match what's on the page.
3211 if( slot == set->page->cnt )
3212 if( !bt_getid (set->page->right) ) {
3213 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3214 bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3218 if( keycmp (ptr, key, keylen) ) {
3219 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3220 bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3224 // key matches, return >= 0 value bytes copied
3225 // or -1 if not there.
3227 if( node->type == Delete || node->dead ) {
3232 val = valptr (set->page,slot);
3234 if( valmax > val->len )
3237 memcpy (value, val->value, valmax);
3246 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3247 bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3252 // set cursor to highest slot on right-most page
3254 BTERR bt_lastkey (BtDb *bt)
3256 uid cache_page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3257 uid main_page_no = bt_getid (bt->main->pagezero->alloc->left);
3259 if( bt->cacheset->latch = bt_pinleaf (bt->mgr, cache_page_no, bt->thread_no) )
3260 bt->cacheset->page = bt_mappage (bt->mgr, bt->cacheset->latch);
3262 return bt->mgr->err;
3264 bt_lockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3265 bt->cacheslot = bt->cacheset->page->cnt;
3267 if( bt->mainset->latch = bt_pinleaf (bt->main, main_page_no, bt->thread_no) )
3268 bt->mainset->page = bt_mappage (bt->main, bt->mainset->latch);
3270 return bt->main->err;
3272 bt_lockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3273 bt->mainslot = bt->mainset->page->cnt;
3278 // return previous slot on cursor page
3280 uint bt_prevslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
3282 uid next, us = set->latch->page_no;
3286 if( slotptr(set->page, slot)->dead )
3291 next = bt_getid(set->page->left);
3297 bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3298 bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
3300 if( set->latch = bt_pinleaf (mgr, next, thread_no) )
3301 set->page = bt_mappage (mgr, set->latch);
3305 bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3306 next = bt_getid (set->page->right);
3308 } while( next != us );
3310 slot = set->page->cnt + 1;
3314 // advance to previous key
3316 BTERR bt_prevkey (BtDb *bt)
3320 // first advance last key(s) one previous slot
3323 switch( bt->phase ) {
3325 bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3328 bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3331 bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3332 bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3338 if( bt->cacheslot ) {
3339 bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3340 bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3341 bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3344 if( bt->mainslot ) {
3345 bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3346 bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3347 bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3350 if( bt->mainslot && bt->cacheslot )
3351 cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3352 else if( bt->cacheslot )
3354 else if( bt->mainslot )
3359 // cache key is larger
3363 if( bt->cachenode->type == Delete )
3365 return bt->cacheslot;
3368 // main key is larger
3372 return bt->mainslot;
3379 if( bt->cachenode->type == Delete )
3382 return bt->cacheslot;
3386 // advance to next slot in cache or main btree
3387 // return 0 for EOF/error
3389 uint bt_nextslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
3395 while( slot++ < set->page->cnt )
3396 if( slotptr(set->page, slot)->dead )
3398 else if( slot < set->page->cnt || bt_getid (set->page->right) )
3403 bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3404 bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
3406 if( page_no = bt_getid(set->page->right) )
3407 if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
3408 set->page = bt_mappage (mgr, set->latch);
3414 // obtain access lock using lock chaining with Access mode
3416 bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
3417 bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3418 bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
3423 // advance to next key
3425 BTERR bt_nextkey (BtDb *bt)
3429 // first advance last key(s) one next slot
3432 switch( bt->phase ) {
3434 bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3437 bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3440 bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3441 bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3447 if( bt->cacheslot ) {
3448 bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3449 bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3450 bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3453 if( bt->mainslot ) {
3454 bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3455 bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3456 bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3459 if( bt->mainslot && bt->cacheslot )
3460 cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3461 else if( bt->mainslot )
3463 else if( bt->cacheslot )
3468 // main key is larger
3469 // return smaller key
3473 if( bt->cachenode->type == Delete )
3475 return bt->cacheslot;
3478 // cache key is larger
3482 return bt->mainslot;
3489 if( bt->cachenode->type == Delete )
3492 return bt->cacheslot;
3496 // start sweep of keys
3498 BTERR bt_startkey (BtDb *bt, unsigned char *key, uint len)
3505 if( slot = bt_loadpage (bt->mgr, bt->cacheset, key, len, 0, BtLockRead, bt->thread_no) )
3506 bt->cacheslot = slot - 1;
3508 return bt->mgr->err;
3512 if( slot = bt_loadpage (bt->main, bt->mainset, key, len, 0, BtLockRead, bt->thread_no) )
3513 bt->mainslot = slot - 1;
3515 return bt->mgr->err;
3524 double getCpuTime(int type)
3527 FILETIME xittime[1];
3528 FILETIME systime[1];
3529 FILETIME usrtime[1];
3530 SYSTEMTIME timeconv[1];
3533 memset (timeconv, 0, sizeof(SYSTEMTIME));
3537 GetSystemTimeAsFileTime (xittime);
3538 FileTimeToSystemTime (xittime, timeconv);
3539 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3542 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3543 FileTimeToSystemTime (usrtime, timeconv);
3546 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3547 FileTimeToSystemTime (systime, timeconv);
3551 ans += (double)timeconv->wHour * 3600;
3552 ans += (double)timeconv->wMinute * 60;
3553 ans += (double)timeconv->wSecond;
3554 ans += (double)timeconv->wMilliseconds / 1000;
3559 #include <sys/resource.h>
3561 double getCpuTime(int type)
3563 struct rusage used[1];
3564 struct timeval tv[1];
3568 gettimeofday(tv, NULL);
3569 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3572 getrusage(RUSAGE_SELF, used);
3573 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3576 getrusage(RUSAGE_SELF, used);
3577 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3584 void bt_poolaudit (BtMgr *mgr, char *type)
3586 BtLatchSet *latch, test[1];
3589 memset (test, 0, sizeof(test));
3591 if( memcmp (test, mgr->latchsets, sizeof(test)) )
3592 fprintf(stderr, "%s latchset zero overwritten\n", type);
3594 if( memcmp (test, mgr->leafsets, sizeof(test)) )
3595 fprintf(stderr, "%s leafset zero overwritten\n", type);
3597 for( entry = 0; ++entry < mgr->latchtotal; ) {
3598 latch = mgr->latchsets + entry;
3601 fprintf(stderr, "%s latchset %d is a leaf on page %d\n", type, entry, latch->page_no);
3603 if( *latch->modify->value )
3604 fprintf(stderr, "%s latchset %d modifylocked for page %d\n", type, entry, latch->page_no);
3606 if( latch->pin & ~CLOCK_bit )
3607 fprintf(stderr, "%s latchset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3610 for( entry = 0; ++entry < mgr->leaftotal; ) {
3611 latch = mgr->leafsets + entry;
3614 fprintf(stderr, "%s leafset %d is not a leaf on page %d\n", type, entry, latch->page_no);
3616 if( *latch->modify->value )
3617 fprintf(stderr, "%s leafset %d modifylocked for page %d\n", type, entry, latch->page_no);
3619 if( latch->pin & ~CLOCK_bit )
3620 fprintf(stderr, "%s leafset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3633 // standalone program to index file of keys
3634 // then list them onto std-out
3637 void *index_file (void *arg)
3639 uint __stdcall index_file (void *arg)
3642 int line = 0, found = 0, cnt = 0, cachecnt, idx;
3643 uid next, page_no = LEAF_page; // start on first page of leaves
3644 int ch, len = 0, slot, type = 0;
3645 unsigned char key[BT_maxkey];
3646 unsigned char buff[65536];
3647 uint nxt = sizeof(buff);
3648 ThreadArg *args = arg;
3657 bt = bt_open (args->mgr, args->main);
3658 page = (BtPage)buff;
3660 if( args->idx < strlen (args->type) )
3661 ch = args->type[args->idx];
3663 ch = args->type[strlen(args->type) - 1];
3675 if( type == Delete )
3676 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3678 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3680 if( type == Delete )
3681 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3683 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3685 if( in = fopen (args->infile, "rb") )
3686 while( ch = getc(in), ch != EOF )
3692 if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3693 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3699 memcpy (buff + nxt, key + 10, len - 10);
3701 buff[nxt] = len - 10;
3703 memcpy (buff + nxt, key, 10);
3706 slotptr(page,++cnt)->off = nxt;
3707 slotptr(page,cnt)->type = type;
3710 if( cnt < args->num )
3717 if( bt_atomictxn (bt, page) )
3718 fprintf(stderr, "Error %d Line: %d by %d source: %d\n", bt->mgr->err, bt->mgr->line, bt->mgr->err_thread, line), exit(0);
3723 else if( len < BT_maxkey )
3725 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3729 fprintf(stderr, "started indexing for %s\n", args->infile);
3730 if( in = fopen (args->infile, "r") )
3731 while( ch = getc(in), ch != EOF )
3736 if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3737 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3740 else if( len < BT_maxkey )
3742 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3746 fprintf(stderr, "started finding keys for %s\n", args->infile);
3747 if( in = fopen (args->infile, "rb") )
3748 while( ch = getc(in), ch != EOF )
3752 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3754 else if( bt->mgr->err )
3755 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3758 else if( len < BT_maxkey )
3760 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3764 fprintf(stderr, "started forward scan\n");
3765 if( bt_startkey (bt, NULL, 0) )
3766 fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3768 while( bt_nextkey (bt) ) {
3769 if( bt->phase == 1 ) {
3770 len = bt->mainkey->len;
3772 if( bt->mainnode->type == Duplicate )
3775 fwrite (bt->mainkey->key, len, 1, stdout);
3776 fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3778 len = bt->cachekey->len;
3780 if( bt->cachenode->type == Duplicate )
3783 fwrite (bt->cachekey->key, len, 1, stdout);
3784 fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3787 fputc ('\n', stdout);
3791 bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3792 bt_unpinlatch (bt->cacheset->latch, 0, bt->thread_no, __LINE__);
3794 bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3795 bt_unpinlatch (bt->mainset->latch, 0, bt->thread_no, __LINE__);
3797 fprintf(stderr, " Total keys read %d\n", cnt);
3801 fprintf(stderr, "started reverse scan\n");
3802 if( bt_lastkey (bt) )
3803 fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3805 while( bt_prevkey (bt) ) {
3806 if( bt->phase == 1 ) {
3807 len = bt->mainkey->len;
3809 if( bt->mainnode->type == Duplicate )
3812 fwrite (bt->mainkey->key, len, 1, stdout);
3813 fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3815 len = bt->cachekey->len;
3817 if( bt->cachenode->type == Duplicate )
3820 fwrite (bt->cachekey->key, len, 1, stdout);
3821 fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3824 fputc ('\n', stdout);
3828 bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3829 bt_unpinlatch (bt->cacheset->latch, 0, bt->thread_no, __LINE__);
3831 bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3832 bt_unpinlatch (bt->mainset->latch, 0, bt->thread_no, __LINE__);
3834 fprintf(stderr, " Total keys read %d\n", cnt);
3838 fprintf(stderr, "started counting LSM cache btree\n");
3839 next = bt->mgr->redopage + bt->mgr->pagezero->redopages;
3840 page_no = LEAF_page;
3843 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3845 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3846 pread(bt->mgr->idx, page, sizeof(*page), page_no << bt->mgr->page_bits);
3847 if( !page->lvl && !page->free )
3852 page_no += page->lvl ? 1 : (1 << bt->mgr->leaf_xtra);
3856 cachecnt = --cnt; // remove stopper key
3859 fprintf(stderr, "started counting LSM main btree\n");
3860 next = bt->main->redopage + bt->main->pagezero->redopages;
3861 page_no = LEAF_page;
3864 posix_fadvise( bt->main->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3866 while( page_no < bt_getid(bt->main->pagezero->alloc->right) ) {
3867 pread(bt->main->idx, page, sizeof(*page), page_no << bt->main->page_bits);
3873 page_no += page->lvl ? 1 : (1 << bt->main->leaf_xtra);
3877 cnt--; // remove stopper key
3879 fprintf(stderr, " cache keys counted %d\n", cachecnt);
3880 fprintf(stderr, " main keys counted %d\n", cnt);
3881 fprintf(stderr, " Total keys counted %d\n", cnt + cachecnt);
3893 typedef struct timeval timer;
3895 int main (int argc, char **argv)
3897 int idx, cnt, len, slot, err;
3905 uint mainleafpool = 0;
3906 uint mainleafxtra = 0;
3922 fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
3923 fprintf (stderr, " where idx_file is the name of the cache btree file\n");
3924 fprintf (stderr, " where main_file is the name of the main btree file\n");
3925 fprintf (stderr, " cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with a one character command for each input src_file. Commands can also be given with no input file\n");
3926 fprintf (stderr, " pagebits is the page size in bits for the cache btree\n");
3927 fprintf (stderr, " leafbits is the number of xtra bits for a leaf page\n");
3928 fprintf (stderr, " poolsize is the number of pages in buffer pool for the cache btree\n");
3929 fprintf (stderr, " leafpool is the number of leaf pages in leaf pool for the cache btree\n");
3930 fprintf (stderr, " txnsize = n to block transactions into n units, or zero for no transactions\n");
3931 fprintf (stderr, " redopages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3932 fprintf (stderr, " mainbits is the page size of the main btree in bits\n");
3933 fprintf (stderr, " mainpool is the number of main pages in the main buffer pool\n");
3934 fprintf (stderr, " mainleaf is the number of main pages in the main leaf pool\n");
3935 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3939 start = getCpuTime(0);
3942 bits = atoi(argv[4]);
3945 leafxtra = atoi(argv[5]);
3948 poolsize = atoi(argv[6]);
3951 fprintf (stderr, "no page pool\n"), exit(1);
3954 leafpool = atoi(argv[7]);
3957 num = atoi(argv[8]);
3960 redopages = atoi(argv[9]);
3962 if( redopages > 65535 )
3963 fprintf (stderr, "Recovery buffer too large\n"), exit(1);
3966 mainbits = atoi(argv[10]);
3969 mainleafxtra = atoi(argv[11]);
3972 mainpool = atoi(argv[12]);
3975 mainleafpool = atoi(argv[13]);
3979 threads = malloc (cnt * sizeof(pthread_t));
3981 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3983 args = malloc ((cnt + 1) * sizeof(ThreadArg));
3985 mgr = bt_mgr (argv[1], bits, leafxtra, poolsize, leafpool, redopages);
3988 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3994 main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool, mainleafpool, 0);
3997 fprintf(stderr, "Index Open Error %s\n", argv[2]);
4007 for( idx = 0; idx < cnt; idx++ ) {
4008 args[idx].infile = argv[idx + 14];
4009 args[idx].type = argv[3];
4010 args[idx].main = main;
4011 args[idx].mgr = mgr;
4012 args[idx].num = num;
4013 args[idx].idx = idx;
4015 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
4016 fprintf(stderr, "Error creating thread %d\n", err);
4018 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
4022 args[0].infile = argv[idx + 10];
4023 args[0].type = argv[3];
4024 args[0].main = main;
4031 // wait for termination
4034 for( idx = 0; idx < cnt; idx++ )
4035 pthread_join (threads[idx], NULL);
4037 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
4039 for( idx = 0; idx < cnt; idx++ )
4040 CloseHandle(threads[idx]);
4042 bt_poolaudit(mgr, "cache");
4045 bt_poolaudit(main, "main");
4047 fprintf(stderr, "cache %lld leaves %lld upper %d reads %d writes %d found\n", mgr->pagezero->leafpages, mgr->pagezero->upperpages, mgr->reads, mgr->writes, mgr->found);
4049 fprintf(stderr, "main %lld leaves %lld upper %d reads %d writes %d found\n", main->pagezero->leafpages, main->pagezero->upperpages, main->reads, main->writes, main->found);
4056 elapsed = getCpuTime(0) - start;
4057 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4058 elapsed = getCpuTime(1);
4059 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4060 elapsed = getCpuTime(2);
4061 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4064 BtKey *bt_key (BtPage page, uint slot)
4066 return keyptr(page,slot);
4069 BtSlot *bt_slot (BtPage page, uint slot)
4071 return slotptr(page,slot);