1 // btree version threadskv10g futex version
2 // with reworked bt_deletekey code,
3 // phase-fair re-entrant reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // redo log for failure recovery
10 // LSM B-trees for write optimization
11 // larger sized leaf pages than non-leaf
12 // and LSM B-tree find & count operations
16 // author: karl malbrain, malbrain@cal.berkeley.edu
19 This work, including the source code, documentation
20 and related data, is placed into the public domain.
22 The orginal author is Karl Malbrain.
24 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
25 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
26 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
27 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
28 RESULTING FROM THE USE, MODIFICATION, OR
29 REDISTRIBUTION OF THIS SOFTWARE.
32 // Please see the project home page for documentation
33 // code.google.com/p/high-concurrency-btree
35 #define _FILE_OFFSET_BITS 64
36 #define _LARGEFILE64_SOURCE
40 #include <xmmintrin.h>
41 #include <linux/futex.h>
56 #define WIN32_LEAN_AND_MEAN
70 typedef unsigned long long uid;
71 typedef unsigned long long logseqno;
74 typedef unsigned long long off64_t;
75 typedef unsigned short ushort;
76 typedef unsigned int uint;
79 #define BT_ro 0x6f72 // ro
80 #define BT_rw 0x7772 // rw
82 #define BT_maxbits 26 // maximum page size in bits
83 #define BT_minbits 9 // minimum page size in bits
84 #define BT_minpage (1 << BT_minbits) // minimum page size
85 #define BT_maxpage (1 << BT_maxbits) // maximum page size
87 // BTree page number constants
88 #define ALLOC_page 0 // allocation page
89 #define ROOT_page 1 // root of the btree
90 #define LEAF_page 2 // first page of leaves
92 // Number of levels to create in a new BTree
97 There are six lock types for each node in four independent sets:
98 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
99 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
100 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
101 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
102 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
103 6. (set 4) LinkModification: Exclusive. Update of a node's left link is underway. Incompatible with another LinkModification.
118 volatile unsigned char xcl[1];
119 volatile unsigned char filler;
120 volatile ushort waiters[1];
126 // definition for reader/writer reentrant lock implementation
132 ushort dup; // re-entrant locks
133 ushort tid; // owner thread-no
134 ushort line; // owner line #
137 // hash table entries
141 uint entry; // Latch table entry at head of chain
144 // latch manager table structure
147 uid page_no; // latch set page number
148 RWLock readwr[1]; // read/write page lock
149 RWLock access[1]; // Access Intent/Page delete
150 RWLock parent[1]; // Posting of fence key in parent
151 RWLock link[1]; // left link update in progress
152 MutexLatch modify[1]; // modify entry lite latch
153 uint split; // right split page atomic insert
154 uint next; // next entry in hash table chain
155 uint prev; // prev entry in hash table chain
156 volatile ushort pin; // number of accessing threads
157 unsigned char dirty; // page in cache is dirty
158 unsigned char leaf; // page in cache is a leaf
161 // Define the length of the page record numbers
165 // Page key slot definition.
167 // Keys are marked dead, but remain on the page until
168 // it cleanup is called. The fence key (highest key) for
169 // a leaf page is always present, even after cleanup.
173 // In addition to the Unique keys that occupy slots
174 // there are Librarian and Duplicate key
175 // slots occupying the key slot array.
177 // The Librarian slots are dead keys that
178 // serve as filler, available to add new Unique
179 // or Dup slots that are inserted into the B-tree.
181 // The Duplicate slots have had their key bytes extended
182 // by 6 bytes to contain a binary duplicate key uniqueifier.
193 uint off:BT_maxbits; // page offset for key start
194 uint type:3; // type of slot
195 uint dead:1; // set for deleted slot
198 // The key structure occupies space at the upper end of
199 // each page. It's a length byte followed by the key
203 unsigned char len; // this can be changed to a ushort or uint
204 unsigned char key[0];
207 // the value structure also occupies space at the upper
208 // end of the page. Each key is immediately followed by a value.
211 unsigned char len; // this can be changed to a ushort or uint
212 unsigned char value[0];
215 #define BT_maxkey 255 // maximum number of bytes in a key
216 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
218 // The first part of an index page.
219 // It is immediately followed
220 // by the BtSlot array of keys.
222 typedef struct BtPage_ {
223 uint cnt; // count of keys in page
224 uint act; // count of active keys
225 uint min; // next key offset
226 uint garbage; // page garbage in bytes
227 unsigned char lvl; // level of page
228 unsigned char free; // page is on free chain
229 unsigned char kill; // page is being deleted
230 unsigned char nopromote; // page is being constructed
231 unsigned char filler1[6]; // padding to multiple of 8 bytes
232 unsigned char right[BtId]; // page number to right
233 unsigned char left[BtId]; // page number to left
234 unsigned char filler2[2]; // padding to multiple of 8 bytes
235 logseqno lsn; // log sequence number applied
236 uid page_no; // this page number
239 // The loadpage interface object
242 BtPage page; // current page pointer
243 BtLatchSet *latch; // current page latch set
246 // structure for latch manager on ALLOC_page
249 struct BtPage_ alloc[1]; // next page_no in right ptr
250 unsigned char freechain[BtId]; // head of free page_nos chain
251 unsigned char leafchain[BtId]; // head of leaf page_nos chain
252 unsigned long long leafpages; // number of active leaf pages
253 unsigned long long upperpages; // number of active upper pages
254 uint redopages; // number of redo pages in file
255 unsigned char leaf_xtra; // leaf page size in xtra bits
256 unsigned char page_bits; // base page size in bits
259 // The object structure for Btree access
262 uint page_size; // base page size
263 uint page_bits; // base page size in bits
264 uint leaf_xtra; // leaf xtra bits
270 BtPageZero *pagezero; // mapped allocation page
271 BtHashEntry *hashtable; // the buffer pool hash table entries
272 BtHashEntry *leaftable; // the buffer pool hash table entries
273 BtLatchSet *latchsets; // mapped latch set from buffer pool
274 BtLatchSet *leafsets; // mapped latch set from buffer pool
275 unsigned char *pagepool; // mapped to the buffer pool pages
276 unsigned char *leafpool; // mapped to the leaf pool pages
277 unsigned char *redobuff; // mapped recovery buffer pointer
278 logseqno lsn, flushlsn; // current & first lsn flushed
279 MutexLatch redo[1]; // redo area lite latch
280 MutexLatch lock[1]; // allocation area lite latch
281 ushort thread_no[1]; // highest thread number issued
282 ushort err_thread; // error thread number
283 uint nlatchpage; // size of buffer pool & latchsets
284 uint latchtotal; // number of page latch entries
285 uint latchhash; // number of latch hash table slots
286 uint latchvictim; // next latch entry to swap out
287 uint nleafpage; // size of leaf pool & leafsets
288 uint leaftotal; // number of leaf latch entries
289 uint leafhash; // number of leaf hash table slots
290 uint leafvictim; // next leaf entry to swap out
291 uint leafpromote; // next leaf entry to promote
292 uint redopage; // page number of redo buffer
293 uint redolast; // last msync size of recovery buff
294 uint redoend; // eof/end element in recovery buff
295 int err; // last error
296 int line; // last error line no
297 int found; // number of keys found by delete
298 int reads, writes; // number of reads and writes
299 int type; // type of LSM tree 0=cache, 1=main
301 HANDLE halloc; // allocation handle
302 HANDLE hpool; // buffer pool handle
304 unsigned char *page0; // memory mapped pagezero of b-tree
308 BtMgr *mgr; // buffer manager for entire process
309 BtMgr *main; // buffer manager for main btree
310 BtPageSet cacheset[1]; // cached page frame for cache btree
311 BtPageSet mainset[1]; // cached page frame for main btree
312 uint cacheslot; // slot number in cacheset
313 uint mainslot; // slot number in mainset
314 ushort phase; // 1 = main btree 0 = cache btree 2 = both
315 ushort thread_no; // thread number
324 // atomic txn structure
327 logseqno reqlsn; // redo log seq no required
328 uint entry:31; // latch table entry number
329 uint reuse:1; // reused previous page
330 uint slot; // slot on page
333 // Catastrophic errors
347 #define CLOCK_bit 0x8000
349 // recovery manager entry types
352 BTRM_eof = 0, // rest of buffer is emtpy
353 BTRM_add, // add a unique key-value to btree
354 BTRM_dup, // add a duplicate key-value to btree
355 BTRM_del, // delete a key-value from btree
356 BTRM_upd, // update a key with a new value
357 BTRM_new, // allocate a new empty page
358 BTRM_old // reuse an old empty page
361 // recovery manager entry
362 // structure followed by BtKey & BtVal
365 logseqno reqlsn; // log sequence number required
366 logseqno lsn; // log sequence number for entry
367 uint len; // length of entry
368 unsigned char type; // type of entry
369 unsigned char lvl; // level of btree entry pertains to
374 extern void bt_close (BtDb *bt);
375 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
376 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
377 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
378 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
379 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
380 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
381 extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
383 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
385 extern BTERR bt_startkey (BtDb *db, unsigned char *key, uint len);
386 extern BTERR bt_nextkey (BtDb *bt);
388 extern uint bt_lastkey (BtDb *bt);
389 extern uint bt_prevkey (BtDb *bt);
392 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
393 extern void bt_mgrclose (BtMgr *mgr);
394 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
395 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
397 // atomic transaction functions
398 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
399 BTERR bt_promote (BtDb *bt);
401 // The page is allocated from low and hi ends.
402 // The key slots are allocated from the bottom,
403 // while the text and value of the key
404 // are allocated from the top. When the two
405 // areas meet, the page is split into two.
407 // A key consists of a length byte, two bytes of
408 // index number (0 - 65535), and up to 253 bytes
411 // Associated with each key is a value byte string
412 // containing any value desired.
414 // The b-tree root is always located at page 1.
415 // The first leaf page of level zero is always
416 // located on page 2.
418 // The b-tree pages are linked with next
419 // pointers to facilitate enumerators,
420 // and provide for concurrency.
422 // When to root page fills, it is split in two and
423 // the tree height is raised by a new root at page
424 // one with two keys.
426 // Deleted keys are marked with a dead bit until
427 // page cleanup. The fence key for a leaf node is
430 // To achieve maximum concurrency one page is locked at a time
431 // as the tree is traversed to find leaf key in question. The right
432 // page numbers are used in cases where the page is being split,
435 // Page 0 is dedicated to lock for new page extensions,
436 // and chains empty pages together for reuse. It also
437 // contains the latch manager hash table.
439 // The ParentModification lock on a node is obtained to serialize posting
440 // or changing the fence key for a node.
442 // Empty pages are chained together through the ALLOC page and reused.
444 // Access macros to address slot and key values from the page
445 // Page slots use 1 based indexing.
447 #define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1))
448 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
449 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
451 void bt_putid(unsigned char *dest, uid id)
456 dest[i] = (unsigned char)id, id >>= 8;
459 uid bt_getid(unsigned char *src)
464 for( i = 0; i < BtId; i++ )
465 id <<= 8, id |= *src++;
470 // lite weight spin lock Latch Manager
472 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
474 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
477 void bt_mutexlock(MutexLatch *latch)
479 uint idx, waited = 0;
483 for( idx = 0; idx < 100; idx++ ) {
484 *prev->value = __sync_fetch_and_or (latch->value, 1);
485 if( !*prev->bits->xcl ) {
487 __sync_fetch_and_sub (latch->bits->waiters, 1);
493 __sync_fetch_and_add (latch->bits->waiters, 1);
494 *prev->bits->waiters += 1;
498 sys_futex (latch->value, FUTEX_WAIT_PRIVATE, *prev->value, NULL, NULL, 0);
502 int bt_mutextry(MutexLatch *latch)
504 return !__sync_lock_test_and_set (latch->bits->xcl, 1);
507 void bt_releasemutex(MutexLatch *latch)
511 *prev->value = __sync_fetch_and_and (latch->value, 0xffff0000);
513 if( *prev->bits->waiters )
514 sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
517 // reader/writer lock implementation
519 void WriteLock (RWLock *lock, ushort tid, uint line)
521 if( lock->tid == tid ) {
525 bt_mutexlock (lock->xcl);
526 bt_mutexlock (lock->wrt);
527 bt_releasemutex (lock->xcl);
533 void WriteRelease (RWLock *lock)
540 bt_releasemutex (lock->wrt);
543 void ReadLock (RWLock *lock, ushort tid)
545 bt_mutexlock (lock->xcl);
547 if( !__sync_fetch_and_add (&lock->readers, 1) )
548 bt_mutexlock (lock->wrt);
550 bt_releasemutex (lock->xcl);
553 void ReadRelease (RWLock *lock)
555 if( __sync_fetch_and_sub (&lock->readers, 1) == 1 )
556 bt_releasemutex (lock->wrt);
559 // recovery manager -- flush dirty pages
561 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
563 uint cnt3 = 0, cnt2 = 0, cnt = 0;
568 // flush dirty pool pages to the btree
570 fprintf(stderr, "Start flushlsn ");
571 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
572 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
573 latch = mgr->latchsets + entry;
574 bt_mutexlock (latch->modify);
575 bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
578 bt_writepage(mgr, page, latch->page_no, 0);
579 latch->dirty = 0, cnt++;
581 if( latch->pin & ~CLOCK_bit )
583 bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
584 bt_releasemutex (latch->modify);
586 for( entry = 1; entry < mgr->leaftotal; entry++ ) {
587 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
588 latch = mgr->leafsets + entry;
589 bt_mutexlock (latch->modify);
590 bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
593 bt_writepage(mgr, page, latch->page_no, 1);
594 latch->dirty = 0, cnt++;
596 if( latch->pin & ~CLOCK_bit )
598 bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
599 bt_releasemutex (latch->modify);
601 fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
604 // recovery manager -- process current recovery buff on startup
605 // this won't do much if previous session was properly closed.
607 BTERR bt_recoveryredo (BtMgr *mgr)
614 hdr = (BtLogHdr *)mgr->redobuff;
615 mgr->flushlsn = hdr->lsn;
618 hdr = (BtLogHdr *)(mgr->redobuff + offset);
619 switch( hdr->type ) {
623 case BTRM_add: // add a unique key-value to btree
625 case BTRM_dup: // add a duplicate key-value to btree
626 case BTRM_del: // delete a key-value from btree
627 case BTRM_upd: // update a key with a new value
628 case BTRM_new: // allocate a new empty page
629 case BTRM_old: // reuse an old empty page
635 // recovery manager -- append new entry to recovery log
636 // flush dirty pages to disk when it overflows.
638 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
640 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
641 uint amt = sizeof(BtLogHdr);
645 bt_mutexlock (mgr->redo);
648 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
650 // see if new entry fits in buffer
651 // flush and reset if it doesn't
653 if( amt > size - mgr->redoend ) {
654 mgr->flushlsn = mgr->lsn;
655 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
656 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
659 bt_flushlsn(mgr, thread_no);
662 // fill in new entry & either eof or end block
664 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
669 hdr->lsn = ++mgr->lsn;
673 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
674 memset (eof, 0, sizeof(BtLogHdr));
676 // fill in key and value
679 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
680 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
683 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
684 memset (eof, 0, sizeof(BtLogHdr));
687 last = mgr->redolast & ~0xfff;
690 if( end - last + sizeof(BtLogHdr) >= 32768 )
691 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
692 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
696 bt_releasemutex(mgr->redo);
700 // recovery manager -- append transaction to recovery log
701 // flush dirty pages to disk when it overflows.
703 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
705 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
706 uint amt = 0, src, type;
713 // determine amount of redo recovery log space required
715 for( src = 0; src++ < source->cnt; ) {
716 key = keyptr(source,src);
717 val = valptr(source,src);
718 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
719 amt += sizeof(BtLogHdr);
722 bt_mutexlock (mgr->redo);
724 // see if new entry fits in buffer
725 // flush and reset if it doesn't
727 if( amt > size - mgr->redoend ) {
728 mgr->flushlsn = mgr->lsn;
729 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
730 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
733 bt_flushlsn (mgr, thread_no);
736 // assign new lsn to transaction
740 // fill in new entries
742 for( src = 0; src++ < source->cnt; ) {
743 key = keyptr(source, src);
744 val = valptr(source, src);
746 switch( slotptr(source, src)->type ) {
758 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
759 amt += sizeof(BtLogHdr);
761 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
767 // fill in key and value
769 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
770 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
775 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
776 memset (eof, 0, sizeof(BtLogHdr));
779 last = mgr->redolast & ~0xfff;
782 if( end - last + sizeof(BtLogHdr) >= 32768 )
783 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
784 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
788 bt_releasemutex(mgr->redo);
792 // read page into buffer pool from permanent location in Btree file
794 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
796 uint page_size = mgr->page_size;
799 page_size <<= mgr->leaf_xtra;
801 if( pread(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
802 return mgr->err = BTERR_read;
804 __sync_fetch_and_add (&mgr->reads, 1);
808 // write page to permanent location in Btree file
810 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
812 uint page_size = mgr->page_size;
815 page_size <<= mgr->leaf_xtra;
817 if( pwrite(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
818 return mgr->err = BTERR_wrt;
820 __sync_fetch_and_add (&mgr->writes, 1);
824 // set CLOCK bit in latch
825 // decrement pin count
827 void bt_unpinlatch (BtLatchSet *latch, uint dirty, ushort thread_no, uint line)
829 bt_mutexlock(latch->modify);
832 latch->pin |= CLOCK_bit;
834 bt_releasemutex(latch->modify);
837 // return the btree cached page address
839 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
841 uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets);
845 page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
847 page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
852 // return next available leaf entry
853 // and with latch entry locked
855 uint bt_availleaf (BtMgr *mgr)
862 entry = __sync_fetch_and_add (&mgr->leafvictim, 1) + 1;
864 entry = _InterlockedIncrement (&mgr->leafvictim);
866 entry %= mgr->leaftotal;
871 latch = mgr->leafsets + entry;
873 if( !bt_mutextry(latch->modify) )
876 // return this entry if it is not pinned
881 // if the CLOCK bit is set
884 latch->pin &= ~CLOCK_bit;
885 bt_releasemutex(latch->modify);
889 // return next available latch entry
890 // and with latch entry locked
892 uint bt_availnext (BtMgr *mgr)
899 entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
901 entry = _InterlockedIncrement (&mgr->latchvictim);
903 entry %= mgr->latchtotal;
908 latch = mgr->latchsets + entry;
910 if( !bt_mutextry(latch->modify) )
913 // return this entry if it is not pinned
918 // if the CLOCK bit is set
921 latch->pin &= ~CLOCK_bit;
922 bt_releasemutex(latch->modify);
926 // pin leaf in leaf buffer pool
927 // return with latchset pinned
929 BtLatchSet *bt_pinleaf (BtMgr *mgr, uid page_no, ushort thread_no)
931 uint hashidx = page_no % mgr->leafhash;
936 // try to find our entry
938 bt_mutexlock(mgr->leaftable[hashidx].latch);
940 if( entry = mgr->leaftable[hashidx].entry )
942 latch = mgr->leafsets + entry;
944 if( page_no == latch->page_no )
946 } while( entry = latch->next );
948 // found our entry: increment pin
951 latch = mgr->leafsets + entry;
952 bt_mutexlock(latch->modify);
953 latch->pin |= CLOCK_bit;
956 bt_releasemutex(latch->modify);
957 bt_releasemutex(mgr->leaftable[hashidx].latch);
961 // find and reuse unpinned entry
964 entry = bt_availleaf (mgr);
965 latch = mgr->leafsets + entry;
967 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
968 oldidx = latch->page_no % mgr->leafhash;
970 // skip over this entry if old latch is not available
973 if( oldidx != hashidx )
974 if( !bt_mutextry (mgr->leaftable[oldidx].latch) ) {
975 bt_releasemutex(latch->modify);
979 // update permanent page area in btree from buffer pool
980 // no read-lock is required since page is not pinned.
983 if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) )
984 return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
988 // if latch is on a different hash chain
989 // unlink from the old page_no chain
992 if( oldidx != hashidx ) {
994 mgr->leafsets[latch->prev].next = latch->next;
996 mgr->leaftable[oldidx].entry = latch->next;
999 mgr->leafsets[latch->next].prev = latch->prev;
1001 bt_releasemutex (mgr->leaftable[oldidx].latch);
1004 if( bt_readpage (mgr, page, page_no, 1) )
1005 return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1007 // link page as head of hash table chain
1008 // if this is a never before used entry,
1009 // or it was previously on a different
1010 // hash table chain. Otherwise, just
1011 // leave it in its current hash table
1014 if( !latch->page_no || hashidx != oldidx ) {
1015 if( latch->next = mgr->leaftable[hashidx].entry )
1016 mgr->leafsets[latch->next].prev = entry;
1018 mgr->leaftable[hashidx].entry = entry;
1022 // fill in latch structure
1024 latch->pin = CLOCK_bit | 1;
1025 latch->page_no = page_no;
1028 bt_releasemutex (latch->modify);
1029 bt_releasemutex (mgr->leaftable[hashidx].latch);
1033 // pin page in non-leaf buffer pool
1034 // return with latchset pinned
1036 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, ushort thread_no)
1038 uint hashidx = page_no % mgr->latchhash;
1043 // try to find our entry
1045 bt_mutexlock(mgr->hashtable[hashidx].latch);
1047 if( entry = mgr->hashtable[hashidx].entry ) do
1049 latch = mgr->latchsets + entry;
1051 if( page_no == latch->page_no )
1053 } while( entry = latch->next );
1055 // found our entry: increment pin
1058 latch = mgr->latchsets + entry;
1059 bt_mutexlock(latch->modify);
1060 latch->pin |= CLOCK_bit;
1062 bt_releasemutex(latch->modify);
1063 bt_releasemutex(mgr->hashtable[hashidx].latch);
1067 // find and reuse unpinned entry
1070 entry = bt_availnext (mgr);
1071 latch = mgr->latchsets + entry;
1073 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1074 oldidx = latch->page_no % mgr->latchhash;
1076 // skip over this entry if latch not available
1078 if( latch->page_no )
1079 if( oldidx != hashidx )
1080 if( !bt_mutextry (mgr->hashtable[oldidx].latch) ) {
1081 bt_releasemutex(latch->modify);
1085 // update permanent page area in btree from buffer pool
1086 // no read-lock is required since page is not pinned.
1089 if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1090 return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1094 // if latch is on a different hash chain
1095 // unlink from the old page_no chain
1097 if( latch->page_no )
1098 if( oldidx != hashidx ) {
1100 mgr->latchsets[latch->prev].next = latch->next;
1102 mgr->hashtable[oldidx].entry = latch->next;
1105 mgr->latchsets[latch->next].prev = latch->prev;
1107 bt_releasemutex (mgr->hashtable[oldidx].latch);
1110 if( bt_readpage (mgr, page, page_no, 0) )
1111 return mgr->line = __LINE__, NULL;
1113 // link page as head of hash table chain
1114 // if this is a never before used entry,
1115 // or it was previously on a different
1116 // hash table chain. Otherwise, just
1117 // leave it in its current hash table
1120 if( !latch->page_no || hashidx != oldidx ) {
1121 if( latch->next = mgr->hashtable[hashidx].entry )
1122 mgr->latchsets[latch->next].prev = entry;
1124 mgr->hashtable[hashidx].entry = entry;
1128 // fill in latch structure
1130 latch->pin = CLOCK_bit | 1;
1131 latch->page_no = page_no;
1134 bt_releasemutex (latch->modify);
1135 bt_releasemutex (mgr->hashtable[hashidx].latch);
1139 void bt_mgrclose (BtMgr *mgr)
1141 char *name = mgr->type ? "Main" : "Cache";
1148 // flush previously written dirty pages
1149 // and write recovery buffer to disk
1151 fdatasync (mgr->idx);
1153 if( mgr->redoend ) {
1154 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1155 memset (eof, 0, sizeof(BtLogHdr));
1158 // write remaining dirty pool pages to the btree
1160 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
1161 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1162 latch = mgr->latchsets + entry;
1164 if( latch->dirty ) {
1165 bt_writepage(mgr, page, latch->page_no, 0);
1166 latch->dirty = 0, num++;
1171 fprintf(stderr, "%s %d non-leaf buffer pool pages flushed\n", name, num);
1175 // write remaining dirty leaf pages to the btree
1177 for( entry = 1; entry < mgr->leaftotal; entry++ ) {
1178 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1179 latch = mgr->leafsets + entry;
1181 if( latch->dirty ) {
1182 bt_writepage(mgr, page, latch->page_no, 1);
1183 latch->dirty = 0, num++;
1188 fprintf(stderr, "%s %d leaf buffer pool pages flushed\n", name, num);
1190 // clear redo recovery buffer on disk.
1192 if( mgr->pagezero->redopages ) {
1193 eof = (BtLogHdr *)mgr->redobuff;
1194 memset (eof, 0, sizeof(BtLogHdr));
1195 eof->lsn = mgr->lsn;
1196 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1197 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1201 munmap (mgr->page0, (uid)(mgr->redopage + mgr->pagezero->redopages) << mgr->page_bits);
1203 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1204 munmap (mgr->leafpool, (uid)mgr->nleafpage << mgr->page_bits);
1205 munmap (mgr->pagezero, mgr->page_size);
1207 FlushViewOfFile(mgr->pagezero, 0);
1208 UnmapViewOfFile(mgr->pagezero);
1209 UnmapViewOfFile(mgr->pagepool);
1210 CloseHandle(mgr->halloc);
1211 CloseHandle(mgr->hpool);
1217 FlushFileBuffers(mgr->idx);
1218 CloseHandle(mgr->idx);
1223 // close and release memory
1225 void bt_close (BtDb *bt)
1230 // open/create new btree buffer manager
1232 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1233 // size of page pool (e.g. 300) and leaf pool (e.g. 4000)
1235 BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax, uint leafmax, uint redopages)
1237 uint lvl, attr, last, slot, idx;
1238 uint nlatchpage, latchhash;
1239 uint nleafpage, leafhash;
1240 unsigned char value[BtId];
1241 int flag, initit = 0;
1242 BtPageZero *pagezero;
1250 // determine sanity of page size and buffer pool
1252 if( leafxtra + pagebits > BT_maxbits )
1253 fprintf (stderr, "pagebits + leafxtra > maxbits\n"), exit(1);
1255 if( pagebits < BT_minbits )
1256 fprintf (stderr, "pagebits < minbits\n"), exit(1);
1259 mgr = calloc (1, sizeof(BtMgr));
1261 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1263 if( mgr->idx == -1 ) {
1264 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1265 return free(mgr), NULL;
1268 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1269 attr = FILE_ATTRIBUTE_NORMAL;
1270 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1272 if( mgr->idx == INVALID_HANDLE_VALUE ) {
1273 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1274 return GlobalFree(mgr), NULL;
1279 pagezero = valloc (BT_maxpage);
1282 // read minimum page size to get root info
1283 // to support raw disk partition files
1284 // check if page_bits == 0 on the disk.
1286 if( size = lseek (mgr->idx, 0L, 2) )
1287 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1288 if( pagezero->page_bits ) {
1289 pagebits = pagezero->page_bits;
1290 leafxtra = pagezero->leaf_xtra;
1291 redopages = pagezero->redopages;
1295 return free(mgr), free(pagezero), NULL;
1299 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1300 size = GetFileSize(mgr->idx, amt);
1302 if( size || *amt ) {
1303 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1304 return bt_mgrclose (mgr), NULL;
1305 pagebits = pagezero->page_bits;
1306 leafxtra = pagezero->leaf_xtra;
1307 redopages = pagezero->redopages;
1312 mgr->page_size = 1 << pagebits;
1313 mgr->page_bits = pagebits;
1314 mgr->leaf_xtra = leafxtra;
1316 // calculate number of latch hash table entries
1318 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1320 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1321 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1) >> mgr->page_bits;
1322 mgr->latchtotal = nodemax;
1324 // calculate number of leaf hash table entries
1326 mgr->nleafpage = ((uid)leafmax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1328 mgr->nleafpage += leafmax << leafxtra; // size of the leaf pool in pages
1329 mgr->nleafpage += (sizeof(BtLatchSet) * leafmax + mgr->page_size - 1) >> mgr->page_bits;
1330 mgr->leaftotal = leafmax;
1332 mgr->redopage = LEAF_page + (1 << leafxtra);
1337 // initialize an empty b-tree with latch page, root page, page of leaves
1338 // and page(s) of latches and page pool cache
1340 ftruncate (mgr->idx, (mgr->redopage + pagezero->redopages) << mgr->page_bits);
1341 memset (pagezero, 0, 1 << pagebits);
1342 pagezero->alloc->lvl = MIN_lvl - 1;
1343 pagezero->redopages = redopages;
1344 pagezero->page_bits = mgr->page_bits;
1345 pagezero->leaf_xtra = leafxtra;
1347 bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages);
1348 pagezero->upperpages = 1;
1349 pagezero->leafpages = 1;
1351 // initialize left-most LEAF page in
1352 // alloc->left and count of active leaf pages.
1354 bt_putid (pagezero->alloc->left, LEAF_page);
1356 if( bt_writepage (mgr, pagezero->alloc, 0, 0) ) {
1357 fprintf (stderr, "Unable to create btree page zero\n");
1358 return bt_mgrclose (mgr), NULL;
1361 memset (pagezero, 0, 1 << pagebits);
1363 for( lvl=MIN_lvl; lvl--; ) {
1364 BtSlot *node = slotptr(pagezero->alloc, 1);
1365 node->off = mgr->page_size;
1368 node->off <<= mgr->leaf_xtra;
1370 node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1371 node->type = Librarian;
1374 node->off = node[-1].off;
1375 key = keyptr(pagezero->alloc, 2);
1376 key = keyptr(pagezero->alloc, 1);
1377 key->len = 2; // create stopper key
1381 bt_putid(value, MIN_lvl - lvl + 1);
1382 val = valptr(pagezero->alloc, 1);
1383 val->len = lvl ? BtId : 0;
1384 memcpy (val->value, value, val->len);
1386 pagezero->alloc->min = node->off;
1387 pagezero->alloc->lvl = lvl;
1388 pagezero->alloc->cnt = 2;
1389 pagezero->alloc->act = 1;
1390 pagezero->alloc->page_no = MIN_lvl - lvl;
1392 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, !lvl) ) {
1393 fprintf (stderr, "Unable to create btree page\n");
1394 return bt_mgrclose (mgr), NULL;
1402 VirtualFree (pagezero, 0, MEM_RELEASE);
1405 // mlock the first segment of 64K pages
1407 flag = PROT_READ | PROT_WRITE;
1408 mgr->page0 = mmap (0, (uid)(mgr->redopage + redopages) << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1410 if( mgr->page0 == MAP_FAILED ) {
1411 fprintf (stderr, "Unable to mmap pagezero btree segment, error = %d\n", errno);
1412 return bt_mgrclose (mgr), NULL;
1415 mgr->pagezero = (BtPageZero *)mgr->page0;
1416 mlock (mgr->pagezero, mgr->page_size);
1418 mgr->redobuff = mgr->page0 + (mgr->redopage << mgr->page_bits);
1419 mlock (mgr->redobuff, redopages << mgr->page_bits);
1421 // allocate pool buffers
1423 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1424 if( mgr->pagepool == MAP_FAILED ) {
1425 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1426 return bt_mgrclose (mgr), NULL;
1429 mgr->leafpool = mmap (0, (uid)mgr->nleafpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1430 if( mgr->leafpool == MAP_FAILED ) {
1431 fprintf (stderr, "Unable to mmap anonymous leaf pool pages, error = %d\n", errno);
1432 return bt_mgrclose (mgr), NULL;
1435 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1436 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1437 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1439 mgr->leafsets = (BtLatchSet *)(mgr->leafpool + ((uid)mgr->leaftotal << mgr->page_bits << mgr->leaf_xtra));
1440 mgr->leaftable = (BtHashEntry *)(mgr->leafsets + mgr->leaftotal);
1441 mgr->leafhash = (mgr->leafpool + ((uid)mgr->nleafpage << mgr->page_bits) - (unsigned char *)mgr->leaftable) / sizeof(BtHashEntry);
1443 for( idx = 1; idx < mgr->leaftotal; idx++ ) {
1444 latch = mgr->leafsets + idx;
1451 // open BTree access method
1452 // based on buffer manager
1454 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1456 BtDb *bt = malloc (sizeof(*bt));
1458 memset (bt, 0, sizeof(*bt));
1462 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1464 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1469 // compare two keys, return > 0, = 0, or < 0
1470 // =0: keys are same
1473 // as the comparison value
1475 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1477 uint len1 = key1->len;
1480 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1491 // place write, read, or parent lock on requested page_no.
1493 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1497 ReadLock (latch->readwr, thread_no);
1500 WriteLock (latch->readwr, thread_no, line);
1503 ReadLock (latch->access, thread_no);
1506 WriteLock (latch->access, thread_no, line);
1509 WriteLock (latch->parent, thread_no, line);
1512 WriteLock (latch->link, thread_no, line);
1517 // remove write, read, or parent lock on requested page
1519 void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1523 ReadRelease (latch->readwr);
1526 WriteRelease (latch->readwr);
1529 ReadRelease (latch->access);
1532 WriteRelease (latch->access);
1535 WriteRelease (latch->parent);
1538 WriteRelease (latch->link);
1543 // allocate a new page
1544 // return with page latched, but unlocked.
1546 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_no)
1548 uint page_size = mgr->page_size, leaf_xtra = 0;
1549 unsigned char *freechain;
1552 if( contents->lvl ) {
1553 freechain = mgr->pagezero->freechain;
1554 mgr->pagezero->upperpages++;
1556 freechain = mgr->pagezero->leafchain;
1557 mgr->pagezero->leafpages++;
1558 leaf_xtra = mgr->leaf_xtra;
1559 page_size <<= leaf_xtra;
1562 // lock allocation page
1564 bt_mutexlock(mgr->lock);
1566 // use empty chain first
1567 // else allocate new page
1569 if( page_no = bt_getid(freechain) ) {
1570 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1571 set->page = bt_mappage (mgr, set->latch);
1573 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1575 bt_putid(freechain, bt_getid(set->page->right));
1577 // the page is currently nopromote and this
1578 // will keep bt_promote out.
1580 // contents will replace this bit
1581 // and pin will keep bt_promote out
1583 contents->page_no = page_no;
1584 contents->nopromote = 0;
1586 memcpy (set->page, contents, page_size);
1588 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1589 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1591 bt_releasemutex(mgr->lock);
1595 page_no = bt_getid(mgr->pagezero->alloc->right);
1596 bt_putid(mgr->pagezero->alloc->right, page_no+(1 << leaf_xtra));
1598 // unlock allocation latch and
1599 // extend file into new page.
1601 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1602 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1604 bt_releasemutex(mgr->lock);
1606 // keep bt_promote out of this page
1607 contents->nopromote = 1;
1608 contents->page_no = page_no;
1610 if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
1611 fprintf(stderr, "Write %d error %d\n", (uint)page_no, errno);
1613 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1614 set->page = bt_mappage (mgr, set->latch);
1616 return mgr->err_thread = thread_no, mgr->err;
1618 // now pin will keep bt_promote out
1620 set->page->nopromote = 0;
1624 // find slot in page for given key at a given level
1626 int bt_findslot (BtPage page, unsigned char *key, uint len)
1628 uint diff, higher = page->cnt, low = 1, slot;
1631 // make stopper key an infinite fence value
1633 if( bt_getid (page->right) )
1638 // low is the lowest candidate.
1639 // loop ends when they meet
1641 // higher is already
1642 // tested as .ge. the passed key.
1644 while( diff = higher - low ) {
1645 slot = low + ( diff >> 1 );
1646 if( keycmp (keyptr(page, slot), key, len) < 0 )
1649 higher = slot, good++;
1652 // return zero if key is on right link page
1654 return good ? higher : 0;
1657 // find and load page at given level for given key
1658 // leave page rd or wr locked as requested
1660 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1662 uid page_no = ROOT_page, prevpage_no = 0;
1663 uint drill = 0xff, slot;
1664 BtLatchSet *prevlatch;
1665 uint mode, prevmode;
1670 // start at root of btree and drill down
1673 // determine lock mode of drill level
1674 mode = (drill == lvl) ? lock : BtLockRead;
1676 if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1677 set->page = bt_mappage (mgr, set->latch);
1681 // obtain access lock using lock chaining with Access mode
1683 if( page_no > ROOT_page )
1684 bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1686 // release & unpin parent or left sibling page
1689 bt_unlockpage(prevmode, prevlatch, thread_no, __LINE__);
1690 bt_unpinlatch (prevlatch, 0, thread_no, __LINE__);
1694 // obtain mode lock using lock coupling through AccessLock
1696 bt_lockpage(mode, set->latch, thread_no, __LINE__);
1698 // grab our fence key
1700 ptr=keyptr(set->page,set->page->cnt);
1702 if( set->page->free )
1703 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1705 if( page_no > ROOT_page )
1706 bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1708 // re-read and re-lock root after determining actual level of root
1710 if( set->page->lvl != drill) {
1711 if( set->latch->page_no != ROOT_page )
1712 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1714 drill = set->page->lvl;
1716 if( lock != BtLockRead && drill == lvl ) {
1717 bt_unlockpage(mode, set->latch, thread_no, __LINE__);
1718 bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
1723 prevpage_no = set->latch->page_no;
1724 prevlatch = set->latch;
1725 prevpage = set->page;
1728 // if requested key is beyond our fence,
1729 // slide to the right
1731 if( keycmp (ptr, key, len) < 0 )
1732 if( page_no = bt_getid(set->page->right) )
1735 // if page is part of a delete operation,
1736 // slide to the left;
1738 if( set->page->kill ) {
1739 bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
1740 page_no = bt_getid(set->page->left);
1741 bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
1745 // find key on page at this level
1746 // and descend to requested level
1748 if( slot = bt_findslot (set->page, key, len) ) {
1752 // find next non-dead slot -- the fence key if nothing else
1754 while( slotptr(set->page, slot)->dead )
1755 if( slot++ < set->page->cnt )
1758 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1760 val = valptr(set->page, slot);
1762 if( val->len == BtId )
1763 page_no = bt_getid(val->value);
1765 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0;
1771 // slide right into next page
1773 page_no = bt_getid(set->page->right);
1777 // return error on end of right chain
1779 mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1780 return 0; // return error
1783 // return page to free list
1784 // page must be delete & write locked
1785 // and have no keys pointing to it.
1787 void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1789 unsigned char *freechain;
1791 if( set->page->lvl ) {
1792 freechain = mgr->pagezero->freechain;
1793 mgr->pagezero->upperpages--;
1795 freechain = mgr->pagezero->leafchain;
1796 mgr->pagezero->leafpages--;
1799 // lock allocation page
1801 bt_mutexlock (mgr->lock);
1805 memcpy(set->page->right, freechain, BtId);
1806 bt_putid(freechain, set->latch->page_no);
1807 set->page->free = 1;
1809 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1810 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1812 // unlock released page
1813 // and unlock allocation page
1815 bt_unlockpage (BtLockDelete, set->latch, thread_no, __LINE__);
1816 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1817 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
1818 bt_releasemutex (mgr->lock);
1821 // a fence key was deleted from a page
1822 // push new fence value upwards
1824 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1826 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1827 unsigned char value[BtId];
1831 // remove the old fence value
1833 ptr = keyptr(set->page, set->page->cnt);
1834 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1835 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1837 // cache new fence value
1839 ptr = keyptr(set->page, set->page->cnt);
1840 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1842 bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
1843 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1845 // insert new (now smaller) fence key
1847 bt_putid (value, set->latch->page_no);
1848 ptr = (BtKey*)leftkey;
1850 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1851 return mgr->err_thread = thread_no, mgr->err;
1853 // now delete old fence key
1855 ptr = (BtKey*)rightkey;
1857 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1858 return mgr->err_thread = thread_no, mgr->err;
1860 bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
1861 bt_unpinlatch(set->latch, 1, thread_no, __LINE__);
1865 // root has a single child
1866 // collapse a level from the tree
1868 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1875 // find the child entry and promote as new root contents
1878 for( idx = 0; idx++ < root->page->cnt; )
1879 if( !slotptr(root->page, idx)->dead )
1882 val = valptr(root->page, idx);
1884 if( val->len == BtId )
1885 page_no = bt_getid (valptr(root->page, idx)->value);
1887 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1889 if( child->latch = bt_pinlatch (mgr, page_no, thread_no) )
1890 child->page = bt_mappage (mgr, child->latch);
1892 return mgr->err_thread = thread_no, mgr->err;
1894 bt_lockpage (BtLockDelete, child->latch, thread_no, __LINE__);
1895 bt_lockpage (BtLockWrite, child->latch, thread_no, __LINE__);
1897 memcpy (root->page, child->page, mgr->page_size);
1898 bt_freepage (mgr, child, thread_no);
1900 } while( root->page->lvl > 1 && root->page->act == 1 );
1902 bt_unlockpage (BtLockWrite, root->latch, thread_no, __LINE__);
1903 bt_unpinlatch (root->latch, 1, thread_no, __LINE__);
1907 // delete a page and manage key
1908 // call with page writelocked
1910 // returns with page unpinned
1911 // from the page pool.
1913 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl)
1915 unsigned char lowerfence[BT_keyarray];
1916 uint page_size = mgr->page_size;
1917 BtPageSet right[1], temp[1];
1918 unsigned char value[BtId];
1919 uid page_no, right2;
1923 page_size <<= mgr->leaf_xtra;
1925 // cache copy of original fence key
1926 // that is being deleted.
1928 ptr = keyptr(set->page, set->page->cnt);
1929 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1931 // obtain locks on right page
1933 page_no = bt_getid(set->page->right);
1935 if( right->latch = lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1936 right->page = bt_mappage (mgr, right->latch);
1940 bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
1942 if( right->page->kill )
1943 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1945 // pull contents of right peer into our empty page
1946 // preserving our left page number, and its right page number.
1948 bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
1949 page_no = bt_getid(set->page->left);
1950 memcpy (set->page, right->page, page_size);
1951 bt_putid (set->page->left, page_no);
1952 bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
1954 set->page->page_no = set->latch->page_no;
1956 // fix left link from far right page
1958 if( right2 = bt_getid (right->page->right) ) {
1959 if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
1960 temp->page = bt_mappage (mgr, temp->latch);
1964 bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
1965 bt_putid (temp->page->left, set->latch->page_no);
1966 bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
1967 bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
1968 } else if( !lvl ) { // our page is now rightmost leaf
1969 bt_mutexlock (mgr->lock);
1970 bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
1971 bt_releasemutex(mgr->lock);
1974 // mark right page deleted and release lock
1976 right->page->kill = 1;
1977 bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
1979 // redirect higher key directly to our new node contents
1981 ptr = keyptr(right->page, right->page->cnt);
1982 bt_putid (value, set->latch->page_no);
1984 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
1987 // delete our orignal fence key in parent
1988 // and unlock our page.
1990 ptr = (BtKey *)lowerfence;
1992 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1995 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1996 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
1998 // obtain delete and write locks to right node
2000 bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
2001 bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2002 bt_freepage (mgr, right, thread_no);
2006 // find and delete key on page by marking delete flag bit
2007 // if page becomes empty, delete it from the btree
2009 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2011 uint slot, idx, found, fence;
2017 if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2018 node = slotptr(set->page, slot);
2019 ptr = keyptr(set->page, slot);
2021 return mgr->err_thread = thread_no, mgr->err;
2023 // if librarian slot, advance to real slot
2025 if( node->type == Librarian ) {
2026 ptr = keyptr(set->page, ++slot);
2027 node = slotptr(set->page, slot);
2030 fence = slot == set->page->cnt;
2032 // delete the key, ignore request if already dead
2034 if( found = !keycmp (ptr, key, len) )
2035 if( found = node->dead == 0 ) {
2036 val = valptr(set->page,slot);
2037 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2041 // collapse empty slots beneath the fence
2042 // on interiour nodes
2045 while( idx = set->page->cnt - 1 )
2046 if( slotptr(set->page, idx)->dead ) {
2047 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2048 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2056 // did we delete a fence key in an upper level?
2058 if( lvl && set->page->act && fence )
2059 if( bt_fixfence (mgr, set, lvl, thread_no) )
2060 return mgr->err_thread = thread_no, mgr->err;
2064 // do we need to collapse root?
2066 if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2067 if( bt_collapseroot (mgr, set, thread_no) )
2068 return mgr->err_thread = thread_no, mgr->err;
2072 // delete empty page
2074 if( !set->page->act )
2075 return bt_deletepage (mgr, set, thread_no, set->page->lvl);
2077 bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2078 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2082 // check page for space available,
2083 // clean if necessary and return
2084 // 0 - page needs splitting
2085 // >0 new slot value
2087 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2089 uint page_size = mgr->page_size;
2090 BtPage page = set->page, frame;
2091 uint cnt = 0, idx = 0;
2092 uint max = page->cnt;
2097 if( !set->page->lvl )
2098 page_size <<= mgr->leaf_xtra;
2100 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2103 // skip cleanup and proceed to split
2104 // if there's not enough garbage
2107 if( page->garbage < page_size / 5 )
2110 frame = malloc (page_size);
2111 memcpy (frame, page, page_size);
2113 // skip page info and set rest of page to zero
2115 memset (page+1, 0, page_size - sizeof(*page));
2117 page->min = page_size;
2121 // clean up page first by
2122 // removing deleted keys
2124 while( cnt++ < max ) {
2128 if( cnt < max || frame->lvl )
2129 if( slotptr(frame,cnt)->dead )
2132 // copy the value across
2134 val = valptr(frame, cnt);
2135 page->min -= val->len + sizeof(BtVal);
2136 memcpy ((unsigned char *)page + page->min, val, val->len + sizeof(BtVal));
2138 // copy the key across
2140 key = keyptr(frame, cnt);
2141 page->min -= key->len + sizeof(BtKey);
2142 memcpy ((unsigned char *)page + page->min, key, key->len + sizeof(BtKey));
2144 // make a librarian slot
2146 slotptr(page, ++idx)->off = page->min;
2147 slotptr(page, idx)->type = Librarian;
2148 slotptr(page, idx)->dead = 1;
2152 slotptr(page, ++idx)->off = page->min;
2153 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2155 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2162 // see if page has enough space now, or does it need splitting?
2164 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2170 // split the root and raise the height of the btree
2172 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort thread_no)
2174 unsigned char leftkey[BT_keyarray];
2175 uint nxt = mgr->page_size;
2176 unsigned char value[BtId];
2183 frame = malloc (mgr->page_size);
2184 memcpy (frame, root->page, mgr->page_size);
2186 // save left page fence key for new root
2188 ptr = keyptr(root->page, root->page->cnt);
2189 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2191 // Obtain an empty page to use, and copy the current
2192 // root contents into it, e.g. lower keys
2194 if( bt_newpage(mgr, left, frame, thread_no) )
2195 return mgr->err_thread = thread_no, mgr->err;
2197 left_page_no = left->latch->page_no;
2198 bt_unpinlatch (left->latch, 1, thread_no, __LINE__);
2201 // left link the pages together
2203 page = bt_mappage (mgr, right);
2204 bt_putid (page->left, left_page_no);
2206 // preserve the page info at the bottom
2207 // of higher keys and set rest to zero
2209 memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2211 // insert stopper key at top of newroot page
2212 // and increase the root height
2214 nxt -= BtId + sizeof(BtVal);
2215 bt_putid (value, right->page_no);
2216 val = (BtVal *)((unsigned char *)root->page + nxt);
2217 memcpy (val->value, value, BtId);
2220 nxt -= 2 + sizeof(BtKey);
2221 slotptr(root->page, 2)->off = nxt;
2222 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2227 // insert lower keys page fence key on newroot page as first key
2229 nxt -= BtId + sizeof(BtVal);
2230 bt_putid (value, left_page_no);
2231 val = (BtVal *)((unsigned char *)root->page + nxt);
2232 memcpy (val->value, value, BtId);
2235 ptr = (BtKey *)leftkey;
2236 nxt -= ptr->len + sizeof(BtKey);
2237 slotptr(root->page, 1)->off = nxt;
2238 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2240 bt_putid(root->page->right, 0);
2241 root->page->min = nxt; // reset lowest used offset and key count
2242 root->page->cnt = 2;
2243 root->page->act = 2;
2246 mgr->pagezero->alloc->lvl = root->page->lvl;
2248 // release and unpin root pages
2250 bt_unlockpage(BtLockWrite, root->latch, thread_no, __LINE__);
2251 bt_unpinlatch (root->latch, 1, thread_no, __LINE__);
2253 bt_unpinlatch (right, 1, thread_no, __LINE__);
2257 // split already locked full node
2259 // return pool entry for new right
2260 // page, pinned & unlocked
2262 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint linkleft)
2264 uint page_size = mgr->page_size;
2265 BtPageSet right[1], temp[1];
2266 uint cnt = 0, idx = 0, max;
2267 uint lvl = set->page->lvl;
2275 if( !set->page->lvl )
2276 page_size <<= mgr->leaf_xtra;
2278 // split higher half of keys to frame
2280 frame = malloc (page_size);
2281 memset (frame, 0, page_size);
2282 frame->min = page_size;
2283 max = set->page->cnt;
2287 while( cnt++ < max ) {
2288 if( cnt < max || set->page->lvl )
2289 if( slotptr(set->page, cnt)->dead )
2292 val = valptr(set->page, cnt);
2293 frame->min -= val->len + sizeof(BtVal);
2294 memcpy ((unsigned char *)frame + frame->min, val, val->len + sizeof(BtVal));
2296 key = keyptr(set->page, cnt);
2297 frame->min -= key->len + sizeof(BtKey);
2298 memcpy ((unsigned char *)frame + frame->min, key, key->len + sizeof(BtKey));
2300 // add librarian slot
2302 slotptr(frame, ++idx)->off = frame->min;
2303 slotptr(frame, idx)->type = Librarian;
2304 slotptr(frame, idx)->dead = 1;
2308 slotptr(frame, ++idx)->off = frame->min;
2309 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2311 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2320 if( set->latch->page_no > ROOT_page ) {
2321 right2 = bt_getid (set->page->right);
2322 bt_putid (frame->right, right2);
2325 bt_putid (frame->left, set->latch->page_no);
2328 // get new free page and write higher keys to it.
2330 if( bt_newpage(mgr, right, frame, thread_no) )
2333 // link far right's left pointer to new page
2335 if( linkleft && set->latch->page_no > ROOT_page )
2337 if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2338 temp->page = bt_mappage (mgr, temp->latch);
2342 bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2343 bt_putid (temp->page->left, right->latch->page_no);
2344 bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2345 bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
2346 } else if( !lvl ) { // page is rightmost leaf
2347 bt_mutexlock (mgr->lock);
2348 bt_putid (mgr->pagezero->alloc->left, right->latch->page_no);
2349 bt_releasemutex(mgr->lock);
2352 // process lower keys
2354 memcpy (frame, set->page, page_size);
2355 memset (set->page+1, 0, page_size - sizeof(*set->page));
2357 set->page->min = page_size;
2358 set->page->garbage = 0;
2364 // assemble page of smaller keys
2366 while( cnt++ < max ) {
2367 if( slotptr(frame, cnt)->dead )
2369 val = valptr(frame, cnt);
2370 set->page->min -= val->len + sizeof(BtVal);
2371 memcpy ((unsigned char *)set->page + set->page->min, val, val->len + sizeof(BtVal));
2373 key = keyptr(frame, cnt);
2374 set->page->min -= key->len + sizeof(BtKey);
2375 memcpy ((unsigned char *)set->page + set->page->min, key, key->len + sizeof(BtKey));
2377 // add librarian slot
2379 slotptr(set->page, ++idx)->off = set->page->min;
2380 slotptr(set->page, idx)->type = Librarian;
2381 slotptr(set->page, idx)->dead = 1;
2385 slotptr(set->page, ++idx)->off = set->page->min;
2386 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2390 bt_putid(set->page->right, right->latch->page_no);
2391 set->page->cnt = idx;
2394 entry = right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
2398 // fix keys for newly split page
2399 // call with both pages pinned & locked
2400 // return unlocked and unpinned
2402 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2404 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2405 unsigned char value[BtId];
2406 uint lvl = set->page->lvl;
2412 // if current page is the root page, split it
2414 if( set->latch->page_no == ROOT_page )
2415 return bt_splitroot (mgr, set, right, thread_no);
2417 ptr = keyptr(set->page, set->page->cnt);
2418 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2420 page = bt_mappage (mgr, right);
2422 ptr = keyptr(page, page->cnt);
2423 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2425 // splice in far right page's left page_no
2427 if( right2 = bt_getid (page->right) ) {
2428 if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2429 temp->page = bt_mappage (mgr, temp->latch);
2433 bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2434 bt_putid (temp->page->left, right->page_no);
2435 bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2436 bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
2437 } else if( !lvl ) { // right page is far right page
2438 bt_mutexlock (mgr->lock);
2439 bt_putid (mgr->pagezero->alloc->left, right->page_no);
2440 bt_releasemutex(mgr->lock);
2442 // insert new fences in their parent pages
2444 bt_lockpage (BtLockParent, right, thread_no, __LINE__);
2446 bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
2447 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2449 // insert new fence for reformulated left block of smaller keys
2451 bt_putid (value, set->latch->page_no);
2452 ptr = (BtKey *)leftkey;
2454 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2455 return mgr->err_thread = thread_no, mgr->err;
2457 // switch fence for right block of larger keys to new right page
2459 bt_putid (value, right->page_no);
2460 ptr = (BtKey *)rightkey;
2462 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2463 return mgr->err_thread = thread_no, mgr->err;
2465 bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
2466 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2468 bt_unlockpage (BtLockParent, right, thread_no, __LINE__);
2469 bt_unpinlatch (right, 1, thread_no, __LINE__);
2473 // install new key and value onto page
2474 // page must already be checked for
2477 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2479 uint idx, librarian;
2485 // if previous slot is a librarian slot, use it
2488 if( slotptr(set->page, slot-1)->type == Librarian )
2491 // copy value onto page
2493 set->page->min -= vallen + sizeof(BtVal);
2494 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2495 memcpy (val->value, value, vallen);
2498 // copy key onto page
2500 set->page->min -= keylen + sizeof(BtKey);
2501 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2502 memcpy (ptr->key, key, keylen);
2505 // find first empty slot at or above our insert slot
2507 for( idx = slot; idx < set->page->cnt; idx++ )
2508 if( slotptr(set->page, idx)->dead )
2511 // now insert key into array before slot.
2513 // if we're going all the way to the top,
2514 // add as many librarian slots as
2517 if( idx == set->page->cnt ) {
2518 int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2520 librarian = ++idx - slot;
2521 avail /= sizeof(BtSlot);
2526 if( librarian > avail )
2530 rate = (idx - slot) / librarian;
2531 set->page->cnt += librarian;
2536 librarian = 0, rate = 0;
2538 // transfer slots and add librarian slots
2540 while( idx > slot ) {
2541 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2543 // add librarian slot per rate
2546 if( (idx - slot)/2 <= librarian * rate ) {
2547 node = slotptr(set->page, --idx);
2548 node->off = node[1].off;
2549 node->type = Librarian;
2561 node = slotptr(set->page, slot);
2562 node->off = set->page->min;
2568 // Insert new key into the btree at given level.
2569 // either add a new key or update/add an existing one
2571 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2573 uint slot, idx, len, entry;
2579 while( 1 ) { // find the page and slot for the current key
2580 if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2581 node = slotptr(set->page, slot);
2582 ptr = keyptr(set->page, slot);
2586 // if librarian slot == found slot, advance to real slot
2588 if( node->type == Librarian ) {
2589 node = slotptr(set->page, ++slot);
2590 ptr = keyptr(set->page, slot);
2593 // if inserting a duplicate key or unique
2594 // key that doesn't exist on the page,
2595 // check for adequate space on the page
2596 // and insert the new key before slot.
2601 if( !keycmp (ptr, key, keylen) )
2604 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2605 if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
2610 if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2611 if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2614 return mgr->err_thread = thread_no, mgr->err;
2617 if( keycmp (ptr, key, keylen) )
2623 // if key already exists, update value and return
2625 val = valptr(set->page, slot);
2627 if( val->len >= vallen ) {
2633 set->page->garbage += val->len - vallen;
2635 memcpy (val->value, value, vallen);
2639 // new update value doesn't fit in existing value area
2640 // make sure page has room
2643 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2650 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2653 if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2654 if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2657 return mgr->err_thread = thread_no, mgr->err;
2660 // copy key and value onto page and update slot
2662 set->page->min -= vallen + sizeof(BtVal);
2663 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2664 memcpy (val->value, value, vallen);
2667 set->page->min -= keylen + sizeof(BtKey);
2668 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2669 memcpy (ptr->key, key, keylen);
2672 slotptr(set->page,slot)->off = set->page->min;
2675 bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2676 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2680 // determine actual page where key is located
2681 // return slot number
2683 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2685 BtKey *key = keyptr(source,src), *ptr;
2686 uint slot = locks[src].slot;
2689 if( locks[src].reuse )
2690 entry = locks[src-1].entry;
2692 entry = locks[src].entry;
2695 set->latch = mgr->leafsets + entry;
2696 set->page = bt_mappage (mgr, set->latch);
2700 // find where our key is located
2701 // on current page or pages split on
2702 // same page txn operations.
2705 set->latch = mgr->leafsets + entry;
2706 set->page = bt_mappage (mgr, set->latch);
2708 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2709 if( slotptr(set->page, slot)->type == Librarian )
2711 if( locks[src].reuse )
2712 locks[src].entry = entry;
2715 } while( entry = set->latch->split );
2717 mgr->line = __LINE__, mgr->err = BTERR_atomic;
2721 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2723 BtKey *key = keyptr(source, src);
2724 BtVal *val = valptr(source, src);
2729 while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2730 if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2731 if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type) )
2732 return mgr->err_thread = thread_no, mgr->err;
2734 set->page->lsn = lsn;
2740 if( entry = bt_splitpage (mgr, set, thread_no, 0) )
2741 latch = mgr->leafsets + entry;
2745 // splice right page into split chain
2748 bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2749 latch->split = set->latch->split;
2750 set->latch->split = entry;
2752 // clear slot number for atomic page
2754 locks[src].slot = 0;
2757 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
2760 // perform delete from smaller btree
2761 // insert a delete slot if not found there
2763 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2765 BtKey *key = keyptr(source, src);
2772 if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2773 node = slotptr(set->page, slot);
2774 ptr = keyptr(set->page, slot);
2775 val = valptr(set->page, slot);
2777 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
2779 // if slot is not found, insert a delete slot
2781 if( keycmp (ptr, key->key, key->len) )
2782 if( bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete) )
2785 // if node is already dead,
2786 // ignore the request.
2791 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2792 set->page->lsn = lsn;
2796 __sync_fetch_and_add(&mgr->found, 1);
2800 // release master's splits from right to left
2802 void bt_atomicrelease (BtMgr *mgr, uint entry, ushort thread_no)
2804 BtLatchSet *latch = mgr->leafsets + entry;
2807 bt_atomicrelease (mgr, latch->split, thread_no);
2810 bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
2811 bt_unpinlatch(latch, 1, thread_no, __LINE__);
2814 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2816 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2817 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2819 return keycmp (key1, key2->key, key2->len);
2821 // atomic modification of a batch of keys.
2823 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2825 uint src, idx, slot, samepage, entry, que = 0;
2826 BtKey *key, *ptr, *key2;
2832 // stable sort the list of keys into order to
2833 // prevent deadlocks between threads.
2835 for( src = 1; src++ < source->cnt; ) {
2836 *temp = *slotptr(source,src);
2837 key = keyptr (source,src);
2839 for( idx = src; --idx; ) {
2840 key2 = keyptr (source,idx);
2841 if( keycmp (key, key2->key, key2->len) < 0 ) {
2842 *slotptr(source,idx+1) = *slotptr(source,idx);
2843 *slotptr(source,idx) = *temp;
2849 qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2850 // add entries to redo log
2852 if( bt->mgr->pagezero->redopages )
2853 lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2857 // perform the individual actions in the transaction
2859 if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2860 return bt->mgr->err;
2862 // if number of active pages
2863 // is greater than the buffer pool
2864 // promote page into larger btree
2867 while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no )
2868 if( bt_promote (bt) )
2869 return bt->mgr->err;
2876 // execute the source list of inserts/deletes
2878 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2880 uint slot, src, idx, samepage, entry;
2881 BtPageSet set[1], prev[1];
2882 unsigned char value[BtId];
2890 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2892 // Load the leaf page for each key
2893 // group same page references with reuse bit
2895 for( src = 0; src++ < source->cnt; ) {
2896 key = keyptr(source, src);
2898 // first determine if this modification falls
2899 // on the same page as the previous modification
2900 // note that the far right leaf page is a special case
2902 if( samepage = src > 1 )
2903 samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
2906 if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
2907 ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
2914 if( slotptr(set->page, slot)->type == Librarian )
2917 entry = set->latch - mgr->leafsets;
2918 locks[src].reuse = samepage;
2919 locks[src].entry = entry;
2920 locks[src].slot = slot;
2922 // capture current lsn for master page
2924 locks[src].reqlsn = set->page->lsn;
2927 // insert or delete each key
2928 // process any splits or merges
2929 // run through txn list backwards
2931 samepage = source->cnt + 1;
2933 for( src = source->cnt; src; src-- ) {
2934 if( locks[src].reuse )
2937 // perform the txn operations
2938 // from smaller to larger on
2941 for( idx = src; idx < samepage; idx++ )
2942 switch( slotptr(source,idx)->type ) {
2944 if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
2950 if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
2955 bt_atomicpage (mgr, source, locks, idx, set);
2959 // after the same page operations have finished,
2960 // process master page for splits or deletion.
2962 latch = prev->latch = mgr->leafsets + locks[src].entry;
2963 prev->page = bt_mappage (mgr, prev->latch);
2966 // pick-up all splits from master page
2967 // each one is already pinned & WriteLocked.
2969 while( entry = prev->latch->split ) {
2970 set->latch = mgr->leafsets + entry;
2971 set->page = bt_mappage (mgr, set->latch);
2973 // delete empty master page by undoing its split
2974 // (this is potentially another empty page)
2975 // note that there are no pointers to it yet
2977 if( !prev->page->act ) {
2978 memcpy (set->page->left, prev->page->left, BtId);
2979 memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
2980 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
2981 prev->latch->split = set->latch->split;
2982 bt_freepage (mgr, set, thread_no);
2986 // remove empty split page from the split chain
2987 // and return it to the free list. No other
2988 // thread has its page number yet.
2990 if( !set->page->act ) {
2991 memcpy (prev->page->right, set->page->right, BtId);
2992 prev->latch->split = set->latch->split;
2994 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
2995 bt_freepage (mgr, set, thread_no);
2999 // update prev's fence key
3001 ptr = keyptr(prev->page,prev->page->cnt);
3002 bt_putid (value, prev->latch->page_no);
3004 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3007 // splice in the left link into the split page
3009 bt_putid (set->page->left, prev->latch->page_no);
3013 // update left pointer in next right page from last split page
3014 // (if all splits were reversed or none occurred, latch->split == 0)
3016 if( latch->split ) {
3017 // fix left pointer in master's original (now split)
3018 // far right sibling or set rightmost page in page zero
3020 if( right_page_no = bt_getid (prev->page->right) ) {
3021 if( set->latch = bt_pinleaf (mgr, right_page_no, thread_no) )
3022 set->page = bt_mappage (mgr, set->latch);
3026 bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3027 bt_putid (set->page->left, prev->latch->page_no);
3028 bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
3029 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
3030 } else { // prev is rightmost page
3031 bt_mutexlock (mgr->lock);
3032 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3033 bt_releasemutex(mgr->lock);
3036 // switch the original fence key from the
3037 // master page to the last split page.
3039 ptr = keyptr(prev->page,prev->page->cnt);
3040 bt_putid (value, prev->latch->page_no);
3042 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) )
3045 // unlock and unpin the split pages
3047 bt_atomicrelease (mgr, latch->split, thread_no);
3049 // unlock and unpin the master page
3052 bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
3053 bt_unpinlatch(latch, 1, thread_no, __LINE__);
3057 // since there are no splits remaining, we're
3058 // finished if master page occupied
3060 if( prev->page->act ) {
3061 bt_unlockpage(BtLockWrite, prev->latch, thread_no, __LINE__);
3062 bt_unpinlatch(prev->latch, 1, thread_no, __LINE__);
3066 // any and all splits were reversed, and the
3067 // master page located in prev is empty, delete it
3069 if( bt_deletepage (mgr, prev, thread_no, 0) )
3077 // pick & promote a page into the larger btree
3079 BTERR bt_promote (BtDb *bt)
3081 uint entry, slot, idx;
3089 entry = __sync_fetch_and_add(&bt->mgr->leafpromote, 1);
3091 entry = _InterlockedIncrement (&bt->mgr->leafpromote) - 1;
3093 entry %= bt->mgr->leaftotal;
3098 set->latch = bt->mgr->leafsets + entry;
3100 // skip this entry if it has never been used
3102 if( !set->latch->page_no )
3105 if( !bt_mutextry(set->latch->modify) )
3108 // skip this entry if it is pinned
3110 if( set->latch->pin & ~CLOCK_bit ) {
3111 bt_releasemutex(set->latch->modify);
3115 set->page = bt_mappage (bt->mgr, set->latch);
3117 // entry has no right sibling
3119 if( !bt_getid (set->page->right) ) {
3120 bt_releasemutex(set->latch->modify);
3124 // entry interiour node or being killed or constructed
3126 if( set->page->lvl || set->page->nopromote || set->page->kill ) {
3127 bt_releasemutex(set->latch->modify);
3131 // pin the page for our access
3132 // and leave it locked for the
3133 // duration of the promotion.
3136 bt_lockpage (BtLockWrite, set->latch, bt->thread_no, __LINE__);
3137 bt_releasemutex(set->latch->modify);
3139 // transfer slots in our selected page to the main btree
3140 if( !(entry % 100) )
3141 fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_no, set->page->act);
3143 if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
3144 fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
3145 return bt->main->err;
3148 // now delete the page
3150 if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
3151 fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread);
3153 return bt->mgr->err;
3157 // find unique key == given key, or first duplicate key in
3158 // leaf level and return number of value bytes
3159 // or (-1) if not found.
3161 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
3170 for( type = 0; type < 2; type++ )
3171 if( slot = bt_loadpage (type ? bt->main : bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) ) {
3172 node = slotptr(set->page, slot);
3174 // skip librarian slot place holder
3176 if( node->type == Librarian )
3177 node = slotptr(set->page, ++slot);
3179 ptr = keyptr(set->page, slot);
3181 // not there if we reach the stopper key
3182 // or the key doesn't match what's on the page.
3184 if( slot == set->page->cnt )
3185 if( !bt_getid (set->page->right) ) {
3186 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3187 bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3191 if( keycmp (ptr, key, keylen) ) {
3192 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3193 bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3197 // key matches, return >= 0 value bytes copied
3198 // or -1 if not there.
3200 if( node->type == Delete || node->dead ) {
3205 val = valptr (set->page,slot);
3207 if( valmax > val->len )
3210 memcpy (value, val->value, valmax);
3219 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3220 bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3225 // set cursor to highest slot on right-most page
3227 BTERR bt_lastkey (BtDb *bt)
3229 uid cache_page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3230 uid main_page_no = bt_getid (bt->main->pagezero->alloc->left);
3232 if( bt->cacheset->latch = bt_pinleaf (bt->mgr, cache_page_no, bt->thread_no) )
3233 bt->cacheset->page = bt_mappage (bt->mgr, bt->cacheset->latch);
3235 return bt->mgr->err;
3237 bt_lockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3238 bt->cacheslot = bt->cacheset->page->cnt;
3240 if( bt->mainset->latch = bt_pinleaf (bt->main, main_page_no, bt->thread_no) )
3241 bt->mainset->page = bt_mappage (bt->main, bt->mainset->latch);
3243 return bt->main->err;
3245 bt_lockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3246 bt->mainslot = bt->mainset->page->cnt;
3251 // return previous slot on cursor page
3253 uint bt_prevslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
3255 uid next, us = set->latch->page_no;
3259 if( slotptr(set->page, slot)->dead )
3264 next = bt_getid(set->page->left);
3270 bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3271 bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
3273 if( set->latch = bt_pinleaf (mgr, next, thread_no) )
3274 set->page = bt_mappage (mgr, set->latch);
3278 bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3279 next = bt_getid (set->page->right);
3281 } while( next != us );
3283 slot = set->page->cnt + 1;
3287 // advance to previous key
3289 BTERR bt_prevkey (BtDb *bt)
3293 // first advance last key(s) one previous slot
3296 switch( bt->phase ) {
3298 bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3301 bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3304 bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3305 bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3311 if( bt->cacheslot ) {
3312 bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3313 bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3314 bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3317 if( bt->mainslot ) {
3318 bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3319 bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3320 bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3323 if( bt->mainslot && bt->cacheslot )
3324 cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3325 else if( bt->cacheslot )
3327 else if( bt->mainslot )
3332 // cache key is larger
3336 if( bt->cachenode->type == Delete )
3338 return bt->cacheslot;
3341 // main key is larger
3345 return bt->mainslot;
3352 if( bt->cachenode->type == Delete )
3355 return bt->cacheslot;
3359 // advance to next slot in cache or main btree
3360 // return 0 for EOF/error
3362 uint bt_nextslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
3368 while( slot++ < set->page->cnt )
3369 if( slotptr(set->page, slot)->dead )
3371 else if( slot < set->page->cnt || bt_getid (set->page->right) )
3376 bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3377 bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
3379 if( page_no = bt_getid(set->page->right) )
3380 if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
3381 set->page = bt_mappage (mgr, set->latch);
3387 // obtain access lock using lock chaining with Access mode
3389 bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
3390 bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3391 bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
3396 // advance to next key
3398 BTERR bt_nextkey (BtDb *bt)
3402 // first advance last key(s) one next slot
3405 switch( bt->phase ) {
3407 bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3410 bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3413 bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3414 bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3420 if( bt->cacheslot ) {
3421 bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3422 bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3423 bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3426 if( bt->mainslot ) {
3427 bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3428 bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3429 bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3432 if( bt->mainslot && bt->cacheslot )
3433 cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3434 else if( bt->mainslot )
3436 else if( bt->cacheslot )
3441 // main key is larger
3445 if( bt->cachenode->type == Delete )
3447 return bt->cacheslot;
3450 // cache key is larger
3454 return bt->mainslot;
3461 if( bt->cachenode->type == Delete )
3464 return bt->cacheslot;
3468 // start sweep of keys
3470 BTERR bt_startkey (BtDb *bt, unsigned char *key, uint len)
3477 if( slot = bt_loadpage (bt->mgr, bt->cacheset, key, len, 0, BtLockRead, bt->thread_no) )
3478 bt->cacheslot = slot - 1;
3480 return bt->mgr->err;
3484 if( slot = bt_loadpage (bt->main, bt->mainset, key, len, 0, BtLockRead, bt->thread_no) )
3485 bt->mainslot = slot - 1;
3487 return bt->mgr->err;
3496 double getCpuTime(int type)
3499 FILETIME xittime[1];
3500 FILETIME systime[1];
3501 FILETIME usrtime[1];
3502 SYSTEMTIME timeconv[1];
3505 memset (timeconv, 0, sizeof(SYSTEMTIME));
3509 GetSystemTimeAsFileTime (xittime);
3510 FileTimeToSystemTime (xittime, timeconv);
3511 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3514 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3515 FileTimeToSystemTime (usrtime, timeconv);
3518 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3519 FileTimeToSystemTime (systime, timeconv);
3523 ans += (double)timeconv->wHour * 3600;
3524 ans += (double)timeconv->wMinute * 60;
3525 ans += (double)timeconv->wSecond;
3526 ans += (double)timeconv->wMilliseconds / 1000;
3531 #include <sys/resource.h>
3533 double getCpuTime(int type)
3535 struct rusage used[1];
3536 struct timeval tv[1];
3540 gettimeofday(tv, NULL);
3541 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3544 getrusage(RUSAGE_SELF, used);
3545 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3548 getrusage(RUSAGE_SELF, used);
3549 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3556 void bt_poolaudit (BtMgr *mgr, char *type)
3558 BtLatchSet *latch, test[1];
3561 memset (test, 0, sizeof(test));
3563 if( memcmp (test, mgr->latchsets, sizeof(test)) )
3564 fprintf(stderr, "%s latchset zero overwritten\n", type);
3566 if( memcmp (test, mgr->leafsets, sizeof(test)) )
3567 fprintf(stderr, "%s leafset zero overwritten\n", type);
3569 for( entry = 0; ++entry < mgr->latchtotal; ) {
3570 latch = mgr->latchsets + entry;
3573 fprintf(stderr, "%s latchset %d is a leaf on page %d\n", type, entry, latch->page_no);
3575 if( *latch->modify->value )
3576 fprintf(stderr, "%s latchset %d modifylocked for page %d\n", type, entry, latch->page_no);
3578 if( latch->pin & ~CLOCK_bit )
3579 fprintf(stderr, "%s latchset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3582 for( entry = 0; ++entry < mgr->leaftotal; ) {
3583 latch = mgr->leafsets + entry;
3586 fprintf(stderr, "%s leafset %d is not a leaf on page %d\n", type, entry, latch->page_no);
3588 if( *latch->modify->value )
3589 fprintf(stderr, "%s leafset %d modifylocked for page %d\n", type, entry, latch->page_no);
3591 if( latch->pin & ~CLOCK_bit )
3592 fprintf(stderr, "%s leafset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3605 // standalone program to index file of keys
3606 // then list them onto std-out
3609 void *index_file (void *arg)
3611 uint __stdcall index_file (void *arg)
3614 int line = 0, found = 0, cnt = 0, cachecnt, idx;
3615 uid next, page_no = LEAF_page; // start on first page of leaves
3616 int ch, len = 0, slot, type = 0;
3617 unsigned char key[BT_maxkey];
3618 unsigned char buff[65536];
3619 uint nxt = sizeof(buff);
3620 ThreadArg *args = arg;
3629 bt = bt_open (args->mgr, args->main);
3630 page = (BtPage)buff;
3632 if( args->idx < strlen (args->type) )
3633 ch = args->type[args->idx];
3635 ch = args->type[strlen(args->type) - 1];
3647 if( type == Delete )
3648 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3650 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3652 if( type == Delete )
3653 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3655 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3657 if( in = fopen (args->infile, "rb") )
3658 while( ch = getc(in), ch != EOF )
3664 if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3665 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3671 memcpy (buff + nxt, key + 10, len - 10);
3673 buff[nxt] = len - 10;
3675 memcpy (buff + nxt, key, 10);
3678 slotptr(page,++cnt)->off = nxt;
3679 slotptr(page,cnt)->type = type;
3682 if( cnt < args->num )
3689 if( bt_atomictxn (bt, page) )
3690 fprintf(stderr, "Error %d Line: %d by %d source: %d\n", bt->mgr->err, bt->mgr->line, bt->mgr->err_thread, line), exit(0);
3695 else if( len < BT_maxkey )
3697 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3701 fprintf(stderr, "started indexing for %s\n", args->infile);
3702 if( in = fopen (args->infile, "r") )
3703 while( ch = getc(in), ch != EOF )
3708 if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3709 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3712 else if( len < BT_maxkey )
3714 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3718 fprintf(stderr, "started finding keys for %s\n", args->infile);
3719 if( in = fopen (args->infile, "rb") )
3720 while( ch = getc(in), ch != EOF )
3724 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3726 else if( bt->mgr->err )
3727 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3730 else if( len < BT_maxkey )
3732 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3736 fprintf(stderr, "started forward scan\n");
3737 if( bt_startkey (bt, NULL, 0) )
3738 fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3740 while( bt_nextkey (bt) ) {
3741 if( bt->phase == 1 ) {
3742 len = bt->mainkey->len;
3744 if( bt->mainnode->type == Duplicate )
3747 fwrite (bt->mainkey->key, len, 1, stdout);
3748 fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3750 len = bt->cachekey->len;
3752 if( bt->cachenode->type == Duplicate )
3755 fwrite (bt->cachekey->key, len, 1, stdout);
3756 fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3759 fputc ('\n', stdout);
3763 bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3764 bt_unpinlatch (bt->cacheset->latch, 0, bt->thread_no, __LINE__);
3766 bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3767 bt_unpinlatch (bt->mainset->latch, 0, bt->thread_no, __LINE__);
3769 fprintf(stderr, " Total keys read %d\n", cnt);
3773 fprintf(stderr, "started reverse scan\n");
3774 if( bt_lastkey (bt) )
3775 fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3777 while( bt_prevkey (bt) ) {
3778 if( bt->phase == 1 ) {
3779 len = bt->mainkey->len;
3781 if( bt->mainnode->type == Duplicate )
3784 fwrite (bt->mainkey->key, len, 1, stdout);
3785 fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3787 len = bt->cachekey->len;
3789 if( bt->cachenode->type == Duplicate )
3792 fwrite (bt->cachekey->key, len, 1, stdout);
3793 fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3796 fputc ('\n', stdout);
3800 bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3801 bt_unpinlatch (bt->cacheset->latch, 0, bt->thread_no, __LINE__);
3803 bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3804 bt_unpinlatch (bt->mainset->latch, 0, bt->thread_no, __LINE__);
3806 fprintf(stderr, " Total keys read %d\n", cnt);
3810 fprintf(stderr, "started counting LSM cache btree\n");
3811 next = bt->mgr->redopage + bt->mgr->pagezero->redopages;
3812 page_no = LEAF_page;
3815 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3817 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3818 pread(bt->mgr->idx, page, sizeof(*page), page_no << bt->mgr->page_bits);
3819 if( !page->lvl && !page->free )
3824 page_no += page->lvl ? 1 : (1 << bt->mgr->leaf_xtra);
3828 cachecnt = --cnt; // remove stopper key
3831 fprintf(stderr, "started counting LSM main btree\n");
3832 next = bt->main->redopage + bt->main->pagezero->redopages;
3833 page_no = LEAF_page;
3836 posix_fadvise( bt->main->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3838 while( page_no < bt_getid(bt->main->pagezero->alloc->right) ) {
3839 pread(bt->main->idx, page, sizeof(*page), page_no << bt->main->page_bits);
3845 page_no += page->lvl ? 1 : (1 << bt->main->leaf_xtra);
3849 cnt--; // remove stopper key
3851 fprintf(stderr, " cache keys counted %d\n", cachecnt);
3852 fprintf(stderr, " main keys counted %d\n", cnt);
3853 fprintf(stderr, " Total keys counted %d\n", cnt + cachecnt);
3865 typedef struct timeval timer;
3867 int main (int argc, char **argv)
3869 int idx, cnt, len, slot, err;
3877 uint mainleafpool = 0;
3878 uint mainleafxtra = 0;
3894 fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
3895 fprintf (stderr, " where idx_file is the name of the cache btree file\n");
3896 fprintf (stderr, " where main_file is the name of the main btree file\n");
3897 fprintf (stderr, " cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with a one character command for each input src_file. Commands can also be given with no input file\n");
3898 fprintf (stderr, " pagebits is the page size in bits for the cache btree\n");
3899 fprintf (stderr, " leafbits is the number of xtra bits for a leaf page\n");
3900 fprintf (stderr, " poolsize is the number of pages in buffer pool for the cache btree\n");
3901 fprintf (stderr, " leafpool is the number of leaf pages in leaf pool for the cache btree\n");
3902 fprintf (stderr, " txnsize = n to block transactions into n units, or zero for no transactions\n");
3903 fprintf (stderr, " redopages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3904 fprintf (stderr, " mainbits is the page size of the main btree in bits\n");
3905 fprintf (stderr, " mainpool is the number of main pages in the main buffer pool\n");
3906 fprintf (stderr, " mainleaf is the number of main pages in the main leaf pool\n");
3907 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3911 start = getCpuTime(0);
3914 bits = atoi(argv[4]);
3917 leafxtra = atoi(argv[5]);
3920 poolsize = atoi(argv[6]);
3923 fprintf (stderr, "no page pool\n"), exit(1);
3926 leafpool = atoi(argv[7]);
3929 num = atoi(argv[8]);
3932 redopages = atoi(argv[9]);
3934 if( redopages > 65535 )
3935 fprintf (stderr, "Recovery buffer too large\n"), exit(1);
3938 mainbits = atoi(argv[10]);
3941 mainleafxtra = atoi(argv[11]);
3944 mainpool = atoi(argv[12]);
3947 mainleafpool = atoi(argv[13]);
3951 threads = malloc (cnt * sizeof(pthread_t));
3953 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3955 args = malloc ((cnt + 1) * sizeof(ThreadArg));
3957 mgr = bt_mgr (argv[1], bits, leafxtra, poolsize, leafpool, redopages);
3960 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3966 main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool, mainleafpool, 0);
3969 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3979 for( idx = 0; idx < cnt; idx++ ) {
3980 args[idx].infile = argv[idx + 14];
3981 args[idx].type = argv[3];
3982 args[idx].main = main;
3983 args[idx].mgr = mgr;
3984 args[idx].num = num;
3985 args[idx].idx = idx;
3987 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3988 fprintf(stderr, "Error creating thread %d\n", err);
3990 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3994 args[0].infile = argv[idx + 10];
3995 args[0].type = argv[3];
3996 args[0].main = main;
4003 // wait for termination
4006 for( idx = 0; idx < cnt; idx++ )
4007 pthread_join (threads[idx], NULL);
4009 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
4011 for( idx = 0; idx < cnt; idx++ )
4012 CloseHandle(threads[idx]);
4014 bt_poolaudit(mgr, "cache");
4017 bt_poolaudit(main, "main");
4019 fprintf(stderr, "cache %lld leaves %lld upper %d reads %d writes %d found\n", mgr->pagezero->leafpages, mgr->pagezero->upperpages, mgr->reads, mgr->writes, mgr->found);
4021 fprintf(stderr, "main %lld leaves %lld upper %d reads %d writes %d found\n", main->pagezero->leafpages, main->pagezero->upperpages, main->reads, main->writes, main->found);
4028 elapsed = getCpuTime(0) - start;
4029 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4030 elapsed = getCpuTime(1);
4031 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4032 elapsed = getCpuTime(2);
4033 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4036 BtKey *bt_key (BtPage page, uint slot)
4038 return keyptr(page,slot);
4041 BtSlot *bt_slot (BtPage page, uint slot)
4043 return slotptr(page,slot);