1 // btree version threadskv10g futex version
2 // with reworked bt_deletekey code,
3 // phase-fair re-entrant reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // redo log for failure recovery
10 // LSM B-trees for write optimization
11 // larger sized leaf pages than non-leaf
12 // and LSM B-tree find & count operations
16 // author: karl malbrain, malbrain@cal.berkeley.edu
19 This work, including the source code, documentation
20 and related data, is placed into the public domain.
22 The orginal author is Karl Malbrain.
24 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
25 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
26 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
27 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
28 RESULTING FROM THE USE, MODIFICATION, OR
29 REDISTRIBUTION OF THIS SOFTWARE.
32 // Please see the project home page for documentation
33 // code.google.com/p/high-concurrency-btree
35 #define _FILE_OFFSET_BITS 64
36 #define _LARGEFILE64_SOURCE
40 #include <xmmintrin.h>
41 #include <linux/futex.h>
56 #define WIN32_LEAN_AND_MEAN
70 typedef unsigned long long uid;
71 typedef unsigned long long logseqno;
74 typedef unsigned long long off64_t;
75 typedef unsigned short ushort;
76 typedef unsigned int uint;
79 #define BT_ro 0x6f72 // ro
80 #define BT_rw 0x7772 // rw
82 #define BT_maxbits 26 // maximum page size in bits
83 #define BT_minbits 9 // minimum page size in bits
84 #define BT_minpage (1 << BT_minbits) // minimum page size
85 #define BT_maxpage (1 << BT_maxbits) // maximum page size
87 // BTree page number constants
88 #define ALLOC_page 0 // allocation page
89 #define ROOT_page 1 // root of the btree
90 #define LEAF_page 2 // first page of leaves
92 // Number of levels to create in a new BTree
97 There are six lock types for each node in four independent sets:
98 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
99 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
100 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
101 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
102 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
103 6. (set 4) LinkModification: Exclusive. Update of a node's left link is underway. Incompatible with another LinkModification.
118 volatile unsigned char xcl[1];
119 volatile unsigned char filler;
120 volatile ushort waiters[1];
126 // definition for reader/writer reentrant lock implementation
132 ushort dup; // re-entrant locks
133 ushort tid; // owner thread-no
134 ushort line; // owner line #
137 // hash table entries
141 uint entry; // Latch table entry at head of chain
144 // latch manager table structure
147 uid page_no; // latch set page number
148 RWLock readwr[1]; // read/write page lock
149 RWLock access[1]; // Access Intent/Page delete
150 RWLock parent[1]; // Posting of fence key in parent
151 RWLock link[1]; // left link update in progress
152 MutexLatch modify[1]; // modify entry lite latch
153 uint split; // right split page atomic insert
154 uint next; // next entry in hash table chain
155 uint prev; // prev entry in hash table chain
156 volatile ushort pin; // number of accessing threads
157 unsigned char dirty; // page in cache is dirty
158 unsigned char leaf; // page in cache is a leaf
161 // Define the length of the page record numbers
165 // Page key slot definition.
167 // Keys are marked dead, but remain on the page until
168 // it cleanup is called. The fence key (highest key) for
169 // a leaf page is always present, even after cleanup.
173 // In addition to the Unique keys that occupy slots
174 // there are Librarian and Duplicate key
175 // slots occupying the key slot array.
177 // The Librarian slots are dead keys that
178 // serve as filler, available to add new Unique
179 // or Dup slots that are inserted into the B-tree.
181 // The Duplicate slots have had their key bytes extended
182 // by 6 bytes to contain a binary duplicate key uniqueifier.
193 uint off:BT_maxbits; // page offset for key start
194 uint type:3; // type of slot
195 uint dead:1; // set for deleted slot
198 // The key structure occupies space at the upper end of
199 // each page. It's a length byte followed by the key
203 unsigned char len; // this can be changed to a ushort or uint
204 unsigned char key[0];
207 // the value structure also occupies space at the upper
208 // end of the page. Each key is immediately followed by a value.
211 unsigned char len; // this can be changed to a ushort or uint
212 unsigned char value[0];
215 #define BT_maxkey 255 // maximum number of bytes in a key
216 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
218 // The first part of an index page.
219 // It is immediately followed
220 // by the BtSlot array of keys.
222 typedef struct BtPage_ {
223 uint cnt; // count of keys in page
224 uint act; // count of active keys
225 uint min; // next key offset
226 uint garbage; // page garbage in bytes
227 unsigned char lvl; // level of page
228 unsigned char free; // page is on free chain
229 unsigned char kill; // page is being deleted
230 unsigned char nopromote; // page is being constructed
231 unsigned char filler1[6]; // padding to multiple of 8 bytes
232 unsigned char right[BtId]; // page number to right
233 unsigned char left[BtId]; // page number to left
234 unsigned char filler2[2]; // padding to multiple of 8 bytes
235 logseqno lsn; // log sequence number applied
236 uid page_no; // this page number
239 // The loadpage interface object
242 BtPage page; // current page pointer
243 BtLatchSet *latch; // current page latch set
246 // structure for latch manager on ALLOC_page
249 struct BtPage_ alloc[1]; // next page_no in right ptr
250 unsigned char freechain[BtId]; // head of free page_nos chain
251 unsigned char leafchain[BtId]; // head of leaf page_nos chain
252 unsigned long long leafpages; // number of active leaf pages
253 unsigned long long upperpages; // number of active upper pages
254 uint redopages; // number of redo pages in file
255 unsigned char leaf_xtra; // leaf page size in xtra bits
256 unsigned char page_bits; // base page size in bits
259 // The object structure for Btree access
262 uint page_size; // base page size
263 uint page_bits; // base page size in bits
264 uint leaf_xtra; // leaf xtra bits
270 BtPageZero *pagezero; // mapped allocation page
271 BtHashEntry *hashtable; // the buffer pool hash table entries
272 BtHashEntry *leaftable; // the buffer pool hash table entries
273 BtLatchSet *latchsets; // mapped latch set from buffer pool
274 BtLatchSet *leafsets; // mapped latch set from buffer pool
275 unsigned char *pagepool; // mapped to the buffer pool pages
276 unsigned char *leafpool; // mapped to the leaf pool pages
277 unsigned char *redobuff; // mapped recovery buffer pointer
278 logseqno lsn, flushlsn; // current & first lsn flushed
279 MutexLatch redo[1]; // redo area lite latch
280 MutexLatch lock[1]; // allocation area lite latch
281 MutexLatch maps[1]; // mapping segments lite latch
282 ushort thread_no[1]; // highest thread number issued
283 ushort err_thread; // error thread number
284 uint nlatchpage; // size of buffer pool & latchsets
285 uint latchtotal; // number of page latch entries
286 uint latchhash; // number of latch hash table slots
287 uint latchvictim; // next latch entry to examine
288 uint nleafpage; // size of leaf pool & leafsets
289 uint leaftotal; // number of leaf latch entries
290 uint leafhash; // number of leaf hash table slots
291 uint leafvictim; // next leaf entry to examine
292 uint leafpromote; // next leaf entry to promote
293 uint redopage; // page number of redo buffer
294 uint redolast; // last msync size of recovery buff
295 uint redoend; // eof/end element in recovery buff
296 int err; // last error
297 int line; // last error line no
298 int found; // number of keys found by delete
299 int reads, writes; // number of reads and writes
301 HANDLE halloc; // allocation handle
302 HANDLE hpool; // buffer pool handle
304 volatile uint segments; // number of memory mapped segments
305 unsigned char *pages[64000];// memory mapped segments of b-tree
309 BtMgr *mgr; // buffer manager for entire process
310 BtMgr *main; // buffer manager for main btree
311 BtPageSet cacheset[1]; // cached page frame for cache btree
312 BtPageSet mainset[1]; // cached page frame for main btree
313 uint cacheslot; // slot number in cacheset
314 uint mainslot; // slot number in mainset
315 ushort phase; // 1 = main btree 0 = cache btree 2 = both
316 ushort thread_no; // thread number
325 // atomic txn structure
328 logseqno reqlsn; // redo log seq no required
329 uint entry:31; // latch table entry number
330 uint reuse:1; // reused previous page
331 uint slot; // slot on page
334 // Catastrophic errors
348 #define CLOCK_bit 0x8000
350 // recovery manager entry types
353 BTRM_eof = 0, // rest of buffer is emtpy
354 BTRM_add, // add a unique key-value to btree
355 BTRM_dup, // add a duplicate key-value to btree
356 BTRM_del, // delete a key-value from btree
357 BTRM_upd, // update a key with a new value
358 BTRM_new, // allocate a new empty page
359 BTRM_old // reuse an old empty page
362 // recovery manager entry
363 // structure followed by BtKey & BtVal
366 logseqno reqlsn; // log sequence number required
367 logseqno lsn; // log sequence number for entry
368 uint len; // length of entry
369 unsigned char type; // type of entry
370 unsigned char lvl; // level of btree entry pertains to
375 extern void bt_close (BtDb *bt);
376 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
377 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
378 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
379 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
380 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
381 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
382 extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
384 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
386 extern BTERR bt_startkey (BtDb *db, unsigned char *key, uint len);
387 extern BTERR bt_nextkey (BtDb *bt);
389 extern uint bt_lastkey (BtDb *bt);
390 extern uint bt_prevkey (BtDb *bt);
393 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
394 extern void bt_mgrclose (BtMgr *mgr);
395 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
396 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
398 // atomic transaction functions
399 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
400 BTERR bt_promote (BtDb *bt);
402 // The page is allocated from low and hi ends.
403 // The key slots are allocated from the bottom,
404 // while the text and value of the key
405 // are allocated from the top. When the two
406 // areas meet, the page is split into two.
408 // A key consists of a length byte, two bytes of
409 // index number (0 - 65535), and up to 253 bytes
412 // Associated with each key is a value byte string
413 // containing any value desired.
415 // The b-tree root is always located at page 1.
416 // The first leaf page of level zero is always
417 // located on page 2.
419 // The b-tree pages are linked with next
420 // pointers to facilitate enumerators,
421 // and provide for concurrency.
423 // When to root page fills, it is split in two and
424 // the tree height is raised by a new root at page
425 // one with two keys.
427 // Deleted keys are marked with a dead bit until
428 // page cleanup. The fence key for a leaf node is
431 // To achieve maximum concurrency one page is locked at a time
432 // as the tree is traversed to find leaf key in question. The right
433 // page numbers are used in cases where the page is being split,
436 // Page 0 is dedicated to lock for new page extensions,
437 // and chains empty pages together for reuse. It also
438 // contains the latch manager hash table.
440 // The ParentModification lock on a node is obtained to serialize posting
441 // or changing the fence key for a node.
443 // Empty pages are chained together through the ALLOC page and reused.
445 // Access macros to address slot and key values from the page
446 // Page slots use 1 based indexing.
448 #define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1))
449 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
450 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
452 void bt_putid(unsigned char *dest, uid id)
457 dest[i] = (unsigned char)id, id >>= 8;
460 uid bt_getid(unsigned char *src)
465 for( i = 0; i < BtId; i++ )
466 id <<= 8, id |= *src++;
471 // lite weight spin lock Latch Manager
473 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
475 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
478 void bt_mutexlock(MutexLatch *latch)
480 uint idx, waited = 0;
484 for( idx = 0; idx < 100; idx++ ) {
485 *prev->value = __sync_fetch_and_or (latch->value, 1);
486 if( !*prev->bits->xcl ) {
488 __sync_fetch_and_sub (latch->bits->waiters, 1);
494 __sync_fetch_and_add (latch->bits->waiters, 1);
495 *prev->bits->waiters += 1;
499 sys_futex (latch->value, FUTEX_WAIT_PRIVATE, *prev->value, NULL, NULL, 0);
503 int bt_mutextry(MutexLatch *latch)
505 return !__sync_lock_test_and_set (latch->bits->xcl, 1);
508 void bt_releasemutex(MutexLatch *latch)
512 *prev->value = __sync_fetch_and_and (latch->value, 0xffff0000);
514 if( *prev->bits->waiters )
515 sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
518 // reader/writer lock implementation
520 void WriteLock (RWLock *lock, ushort tid, uint line)
522 if( lock->tid == tid ) {
526 bt_mutexlock (lock->xcl);
527 bt_mutexlock (lock->wrt);
528 bt_releasemutex (lock->xcl);
534 void WriteRelease (RWLock *lock)
541 bt_releasemutex (lock->wrt);
544 void ReadLock (RWLock *lock, ushort tid)
546 bt_mutexlock (lock->xcl);
548 if( !__sync_fetch_and_add (&lock->readers, 1) )
549 bt_mutexlock (lock->wrt);
551 bt_releasemutex (lock->xcl);
554 void ReadRelease (RWLock *lock)
556 if( __sync_fetch_and_sub (&lock->readers, 1) == 1 )
557 bt_releasemutex (lock->wrt);
560 // recovery manager -- flush dirty pages
562 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
564 uint cnt3 = 0, cnt2 = 0, cnt = 0;
569 // flush dirty pool pages to the btree
571 fprintf(stderr, "Start flushlsn ");
572 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
573 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
574 latch = mgr->latchsets + entry;
575 bt_mutexlock (latch->modify);
576 bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
579 bt_writepage(mgr, page, latch->page_no, 0);
580 latch->dirty = 0, cnt++;
582 if( latch->pin & ~CLOCK_bit )
584 bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
585 bt_releasemutex (latch->modify);
587 for( entry = 1; entry < mgr->leaftotal; entry++ ) {
588 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
589 latch = mgr->leafsets + entry;
590 bt_mutexlock (latch->modify);
591 bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
594 bt_writepage(mgr, page, latch->page_no, 1);
595 latch->dirty = 0, cnt++;
597 if( latch->pin & ~CLOCK_bit )
599 bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
600 bt_releasemutex (latch->modify);
602 fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
603 fprintf(stderr, "begin sync");
604 for( segment = 0; segment < mgr->segments; segment++ )
605 if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
606 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
607 fprintf(stderr, " end sync\n");
610 // recovery manager -- process current recovery buff on startup
611 // this won't do much if previous session was properly closed.
613 BTERR bt_recoveryredo (BtMgr *mgr)
620 hdr = (BtLogHdr *)mgr->redobuff;
621 mgr->flushlsn = hdr->lsn;
624 hdr = (BtLogHdr *)(mgr->redobuff + offset);
625 switch( hdr->type ) {
629 case BTRM_add: // add a unique key-value to btree
631 case BTRM_dup: // add a duplicate key-value to btree
632 case BTRM_del: // delete a key-value from btree
633 case BTRM_upd: // update a key with a new value
634 case BTRM_new: // allocate a new empty page
635 case BTRM_old: // reuse an old empty page
641 // recovery manager -- append new entry to recovery log
642 // flush dirty pages to disk when it overflows.
644 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
646 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
647 uint amt = sizeof(BtLogHdr);
651 bt_mutexlock (mgr->redo);
654 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
656 // see if new entry fits in buffer
657 // flush and reset if it doesn't
659 if( amt > size - mgr->redoend ) {
660 mgr->flushlsn = mgr->lsn;
661 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
662 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
665 bt_flushlsn(mgr, thread_no);
668 // fill in new entry & either eof or end block
670 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
675 hdr->lsn = ++mgr->lsn;
679 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
680 memset (eof, 0, sizeof(BtLogHdr));
682 // fill in key and value
685 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
686 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
689 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
690 memset (eof, 0, sizeof(BtLogHdr));
693 last = mgr->redolast & ~0xfff;
696 if( end - last + sizeof(BtLogHdr) >= 32768 )
697 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
698 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
702 bt_releasemutex(mgr->redo);
706 // recovery manager -- append transaction to recovery log
707 // flush dirty pages to disk when it overflows.
709 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
711 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
712 uint amt = 0, src, type;
719 // determine amount of redo recovery log space required
721 for( src = 0; src++ < source->cnt; ) {
722 key = keyptr(source,src);
723 val = valptr(source,src);
724 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
725 amt += sizeof(BtLogHdr);
728 bt_mutexlock (mgr->redo);
730 // see if new entry fits in buffer
731 // flush and reset if it doesn't
733 if( amt > size - mgr->redoend ) {
734 mgr->flushlsn = mgr->lsn;
735 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
736 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
739 bt_flushlsn (mgr, thread_no);
742 // assign new lsn to transaction
746 // fill in new entries
748 for( src = 0; src++ < source->cnt; ) {
749 key = keyptr(source, src);
750 val = valptr(source, src);
752 switch( slotptr(source, src)->type ) {
764 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
765 amt += sizeof(BtLogHdr);
767 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
773 // fill in key and value
775 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
776 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
781 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
782 memset (eof, 0, sizeof(BtLogHdr));
785 last = mgr->redolast & ~0xfff;
788 if( end - last + sizeof(BtLogHdr) >= 32768 )
789 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
790 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
794 bt_releasemutex(mgr->redo);
798 // sync a single btree page to disk
800 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
802 uint segment = latch->page_no >> 16;
803 uint page_size = mgr->page_size;
806 if( bt_writepage (mgr, page, latch->page_no, latch->leaf) )
810 page_size <<= mgr->leaf_xtra;
812 perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
814 if( msync (perm, page_size, MS_SYNC) < 0 )
815 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
821 // read page into buffer pool from permanent location in Btree file
823 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
825 int flag = PROT_READ | PROT_WRITE;
826 uint page_size = mgr->page_size;
827 uint off = 0, segment, fragment;
831 page_size <<= mgr->leaf_xtra;
833 fragment = page_no & 0xffff;
834 segment = page_no >> 16;
837 while( off < page_size ) {
839 segment++, fragment = 0;
841 if( segment < mgr->segments ) {
842 perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
844 memcpy ((unsigned char *)page + off, perm, mgr->page_size);
845 off += mgr->page_size;
850 bt_mutexlock (mgr->maps);
852 if( segment < mgr->segments ) {
853 bt_releasemutex (mgr->maps);
857 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
860 bt_releasemutex (mgr->maps);
866 // write page to permanent location in Btree file
868 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
870 int flag = PROT_READ | PROT_WRITE;
871 uint page_size = mgr->page_size;
872 uint off = 0, segment, fragment;
876 page_size <<= mgr->leaf_xtra;
878 fragment = page_no & 0xffff;
879 segment = page_no >> 16;
882 while( off < page_size ) {
884 segment++, fragment = 0;
886 if( segment < mgr->segments ) {
887 perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
888 memcpy (perm, (unsigned char *)page + off, mgr->page_size);
889 off += mgr->page_size;
894 bt_mutexlock (mgr->maps);
896 if( segment < mgr->segments ) {
897 bt_releasemutex (mgr->maps);
901 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
903 bt_releasemutex (mgr->maps);
909 // set CLOCK bit in latch
910 // decrement pin count
914 void bt_unpinlatch (BtLatchSet *latch, ushort thread_no, uint line)
916 bt_mutexlock(latch->modify);
917 latch->pin |= CLOCK_bit;
920 bt_releasemutex(latch->modify);
923 // return the btree cached page address
925 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
927 uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets);
931 page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
933 page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
938 // return next available leaf entry
939 // and with latch entry locked
941 uint bt_availleaf (BtMgr *mgr)
948 entry = __sync_fetch_and_add (&mgr->leafvictim, 1) + 1;
950 entry = _InterlockedIncrement (&mgr->leafvictim);
952 entry %= mgr->leaftotal;
957 latch = mgr->leafsets + entry;
959 if( !bt_mutextry(latch->modify) )
962 // return this entry if it is not pinned
967 // if the CLOCK bit is set
970 latch->pin &= ~CLOCK_bit;
971 bt_releasemutex(latch->modify);
975 // return next available latch entry
976 // and with latch entry locked
978 uint bt_availnext (BtMgr *mgr)
985 entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
987 entry = _InterlockedIncrement (&mgr->latchvictim);
989 entry %= mgr->latchtotal;
994 latch = mgr->latchsets + entry;
996 if( !bt_mutextry(latch->modify) )
999 // return this entry if it is not pinned
1004 // if the CLOCK bit is set
1005 // reset it to zero.
1007 latch->pin &= ~CLOCK_bit;
1008 bt_releasemutex(latch->modify);
1012 // pin leaf in leaf buffer pool
1013 // return with latchset pinned
1015 BtLatchSet *bt_pinleaf (BtMgr *mgr, uid page_no, ushort thread_no)
1017 uint hashidx = page_no % mgr->leafhash;
1022 // try to find our entry
1024 bt_mutexlock(mgr->leaftable[hashidx].latch);
1026 if( entry = mgr->leaftable[hashidx].entry )
1028 latch = mgr->leafsets + entry;
1030 if( page_no == latch->page_no )
1032 } while( entry = latch->next );
1034 // found our entry: increment pin
1037 latch = mgr->leafsets + entry;
1038 bt_mutexlock(latch->modify);
1039 latch->pin |= CLOCK_bit;
1042 bt_releasemutex(latch->modify);
1043 bt_releasemutex(mgr->leaftable[hashidx].latch);
1047 // find and reuse unpinned entry
1050 entry = bt_availleaf (mgr);
1051 latch = mgr->leafsets + entry;
1053 idx = latch->page_no % mgr->leafhash;
1055 // if latch is on a different hash chain
1056 // unlink from the old page_no chain
1058 if( latch->page_no )
1059 if( idx != hashidx ) {
1061 // skip over this entry if latch not available
1063 if( !bt_mutextry (mgr->leaftable[idx].latch) ) {
1064 bt_releasemutex(latch->modify);
1069 mgr->leafsets[latch->prev].next = latch->next;
1071 mgr->leaftable[idx].entry = latch->next;
1074 mgr->leafsets[latch->next].prev = latch->prev;
1076 bt_releasemutex (mgr->leaftable[idx].latch);
1079 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1081 // update permanent page area in btree from buffer pool
1082 // no read-lock is required since page is not pinned.
1085 if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) )
1086 return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1090 if( bt_readpage (mgr, page, page_no, 1) )
1091 return mgr->line = __LINE__, NULL;
1093 // link page as head of hash table chain
1094 // if this is a never before used entry,
1095 // or it was previously on a different
1096 // hash table chain. Otherwise, just
1097 // leave it in its current hash table
1100 if( !latch->page_no || hashidx != idx ) {
1101 if( latch->next = mgr->leaftable[hashidx].entry )
1102 mgr->leafsets[latch->next].prev = entry;
1104 mgr->leaftable[hashidx].entry = entry;
1108 // fill in latch structure
1110 latch->pin = CLOCK_bit | 1;
1111 latch->page_no = page_no;
1114 bt_releasemutex (latch->modify);
1115 bt_releasemutex (mgr->leaftable[hashidx].latch);
1119 // pin page in non-leaf buffer pool
1120 // return with latchset pinned
1122 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, ushort thread_no)
1124 uint hashidx = page_no % mgr->latchhash;
1129 // try to find our entry
1131 bt_mutexlock(mgr->hashtable[hashidx].latch);
1133 if( entry = mgr->hashtable[hashidx].entry ) do
1135 latch = mgr->latchsets + entry;
1137 if( page_no == latch->page_no )
1139 } while( entry = latch->next );
1141 // found our entry: increment pin
1144 latch = mgr->latchsets + entry;
1145 bt_mutexlock(latch->modify);
1146 latch->pin |= CLOCK_bit;
1148 bt_releasemutex(latch->modify);
1149 bt_releasemutex(mgr->hashtable[hashidx].latch);
1153 // find and reuse unpinned entry
1156 entry = bt_availnext (mgr);
1157 latch = mgr->latchsets + entry;
1159 idx = latch->page_no % mgr->latchhash;
1161 // if latch is on a different hash chain
1162 // unlink from the old page_no chain
1164 if( latch->page_no )
1165 if( idx != hashidx ) {
1167 // skip over this entry if latch not available
1169 if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1170 bt_releasemutex(latch->modify);
1175 mgr->latchsets[latch->prev].next = latch->next;
1177 mgr->hashtable[idx].entry = latch->next;
1180 mgr->latchsets[latch->next].prev = latch->prev;
1182 bt_releasemutex (mgr->hashtable[idx].latch);
1185 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1187 // update permanent page area in btree from buffer pool
1188 // no read-lock is required since page is not pinned.
1191 if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1192 return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1196 if( bt_readpage (mgr, page, page_no, 0) )
1197 return mgr->line = __LINE__, NULL;
1199 // link page as head of hash table chain
1200 // if this is a never before used entry,
1201 // or it was previously on a different
1202 // hash table chain. Otherwise, just
1203 // leave it in its current hash table
1206 if( !latch->page_no || hashidx != idx ) {
1207 if( latch->next = mgr->hashtable[hashidx].entry )
1208 mgr->latchsets[latch->next].prev = entry;
1210 mgr->hashtable[hashidx].entry = entry;
1214 // fill in latch structure
1216 latch->pin = CLOCK_bit | 1;
1217 latch->page_no = page_no;
1220 bt_releasemutex (latch->modify);
1221 bt_releasemutex (mgr->hashtable[hashidx].latch);
1225 void bt_mgrclose (BtMgr *mgr)
1233 // flush previously written dirty pages
1234 // and write recovery buffer to disk
1236 fdatasync (mgr->idx);
1238 if( mgr->redoend ) {
1239 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1240 memset (eof, 0, sizeof(BtLogHdr));
1243 // write remaining dirty pool pages to the btree
1245 for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1246 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1247 latch = mgr->latchsets + slot;
1249 if( latch->dirty ) {
1250 bt_writepage(mgr, page, latch->page_no, 0);
1251 latch->dirty = 0, num++;
1255 // write remaining dirty leaf pages to the btree
1257 for( slot = 1; slot < mgr->leaftotal; slot++ ) {
1258 page = (BtPage)(((uid)slot << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1259 latch = mgr->leafsets + slot;
1261 if( latch->dirty ) {
1262 bt_writepage(mgr, page, latch->page_no, 1);
1263 latch->dirty = 0, num++;
1267 // clear redo recovery buffer on disk.
1269 if( mgr->pagezero->redopages ) {
1270 eof = (BtLogHdr *)mgr->redobuff;
1271 memset (eof, 0, sizeof(BtLogHdr));
1272 eof->lsn = mgr->lsn;
1273 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1274 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1277 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1280 while( mgr->segments )
1281 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1283 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1284 munmap (mgr->leafpool, (uid)mgr->nleafpage << mgr->page_bits);
1285 munmap (mgr->pagezero, mgr->page_size);
1287 FlushViewOfFile(mgr->pagezero, 0);
1288 UnmapViewOfFile(mgr->pagezero);
1289 UnmapViewOfFile(mgr->pagepool);
1290 CloseHandle(mgr->halloc);
1291 CloseHandle(mgr->hpool);
1297 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1298 FlushFileBuffers(mgr->idx);
1299 CloseHandle(mgr->idx);
1304 // close and release memory
1306 void bt_close (BtDb *bt)
1311 // open/create new btree buffer manager
1313 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1314 // size of page pool (e.g. 262144) and leaf pool
1316 BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax, uint leafmax, uint redopages)
1318 uint lvl, attr, last, slot, idx;
1319 uint nlatchpage, latchhash;
1320 uint nleafpage, leafhash;
1321 unsigned char value[BtId];
1322 int flag, initit = 0;
1323 BtPageZero *pagezero;
1331 // determine sanity of page size and buffer pool
1333 if( leafxtra + pagebits > BT_maxbits )
1334 fprintf (stderr, "pagebits + leafxtra > maxbits\n"), exit(1);
1336 if( pagebits < BT_minbits )
1337 fprintf (stderr, "pagebits < minbits\n"), exit(1);
1340 mgr = calloc (1, sizeof(BtMgr));
1342 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1344 if( mgr->idx == -1 ) {
1345 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1346 return free(mgr), NULL;
1349 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1350 attr = FILE_ATTRIBUTE_NORMAL;
1351 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1353 if( mgr->idx == INVALID_HANDLE_VALUE ) {
1354 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1355 return GlobalFree(mgr), NULL;
1360 pagezero = valloc (BT_maxpage);
1363 // read minimum page size to get root info
1364 // to support raw disk partition files
1365 // check if page_bits == 0 on the disk.
1367 if( size = lseek (mgr->idx, 0L, 2) )
1368 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1369 if( pagezero->page_bits ) {
1370 pagebits = pagezero->page_bits;
1371 leafxtra = pagezero->leaf_xtra;
1375 return free(mgr), free(pagezero), NULL;
1379 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1380 size = GetFileSize(mgr->idx, amt);
1382 if( size || *amt ) {
1383 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1384 return bt_mgrclose (mgr), NULL;
1385 pagebits = pagezero->page_bits;
1386 leafxtra = pagezero->leaf_xtra;
1391 mgr->page_size = 1 << pagebits;
1392 mgr->page_bits = pagebits;
1393 mgr->leaf_xtra = leafxtra;
1395 // calculate number of latch hash table entries
1397 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1399 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1400 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1) >> mgr->page_bits;
1401 mgr->latchtotal = nodemax;
1403 // calculate number of leaf hash table entries
1405 mgr->nleafpage = ((uid)leafmax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1407 mgr->nleafpage += leafmax << leafxtra; // size of the leaf pool in pages
1408 mgr->nleafpage += (sizeof(BtLatchSet) * leafmax + mgr->page_size - 1) >> mgr->page_bits;
1409 mgr->leaftotal = leafmax;
1411 mgr->redopage = LEAF_page + (1 << leafxtra);
1416 // initialize an empty b-tree with latch page, root page, page of leaves
1417 // and page(s) of latches and page pool cache
1419 ftruncate (mgr->idx, (mgr->redopage + pagezero->redopages) << mgr->page_bits);
1420 memset (pagezero, 0, 1 << pagebits);
1421 pagezero->alloc->lvl = MIN_lvl - 1;
1422 pagezero->redopages = redopages;
1423 pagezero->page_bits = mgr->page_bits;
1424 pagezero->leaf_xtra = leafxtra;
1426 bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages);
1427 pagezero->upperpages = 1;
1428 pagezero->leafpages = 1;
1430 // initialize left-most LEAF page in
1431 // alloc->left and count of active leaf pages.
1433 bt_putid (pagezero->alloc->left, LEAF_page);
1435 if( bt_writepage (mgr, pagezero->alloc, 0, 0) ) {
1436 fprintf (stderr, "Unable to create btree page zero\n");
1437 return bt_mgrclose (mgr), NULL;
1440 memset (pagezero, 0, 1 << pagebits);
1442 for( lvl=MIN_lvl; lvl--; ) {
1443 BtSlot *node = slotptr(pagezero->alloc, 1);
1444 node->off = mgr->page_size;
1447 node->off <<= mgr->leaf_xtra;
1449 node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1450 node->type = Librarian;
1453 node->off = node[-1].off;
1454 key = keyptr(pagezero->alloc, 2);
1455 key = keyptr(pagezero->alloc, 1);
1456 key->len = 2; // create stopper key
1460 bt_putid(value, MIN_lvl - lvl + 1);
1461 val = valptr(pagezero->alloc, 1);
1462 val->len = lvl ? BtId : 0;
1463 memcpy (val->value, value, val->len);
1465 pagezero->alloc->min = node->off;
1466 pagezero->alloc->lvl = lvl;
1467 pagezero->alloc->cnt = 2;
1468 pagezero->alloc->act = 1;
1469 pagezero->alloc->page_no = MIN_lvl - lvl;
1471 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, !lvl) ) {
1472 fprintf (stderr, "Unable to create btree page\n");
1473 return bt_mgrclose (mgr), NULL;
1481 VirtualFree (pagezero, 0, MEM_RELEASE);
1484 // mlock the first segment of 64K pages
1486 flag = PROT_READ | PROT_WRITE;
1487 mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1490 if( mgr->pages[0] == MAP_FAILED ) {
1491 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1492 return bt_mgrclose (mgr), NULL;
1495 mgr->pagezero = (BtPageZero *)mgr->pages[0];
1496 mlock (mgr->pagezero, mgr->page_size);
1498 mgr->redobuff = mgr->pages[0] + (mgr->redopage << mgr->page_bits);
1499 mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1501 // allocate pool buffers
1503 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1504 if( mgr->pagepool == MAP_FAILED ) {
1505 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1506 return bt_mgrclose (mgr), NULL;
1509 mgr->leafpool = mmap (0, (uid)mgr->nleafpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1510 if( mgr->leafpool == MAP_FAILED ) {
1511 fprintf (stderr, "Unable to mmap anonymous leaf pool pages, error = %d\n", errno);
1512 return bt_mgrclose (mgr), NULL;
1515 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1516 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1517 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1519 mgr->leafsets = (BtLatchSet *)(mgr->leafpool + ((uid)mgr->leaftotal << mgr->page_bits << mgr->leaf_xtra));
1520 mgr->leaftable = (BtHashEntry *)(mgr->leafsets + mgr->leaftotal);
1521 mgr->leafhash = (mgr->leafpool + ((uid)mgr->nleafpage << mgr->page_bits) - (unsigned char *)mgr->leaftable) / sizeof(BtHashEntry);
1526 // open BTree access method
1527 // based on buffer manager
1529 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1531 BtDb *bt = malloc (sizeof(*bt));
1533 memset (bt, 0, sizeof(*bt));
1537 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1539 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1544 // compare two keys, return > 0, = 0, or < 0
1545 // =0: keys are same
1548 // as the comparison value
1550 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1552 uint len1 = key1->len;
1555 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1566 // place write, read, or parent lock on requested page_no.
1568 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1572 ReadLock (latch->readwr, thread_no);
1575 WriteLock (latch->readwr, thread_no, line);
1578 ReadLock (latch->access, thread_no);
1581 WriteLock (latch->access, thread_no, line);
1584 WriteLock (latch->parent, thread_no, line);
1587 WriteLock (latch->link, thread_no, line);
1592 // remove write, read, or parent lock on requested page
1594 void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1598 ReadRelease (latch->readwr);
1601 WriteRelease (latch->readwr);
1604 ReadRelease (latch->access);
1607 WriteRelease (latch->access);
1610 WriteRelease (latch->parent);
1613 WriteRelease (latch->link);
1618 // allocate a new page
1619 // return with page latched, but unlocked.
1621 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_no)
1623 uint page_size = mgr->page_size, page_xtra = 0;
1624 unsigned char *freechain;
1627 if( contents->lvl ) {
1628 freechain = mgr->pagezero->freechain;
1629 mgr->pagezero->upperpages++;
1631 freechain = mgr->pagezero->leafchain;
1632 mgr->pagezero->leafpages++;
1633 page_xtra = mgr->leaf_xtra;
1634 page_size <<= page_xtra;
1637 // lock allocation page
1639 bt_mutexlock(mgr->lock);
1641 // use empty chain first
1642 // else allocate new page
1644 if( page_no = bt_getid(freechain) ) {
1645 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1646 set->page = bt_mappage (mgr, set->latch);
1648 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1650 bt_putid(freechain, bt_getid(set->page->right));
1652 // the page is currently nopromote and this
1653 // will keep bt_promote out.
1655 // contents will replace this bit
1656 // and pin will keep bt_promote out
1658 contents->page_no = page_no;
1659 contents->nopromote = 0;
1660 set->latch->dirty = 1;
1662 memcpy (set->page, contents, page_size);
1664 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1665 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1667 bt_releasemutex(mgr->lock);
1671 page_no = bt_getid(mgr->pagezero->alloc->right);
1672 bt_putid(mgr->pagezero->alloc->right, page_no+(1 << page_xtra));
1674 // unlock allocation latch and
1675 // extend file into new page.
1677 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1678 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1679 bt_releasemutex(mgr->lock);
1681 // keep bt_promote out of this page
1682 contents->nopromote = 1;
1683 contents->page_no = page_no;
1684 if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
1685 fprintf(stderr, "Write %d error %d\n", (uint)page_no, errno);
1686 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1687 set->page = bt_mappage (mgr, set->latch);
1689 return mgr->err_thread = thread_no, mgr->err;
1691 // now pin will keep bt_promote out
1693 set->page->nopromote = 0;
1694 set->latch->dirty = 1;
1698 // find slot in page for given key at a given level
1700 int bt_findslot (BtPage page, unsigned char *key, uint len)
1702 uint diff, higher = page->cnt, low = 1, slot;
1705 // make stopper key an infinite fence value
1707 if( bt_getid (page->right) )
1712 // low is the lowest candidate.
1713 // loop ends when they meet
1715 // higher is already
1716 // tested as .ge. the passed key.
1718 while( diff = higher - low ) {
1719 slot = low + ( diff >> 1 );
1720 if( keycmp (keyptr(page, slot), key, len) < 0 )
1723 higher = slot, good++;
1726 // return zero if key is on right link page
1728 return good ? higher : 0;
1731 // find and load page at given level for given key
1732 // leave page rd or wr locked as requested
1734 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1736 uid page_no = ROOT_page, prevpage_no = 0;
1737 uint drill = 0xff, slot;
1738 BtLatchSet *prevlatch;
1739 uint mode, prevmode;
1744 // start at root of btree and drill down
1747 // determine lock mode of drill level
1748 mode = (drill == lvl) ? lock : BtLockRead;
1750 if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1751 set->page = bt_mappage (mgr, set->latch);
1755 // obtain access lock using lock chaining with Access mode
1757 if( page_no > ROOT_page )
1758 bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1760 // release & unpin parent or left sibling page
1763 bt_unlockpage(prevmode, prevlatch, thread_no, __LINE__);
1764 bt_unpinlatch (prevlatch, thread_no, __LINE__);
1768 // obtain mode lock using lock coupling through AccessLock
1770 bt_lockpage(mode, set->latch, thread_no, __LINE__);
1772 // grab our fence key
1774 ptr=keyptr(set->page,set->page->cnt);
1776 if( set->page->free )
1777 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1779 if( page_no > ROOT_page )
1780 bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1782 // re-read and re-lock root after determining actual level of root
1784 if( set->page->lvl != drill) {
1785 if( set->latch->page_no != ROOT_page )
1786 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1788 drill = set->page->lvl;
1790 if( lock != BtLockRead && drill == lvl ) {
1791 bt_unlockpage(mode, set->latch, thread_no, __LINE__);
1792 bt_unpinlatch (set->latch, thread_no, __LINE__);
1797 prevpage_no = set->latch->page_no;
1798 prevlatch = set->latch;
1799 prevpage = set->page;
1802 // if requested key is beyond our fence,
1803 // slide to the right
1805 if( keycmp (ptr, key, len) < 0 )
1806 if( page_no = bt_getid(set->page->right) )
1809 // if page is part of a delete operation,
1810 // slide to the left;
1812 if( set->page->kill ) {
1813 bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
1814 page_no = bt_getid(set->page->left);
1815 bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
1819 // find key on page at this level
1820 // and descend to requested level
1822 if( slot = bt_findslot (set->page, key, len) ) {
1826 // find next non-dead slot -- the fence key if nothing else
1828 while( slotptr(set->page, slot)->dead )
1829 if( slot++ < set->page->cnt )
1832 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1834 val = valptr(set->page, slot);
1836 if( val->len == BtId )
1837 page_no = bt_getid(val->value);
1839 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0;
1845 // slide right into next page
1847 page_no = bt_getid(set->page->right);
1851 // return error on end of right chain
1853 mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1854 return 0; // return error
1857 // return page to free list
1858 // page must be delete & write locked
1859 // and have no keys pointing to it.
1861 void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1863 unsigned char *freechain;
1865 if( set->page->lvl ) {
1866 freechain = mgr->pagezero->freechain;
1867 mgr->pagezero->upperpages--;
1869 freechain = mgr->pagezero->leafchain;
1870 mgr->pagezero->leafpages--;
1873 // lock allocation page
1875 bt_mutexlock (mgr->lock);
1879 memcpy(set->page->right, freechain, BtId);
1880 bt_putid(freechain, set->latch->page_no);
1881 set->latch->dirty = 1;
1882 set->page->free = 1;
1884 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1885 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1887 // unlock released page
1888 // and unlock allocation page
1890 bt_unlockpage (BtLockDelete, set->latch, thread_no, __LINE__);
1891 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1892 bt_unpinlatch (set->latch, thread_no, __LINE__);
1893 bt_releasemutex (mgr->lock);
1896 // a fence key was deleted from a page
1897 // push new fence value upwards
1899 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1901 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1902 unsigned char value[BtId];
1906 // remove the old fence value
1908 ptr = keyptr(set->page, set->page->cnt);
1909 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1910 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1911 set->latch->dirty = 1;
1913 // cache new fence value
1915 ptr = keyptr(set->page, set->page->cnt);
1916 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1918 bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
1919 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1921 // insert new (now smaller) fence key
1923 bt_putid (value, set->latch->page_no);
1924 ptr = (BtKey*)leftkey;
1926 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1927 return mgr->err_thread = thread_no, mgr->err;
1929 // now delete old fence key
1931 ptr = (BtKey*)rightkey;
1933 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1934 return mgr->err_thread = thread_no, mgr->err;
1936 bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
1937 bt_unpinlatch(set->latch, thread_no, __LINE__);
1941 // root has a single child
1942 // collapse a level from the tree
1944 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1951 // find the child entry and promote as new root contents
1954 for( idx = 0; idx++ < root->page->cnt; )
1955 if( !slotptr(root->page, idx)->dead )
1958 val = valptr(root->page, idx);
1960 if( val->len == BtId )
1961 page_no = bt_getid (valptr(root->page, idx)->value);
1963 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1965 if( child->latch = bt_pinlatch (mgr, page_no, thread_no) )
1966 child->page = bt_mappage (mgr, child->latch);
1968 return mgr->err_thread = thread_no, mgr->err;
1970 bt_lockpage (BtLockDelete, child->latch, thread_no, __LINE__);
1971 bt_lockpage (BtLockWrite, child->latch, thread_no, __LINE__);
1973 memcpy (root->page, child->page, mgr->page_size);
1974 root->latch->dirty = 1;
1976 bt_freepage (mgr, child, thread_no);
1978 } while( root->page->lvl > 1 && root->page->act == 1 );
1980 bt_unlockpage (BtLockWrite, root->latch, thread_no, __LINE__);
1981 bt_unpinlatch (root->latch, thread_no, __LINE__);
1985 // delete a page and manage key
1986 // call with page writelocked
1988 // returns with page unpinned
1989 // from the page pool.
1991 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl)
1993 unsigned char lowerfence[BT_keyarray];
1994 uint page_size = mgr->page_size;
1995 BtPageSet right[1], temp[1];
1996 unsigned char value[BtId];
1997 uid page_no, right2;
2001 page_size <<= mgr->leaf_xtra;
2003 // cache copy of original fence key
2004 // that is being deleted.
2006 ptr = keyptr(set->page, set->page->cnt);
2007 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
2009 // obtain locks on right page
2011 page_no = bt_getid(set->page->right);
2013 if( right->latch = lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
2014 right->page = bt_mappage (mgr, right->latch);
2018 bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2020 if( right->page->kill )
2021 return mgr->line = __LINE__, mgr->err = BTERR_struct;
2023 // pull contents of right peer into our empty page
2024 // preserving our left page number, and its right page number.
2026 bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
2027 page_no = bt_getid(set->page->left);
2028 memcpy (set->page, right->page, page_size);
2029 bt_putid (set->page->left, page_no);
2030 bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
2032 set->page->page_no = set->latch->page_no;
2033 set->latch->dirty = 1;
2035 // fix left link from far right page
2037 if( right2 = bt_getid (right->page->right) ) {
2038 if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2039 temp->page = bt_mappage (mgr, temp->latch);
2043 bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2044 bt_putid (temp->page->left, set->latch->page_no);
2045 bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2046 bt_unpinlatch (temp->latch, thread_no, __LINE__);
2047 } else if( !lvl ) { // our page is now rightmost leaf
2048 bt_mutexlock (mgr->lock);
2049 bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
2050 bt_releasemutex(mgr->lock);
2053 // mark right page deleted and release lock
2055 right->latch->dirty = 1;
2056 right->page->kill = 1;
2057 bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2059 // redirect higher key directly to our new node contents
2061 ptr = keyptr(right->page, right->page->cnt);
2062 bt_putid (value, set->latch->page_no);
2064 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
2067 // delete our orignal fence key in parent
2068 // and unlock our page.
2070 ptr = (BtKey *)lowerfence;
2072 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
2075 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2076 bt_unpinlatch (set->latch, thread_no, __LINE__);
2078 // obtain delete and write locks to right node
2080 bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
2081 bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2082 bt_freepage (mgr, right, thread_no);
2086 // find and delete key on page by marking delete flag bit
2087 // if page becomes empty, delete it from the btree
2089 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2091 uint slot, idx, found, fence;
2097 if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2098 node = slotptr(set->page, slot);
2099 ptr = keyptr(set->page, slot);
2101 return mgr->err_thread = thread_no, mgr->err;
2103 // if librarian slot, advance to real slot
2105 if( node->type == Librarian ) {
2106 ptr = keyptr(set->page, ++slot);
2107 node = slotptr(set->page, slot);
2110 fence = slot == set->page->cnt;
2112 // delete the key, ignore request if already dead
2114 if( found = !keycmp (ptr, key, len) )
2115 if( found = node->dead == 0 ) {
2116 val = valptr(set->page,slot);
2117 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2121 // collapse empty slots beneath the fence
2122 // on interiour nodes
2125 while( idx = set->page->cnt - 1 )
2126 if( slotptr(set->page, idx)->dead ) {
2127 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2128 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2136 // did we delete a fence key in an upper level?
2138 if( lvl && set->page->act && fence )
2139 if( bt_fixfence (mgr, set, lvl, thread_no) )
2140 return mgr->err_thread = thread_no, mgr->err;
2144 // do we need to collapse root?
2146 if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2147 if( bt_collapseroot (mgr, set, thread_no) )
2148 return mgr->err_thread = thread_no, mgr->err;
2152 // delete empty page
2154 if( !set->page->act )
2155 return bt_deletepage (mgr, set, thread_no, set->page->lvl);
2157 set->latch->dirty = 1;
2158 bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2159 bt_unpinlatch (set->latch, thread_no, __LINE__);
2163 // check page for space available,
2164 // clean if necessary and return
2165 // 0 - page needs splitting
2166 // >0 new slot value
2168 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2170 uint page_size = mgr->page_size;
2171 BtPage page = set->page, frame;
2172 uint cnt = 0, idx = 0;
2173 uint max = page->cnt;
2178 if( !set->page->lvl )
2179 page_size <<= mgr->leaf_xtra;
2181 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2184 // skip cleanup and proceed to split
2185 // if there's not enough garbage
2188 if( page->garbage < page_size / 5 )
2191 frame = malloc (page_size);
2192 memcpy (frame, page, page_size);
2194 // skip page info and set rest of page to zero
2196 memset (page+1, 0, page_size - sizeof(*page));
2197 set->latch->dirty = 1;
2199 page->min = page_size;
2203 // clean up page first by
2204 // removing deleted keys
2206 while( cnt++ < max ) {
2210 if( cnt < max || frame->lvl )
2211 if( slotptr(frame,cnt)->dead )
2214 // copy the value across
2216 val = valptr(frame, cnt);
2217 page->min -= val->len + sizeof(BtVal);
2218 memcpy ((unsigned char *)page + page->min, val, val->len + sizeof(BtVal));
2220 // copy the key across
2222 key = keyptr(frame, cnt);
2223 page->min -= key->len + sizeof(BtKey);
2224 memcpy ((unsigned char *)page + page->min, key, key->len + sizeof(BtKey));
2226 // make a librarian slot
2228 slotptr(page, ++idx)->off = page->min;
2229 slotptr(page, idx)->type = Librarian;
2230 slotptr(page, idx)->dead = 1;
2234 slotptr(page, ++idx)->off = page->min;
2235 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2237 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2244 // see if page has enough space now, or does it need splitting?
2246 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2252 // split the root and raise the height of the btree
2254 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort thread_no)
2256 unsigned char leftkey[BT_keyarray];
2257 uint nxt = mgr->page_size;
2258 unsigned char value[BtId];
2265 frame = malloc (mgr->page_size);
2266 memcpy (frame, root->page, mgr->page_size);
2268 // save left page fence key for new root
2270 ptr = keyptr(root->page, root->page->cnt);
2271 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2273 // Obtain an empty page to use, and copy the current
2274 // root contents into it, e.g. lower keys
2276 if( bt_newpage(mgr, left, frame, thread_no) )
2277 return mgr->err_thread = thread_no, mgr->err;
2279 left_page_no = left->latch->page_no;
2280 bt_unpinlatch (left->latch, thread_no, __LINE__);
2283 // left link the pages together
2285 page = bt_mappage (mgr, right);
2286 bt_putid (page->left, left_page_no);
2288 // preserve the page info at the bottom
2289 // of higher keys and set rest to zero
2291 memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2293 // insert stopper key at top of newroot page
2294 // and increase the root height
2296 nxt -= BtId + sizeof(BtVal);
2297 bt_putid (value, right->page_no);
2298 val = (BtVal *)((unsigned char *)root->page + nxt);
2299 memcpy (val->value, value, BtId);
2302 nxt -= 2 + sizeof(BtKey);
2303 slotptr(root->page, 2)->off = nxt;
2304 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2309 // insert lower keys page fence key on newroot page as first key
2311 nxt -= BtId + sizeof(BtVal);
2312 bt_putid (value, left_page_no);
2313 val = (BtVal *)((unsigned char *)root->page + nxt);
2314 memcpy (val->value, value, BtId);
2317 ptr = (BtKey *)leftkey;
2318 nxt -= ptr->len + sizeof(BtKey);
2319 slotptr(root->page, 1)->off = nxt;
2320 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2322 bt_putid(root->page->right, 0);
2323 root->page->min = nxt; // reset lowest used offset and key count
2324 root->page->cnt = 2;
2325 root->page->act = 2;
2328 mgr->pagezero->alloc->lvl = root->page->lvl;
2330 // release and unpin root pages
2332 bt_unlockpage(BtLockWrite, root->latch, thread_no, __LINE__);
2333 bt_unpinlatch (root->latch, thread_no, __LINE__);
2335 bt_unpinlatch (right, thread_no, __LINE__);
2339 // split already locked full node
2341 // return pool entry for new right
2342 // page, pinned & unlocked
2344 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint linkleft)
2346 uint page_size = mgr->page_size;
2347 BtPageSet right[1], temp[1];
2348 uint cnt = 0, idx = 0, max;
2349 uint lvl = set->page->lvl;
2357 if( !set->page->lvl )
2358 page_size <<= mgr->leaf_xtra;
2360 // split higher half of keys to frame
2362 frame = malloc (page_size);
2363 memset (frame, 0, page_size);
2364 frame->min = page_size;
2365 max = set->page->cnt;
2369 while( cnt++ < max ) {
2370 if( cnt < max || set->page->lvl )
2371 if( slotptr(set->page, cnt)->dead )
2374 val = valptr(set->page, cnt);
2375 frame->min -= val->len + sizeof(BtVal);
2376 memcpy ((unsigned char *)frame + frame->min, val, val->len + sizeof(BtVal));
2378 key = keyptr(set->page, cnt);
2379 frame->min -= key->len + sizeof(BtKey);
2380 memcpy ((unsigned char *)frame + frame->min, key, key->len + sizeof(BtKey));
2382 // add librarian slot
2384 slotptr(frame, ++idx)->off = frame->min;
2385 slotptr(frame, idx)->type = Librarian;
2386 slotptr(frame, idx)->dead = 1;
2390 slotptr(frame, ++idx)->off = frame->min;
2391 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2393 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2402 if( set->latch->page_no > ROOT_page ) {
2403 right2 = bt_getid (set->page->right);
2404 bt_putid (frame->right, right2);
2407 bt_putid (frame->left, set->latch->page_no);
2410 // get new free page and write higher keys to it.
2412 if( bt_newpage(mgr, right, frame, thread_no) )
2415 // link far right's left pointer to new page
2417 if( linkleft && set->latch->page_no > ROOT_page )
2419 if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2420 temp->page = bt_mappage (mgr, temp->latch);
2424 bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2425 bt_putid (temp->page->left, right->latch->page_no);
2426 bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2427 bt_unpinlatch (temp->latch, thread_no, __LINE__);
2428 } else if( !lvl ) { // page is rightmost leaf
2429 bt_mutexlock (mgr->lock);
2430 bt_putid (mgr->pagezero->alloc->left, right->latch->page_no);
2431 bt_releasemutex(mgr->lock);
2434 // process lower keys
2436 memcpy (frame, set->page, page_size);
2437 memset (set->page+1, 0, page_size - sizeof(*set->page));
2438 set->latch->dirty = 1;
2440 set->page->min = page_size;
2441 set->page->garbage = 0;
2447 // assemble page of smaller keys
2449 while( cnt++ < max ) {
2450 if( slotptr(frame, cnt)->dead )
2452 val = valptr(frame, cnt);
2453 set->page->min -= val->len + sizeof(BtVal);
2454 memcpy ((unsigned char *)set->page + set->page->min, val, val->len + sizeof(BtVal));
2456 key = keyptr(frame, cnt);
2457 set->page->min -= key->len + sizeof(BtKey);
2458 memcpy ((unsigned char *)set->page + set->page->min, key, key->len + sizeof(BtKey));
2460 // add librarian slot
2462 slotptr(set->page, ++idx)->off = set->page->min;
2463 slotptr(set->page, idx)->type = Librarian;
2464 slotptr(set->page, idx)->dead = 1;
2468 slotptr(set->page, ++idx)->off = set->page->min;
2469 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2473 bt_putid(set->page->right, right->latch->page_no);
2474 set->page->cnt = idx;
2477 entry = right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
2481 // fix keys for newly split page
2482 // call with both pages pinned & locked
2483 // return unlocked and unpinned
2485 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2487 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2488 unsigned char value[BtId];
2489 uint lvl = set->page->lvl;
2495 // if current page is the root page, split it
2497 if( set->latch->page_no == ROOT_page )
2498 return bt_splitroot (mgr, set, right, thread_no);
2500 ptr = keyptr(set->page, set->page->cnt);
2501 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2503 page = bt_mappage (mgr, right);
2505 ptr = keyptr(page, page->cnt);
2506 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2508 // splice in far right page's left page_no
2510 if( right2 = bt_getid (page->right) ) {
2511 if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2512 temp->page = bt_mappage (mgr, temp->latch);
2516 bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2517 bt_putid (temp->page->left, right->page_no);
2518 bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2519 bt_unpinlatch (temp->latch, thread_no, __LINE__);
2520 } else if( !lvl ) { // right page is far right page
2521 bt_mutexlock (mgr->lock);
2522 bt_putid (mgr->pagezero->alloc->left, right->page_no);
2523 bt_releasemutex(mgr->lock);
2525 // insert new fences in their parent pages
2527 bt_lockpage (BtLockParent, right, thread_no, __LINE__);
2529 bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
2530 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2532 // insert new fence for reformulated left block of smaller keys
2534 bt_putid (value, set->latch->page_no);
2535 ptr = (BtKey *)leftkey;
2537 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2538 return mgr->err_thread = thread_no, mgr->err;
2540 // switch fence for right block of larger keys to new right page
2542 bt_putid (value, right->page_no);
2543 ptr = (BtKey *)rightkey;
2545 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2546 return mgr->err_thread = thread_no, mgr->err;
2548 bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
2549 bt_unpinlatch (set->latch, thread_no, __LINE__);
2551 bt_unlockpage (BtLockParent, right, thread_no, __LINE__);
2552 bt_unpinlatch (right, thread_no, __LINE__);
2556 // install new key and value onto page
2557 // page must already be checked for
2560 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2562 uint idx, librarian;
2568 // if previous slot is a librarian slot, use it
2571 if( slotptr(set->page, slot-1)->type == Librarian )
2574 // copy value onto page
2576 set->page->min -= vallen + sizeof(BtVal);
2577 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2578 memcpy (val->value, value, vallen);
2581 // copy key onto page
2583 set->page->min -= keylen + sizeof(BtKey);
2584 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2585 memcpy (ptr->key, key, keylen);
2588 // find first empty slot at or above our insert slot
2590 for( idx = slot; idx < set->page->cnt; idx++ )
2591 if( slotptr(set->page, idx)->dead )
2594 // now insert key into array before slot.
2596 // if we're going all the way to the top,
2597 // add as many librarian slots as
2600 if( idx == set->page->cnt ) {
2601 int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2603 librarian = ++idx - slot;
2604 avail /= sizeof(BtSlot);
2609 if( librarian > avail )
2613 rate = (idx - slot) / librarian;
2614 set->page->cnt += librarian;
2619 librarian = 0, rate = 0;
2621 // transfer slots and add librarian slots
2623 while( idx > slot ) {
2624 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2626 // add librarian slot per rate
2629 if( (idx - slot)/2 <= librarian * rate ) {
2630 node = slotptr(set->page, --idx);
2631 node->off = node[1].off;
2632 node->type = Librarian;
2640 set->latch->dirty = 1;
2645 node = slotptr(set->page, slot);
2646 node->off = set->page->min;
2652 // Insert new key into the btree at given level.
2653 // either add a new key or update/add an existing one
2655 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2657 uint slot, idx, len, entry;
2663 while( 1 ) { // find the page and slot for the current key
2664 if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2665 node = slotptr(set->page, slot);
2666 ptr = keyptr(set->page, slot);
2670 // if librarian slot == found slot, advance to real slot
2672 if( node->type == Librarian ) {
2673 node = slotptr(set->page, ++slot);
2674 ptr = keyptr(set->page, slot);
2677 // if inserting a duplicate key or unique
2678 // key that doesn't exist on the page,
2679 // check for adequate space on the page
2680 // and insert the new key before slot.
2685 if( !keycmp (ptr, key, keylen) )
2688 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2689 if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
2694 if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2695 if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2698 return mgr->err_thread = thread_no, mgr->err;
2701 if( keycmp (ptr, key, keylen) )
2707 // if key already exists, update value and return
2709 val = valptr(set->page, slot);
2711 if( val->len >= vallen ) {
2717 set->page->garbage += val->len - vallen;
2718 set->latch->dirty = 1;
2720 memcpy (val->value, value, vallen);
2724 // new update value doesn't fit in existing value area
2725 // make sure page has room
2728 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2735 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2738 if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2739 if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2742 return mgr->err_thread = thread_no, mgr->err;
2745 // copy key and value onto page and update slot
2747 set->page->min -= vallen + sizeof(BtVal);
2748 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2749 memcpy (val->value, value, vallen);
2752 set->latch->dirty = 1;
2753 set->page->min -= keylen + sizeof(BtKey);
2754 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2755 memcpy (ptr->key, key, keylen);
2758 slotptr(set->page,slot)->off = set->page->min;
2761 bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2762 bt_unpinlatch (set->latch, thread_no, __LINE__);
2766 // determine actual page where key is located
2767 // return slot number
2769 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2771 BtKey *key = keyptr(source,src), *ptr;
2772 uint slot = locks[src].slot;
2775 if( locks[src].reuse )
2776 entry = locks[src-1].entry;
2778 entry = locks[src].entry;
2781 set->latch = mgr->leafsets + entry;
2782 set->page = bt_mappage (mgr, set->latch);
2786 // find where our key is located
2787 // on current page or pages split on
2788 // same page txn operations.
2791 set->latch = mgr->leafsets + entry;
2792 set->page = bt_mappage (mgr, set->latch);
2794 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2795 if( slotptr(set->page, slot)->type == Librarian )
2797 if( locks[src].reuse )
2798 locks[src].entry = entry;
2801 } while( entry = set->latch->split );
2803 mgr->line = __LINE__, mgr->err = BTERR_atomic;
2807 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2809 BtKey *key = keyptr(source, src);
2810 BtVal *val = valptr(source, src);
2815 while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2816 if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2817 if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type) )
2818 return mgr->err_thread = thread_no, mgr->err;
2820 set->page->lsn = lsn;
2826 if( entry = bt_splitpage (mgr, set, thread_no, 0) )
2827 latch = mgr->leafsets + entry;
2831 // splice right page into split chain
2834 bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2835 latch->split = set->latch->split;
2836 set->latch->split = entry;
2838 // clear slot number for atomic page
2840 locks[src].slot = 0;
2843 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
2846 // perform delete from smaller btree
2847 // insert a delete slot if not found there
2849 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2851 BtKey *key = keyptr(source, src);
2858 if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2859 node = slotptr(set->page, slot);
2860 ptr = keyptr(set->page, slot);
2861 val = valptr(set->page, slot);
2863 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
2865 // if slot is not found, insert a delete slot
2867 if( keycmp (ptr, key->key, key->len) )
2868 if( bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete) )
2871 // if node is already dead,
2872 // ignore the request.
2877 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2878 set->latch->dirty = 1;
2879 set->page->lsn = lsn;
2883 __sync_fetch_and_add(&mgr->found, 1);
2887 // release master's splits from right to left
2889 void bt_atomicrelease (BtMgr *mgr, uint entry, ushort thread_no)
2891 BtLatchSet *latch = mgr->leafsets + entry;
2894 bt_atomicrelease (mgr, latch->split, thread_no);
2897 bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
2898 bt_unpinlatch(latch, thread_no, __LINE__);
2901 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2903 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2904 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2906 return keycmp (key1, key2->key, key2->len);
2908 // atomic modification of a batch of keys.
2910 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2912 uint src, idx, slot, samepage, entry, que = 0;
2913 BtKey *key, *ptr, *key2;
2919 // stable sort the list of keys into order to
2920 // prevent deadlocks between threads.
2922 for( src = 1; src++ < source->cnt; ) {
2923 *temp = *slotptr(source,src);
2924 key = keyptr (source,src);
2926 for( idx = src; --idx; ) {
2927 key2 = keyptr (source,idx);
2928 if( keycmp (key, key2->key, key2->len) < 0 ) {
2929 *slotptr(source,idx+1) = *slotptr(source,idx);
2930 *slotptr(source,idx) = *temp;
2936 qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2937 // add entries to redo log
2939 if( bt->mgr->pagezero->redopages )
2940 lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2944 // perform the individual actions in the transaction
2946 if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2947 return bt->mgr->err;
2949 // if number of active pages
2950 // is greater than the buffer pool
2951 // promote page into larger btree
2954 while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no )
2955 if( bt_promote (bt) )
2956 return bt->mgr->err;
2963 // execute the source list of inserts/deletes
2965 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2967 uint slot, src, idx, samepage, entry;
2968 BtPageSet set[1], prev[1];
2969 unsigned char value[BtId];
2977 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2979 // Load the leaf page for each key
2980 // group same page references with reuse bit
2982 for( src = 0; src++ < source->cnt; ) {
2983 key = keyptr(source, src);
2985 // first determine if this modification falls
2986 // on the same page as the previous modification
2987 // note that the far right leaf page is a special case
2989 if( samepage = src > 1 )
2990 samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
2993 if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
2994 ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
3001 if( slotptr(set->page, slot)->type == Librarian )
3004 entry = set->latch - mgr->leafsets;
3005 locks[src].reuse = samepage;
3006 locks[src].entry = entry;
3007 locks[src].slot = slot;
3009 // capture current lsn for master page
3011 locks[src].reqlsn = set->page->lsn;
3014 // insert or delete each key
3015 // process any splits or merges
3016 // run through txn list backwards
3018 samepage = source->cnt + 1;
3020 for( src = source->cnt; src; src-- ) {
3021 if( locks[src].reuse )
3024 // perform the txn operations
3025 // from smaller to larger on
3028 for( idx = src; idx < samepage; idx++ )
3029 switch( slotptr(source,idx)->type ) {
3031 if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
3037 if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
3042 bt_atomicpage (mgr, source, locks, idx, set);
3046 // after the same page operations have finished,
3047 // process master page for splits or deletion.
3049 latch = prev->latch = mgr->leafsets + locks[src].entry;
3050 prev->page = bt_mappage (mgr, prev->latch);
3053 // pick-up all splits from master page
3054 // each one is already pinned & WriteLocked.
3056 while( entry = prev->latch->split ) {
3057 set->latch = mgr->leafsets + entry;
3058 set->page = bt_mappage (mgr, set->latch);
3060 // delete empty master page by undoing its split
3061 // (this is potentially another empty page)
3062 // note that there are no pointers to it yet
3064 if( !prev->page->act ) {
3065 memcpy (set->page->left, prev->page->left, BtId);
3066 memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
3067 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3068 prev->latch->split = set->latch->split;
3069 prev->latch->dirty = 1;
3070 bt_freepage (mgr, set, thread_no);
3074 // remove empty split page from the split chain
3075 // and return it to the free list. No other
3076 // thread has its page number yet.
3078 if( !set->page->act ) {
3079 memcpy (prev->page->right, set->page->right, BtId);
3080 prev->latch->split = set->latch->split;
3082 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3083 bt_freepage (mgr, set, thread_no);
3087 // update prev's fence key
3089 ptr = keyptr(prev->page,prev->page->cnt);
3090 bt_putid (value, prev->latch->page_no);
3092 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3095 // splice in the left link into the split page
3097 bt_putid (set->page->left, prev->latch->page_no);
3100 bt_syncpage (mgr, prev->page, prev->latch);
3105 // update left pointer in next right page from last split page
3106 // (if all splits were reversed or none occurred, latch->split == 0)
3108 if( latch->split ) {
3109 // fix left pointer in master's original (now split)
3110 // far right sibling or set rightmost page in page zero
3112 if( right_page_no = bt_getid (prev->page->right) ) {
3113 if( set->latch = bt_pinleaf (mgr, right_page_no, thread_no) )
3114 set->page = bt_mappage (mgr, set->latch);
3118 bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3119 bt_putid (set->page->left, prev->latch->page_no);
3120 set->latch->dirty = 1;
3122 bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
3123 bt_unpinlatch (set->latch, thread_no, __LINE__);
3124 } else { // prev is rightmost page
3125 bt_mutexlock (mgr->lock);
3126 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3127 bt_releasemutex(mgr->lock);
3130 // switch the original fence key from the
3131 // master page to the last split page.
3133 ptr = keyptr(prev->page,prev->page->cnt);
3134 bt_putid (value, prev->latch->page_no);
3136 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) )
3140 bt_syncpage (mgr, prev->page, prev->latch);
3142 // unlock and unpin the split pages
3144 bt_atomicrelease (mgr, latch->split, thread_no);
3146 // unlock and unpin the master page
3149 bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
3150 bt_unpinlatch(latch, thread_no, __LINE__);
3154 // since there are no splits remaining, we're
3155 // finished if master page occupied
3157 if( prev->page->act ) {
3158 bt_unlockpage(BtLockWrite, prev->latch, thread_no, __LINE__);
3161 bt_syncpage (mgr, prev->page, prev->latch);
3163 bt_unpinlatch(prev->latch, thread_no, __LINE__);
3167 // any and all splits were reversed, and the
3168 // master page located in prev is empty, delete it
3170 if( bt_deletepage (mgr, prev, thread_no, 0) )
3178 // pick & promote a page into the larger btree
3180 BTERR bt_promote (BtDb *bt)
3182 uint entry, slot, idx;
3190 entry = __sync_fetch_and_add(&bt->mgr->leafpromote, 1);
3192 entry = _InterlockedIncrement (&bt->mgr->leafpromote) - 1;
3194 entry %= bt->mgr->leaftotal;
3199 set->latch = bt->mgr->leafsets + entry;
3201 // skip this entry if it has never been used
3203 if( !set->latch->page_no )
3206 if( !bt_mutextry(set->latch->modify) )
3209 // skip this entry if it is pinned
3211 if( set->latch->pin & ~CLOCK_bit ) {
3212 bt_releasemutex(set->latch->modify);
3216 set->page = bt_mappage (bt->mgr, set->latch);
3218 // entry has no right sibling
3220 if( !bt_getid (set->page->right) ) {
3221 bt_releasemutex(set->latch->modify);
3225 // entry interiour node or being killed or constructed
3227 if( set->page->lvl || set->page->nopromote || set->page->kill ) {
3228 bt_releasemutex(set->latch->modify);
3232 // pin the page for our access
3233 // and leave it locked for the
3234 // duration of the promotion.
3237 bt_lockpage (BtLockWrite, set->latch, bt->thread_no, __LINE__);
3238 bt_releasemutex(set->latch->modify);
3240 // transfer slots in our selected page to the main btree
3241 if( !(entry % 100) )
3242 fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_no, set->page->act);
3244 if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
3245 fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
3246 return bt->main->err;
3249 // now delete the page
3251 if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
3252 fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread);
3254 return bt->mgr->err;
3258 // find unique key == given key, or first duplicate key in
3259 // leaf level and return number of value bytes
3260 // or (-1) if not found.
3262 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
3271 for( type = 0; type < 2; type++ )
3272 if( slot = bt_loadpage (type ? bt->main : bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) ) {
3273 node = slotptr(set->page, slot);
3275 // skip librarian slot place holder
3277 if( node->type == Librarian )
3278 node = slotptr(set->page, ++slot);
3280 ptr = keyptr(set->page, slot);
3282 // not there if we reach the stopper key
3283 // or the key doesn't match what's on the page.
3285 if( slot == set->page->cnt )
3286 if( !bt_getid (set->page->right) ) {
3287 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3288 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3292 if( keycmp (ptr, key, keylen) ) {
3293 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3294 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3298 // key matches, return >= 0 value bytes copied
3299 // or -1 if not there.
3301 if( node->type == Delete || node->dead ) {
3306 val = valptr (set->page,slot);
3308 if( valmax > val->len )
3311 memcpy (value, val->value, valmax);
3320 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3321 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3326 // set cursor to highest slot on left-most page
3328 BTERR bt_lastkey (BtDb *bt)
3330 uid cache_page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3331 uid main_page_no = bt_getid (bt->main->pagezero->alloc->left);
3333 if( bt->cacheset->latch = bt_pinleaf (bt->mgr, cache_page_no, bt->thread_no) )
3334 bt->cacheset->page = bt_mappage (bt->mgr, bt->cacheset->latch);
3336 return bt->mgr->err;
3338 bt_lockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3339 bt->cacheslot = bt->cacheset->page->cnt;
3341 if( bt->mainset->latch = bt_pinleaf (bt->main, main_page_no, bt->thread_no) )
3342 bt->mainset->page = bt_mappage (bt->main, bt->mainset->latch);
3344 return bt->main->err;
3346 bt_lockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3347 bt->mainslot = bt->mainset->page->cnt;
3352 // return previous slot on cursor page
3354 uint bt_prevslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
3356 uid next, us = set->latch->page_no;
3360 if( slotptr(set->page, slot)->dead )
3365 next = bt_getid(set->page->left);
3371 bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3372 bt_unpinlatch (set->latch, thread_no, __LINE__);
3374 if( set->latch = bt_pinleaf (mgr, next, thread_no) )
3375 set->page = bt_mappage (mgr, set->latch);
3379 bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3380 next = bt_getid (set->page->right);
3382 } while( next != us );
3384 slot = set->page->cnt + 1;
3388 // advance to previous key
3390 BTERR bt_prevkey (BtDb *bt)
3394 // first advance last key(s) one previous slot
3397 switch( bt->phase ) {
3399 bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3402 bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3405 bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3406 bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3412 if( bt->cacheslot ) {
3413 bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3414 bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3415 bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3418 if( bt->mainslot ) {
3419 bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3420 bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3421 bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3424 if( bt->mainslot && bt->cacheslot )
3425 cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3426 else if( bt->cacheslot )
3428 else if( bt->mainslot )
3433 // cache key is larger
3437 if( bt->cachenode->type == Delete )
3439 return bt->cacheslot;
3442 // main key is larger
3446 return bt->mainslot;
3453 if( bt->cachenode->type == Delete )
3456 return bt->cacheslot;
3460 // advance to next slot in cache or main btree
3461 // return 0 for EOF/error
3463 uint bt_nextslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
3469 while( slot++ < set->page->cnt )
3470 if( slotptr(set->page, slot)->dead )
3472 else if( slot < set->page->cnt || bt_getid (set->page->right) )
3477 bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3478 bt_unpinlatch (set->latch, thread_no, __LINE__);
3480 if( page_no = bt_getid(set->page->right) )
3481 if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
3482 set->page = bt_mappage (mgr, set->latch);
3488 // obtain access lock using lock chaining with Access mode
3490 bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
3491 bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3492 bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
3497 // advance to next key
3499 BTERR bt_nextkey (BtDb *bt)
3503 // first advance last key(s) one next slot
3506 switch( bt->phase ) {
3508 bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3511 bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3514 bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3515 bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3521 if( bt->cacheslot ) {
3522 bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3523 bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3524 bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3527 if( bt->mainslot ) {
3528 bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3529 bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3530 bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3533 if( bt->mainslot && bt->cacheslot )
3534 cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3535 else if( bt->mainslot )
3537 else if( bt->cacheslot )
3542 // main key is larger
3546 if( bt->cachenode->type == Delete )
3548 return bt->cacheslot;
3551 // cache key is larger
3555 return bt->mainslot;
3562 if( bt->cachenode->type == Delete )
3565 return bt->cacheslot;
3569 // start sweep of keys
3571 BTERR bt_startkey (BtDb *bt, unsigned char *key, uint len)
3578 if( slot = bt_loadpage (bt->mgr, bt->cacheset, key, len, 0, BtLockRead, bt->thread_no) )
3579 bt->cacheslot = slot - 1;
3581 return bt->mgr->err;
3585 if( slot = bt_loadpage (bt->main, bt->mainset, key, len, 0, BtLockRead, bt->thread_no) )
3586 bt->mainslot = slot - 1;
3588 return bt->mgr->err;
3597 double getCpuTime(int type)
3600 FILETIME xittime[1];
3601 FILETIME systime[1];
3602 FILETIME usrtime[1];
3603 SYSTEMTIME timeconv[1];
3606 memset (timeconv, 0, sizeof(SYSTEMTIME));
3610 GetSystemTimeAsFileTime (xittime);
3611 FileTimeToSystemTime (xittime, timeconv);
3612 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3615 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3616 FileTimeToSystemTime (usrtime, timeconv);
3619 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3620 FileTimeToSystemTime (systime, timeconv);
3624 ans += (double)timeconv->wHour * 3600;
3625 ans += (double)timeconv->wMinute * 60;
3626 ans += (double)timeconv->wSecond;
3627 ans += (double)timeconv->wMilliseconds / 1000;
3632 #include <sys/resource.h>
3634 double getCpuTime(int type)
3636 struct rusage used[1];
3637 struct timeval tv[1];
3641 gettimeofday(tv, NULL);
3642 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3645 getrusage(RUSAGE_SELF, used);
3646 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3649 getrusage(RUSAGE_SELF, used);
3650 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3657 void bt_poolaudit (BtMgr *mgr, char *type)
3659 BtLatchSet *latch, test[1];
3662 memset (test, 0, sizeof(test));
3664 if( memcmp (test, mgr->latchsets, sizeof(test)) )
3665 fprintf(stderr, "%s latchset zero overwritten\n", type);
3667 if( memcmp (test, mgr->leafsets, sizeof(test)) )
3668 fprintf(stderr, "%s leafset zero overwritten\n", type);
3670 for( entry = 0; ++entry < mgr->latchtotal; ) {
3671 latch = mgr->latchsets + entry;
3673 if( *latch->modify->value )
3674 fprintf(stderr, "%s latchset %d modifylocked for page %d\n", type, entry, latch->page_no);
3676 if( latch->pin & ~CLOCK_bit )
3677 fprintf(stderr, "%s latchset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3680 for( entry = 0; ++entry < mgr->leaftotal; ) {
3681 latch = mgr->leafsets + entry;
3683 if( *latch->modify->value )
3684 fprintf(stderr, "%s leafset %d modifylocked for page %d\n", type, entry, latch->page_no);
3686 if( latch->pin & ~CLOCK_bit )
3687 fprintf(stderr, "%s leafset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3700 // standalone program to index file of keys
3701 // then list them onto std-out
3704 void *index_file (void *arg)
3706 uint __stdcall index_file (void *arg)
3709 int line = 0, found = 0, cnt = 0, cachecnt, idx;
3710 uid next, page_no = LEAF_page; // start on first page of leaves
3711 int ch, len = 0, slot, type = 0;
3712 unsigned char key[BT_maxkey];
3713 unsigned char buff[65536];
3714 uint nxt = sizeof(buff);
3715 ThreadArg *args = arg;
3724 bt = bt_open (args->mgr, args->main);
3725 page = (BtPage)buff;
3727 if( args->idx < strlen (args->type) )
3728 ch = args->type[args->idx];
3730 ch = args->type[strlen(args->type) - 1];
3742 if( type == Delete )
3743 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3745 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3747 if( type == Delete )
3748 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3750 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3752 if( in = fopen (args->infile, "rb") )
3753 while( ch = getc(in), ch != EOF )
3759 if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3760 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3766 memcpy (buff + nxt, key + 10, len - 10);
3768 buff[nxt] = len - 10;
3770 memcpy (buff + nxt, key, 10);
3773 slotptr(page,++cnt)->off = nxt;
3774 slotptr(page,cnt)->type = type;
3777 if( cnt < args->num )
3784 if( bt_atomictxn (bt, page) )
3785 fprintf(stderr, "Error %d Line: %d by %d source: %d\n", bt->mgr->err, bt->mgr->line, bt->mgr->err_thread, line), exit(0);
3790 else if( len < BT_maxkey )
3792 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3796 fprintf(stderr, "started indexing for %s\n", args->infile);
3797 if( in = fopen (args->infile, "r") )
3798 while( ch = getc(in), ch != EOF )
3803 if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3804 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3807 else if( len < BT_maxkey )
3809 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3813 fprintf(stderr, "started finding keys for %s\n", args->infile);
3814 if( in = fopen (args->infile, "rb") )
3815 while( ch = getc(in), ch != EOF )
3819 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3821 else if( bt->mgr->err )
3822 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3825 else if( len < BT_maxkey )
3827 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3831 fprintf(stderr, "started forward scan\n");
3832 if( bt_startkey (bt, NULL, 0) )
3833 fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3835 while( bt_nextkey (bt) ) {
3836 if( bt->phase == 1 ) {
3837 len = bt->mainkey->len;
3839 if( bt->mainnode->type == Duplicate )
3842 fwrite (bt->mainkey->key, len, 1, stdout);
3843 fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3845 len = bt->cachekey->len;
3847 if( bt->cachenode->type == Duplicate )
3850 fwrite (bt->cachekey->key, len, 1, stdout);
3851 fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3854 fputc ('\n', stdout);
3858 bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3859 bt_unpinlatch (bt->cacheset->latch, bt->thread_no, __LINE__);
3861 bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3862 bt_unpinlatch (bt->mainset->latch, bt->thread_no, __LINE__);
3864 fprintf(stderr, " Total keys read %d\n", cnt);
3868 fprintf(stderr, "started reverse scan\n");
3869 if( bt_lastkey (bt) )
3870 fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3872 while( bt_prevkey (bt) ) {
3873 if( bt->phase == 1 ) {
3874 len = bt->mainkey->len;
3876 if( bt->mainnode->type == Duplicate )
3879 fwrite (bt->mainkey->key, len, 1, stdout);
3880 fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3882 len = bt->cachekey->len;
3884 if( bt->cachenode->type == Duplicate )
3887 fwrite (bt->cachekey->key, len, 1, stdout);
3888 fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3891 fputc ('\n', stdout);
3895 bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3896 bt_unpinlatch (bt->cacheset->latch, bt->thread_no, __LINE__);
3898 bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3899 bt_unpinlatch (bt->mainset->latch, bt->thread_no, __LINE__);
3901 fprintf(stderr, " Total keys read %d\n", cnt);
3905 fprintf(stderr, "started counting LSM cache btree\n");
3906 page_no = LEAF_page;
3909 if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
3910 set->page = bt_mappage (bt->mgr, set->latch);
3914 bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3915 cnt += set->page->act;
3916 page_no = bt_getid (set->page->right);
3917 bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3918 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3921 cachecnt = --cnt; // remove stopper key
3924 fprintf(stderr, "started counting LSM main btree\n");
3925 page_no = LEAF_page;
3928 if( set->latch = bt_pinleaf (bt->main, page_no, bt->thread_no) )
3929 set->page = bt_mappage (bt->main, set->latch);
3933 bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3934 cnt += set->page->act;
3935 page_no = bt_getid (set->page->right);
3936 bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3937 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3940 cnt--; // remove stopper key
3942 fprintf(stderr, " cache keys counted %d\n", cachecnt);
3943 fprintf(stderr, " main keys counted %d\n", cnt);
3944 fprintf(stderr, " Total keys counted %d\n", cnt + cachecnt);
3956 typedef struct timeval timer;
3958 int main (int argc, char **argv)
3960 int idx, cnt, len, slot, err;
3968 uint mainleafpool = 0;
3969 uint mainleafxtra = 0;
3985 fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
3986 fprintf (stderr, " where idx_file is the name of the cache btree file\n");
3987 fprintf (stderr, " where main_file is the name of the main btree file\n");
3988 fprintf (stderr, " cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with a one character command for each input src_file. Commands can also be given with no input file\n");
3989 fprintf (stderr, " pagebits is the page size in bits for the cache btree\n");
3990 fprintf (stderr, " leafbits is the number of xtra bits for a leaf page\n");
3991 fprintf (stderr, " poolsize is the number of pages in buffer pool for the cache btree\n");
3992 fprintf (stderr, " leafpool is the number of leaf pages in leaf pool for the cache btree\n");
3993 fprintf (stderr, " txnsize = n to block transactions into n units, or zero for no transactions\n");
3994 fprintf (stderr, " redopages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3995 fprintf (stderr, " mainbits is the page size of the main btree in bits\n");
3996 fprintf (stderr, " mainpool is the number of main pages in the main buffer pool\n");
3997 fprintf (stderr, " mainleaf is the number of main pages in the main leaf pool\n");
3998 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
4002 start = getCpuTime(0);
4005 bits = atoi(argv[4]);
4008 leafxtra = atoi(argv[5]);
4011 poolsize = atoi(argv[6]);
4014 fprintf (stderr, "no page pool\n"), exit(1);
4017 leafpool = atoi(argv[7]);
4020 num = atoi(argv[8]);
4023 redopages = atoi(argv[9]);
4025 if( redopages > 65535 )
4026 fprintf (stderr, "Recovery buffer too large\n"), exit(1);
4029 mainbits = atoi(argv[10]);
4032 mainleafxtra = atoi(argv[11]);
4035 mainpool = atoi(argv[12]);
4038 mainleafpool = atoi(argv[13]);
4042 threads = malloc (cnt * sizeof(pthread_t));
4044 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
4046 args = malloc ((cnt + 1) * sizeof(ThreadArg));
4048 mgr = bt_mgr (argv[1], bits, leafxtra, poolsize, leafpool, redopages);
4051 fprintf(stderr, "Index Open Error %s\n", argv[1]);
4056 main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool, mainleafpool, 0);
4059 fprintf(stderr, "Index Open Error %s\n", argv[2]);
4068 for( idx = 0; idx < cnt; idx++ ) {
4069 args[idx].infile = argv[idx + 14];
4070 args[idx].type = argv[3];
4071 args[idx].main = main;
4072 args[idx].mgr = mgr;
4073 args[idx].num = num;
4074 args[idx].idx = idx;
4076 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
4077 fprintf(stderr, "Error creating thread %d\n", err);
4079 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
4083 args[0].infile = argv[idx + 10];
4084 args[0].type = argv[3];
4085 args[0].main = main;
4092 // wait for termination
4095 for( idx = 0; idx < cnt; idx++ )
4096 pthread_join (threads[idx], NULL);
4098 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
4100 for( idx = 0; idx < cnt; idx++ )
4101 CloseHandle(threads[idx]);
4103 bt_poolaudit(mgr, "cache");
4106 bt_poolaudit(main, "main");
4108 fprintf(stderr, "cache %lld leaves %lld upper %d reads %d writes %d found\n", mgr->pagezero->leafpages, mgr->pagezero->upperpages, mgr->reads, mgr->writes, mgr->found);
4110 fprintf(stderr, "main %lld leaves %lld upper %d reads %d writes %d found\n", main->pagezero->leafpages, main->pagezero->upperpages, main->reads, main->writes, main->found);
4116 elapsed = getCpuTime(0) - start;
4117 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4118 elapsed = getCpuTime(1);
4119 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4120 elapsed = getCpuTime(2);
4121 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4124 BtKey *bt_key (BtPage page, uint slot)
4126 return keyptr(page,slot);
4129 BtSlot *bt_slot (BtPage page, uint slot)
4131 return slotptr(page,slot);