1 // btree version threadskv10e futex version
2 // with reworked bt_deletekey code,
3 // phase-fair re-entrant reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // redo log for failure recovery
10 // LSM B-trees for write optimization
11 // and variable sized leaf pages
15 // author: karl malbrain, malbrain@cal.berkeley.edu
18 This work, including the source code, documentation
19 and related data, is placed into the public domain.
21 The orginal author is Karl Malbrain.
23 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
24 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
25 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
26 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
27 RESULTING FROM THE USE, MODIFICATION, OR
28 REDISTRIBUTION OF THIS SOFTWARE.
31 // Please see the project home page for documentation
32 // code.google.com/p/high-concurrency-btree
34 #define _FILE_OFFSET_BITS 64
35 #define _LARGEFILE64_SOURCE
39 #include <xmmintrin.h>
40 #include <linux/futex.h>
55 #define WIN32_LEAN_AND_MEAN
69 typedef unsigned long long uid;
70 typedef unsigned long long logseqno;
73 typedef unsigned long long off64_t;
74 typedef unsigned short ushort;
75 typedef unsigned int uint;
78 #define BT_ro 0x6f72 // ro
79 #define BT_rw 0x7772 // rw
81 #define BT_maxbits 26 // maximum page size in bits
82 #define BT_minbits 9 // minimum page size in bits
83 #define BT_minpage (1 << BT_minbits) // minimum page size
84 #define BT_maxpage (1 << BT_maxbits) // maximum page size
86 // BTree page number constants
87 #define ALLOC_page 0 // allocation page
88 #define ROOT_page 1 // root of the btree
89 #define LEAF_page 2 // first page of leaves
91 // Number of levels to create in a new BTree
96 There are six lock types for each node in four independent sets:
97 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
98 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
99 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
100 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
101 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
102 6. (set 4) LinkModification: Exclusive. Update of a node's left link is underway. Incompatible with another LinkModification.
115 volatile uint value[1];
118 // definition for reader/writer reentrant lock implementation
124 ushort dup; // re-entrant locks
125 ushort tid; // owner thread-no
126 uint line; // owner line #
129 // hash table entries
132 BtMutexLatch latch[1];
133 uint entry; // Latch table entry at head of chain
136 // latch manager table structure
139 uid page_no; // latch set page number
140 BtMutexLatch modify[1]; // modify entry lite latch
141 RWLock readwr[1]; // read/write page lock
142 RWLock access[1]; // Access Intent/Page delete
143 RWLock parent[1]; // Posting of fence key in parent
144 RWLock link[1]; // left link update in progress
145 uint split; // right split page atomic insert
146 uint next; // next entry in hash table chain
147 uint prev; // prev entry in hash table chain
148 ushort pin; // number of accessing threads
149 unsigned char dirty; // page in cache is dirty
150 unsigned char leaf; // page in cache is a leaf
153 // Define the length of the page record numbers
157 // Page key slot definition.
159 // Keys are marked dead, but remain on the page until
160 // it cleanup is called. The fence key (highest key) for
161 // a leaf page is always present, even after cleanup.
165 // In addition to the Unique keys that occupy slots
166 // there are Librarian and Duplicate key
167 // slots occupying the key slot array.
169 // The Librarian slots are dead keys that
170 // serve as filler, available to add new Unique
171 // or Dup slots that are inserted into the B-tree.
173 // The Duplicate slots have had their key bytes extended
174 // by 6 bytes to contain a binary duplicate key uniqueifier.
184 uint off:BT_maxbits; // page offset for key start
185 uint type:3; // type of slot
186 uint dead:1; // set for deleted slot
189 // The key structure occupies space at the upper end of
190 // each page. It's a length byte followed by the key
194 unsigned char len; // this can be changed to a ushort or uint
195 unsigned char key[0];
198 // the value structure also occupies space at the upper
199 // end of the page. Each key is immediately followed by a value.
202 unsigned char len; // this can be changed to a ushort or uint
203 unsigned char value[0];
206 #define BT_maxkey 255 // maximum number of bytes in a key
207 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
209 // The first part of an index page.
210 // It is immediately followed
211 // by the BtSlot array of keys.
213 typedef struct BtPage_ {
214 uint cnt; // count of keys in page
215 uint act; // count of active keys
216 uint min; // next key offset
217 uint garbage; // page garbage in bytes
218 unsigned char lvl; // level of page
219 unsigned char free; // page is on free chain
220 unsigned char kill; // page is being deleted
221 unsigned char nopromote; // page is being constructed
222 unsigned char filler1[6]; // padding to multiple of 8 bytes
223 unsigned char right[BtId]; // page number to right
224 unsigned char left[BtId]; // page number to left
225 unsigned char filler2[2]; // padding to multiple of 8 bytes
226 logseqno lsn; // log sequence number applied
227 uid page_no; // this page number
230 // The loadpage interface object
233 BtPage page; // current page pointer
234 BtLatchSet *latch; // current page latch set
237 // structure for latch manager on ALLOC_page
240 struct BtPage_ alloc[1]; // next page_no in right ptr
241 unsigned char freechain[BtId]; // head of free page_nos chain
242 unsigned char leafchain[BtId]; // head of leaf page_nos chain
243 unsigned long long leafpages; // number of active leaf pages
244 uint redopages; // number of redo pages in file
245 unsigned char leaf_xtra; // leaf page size in xtra bits
246 unsigned char page_bits; // base page size in bits
249 // The object structure for Btree access
252 uint page_size; // base page size
253 uint page_bits; // base page size in bits
254 uint leaf_xtra; // leaf xtra bits
260 BtPageZero *pagezero; // mapped allocation page
261 BtHashEntry *hashtable; // the buffer pool hash table entries
262 BtHashEntry *leaftable; // the buffer pool hash table entries
263 BtLatchSet *latchsets; // mapped latch set from buffer pool
264 BtLatchSet *leafsets; // mapped latch set from buffer pool
265 unsigned char *pagepool; // mapped to the buffer pool pages
266 unsigned char *leafpool; // mapped to the leaf pool pages
267 unsigned char *redobuff; // mapped recovery buffer pointer
268 logseqno lsn, flushlsn; // current & first lsn flushed
269 BtMutexLatch redo[1]; // redo area lite latch
270 BtMutexLatch lock[1]; // allocation area lite latch
271 BtMutexLatch maps[1]; // mapping segments lite latch
272 ushort thread_no[1]; // highest thread number issued
273 ushort err_thread; // error thread number
274 uint nlatchpage; // size of buffer pool & latchsets
275 uint latchtotal; // number of page latch entries
276 uint latchhash; // number of latch hash table slots
277 uint latchvictim; // next latch entry to examine
278 uint nleafpage; // size of leaf pool & leafsets
279 uint leaftotal; // number of leaf latch entries
280 uint leafhash; // number of leaf hash table slots
281 uint leafvictim; // next leaf entry to examine
282 uint leafpromote; // next leaf entry to promote
283 uint redopage; // page number of redo buffer
284 uint redolast; // last msync size of recovery buff
285 uint redoend; // eof/end element in recovery buff
286 int err; // last error
287 int line; // last error line no
288 int found; // number of keys found by delete
289 int reads, writes; // number of reads and writes
291 HANDLE halloc; // allocation handle
292 HANDLE hpool; // buffer pool handle
294 uint segments; // number of memory mapped segments
295 unsigned char *pages[64000];// memory mapped segments of b-tree
299 BtMgr *mgr; // buffer manager for entire process
300 BtMgr *main; // buffer manager for main btree
301 BtPage cursor; // cached page frame for start/next
302 ushort thread_no; // thread number
303 unsigned char key[BT_keyarray]; // last found complete key
306 // atomic txn structures
309 logseqno reqlsn; // redo log seq no required
310 uint entry:31; // latch table entry number
311 uint reuse:1; // reused previous page
314 // Catastrophic errors
328 #define CLOCK_bit 0x8000
330 // recovery manager entry types
333 BTRM_eof = 0, // rest of buffer is emtpy
334 BTRM_add, // add a unique key-value to btree
335 BTRM_dup, // add a duplicate key-value to btree
336 BTRM_del, // delete a key-value from btree
337 BTRM_upd, // update a key with a new value
338 BTRM_new, // allocate a new empty page
339 BTRM_old // reuse an old empty page
342 // recovery manager entry
343 // structure followed by BtKey & BtVal
346 logseqno reqlsn; // log sequence number required
347 logseqno lsn; // log sequence number for entry
348 uint len; // length of entry
349 unsigned char type; // type of entry
350 unsigned char lvl; // level of btree entry pertains to
355 extern void bt_close (BtDb *bt);
356 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
357 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
358 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
359 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
360 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
361 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
362 extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
364 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
366 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
367 extern uint bt_nextkey (BtDb *bt, uint slot);
368 extern uint bt_prevkey (BtDb *db, uint slot);
369 extern uint bt_lastkey (BtDb *db);
372 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
373 extern void bt_mgrclose (BtMgr *mgr);
374 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
375 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
377 // atomic transaction functions
378 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
379 BTERR bt_txnpromote (BtDb *bt);
381 // The page is allocated from low and hi ends.
382 // The key slots are allocated from the bottom,
383 // while the text and value of the key
384 // are allocated from the top. When the two
385 // areas meet, the page is split into two.
387 // A key consists of a length byte, two bytes of
388 // index number (0 - 65535), and up to 253 bytes
391 // Associated with each key is a value byte string
392 // containing any value desired.
394 // The b-tree root is always located at page 1.
395 // The first leaf page of level zero is always
396 // located on page 2.
398 // The b-tree pages are linked with next
399 // pointers to facilitate enumerators,
400 // and provide for concurrency.
402 // When to root page fills, it is split in two and
403 // the tree height is raised by a new root at page
404 // one with two keys.
406 // Deleted keys are marked with a dead bit until
407 // page cleanup. The fence key for a leaf node is
410 // To achieve maximum concurrency one page is locked at a time
411 // as the tree is traversed to find leaf key in question. The right
412 // page numbers are used in cases where the page is being split,
415 // Page 0 is dedicated to lock for new page extensions,
416 // and chains empty pages together for reuse. It also
417 // contains the latch manager hash table.
419 // The ParentModification lock on a node is obtained to serialize posting
420 // or changing the fence key for a node.
422 // Empty pages are chained together through the ALLOC page and reused.
424 // Access macros to address slot and key values from the page
425 // Page slots use 1 based indexing.
427 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
428 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
429 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
431 void bt_putid(unsigned char *dest, uid id)
436 dest[i] = (unsigned char)id, id >>= 8;
439 uid bt_getid(unsigned char *src)
444 for( i = 0; i < BtId; i++ )
445 id <<= 8, id |= *src++;
450 // lite weight spin lock Latch Manager
452 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
454 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
457 void bt_mutexlock(BtMutexLatch *latch)
461 while( __sync_val_compare_and_swap (latch->value, 0, 1) )
468 int bt_mutextry(BtMutexLatch *latch)
470 return !__sync_val_compare_and_swap (latch->value, 0, 1);
473 void bt_releasemutex(BtMutexLatch *latch)
475 if( !__sync_lock_test_and_set (latch->value, 0) )
479 void bt_mutexlock(BtMutexLatch *latch)
483 for( idx = 0; idx < 100; idx++ )
484 if( !__sync_val_compare_and_swap (latch->value, 0, 1) )
487 while( __sync_lock_test_and_set (latch->value, 2) )
488 sys_futex ((uint *)latch->value, FUTEX_WAIT_PRIVATE, 2, NULL, NULL, 0);
491 int bt_mutextry(BtMutexLatch *latch)
493 return !__sync_val_compare_and_swap (latch->value, 0, 1);
496 void bt_releasemutex(BtMutexLatch *latch)
500 if( *latch->value == 2 )
502 else if( __sync_lock_test_and_set (latch->value, 0) == 1 )
505 if( latch->value[0] )
506 if( __sync_val_compare_and_swap (latch->value, 1, 2) )
509 // for( idx = 0; idx < 200; idx++ )
510 // if( latch->value[0] )
511 // if( __sync_val_compare_and_swap (latch->value, 1, 2) )
514 sys_futex( (uint *)latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
517 // reader/writer lock implementation
519 void WriteLock (RWLock *lock, ushort tid, uint line)
521 if( lock->tid == tid ) {
525 bt_mutexlock (lock->xcl);
526 bt_mutexlock (lock->wrt);
527 bt_releasemutex (lock->xcl);
534 void WriteRelease (RWLock *lock)
541 bt_releasemutex (lock->wrt);
544 void ReadLock (RWLock *lock, ushort tid)
546 bt_mutexlock (lock->xcl);
548 if( !__sync_fetch_and_add (&lock->readers, 1) )
549 bt_mutexlock (lock->wrt);
551 bt_releasemutex (lock->xcl);
554 void ReadRelease (RWLock *lock)
556 if( __sync_fetch_and_sub (&lock->readers, 1) == 1 )
557 bt_releasemutex (lock->wrt);
560 // recovery manager -- flush dirty pages
562 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
564 uint cnt3 = 0, cnt2 = 0, cnt = 0;
569 // flush dirty pool pages to the btree
571 fprintf(stderr, "Start flushlsn ");
572 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
573 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
574 latch = mgr->latchsets + entry;
575 bt_mutexlock (latch->modify);
576 bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
579 bt_writepage(mgr, page, latch->page_no, 0);
580 latch->dirty = 0, cnt++;
582 if( latch->pin & ~CLOCK_bit )
584 bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
585 bt_releasemutex (latch->modify);
587 for( entry = 1; entry < mgr->leaftotal; entry++ ) {
588 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
589 latch = mgr->leafsets + entry;
590 bt_mutexlock (latch->modify);
591 bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
594 bt_writepage(mgr, page, latch->page_no, 1);
595 latch->dirty = 0, cnt++;
597 if( latch->pin & ~CLOCK_bit )
599 bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
600 bt_releasemutex (latch->modify);
602 fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
603 fprintf(stderr, "begin sync");
604 for( segment = 0; segment < mgr->segments; segment++ )
605 if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
606 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
607 fprintf(stderr, " end sync\n");
610 // recovery manager -- process current recovery buff on startup
611 // this won't do much if previous session was properly closed.
613 BTERR bt_recoveryredo (BtMgr *mgr)
620 hdr = (BtLogHdr *)mgr->redobuff;
621 mgr->flushlsn = hdr->lsn;
624 hdr = (BtLogHdr *)(mgr->redobuff + offset);
625 switch( hdr->type ) {
629 case BTRM_add: // add a unique key-value to btree
631 case BTRM_dup: // add a duplicate key-value to btree
632 case BTRM_del: // delete a key-value from btree
633 case BTRM_upd: // update a key with a new value
634 case BTRM_new: // allocate a new empty page
635 case BTRM_old: // reuse an old empty page
641 // recovery manager -- append new entry to recovery log
642 // flush dirty pages to disk when it overflows.
644 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
646 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
647 uint amt = sizeof(BtLogHdr);
651 bt_mutexlock (mgr->redo);
654 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
656 // see if new entry fits in buffer
657 // flush and reset if it doesn't
659 if( amt > size - mgr->redoend ) {
660 mgr->flushlsn = mgr->lsn;
661 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
662 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
665 bt_flushlsn(mgr, thread_no);
668 // fill in new entry & either eof or end block
670 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
675 hdr->lsn = ++mgr->lsn;
679 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
680 memset (eof, 0, sizeof(BtLogHdr));
682 // fill in key and value
685 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
686 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
689 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
690 memset (eof, 0, sizeof(BtLogHdr));
693 last = mgr->redolast & ~0xfff;
696 if( end - last + sizeof(BtLogHdr) >= 32768 )
697 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
698 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
702 bt_releasemutex(mgr->redo);
706 // recovery manager -- append transaction to recovery log
707 // flush dirty pages to disk when it overflows.
709 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
711 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
712 uint amt = 0, src, type;
719 // determine amount of redo recovery log space required
721 for( src = 0; src++ < source->cnt; ) {
722 key = keyptr(source,src);
723 val = valptr(source,src);
724 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
725 amt += sizeof(BtLogHdr);
728 bt_mutexlock (mgr->redo);
730 // see if new entry fits in buffer
731 // flush and reset if it doesn't
733 if( amt > size - mgr->redoend ) {
734 mgr->flushlsn = mgr->lsn;
735 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
736 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
739 bt_flushlsn (mgr, thread_no);
742 // assign new lsn to transaction
746 // fill in new entries
748 for( src = 0; src++ < source->cnt; ) {
749 key = keyptr(source, src);
750 val = valptr(source, src);
752 switch( slotptr(source, src)->type ) {
764 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
765 amt += sizeof(BtLogHdr);
767 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
773 // fill in key and value
775 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
776 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
781 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
782 memset (eof, 0, sizeof(BtLogHdr));
785 last = mgr->redolast & ~0xfff;
788 if( end - last + sizeof(BtLogHdr) >= 32768 )
789 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
790 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
794 bt_releasemutex(mgr->redo);
798 // sync a single btree page to disk
800 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
802 uint segment = latch->page_no >> 16;
803 uint page_size = mgr->page_size;
806 if( bt_writepage (mgr, page, latch->page_no, latch->leaf) )
810 page_size <<= mgr->leaf_xtra;
812 perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
814 if( msync (perm, page_size, MS_SYNC) < 0 )
815 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
821 // read page into buffer pool from permanent location in Btree file
823 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
825 int flag = PROT_READ | PROT_WRITE;
826 uint page_size = mgr->page_size;
827 uint off = 0, segment, fragment;
831 page_size <<= mgr->leaf_xtra;
833 fragment = page_no & 0xffff;
834 segment = page_no >> 16;
837 while( off < page_size ) {
839 segment++, fragment = 0;
841 if( segment < mgr->segments ) {
842 perm = mgr->pages[segment] + (fragment << mgr->page_bits);
844 memcpy ((unsigned char *)page + off, perm, mgr->page_size);
845 off += mgr->page_size;
850 bt_mutexlock (mgr->maps);
852 if( segment < mgr->segments ) {
853 bt_releasemutex (mgr->maps);
857 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
860 bt_releasemutex (mgr->maps);
863 if( !leaf && !page->lvl )
865 if( leaf && page->lvl )
870 // write page to permanent location in Btree file
872 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
874 int flag = PROT_READ | PROT_WRITE;
875 uint page_size = mgr->page_size;
876 uint off = 0, segment, fragment;
879 if( !leaf && !page->lvl )
881 if( leaf && page->lvl )
883 //if( !page->lvl && mgr->leaf_xtra == 8 )
884 //fprintf(stderr, "WrPage %d\n", (uint)page_no);
886 page_size <<= mgr->leaf_xtra;
888 fragment = page_no & 0xffff;
889 segment = page_no >> 16;
892 while( off < page_size ) {
894 segment++, fragment = 0;
896 if( segment < mgr->segments ) {
897 perm = mgr->pages[segment] + (fragment << mgr->page_bits);
898 memcpy (perm, (unsigned char *)page + off, mgr->page_size);
899 off += mgr->page_size;
904 bt_mutexlock (mgr->maps);
906 if( segment < mgr->segments ) {
907 bt_releasemutex (mgr->maps);
911 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
913 bt_releasemutex (mgr->maps);
919 // set CLOCK bit in latch
920 // decrement pin count
922 void bt_unpinlatch (BtLatchSet *latch, ushort thread_no, uint line)
924 bt_mutexlock(latch->modify);
925 if( !(latch->pin & ~CLOCK_bit) )
927 latch->pin |= CLOCK_bit;
930 bt_releasemutex(latch->modify);
933 // return the btree cached page address
935 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
937 uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets);
941 page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
943 page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
950 // return next available leaf entry
951 // and with latch entry locked
953 uint bt_availleaf (BtMgr *mgr)
960 entry = __sync_fetch_and_add (&mgr->leafvictim, 1) + 1;
962 entry = _InterlockedIncrement (&mgr->leafvictim);
964 entry %= mgr->leaftotal;
969 latch = mgr->leafsets + entry;
971 if( !bt_mutextry(latch->modify) )
974 // return this entry if it is not pinned
979 // if the CLOCK bit is set
982 latch->pin &= ~CLOCK_bit;
983 bt_releasemutex(latch->modify);
987 // return next available latch entry
988 // and with latch entry locked
990 uint bt_availnext (BtMgr *mgr)
997 entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
999 entry = _InterlockedIncrement (&mgr->latchvictim);
1001 entry %= mgr->latchtotal;
1006 latch = mgr->latchsets + entry;
1008 if( !bt_mutextry(latch->modify) )
1011 // return this entry if it is not pinned
1016 // if the CLOCK bit is set
1017 // reset it to zero.
1019 latch->pin &= ~CLOCK_bit;
1020 bt_releasemutex(latch->modify);
1024 // pin leaf in leaf buffer pool
1025 // return with latchset pinned
1027 BtLatchSet *bt_pinleaf (BtMgr *mgr, uid page_no, ushort thread_no)
1029 uint hashidx = page_no % mgr->leafhash;
1034 // try to find our entry
1036 bt_mutexlock(mgr->leaftable[hashidx].latch);
1038 if( entry = mgr->leaftable[hashidx].entry ) do
1040 latch = mgr->leafsets + entry;
1041 if( page_no == latch->page_no )
1043 } while( entry = latch->next );
1045 // found our entry: increment pin
1048 latch = mgr->leafsets + entry;
1049 bt_mutexlock(latch->modify);
1050 latch->pin |= CLOCK_bit;
1054 bt_releasemutex(latch->modify);
1055 bt_releasemutex(mgr->leaftable[hashidx].latch);
1059 // find and reuse unpinned entry
1063 entry = bt_availleaf (mgr);
1064 latch = mgr->leafsets + entry;
1065 if( latch->page_no )
1069 idx = latch->page_no % mgr->leafhash;
1071 // if latch is on a different hash chain
1072 // unlink from the old page_no chain
1074 if( latch->page_no )
1075 if( idx != hashidx ) {
1077 // skip over this entry if latch not available
1079 if( !bt_mutextry (mgr->leaftable[idx].latch) ) {
1080 bt_releasemutex(latch->modify);
1085 mgr->leafsets[latch->prev].next = latch->next;
1087 mgr->leaftable[idx].entry = latch->next;
1090 mgr->leafsets[latch->next].prev = latch->prev;
1092 bt_releasemutex (mgr->leaftable[idx].latch);
1095 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1097 // update permanent page area in btree from buffer pool
1098 // no read-lock is required since page is not pinned.
1100 if( latch->page_no )
1104 if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) )
1105 return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1109 if( bt_readpage (mgr, page, page_no, 1) )
1110 return mgr->line = __LINE__, NULL;
1112 // link page as head of hash table chain
1113 // if this is a never before used entry,
1114 // or it was previously on a different
1115 // hash table chain. Otherwise, just
1116 // leave it in its current hash table
1119 if( !latch->page_no || hashidx != idx ) {
1120 if( latch->next = mgr->leaftable[hashidx].entry )
1121 mgr->leafsets[latch->next].prev = entry;
1123 mgr->leaftable[hashidx].entry = entry;
1127 // fill in latch structure
1129 latch->pin = CLOCK_bit | 1;
1130 latch->page_no = page_no;
1133 bt_releasemutex (latch->modify);
1134 bt_releasemutex (mgr->leaftable[hashidx].latch);
1138 // pin page in non-leaf buffer pool
1139 // return with latchset pinned
1141 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, ushort thread_no)
1143 uint hashidx = page_no % mgr->latchhash;
1148 // try to find our entry
1150 bt_mutexlock(mgr->hashtable[hashidx].latch);
1152 if( entry = mgr->hashtable[hashidx].entry ) do
1154 latch = mgr->latchsets + entry;
1155 if( page_no == latch->page_no )
1157 } while( entry = latch->next );
1159 // found our entry: increment pin
1162 latch = mgr->latchsets + entry;
1163 bt_mutexlock(latch->modify);
1164 latch->pin |= CLOCK_bit;
1166 bt_releasemutex(latch->modify);
1167 bt_releasemutex(mgr->hashtable[hashidx].latch);
1171 // find and reuse unpinned entry
1175 entry = bt_availnext (mgr);
1176 latch = mgr->latchsets + entry;
1178 idx = latch->page_no % mgr->latchhash;
1180 // if latch is on a different hash chain
1181 // unlink from the old page_no chain
1183 if( latch->page_no )
1184 if( idx != hashidx ) {
1186 // skip over this entry if latch not available
1188 if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1189 bt_releasemutex(latch->modify);
1194 mgr->latchsets[latch->prev].next = latch->next;
1196 mgr->hashtable[idx].entry = latch->next;
1199 mgr->latchsets[latch->next].prev = latch->prev;
1201 bt_releasemutex (mgr->hashtable[idx].latch);
1204 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1206 // update permanent page area in btree from buffer pool
1207 // no read-lock is required since page is not pinned.
1209 if( latch->page_no )
1213 if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1214 return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1218 if( bt_readpage (mgr, page, page_no, 0) )
1219 return mgr->line = __LINE__, NULL;
1221 // link page as head of hash table chain
1222 // if this is a never before used entry,
1223 // or it was previously on a different
1224 // hash table chain. Otherwise, just
1225 // leave it in its current hash table
1228 if( !latch->page_no || hashidx != idx ) {
1229 if( latch->next = mgr->hashtable[hashidx].entry )
1230 mgr->latchsets[latch->next].prev = entry;
1232 mgr->hashtable[hashidx].entry = entry;
1236 // fill in latch structure
1238 latch->pin = CLOCK_bit | 1;
1239 latch->page_no = page_no;
1242 bt_releasemutex (latch->modify);
1243 bt_releasemutex (mgr->hashtable[hashidx].latch);
1247 void bt_mgrclose (BtMgr *mgr)
1255 // flush previously written dirty pages
1256 // and write recovery buffer to disk
1258 fdatasync (mgr->idx);
1260 if( mgr->redoend ) {
1261 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1262 memset (eof, 0, sizeof(BtLogHdr));
1265 // write remaining dirty pool pages to the btree
1267 for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1268 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1269 latch = mgr->latchsets + slot;
1271 if( latch->dirty ) {
1272 bt_writepage(mgr, page, latch->page_no, 0);
1273 latch->dirty = 0, num++;
1277 // write remaining dirty leaf pages to the btree
1279 for( slot = 1; slot < mgr->leaftotal; slot++ ) {
1280 page = (BtPage)(((uid)slot << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1281 latch = mgr->leafsets + slot;
1283 if( latch->dirty ) {
1284 bt_writepage(mgr, page, latch->page_no, 1);
1285 latch->dirty = 0, num++;
1289 // clear redo recovery buffer on disk.
1291 if( mgr->pagezero->redopages ) {
1292 eof = (BtLogHdr *)mgr->redobuff;
1293 memset (eof, 0, sizeof(BtLogHdr));
1294 eof->lsn = mgr->lsn;
1295 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1296 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1299 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1302 while( mgr->segments )
1303 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1305 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1306 munmap (mgr->leafpool, (uid)mgr->nleafpage << mgr->page_bits);
1307 munmap (mgr->pagezero, mgr->page_size);
1309 FlushViewOfFile(mgr->pagezero, 0);
1310 UnmapViewOfFile(mgr->pagezero);
1311 UnmapViewOfFile(mgr->pagepool);
1312 CloseHandle(mgr->halloc);
1313 CloseHandle(mgr->hpool);
1319 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1320 FlushFileBuffers(mgr->idx);
1321 CloseHandle(mgr->idx);
1326 // close and release memory
1328 void bt_close (BtDb *bt)
1335 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1340 // open/create new btree buffer manager
1342 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1343 // size of page pool (e.g. 262144) and leaf pool
1345 BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax, uint leafmax, uint redopages)
1347 uint lvl, attr, last, slot, idx;
1348 uint nlatchpage, latchhash;
1349 uint nleafpage, leafhash;
1350 unsigned char value[BtId];
1351 int flag, initit = 0;
1352 BtPageZero *pagezero;
1360 // determine sanity of page size and buffer pool
1362 if( leafxtra + pagebits > BT_maxbits )
1363 fprintf (stderr, "pagebits + leafxtra > maxbits\n"), exit(1);
1365 if( pagebits < BT_minbits )
1366 fprintf (stderr, "pagebits < minbits\n"), exit(1);
1369 mgr = calloc (1, sizeof(BtMgr));
1371 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1373 if( mgr->idx == -1 ) {
1374 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1375 return free(mgr), NULL;
1378 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1379 attr = FILE_ATTRIBUTE_NORMAL;
1380 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1382 if( mgr->idx == INVALID_HANDLE_VALUE ) {
1383 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1384 return GlobalFree(mgr), NULL;
1389 pagezero = valloc (BT_maxpage);
1392 // read minimum page size to get root info
1393 // to support raw disk partition files
1394 // check if page_bits == 0 on the disk.
1396 if( size = lseek (mgr->idx, 0L, 2) )
1397 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1398 if( pagezero->page_bits ) {
1399 pagebits = pagezero->page_bits;
1400 leafxtra = pagezero->leaf_xtra;
1404 return free(mgr), free(pagezero), NULL;
1408 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1409 size = GetFileSize(mgr->idx, amt);
1411 if( size || *amt ) {
1412 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1413 return bt_mgrclose (mgr), NULL;
1414 pagebits = pagezero->page_bits;
1415 leafxtra = pagezero->leaf_xtra;
1420 mgr->page_size = 1 << pagebits;
1421 mgr->page_bits = pagebits;
1422 mgr->leaf_xtra = leafxtra;
1424 // calculate number of latch hash table entries
1426 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1428 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1429 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1) >> mgr->page_bits;
1430 mgr->latchtotal = nodemax;
1432 // calculate number of leaf hash table entries
1434 mgr->nleafpage = ((uid)leafmax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1436 mgr->nleafpage += leafmax << leafxtra; // size of the leaf pool in pages
1437 mgr->nleafpage += (sizeof(BtLatchSet) * leafmax + mgr->page_size - 1) >> mgr->page_bits;
1438 mgr->leaftotal = leafmax;
1440 mgr->redopage = LEAF_page + (1 << leafxtra);
1445 // initialize an empty b-tree with latch page, root page, page of leaves
1446 // and page(s) of latches and page pool cache
1448 ftruncate (mgr->idx, (mgr->redopage + pagezero->redopages) << mgr->page_bits);
1449 memset (pagezero, 0, 1 << pagebits);
1450 pagezero->alloc->lvl = MIN_lvl - 1;
1451 pagezero->redopages = redopages;
1452 pagezero->page_bits = mgr->page_bits;
1453 pagezero->leaf_xtra = leafxtra;
1455 bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages);
1456 pagezero->leafpages = 1;
1458 // initialize left-most LEAF page in
1459 // alloc->left and count of active leaf pages.
1461 bt_putid (pagezero->alloc->left, LEAF_page);
1463 if( bt_writepage (mgr, pagezero->alloc, 0, 0) ) {
1464 fprintf (stderr, "Unable to create btree page zero\n");
1465 return bt_mgrclose (mgr), NULL;
1468 memset (pagezero, 0, 1 << pagebits);
1470 for( lvl=MIN_lvl; lvl--; ) {
1471 BtSlot *node = slotptr(pagezero->alloc, 1);
1472 node->off = mgr->page_size;
1475 node->off <<= mgr->leaf_xtra;
1477 node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1478 key = keyptr(pagezero->alloc, 1);
1479 key->len = 2; // create stopper key
1483 bt_putid(value, MIN_lvl - lvl + 1);
1484 val = valptr(pagezero->alloc, 1);
1485 val->len = lvl ? BtId : 0;
1486 memcpy (val->value, value, val->len);
1488 pagezero->alloc->min = node->off;
1489 pagezero->alloc->lvl = lvl;
1490 pagezero->alloc->cnt = 1;
1491 pagezero->alloc->act = 1;
1492 pagezero->alloc->page_no = MIN_lvl - lvl;
1494 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, !lvl) ) {
1495 fprintf (stderr, "Unable to create btree page\n");
1496 return bt_mgrclose (mgr), NULL;
1504 VirtualFree (pagezero, 0, MEM_RELEASE);
1507 // mlock the first segment of 64K pages
1509 flag = PROT_READ | PROT_WRITE;
1510 mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1513 if( mgr->pages[0] == MAP_FAILED ) {
1514 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1515 return bt_mgrclose (mgr), NULL;
1518 mgr->pagezero = (BtPageZero *)mgr->pages[0];
1519 mlock (mgr->pagezero, mgr->page_size);
1521 mgr->redobuff = mgr->pages[0] + (mgr->redopage << mgr->page_bits);
1522 mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1524 // allocate pool buffers
1526 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1527 if( mgr->pagepool == MAP_FAILED ) {
1528 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1529 return bt_mgrclose (mgr), NULL;
1532 mgr->leafpool = mmap (0, (uid)mgr->nleafpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1533 if( mgr->leafpool == MAP_FAILED ) {
1534 fprintf (stderr, "Unable to mmap anonymous leaf pool pages, error = %d\n", errno);
1535 return bt_mgrclose (mgr), NULL;
1538 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1539 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1540 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1542 mgr->leafsets = (BtLatchSet *)(mgr->leafpool + ((uid)mgr->leaftotal << mgr->page_bits << mgr->leaf_xtra));
1543 mgr->leaftable = (BtHashEntry *)(mgr->leafsets + mgr->leaftotal);
1544 mgr->leafhash = (mgr->leafpool + ((uid)mgr->nleafpage << mgr->page_bits) - (unsigned char *)mgr->leaftable) / sizeof(BtHashEntry);
1549 // open BTree access method
1550 // based on buffer manager
1552 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1554 BtDb *bt = malloc (sizeof(*bt));
1556 memset (bt, 0, sizeof(*bt));
1560 bt->cursor = valloc (mgr->page_size << mgr->leaf_xtra);
1562 bt->cursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
1565 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1567 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1572 // compare two keys, return > 0, = 0, or < 0
1573 // =0: keys are same
1576 // as the comparison value
1578 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1580 uint len1 = key1->len;
1583 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1594 // place write, read, or parent lock on requested page_no.
1596 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1600 ReadLock (latch->readwr, thread_no);
1604 //fprintf(stderr, "WrtRqst %d by %d at %d\n", (uint)latch->page_no, thread_no, line);
1605 WriteLock (latch->readwr, thread_no, line);
1607 //fprintf(stderr, "WrtLock %d by %d at %d\n", (uint)latch->page_no, thread_no, line);
1610 ReadLock (latch->access, thread_no);
1613 WriteLock (latch->access, thread_no, line);
1616 WriteLock (latch->parent, thread_no, line);
1619 WriteLock (latch->link, thread_no, line);
1624 // remove write, read, or parent lock on requested page
1626 void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1630 ReadRelease (latch->readwr);
1634 //fprintf(stderr, "Un Lock %d by %d at %d\n", (uint)latch->page_no, thread_no, line);
1635 WriteRelease (latch->readwr);
1638 ReadRelease (latch->access);
1641 WriteRelease (latch->access);
1644 WriteRelease (latch->parent);
1647 WriteRelease (latch->link);
1652 // allocate a new page
1653 // return with page latched, but unlocked.
1655 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_no)
1657 uint page_size = mgr->page_size, page_xtra = 0;
1658 unsigned char *freechain;
1662 freechain = mgr->pagezero->freechain;
1664 freechain = mgr->pagezero->leafchain;
1665 mgr->pagezero->leafpages++;
1666 page_xtra = mgr->leaf_xtra;
1667 page_size <<= page_xtra;
1670 // lock allocation page
1672 bt_mutexlock(mgr->lock);
1674 // use empty chain first
1675 // else allocate new page
1677 if( page_no = bt_getid(freechain) ) {
1678 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1679 set->page = bt_mappage (mgr, set->latch);
1681 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1683 bt_putid(freechain, bt_getid(set->page->right));
1685 // the page is currently nopromote and this
1686 // will keep bt_txnpromote out.
1688 // contents will replace this bit
1689 // and pin will keep bt_txnpromote out
1691 contents->page_no = page_no;
1692 contents->nopromote = 0;
1693 set->latch->dirty = 1;
1695 memcpy (set->page, contents, page_size);
1697 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1698 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1700 bt_releasemutex(mgr->lock);
1704 page_no = bt_getid(mgr->pagezero->alloc->right);
1705 bt_putid(mgr->pagezero->alloc->right, page_no+(1 << page_xtra));
1707 // unlock allocation latch and
1708 // extend file into new page.
1710 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1711 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1712 bt_releasemutex(mgr->lock);
1714 // keep bt_txnpromote out of this page
1715 contents->nopromote = 1;
1716 contents->page_no = page_no;
1717 if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
1718 fprintf(stderr, "Write %d error %d\n", (uint)page_no, errno);
1719 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1720 set->page = bt_mappage (mgr, set->latch);
1722 return mgr->err_thread = thread_no, mgr->err;
1724 // now pin will keep bt_txnpromote out
1726 set->page->nopromote = 0;
1727 set->latch->dirty = 1;
1731 // find slot in page for given key at a given level
1733 int bt_findslot (BtPage page, unsigned char *key, uint len)
1735 uint diff, higher = page->cnt, low = 1, slot;
1738 // make stopper key an infinite fence value
1740 if( bt_getid (page->right) )
1745 // low is the lowest candidate.
1746 // loop ends when they meet
1748 // higher is already
1749 // tested as .ge. the passed key.
1751 while( diff = higher - low ) {
1752 slot = low + ( diff >> 1 );
1753 if( keycmp (keyptr(page, slot), key, len) < 0 )
1756 higher = slot, good++;
1759 // return zero if key is on right link page
1761 return good ? higher : 0;
1764 // find and load page at given level for given key
1765 // leave page rd or wr locked as requested
1767 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1769 uid page_no = ROOT_page, prevpage_no = 0;
1770 uint drill = 0xff, slot;
1771 BtLatchSet *prevlatch;
1772 uint mode, prevmode;
1777 // start at root of btree and drill down
1780 // determine lock mode of drill level
1781 mode = (drill == lvl) ? lock : BtLockRead;
1783 if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1784 set->page = bt_mappage (mgr, set->latch);
1788 // obtain access lock using lock chaining with Access mode
1790 if( page_no > ROOT_page )
1791 bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1793 // release & unpin parent or left sibling page
1796 bt_unlockpage(prevmode, prevlatch, thread_no, __LINE__);
1797 bt_unpinlatch (prevlatch, thread_no, __LINE__);
1801 // obtain mode lock using lock coupling through AccessLock
1803 bt_lockpage(mode, set->latch, thread_no, __LINE__);
1805 if( set->page->free )
1806 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1808 if( page_no > ROOT_page )
1809 bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1811 // re-read and re-lock root after determining actual level of root
1813 if( set->page->lvl != drill) {
1814 if( set->latch->page_no != ROOT_page )
1815 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1817 drill = set->page->lvl;
1819 if( lock != BtLockRead && drill == lvl ) {
1820 bt_unlockpage(mode, set->latch, thread_no, __LINE__);
1821 bt_unpinlatch (set->latch, thread_no, __LINE__);
1826 prevpage_no = set->latch->page_no;
1827 prevlatch = set->latch;
1828 prevpage = set->page;
1831 // find key on page at this level
1832 // and descend to requested level
1834 if( !set->page->kill )
1835 if( slot = bt_findslot (set->page, key, len) ) {
1839 // find next non-dead slot -- the fence key if nothing else
1841 while( slotptr(set->page, slot)->dead )
1842 if( slot++ < set->page->cnt )
1845 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1847 val = valptr(set->page, slot);
1849 if( val->len == BtId )
1850 page_no = bt_getid(valptr(set->page, slot)->value);
1852 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0;
1858 // slide right into next page
1860 bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
1861 page_no = bt_getid(set->page->right);
1862 bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
1866 // return error on end of right chain
1868 mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1869 return 0; // return error
1872 // return page to free list
1873 // page must be delete & write locked
1874 // and have no keys pointing to it.
1876 void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1878 unsigned char *freechain;
1880 if( (set->latch->pin & ~CLOCK_bit) > 1 )
1882 if( set->page->lvl )
1883 freechain = mgr->pagezero->freechain;
1885 freechain = mgr->pagezero->leafchain;
1886 mgr->pagezero->leafpages--;
1889 // lock allocation page
1891 bt_mutexlock (mgr->lock);
1895 memcpy(set->page->right, freechain, BtId);
1896 bt_putid(freechain, set->latch->page_no);
1897 set->latch->dirty = 1;
1898 set->page->free = 1;
1900 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1901 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1903 // unlock released page
1904 // and unlock allocation page
1906 bt_unlockpage (BtLockDelete, set->latch, thread_no, __LINE__);
1907 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1908 bt_unpinlatch (set->latch, thread_no, __LINE__);
1909 bt_releasemutex (mgr->lock);
1912 // a fence key was deleted from a page
1913 // push new fence value upwards
1915 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1917 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1918 unsigned char value[BtId];
1922 // remove the old fence value
1924 ptr = keyptr(set->page, set->page->cnt);
1925 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1926 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1927 set->latch->dirty = 1;
1929 // cache new fence value
1931 ptr = keyptr(set->page, set->page->cnt);
1932 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1934 bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
1935 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1937 // insert new (now smaller) fence key
1939 bt_putid (value, set->latch->page_no);
1940 ptr = (BtKey*)leftkey;
1942 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1943 return mgr->err_thread = thread_no, mgr->err;
1945 // now delete old fence key
1947 ptr = (BtKey*)rightkey;
1949 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1950 return mgr->err_thread = thread_no, mgr->err;
1952 bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
1953 bt_unpinlatch(set->latch, thread_no, __LINE__);
1957 // root has a single child
1958 // collapse a level from the tree
1960 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1967 // find the child entry and promote as new root contents
1970 for( idx = 0; idx++ < root->page->cnt; )
1971 if( !slotptr(root->page, idx)->dead )
1974 val = valptr(root->page, idx);
1976 if( val->len == BtId )
1977 page_no = bt_getid (valptr(root->page, idx)->value);
1979 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1981 if( child->latch = bt_pinlatch (mgr, page_no, thread_no) )
1982 child->page = bt_mappage (mgr, child->latch);
1984 return mgr->err_thread = thread_no, mgr->err;
1986 bt_lockpage (BtLockDelete, child->latch, thread_no, __LINE__);
1987 bt_lockpage (BtLockWrite, child->latch, thread_no, __LINE__);
1989 memcpy (root->page, child->page, mgr->page_size);
1990 root->latch->dirty = 1;
1992 bt_freepage (mgr, child, thread_no);
1994 } while( root->page->lvl > 1 && root->page->act == 1 );
1996 bt_unlockpage (BtLockWrite, root->latch, thread_no, __LINE__);
1997 bt_unpinlatch (root->latch, thread_no, __LINE__);
2001 // delete a page and manage key
2002 // call with page writelocked
2004 // returns with page unpinned
2005 // from the page pool.
2007 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2009 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
2010 uint page_size = mgr->page_size;
2011 unsigned char value[BtId];
2012 uint lvl = set->page->lvl;
2018 page_size <<= mgr->leaf_xtra;
2020 // cache copy of fence key
2021 // to remove in parent
2023 ptr = keyptr(set->page, set->page->cnt);
2024 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
2026 // obtain locks on right page
2028 page_no = bt_getid(set->page->right);
2030 if( right->latch = lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
2031 right->page = bt_mappage (mgr, right->latch);
2034 if( right->page->lvl )
2036 bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2038 // cache copy of key to update
2040 ptr = keyptr(right->page, right->page->cnt);
2041 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
2043 if( right->page->kill )
2044 return mgr->line = __LINE__, mgr->err = BTERR_struct;
2046 // pull contents of right peer into our empty page
2048 memcpy (right->page->left, set->page->left, BtId);
2049 memcpy (set->page, right->page, page_size);
2050 set->page->page_no = set->latch->page_no;
2051 set->latch->dirty = 1;
2053 // mark right page deleted and point it to left page
2054 // until we can post parent updates that remove access
2055 // to the deleted page.
2057 bt_putid (right->page->right, set->latch->page_no);
2058 right->latch->dirty = 1;
2059 right->page->kill = 1;
2061 bt_lockpage (BtLockParent, right->latch, thread_no, __LINE__);
2062 bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2064 bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
2065 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2067 // redirect higher key directly to our new node contents
2069 bt_putid (value, set->latch->page_no);
2070 ptr = (BtKey*)higherfence;
2072 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2075 // delete old lower key to our node
2077 ptr = (BtKey*)lowerfence;
2079 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
2082 // obtain delete and write locks to right node
2084 bt_unlockpage (BtLockParent, right->latch, thread_no, __LINE__);
2085 bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
2086 bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2087 while( (right->latch->pin & ~CLOCK_bit) > 1 )
2089 bt_freepage (mgr, right, thread_no);
2090 //fprintf(stderr, "DelPage %d by %d at %d\n", (uint)right->latch->page_no, thread_no, __LINE__);
2091 bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
2092 bt_unpinlatch (set->latch, thread_no, __LINE__);
2096 // find and delete key on page by marking delete flag bit
2097 // if page becomes empty, delete it from the btree
2099 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2101 uint slot, idx, found, fence;
2107 if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2108 node = slotptr(set->page, slot);
2109 ptr = keyptr(set->page, slot);
2111 return mgr->err_thread = thread_no, mgr->err;
2113 // if librarian slot, advance to real slot
2115 if( node->type == Librarian ) {
2116 ptr = keyptr(set->page, ++slot);
2117 node = slotptr(set->page, slot);
2120 fence = slot == set->page->cnt;
2122 // delete the key, ignore request if already dead
2124 if( found = !keycmp (ptr, key, len) )
2125 if( found = node->dead == 0 ) {
2126 val = valptr(set->page,slot);
2127 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2130 // mark node type as delete
2132 node->type = Delete;
2135 // collapse empty slots beneath the fence
2136 // on interiour nodes
2139 while( idx = set->page->cnt - 1 )
2140 if( slotptr(set->page, idx)->dead ) {
2141 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2142 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2150 // did we delete a fence key in an upper level?
2152 if( lvl && set->page->act && fence )
2153 if( bt_fixfence (mgr, set, lvl, thread_no) )
2154 return mgr->err_thread = thread_no, mgr->err;
2158 // do we need to collapse root?
2160 if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2161 if( bt_collapseroot (mgr, set, thread_no) )
2162 return mgr->err_thread = thread_no, mgr->err;
2166 // delete empty page
2168 if( !set->page->act )
2169 return bt_deletepage (mgr, set, thread_no);
2171 set->latch->dirty = 1;
2172 bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2173 bt_unpinlatch (set->latch, thread_no, __LINE__);
2177 // advance to next slot
2179 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2181 BtLatchSet *prevlatch;
2184 if( slot < set->page->cnt )
2187 prevlatch = set->latch;
2189 if( page_no = bt_getid(set->page->right) )
2190 if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
2191 set->page = bt_mappage (bt->mgr, set->latch);
2195 return bt->mgr->err = BTERR_struct, bt->mgr->err_thread = bt->thread_no, bt->mgr->line = __LINE__, 0;
2197 // obtain access lock using lock chaining with Access mode
2199 bt_lockpage(BtLockAccess, set->latch, bt->thread_no, __LINE__);
2201 bt_unlockpage(BtLockRead, prevlatch, bt->thread_no, __LINE__);
2202 bt_unpinlatch (prevlatch, bt->thread_no, __LINE__);
2204 bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
2205 bt_unlockpage(BtLockAccess, set->latch, bt->thread_no, __LINE__);
2209 // find unique key == given key, or first duplicate key in
2210 // leaf level and return number of value bytes
2211 // or (-1) if not found.
2213 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2221 if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2223 ptr = keyptr(set->page, slot);
2225 // skip librarian slot place holder
2227 if( slotptr(set->page, slot)->type == Librarian )
2228 ptr = keyptr(set->page, ++slot);
2230 // return actual key found
2232 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2235 if( slotptr(set->page, slot)->type == Duplicate )
2238 // not there if we reach the stopper key
2240 if( slot == set->page->cnt )
2241 if( !bt_getid (set->page->right) )
2244 // if key exists, return >= 0 value bytes copied
2245 // otherwise return (-1)
2247 if( slotptr(set->page, slot)->dead )
2251 if( !memcmp (ptr->key, key, len) ) {
2252 val = valptr (set->page,slot);
2253 if( valmax > val->len )
2255 memcpy (value, val->value, valmax);
2261 } while( slot = bt_findnext (bt, set, slot) );
2263 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
2264 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
2268 // check page for space available,
2269 // clean if necessary and return
2270 // 0 - page needs splitting
2271 // >0 new slot value
2273 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2275 uint page_size = mgr->page_size;
2276 BtPage page = set->page, frame;
2277 uint cnt = 0, idx = 0;
2278 uint max = page->cnt;
2283 if( !set->page->lvl )
2284 page_size <<= mgr->leaf_xtra;
2286 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2289 // skip cleanup and proceed to split
2290 // if there's not enough garbage
2293 if( page->garbage < page_size / 5 )
2296 frame = malloc (page_size);
2297 memcpy (frame, page, page_size);
2299 // skip page info and set rest of page to zero
2301 memset (page+1, 0, page_size - sizeof(*page));
2302 set->latch->dirty = 1;
2304 page->min = page_size;
2308 // clean up page first by
2309 // removing deleted keys
2311 while( cnt++ < max ) {
2315 if( cnt < max || frame->lvl )
2316 if( slotptr(frame,cnt)->dead )
2319 // copy the value across
2321 val = valptr(frame, cnt);
2322 page->min -= val->len + sizeof(BtVal);
2323 memcpy ((unsigned char *)page + page->min, val, val->len + sizeof(BtVal));
2325 // copy the key across
2327 key = keyptr(frame, cnt);
2328 page->min -= key->len + sizeof(BtKey);
2329 memcpy ((unsigned char *)page + page->min, key, key->len + sizeof(BtKey));
2331 // make a librarian slot
2333 slotptr(page, ++idx)->off = page->min;
2334 slotptr(page, idx)->type = Librarian;
2335 slotptr(page, idx)->dead = 1;
2339 slotptr(page, ++idx)->off = page->min;
2340 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2342 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2349 // see if page has enough space now, or does it need splitting?
2351 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2357 // split the root and raise the height of the btree
2359 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort thread_no)
2361 unsigned char leftkey[BT_keyarray];
2362 uint nxt = mgr->page_size;
2363 unsigned char value[BtId];
2370 frame = malloc (mgr->page_size);
2371 memcpy (frame, root->page, mgr->page_size);
2373 // save left page fence key for new root
2375 ptr = keyptr(root->page, root->page->cnt);
2376 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2378 // Obtain an empty page to use, and copy the current
2379 // root contents into it, e.g. lower keys
2381 if( bt_newpage(mgr, left, frame, thread_no) )
2382 return mgr->err_thread = thread_no, mgr->err;
2384 left_page_no = left->latch->page_no;
2385 bt_unpinlatch (left->latch, thread_no, __LINE__);
2388 // preserve the page info at the bottom
2389 // of higher keys and set rest to zero
2391 memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2393 // insert stopper key at top of newroot page
2394 // and increase the root height
2396 nxt -= BtId + sizeof(BtVal);
2397 bt_putid (value, right->page_no);
2398 val = (BtVal *)((unsigned char *)root->page + nxt);
2399 memcpy (val->value, value, BtId);
2402 nxt -= 2 + sizeof(BtKey);
2403 slotptr(root->page, 2)->off = nxt;
2404 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2409 // insert lower keys page fence key on newroot page as first key
2411 nxt -= BtId + sizeof(BtVal);
2412 bt_putid (value, left_page_no);
2413 val = (BtVal *)((unsigned char *)root->page + nxt);
2414 memcpy (val->value, value, BtId);
2417 ptr = (BtKey *)leftkey;
2418 nxt -= ptr->len + sizeof(BtKey);
2419 slotptr(root->page, 1)->off = nxt;
2420 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2422 bt_putid(root->page->right, 0);
2423 root->page->min = nxt; // reset lowest used offset and key count
2424 root->page->cnt = 2;
2425 root->page->act = 2;
2428 mgr->pagezero->alloc->lvl = root->page->lvl;
2430 // release and unpin root pages
2432 bt_unlockpage(BtLockWrite, root->latch, thread_no, __LINE__);
2433 bt_unpinlatch (root->latch, thread_no, __LINE__);
2435 bt_unpinlatch (right, thread_no, __LINE__);
2439 // split already locked full node
2441 // return pool entry for new right
2442 // page, pinned & unlocked
2444 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2446 uint page_size = mgr->page_size;
2447 uint cnt = 0, idx = 0, max;
2448 uint lvl = set->page->lvl;
2456 if( !set->page->lvl )
2457 page_size <<= mgr->leaf_xtra;
2459 // split higher half of keys to frame
2461 frame = malloc (page_size);
2462 memset (frame, 0, page_size);
2463 frame->min = page_size;
2464 max = set->page->cnt;
2468 while( cnt++ < max ) {
2469 if( cnt < max || set->page->lvl )
2470 if( slotptr(set->page, cnt)->dead )
2473 src = valptr(set->page, cnt);
2474 frame->min -= src->len + sizeof(BtVal);
2475 memcpy ((unsigned char *)frame + frame->min, src, src->len + sizeof(BtVal));
2477 key = keyptr(set->page, cnt);
2478 frame->min -= key->len + sizeof(BtKey);
2479 ptr = (BtKey*)((unsigned char *)frame + frame->min);
2480 memcpy (ptr, key, key->len + sizeof(BtKey));
2482 // add librarian slot
2484 slotptr(frame, ++idx)->off = frame->min;
2485 slotptr(frame, idx)->type = Librarian;
2486 slotptr(frame, idx)->dead = 1;
2490 slotptr(frame, ++idx)->off = frame->min;
2491 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2493 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2502 if( set->latch->page_no > ROOT_page )
2503 bt_putid (frame->right, bt_getid (set->page->right));
2505 // get new free page and write higher keys to it.
2507 if( bt_newpage(mgr, right, frame, thread_no) )
2510 // process lower keys
2512 memcpy (frame, set->page, page_size);
2513 memset (set->page+1, 0, page_size - sizeof(*set->page));
2514 set->latch->dirty = 1;
2516 set->page->min = page_size;
2517 set->page->garbage = 0;
2523 if( slotptr(frame, max)->type == Librarian )
2526 // assemble page of smaller keys
2528 while( cnt++ < max ) {
2529 if( slotptr(frame, cnt)->dead )
2531 val = valptr(frame, cnt);
2532 set->page->min -= val->len + sizeof(BtVal);
2533 memcpy ((unsigned char *)set->page + set->page->min, val, val->len + sizeof(BtVal));
2535 key = keyptr(frame, cnt);
2536 set->page->min -= key->len + sizeof(BtKey);
2537 memcpy ((unsigned char *)set->page + set->page->min, key, key->len + sizeof(BtKey));
2539 // add librarian slot
2541 slotptr(set->page, ++idx)->off = set->page->min;
2542 slotptr(set->page, idx)->type = Librarian;
2543 slotptr(set->page, idx)->dead = 1;
2547 slotptr(set->page, ++idx)->off = set->page->min;
2548 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2552 bt_putid(set->page->right, right->latch->page_no);
2553 set->page->cnt = idx;
2556 return right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
2559 // fix keys for newly split page
2560 // call with both pages pinned & locked
2561 // return unlocked and unpinned
2563 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2565 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2566 unsigned char value[BtId];
2567 uint lvl = set->page->lvl;
2573 // if current page is the root page, split it
2575 if( set->latch->page_no == ROOT_page )
2576 return bt_splitroot (mgr, set, right, thread_no);
2578 ptr = keyptr(set->page, set->page->cnt);
2579 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2581 page = bt_mappage (mgr, right);
2583 ptr = keyptr(page, page->cnt);
2584 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2586 // insert new fences in their parent pages
2588 bt_lockpage (BtLockParent, right, thread_no, __LINE__);
2590 bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
2591 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2593 // insert new fence for reformulated left block of smaller keys
2595 bt_putid (value, set->latch->page_no);
2596 ptr = (BtKey *)leftkey;
2598 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2599 return mgr->err_thread = thread_no, mgr->err;
2601 // switch fence for right block of larger keys to new right page
2603 bt_putid (value, right->page_no);
2604 ptr = (BtKey *)rightkey;
2606 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2607 return mgr->err_thread = thread_no, mgr->err;
2609 bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
2610 bt_unpinlatch (set->latch, thread_no, __LINE__);
2612 bt_unlockpage (BtLockParent, right, thread_no, __LINE__);
2613 bt_unpinlatch (right, thread_no, __LINE__);
2617 // install new key and value onto page
2618 // page must already be checked for
2621 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2623 uint idx, librarian;
2629 // if found slot > desired slot and previous slot
2630 // is a librarian slot, use it
2633 if( slotptr(set->page, slot-1)->type == Librarian )
2636 // copy value onto page
2638 set->page->min -= vallen + sizeof(BtVal);
2639 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2640 memcpy (val->value, value, vallen);
2643 // copy key onto page
2645 set->page->min -= keylen + sizeof(BtKey);
2646 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2647 memcpy (ptr->key, key, keylen);
2650 // find first empty slot
2652 for( idx = slot; idx < set->page->cnt; idx++ )
2653 if( slotptr(set->page, idx)->dead )
2656 // now insert key into array before slot,
2657 // adding as many librarian slots as
2660 if( idx == set->page->cnt ) {
2661 int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2663 librarian = ++idx - slot;
2664 avail /= sizeof(BtSlot);
2669 if( librarian > avail )
2673 rate = (idx - slot) / librarian;
2674 set->page->cnt += librarian;
2679 librarian = 0, rate = 0;
2681 while( idx > slot ) {
2683 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2686 // add librarian slot per rate
2689 if( (idx - slot + 1)/2 <= librarian * rate ) {
2690 node = slotptr(set->page, idx--);
2691 node->off = node[1].off;
2692 node->type = Librarian;
2698 set->latch->dirty = 1;
2703 node = slotptr(set->page, slot);
2704 node->off = set->page->min;
2710 // Insert new key into the btree at given level.
2711 // either add a new key or update/add an existing one
2713 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2715 uint slot, idx, len, entry;
2721 while( 1 ) { // find the page and slot for the current key
2722 if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2723 node = slotptr(set->page, slot);
2724 ptr = keyptr(set->page, slot);
2727 mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2728 return mgr->err_thread = thread_no, mgr->err;
2731 // if librarian slot == found slot, advance to real slot
2733 if( node->type == Librarian )
2734 if( !keycmp (ptr, key, keylen) ) {
2735 ptr = keyptr(set->page, ++slot);
2736 node = slotptr(set->page, slot);
2739 // if inserting a duplicate key or unique
2740 // key that doesn't exist on the page,
2741 // check for adequate space on the page
2742 // and insert the new key before slot.
2747 if( keycmp (ptr, key, keylen) )
2748 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) ) {
2749 if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
2752 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2753 bt_unpinlatch (set->latch, thread_no, __LINE__);
2755 } else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2756 return mgr->err_thread = thread_no, mgr->err;
2757 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2758 return mgr->err_thread = thread_no, mgr->err;
2762 // if key already exists, update value and return
2764 val = valptr(set->page, slot);
2766 if( val->len >= vallen ) {
2767 if( slotptr(set->page, slot)->dead )
2772 set->page->garbage += val->len - vallen;
2773 set->latch->dirty = 1;
2775 memcpy (val->value, value, vallen);
2776 bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2777 bt_unpinlatch (set->latch, thread_no, __LINE__);
2781 // new update value doesn't fit in existing value area
2782 // make sure page has room
2785 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2792 if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2793 if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2794 return mgr->err_thread = thread_no, mgr->err;
2795 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2796 return mgr->err_thread = thread_no, mgr->err;
2800 // copy key and value onto page and update slot
2802 set->page->min -= vallen + sizeof(BtVal);
2803 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2804 memcpy (val->value, value, vallen);
2807 set->latch->dirty = 1;
2808 set->page->min -= keylen + sizeof(BtKey);
2809 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2810 memcpy (ptr->key, key, keylen);
2813 node->off = set->page->min;
2814 bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2815 bt_unpinlatch (set->latch, thread_no, __LINE__);
2822 // determine actual page where key is located
2823 // return slot number
2825 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2827 BtKey *key = keyptr(source,src), *ptr;
2828 unsigned char fence[BT_keyarray];
2831 if( locks[src].reuse )
2832 entry = locks[src-1].entry;
2834 entry = locks[src].entry;
2836 // find where our key is located
2837 // on current page or pages split on
2838 // same page txn operations.
2841 set->latch = mgr->leafsets + entry;
2842 set->page = bt_mappage (mgr, set->latch);
2844 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2845 if( slotptr(set->page, slot)->type == Librarian )
2847 if( locks[src].reuse )
2848 locks[src].entry = entry;
2851 } while( entry = set->latch->split );
2853 ptr = keyptr (set->page, set->page->cnt);
2854 memcpy (fence, ptr, ptr->len + 1);
2856 mgr->line = __LINE__, mgr->err = BTERR_atomic;
2860 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2862 BtKey *key = keyptr(source, src);
2863 BtVal *val = valptr(source, src);
2868 while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2869 if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2870 if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type) )
2871 return mgr->err_thread = thread_no, mgr->err;
2873 set->page->lsn = lsn;
2879 if( entry = bt_splitpage (mgr, set, thread_no) )
2880 latch = mgr->leafsets + entry;
2882 return mgr->err_thread = thread_no, mgr->err;
2884 // splice right page into split chain
2887 //fprintf(stderr, "SplitPg %d by %d at %d\n", (uint)latch->page_no, thread_no, __LINE__);
2888 bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2889 latch->split = set->latch->split;
2890 set->latch->split = entry;
2893 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
2896 // perform delete from smaller btree
2897 // insert a delete slot if not found there
2899 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2901 BtKey *key = keyptr(source, src);
2908 if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2909 node = slotptr(set->page, slot);
2910 ptr = keyptr(set->page, slot);
2911 val = valptr(set->page, slot);
2913 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
2915 // if slot is not found, insert a delete slot
2917 if( keycmp (ptr, key->key, key->len) )
2918 if( bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete) )
2921 // if node is already dead,
2922 // ignore the request.
2927 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2928 set->latch->dirty = 1;
2929 set->page->lsn = lsn;
2933 __sync_fetch_and_add(&mgr->found, 1);
2937 // release master's splits from right to left
2939 void bt_atomicrelease (BtMgr *mgr, uint entry, ushort thread_no)
2941 BtLatchSet *latch = mgr->leafsets + entry;
2944 bt_atomicrelease (mgr, latch->split, thread_no);
2947 bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
2948 bt_unpinlatch(latch, thread_no, __LINE__);
2951 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2953 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2954 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2956 return keycmp (key1, key2->key, key2->len);
2958 // atomic modification of a batch of keys.
2960 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2962 uint src, idx, slot, samepage, entry, que = 0;
2963 BtKey *key, *ptr, *key2;
2969 // stable sort the list of keys into order to
2970 // prevent deadlocks between threads.
2972 for( src = 1; src++ < source->cnt; ) {
2973 *temp = *slotptr(source,src);
2974 key = keyptr (source,src);
2976 for( idx = src; --idx; ) {
2977 key2 = keyptr (source,idx);
2978 if( keycmp (key, key2->key, key2->len) < 0 ) {
2979 *slotptr(source,idx+1) = *slotptr(source,idx);
2980 *slotptr(source,idx) = *temp;
2986 qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2987 // add entries to redo log
2989 if( bt->mgr->pagezero->redopages )
2990 lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2994 // perform the individual actions in the transaction
2996 if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2997 return bt->mgr->err;
2999 // if number of active pages
3000 // is greater than the buffer pool
3001 // promote page into larger btree
3004 while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - 10 )
3005 if( bt_txnpromote (bt) )
3006 return bt->mgr->err;
3013 // execute the source list of inserts/deletes
3015 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
3017 unsigned char fencekey[BT_keyarray], prvfence[BT_keyarray];
3018 uint src, idx, samepage, entry;
3019 BtPageSet set[1], prev[1];
3020 unsigned char value[BtId];
3024 BtKey *key, *ptr, *prv, *cur;
3030 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
3032 // Load the leaf page for each key
3033 // group same page references with reuse bit
3035 for( src = 0; src++ < source->cnt; ) {
3036 key = keyptr(source, src);
3038 // first determine if this modification falls
3039 // on the same page as the previous modification
3040 // note that the far right leaf page is a special case
3042 if( samepage = src > 1 )
3043 samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
3046 if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
3047 //memcpy (prvfence, fencekey, 10),
3048 //src>1 ? memcpy (fencekey, ptr->key, 10):NULL,
3050 //cur = keyptr(set->page, slot),
3051 ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
3055 entry = set->latch - mgr->leafsets;
3057 // is this actually on the same page?
3060 // for( idx = src; --idx; )
3061 //if( entry == locks[idx].entry ) {
3062 //fprintf(stderr, "Dup page %d by thread %d\n", (uint)set->latch->page_no, thread_no);
3066 locks[src].reuse = samepage;
3067 locks[src].entry = entry;
3069 // capture current lsn for master page
3071 locks[src].reqlsn = set->page->lsn;
3074 // insert or delete each key
3075 // process any splits or merges
3076 // run through txn list backwards
3078 samepage = source->cnt + 1;
3080 for( src = source->cnt; src; src-- ) {
3081 if( locks[src].reuse )
3084 // perform the txn operations
3085 // from smaller to larger on
3088 for( idx = src; idx < samepage; idx++ )
3089 switch( slotptr(source,idx)->type ) {
3091 if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
3097 if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
3102 bt_atomicpage (mgr, source, locks, idx, set);
3106 // after the same page operations have finished,
3107 // process master page for splits or deletion.
3109 latch = prev->latch = mgr->leafsets + locks[src].entry;
3110 prev->page = bt_mappage (mgr, prev->latch);
3113 // pick-up all splits from master page
3114 // each one is already pinned & WriteLocked.
3116 while( entry = prev->latch->split ) {
3117 set->latch = mgr->leafsets + entry;
3118 set->page = bt_mappage (mgr, set->latch);
3120 // delete empty master page by undoing its split
3121 // (this is potentially another empty page)
3122 // note that there are no pointers to it yet
3124 if( !prev->page->act ) {
3125 memcpy (set->page->left, prev->page->left, BtId);
3126 memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
3127 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3128 prev->latch->split = set->latch->split;
3129 prev->latch->dirty = 1;
3130 bt_freepage (mgr, set, thread_no);
3134 // remove empty split page from the split chain
3135 // and return it to the free list. No other
3136 // thread has its page number yet.
3138 if( !set->page->act ) {
3139 memcpy (prev->page->right, set->page->right, BtId);
3140 prev->latch->split = set->latch->split;
3142 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3143 bt_freepage (mgr, set, thread_no);
3147 // update prev's fence key
3149 ptr = keyptr(prev->page,prev->page->cnt);
3150 bt_putid (value, prev->latch->page_no);
3151 //fprintf(stderr, "KeyIns for %d by %d at %d\n", (uint)prev->latch->page_no, thread_no, __LINE__);
3152 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3155 // splice in the left link into the split page
3157 bt_putid (set->page->left, prev->latch->page_no);
3160 bt_syncpage (mgr, prev->page, prev->latch);
3165 // update left pointer in next right page from last split page
3166 // (if all splits were reversed or none occurred, latch->split == 0)
3168 if( latch->split ) {
3169 // fix left pointer in master's original (now split)
3170 // far right sibling or set rightmost page in page zero
3172 if( right_page_no = bt_getid (prev->page->right) ) {
3173 if( set->latch = bt_pinleaf (mgr, right_page_no, thread_no) )
3174 set->page = bt_mappage (mgr, set->latch);
3178 bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3179 bt_putid (set->page->left, prev->latch->page_no);
3180 set->latch->dirty = 1;
3182 bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
3183 bt_unpinlatch (set->latch, thread_no, __LINE__);
3184 } else { // prev is rightmost page
3185 bt_mutexlock (mgr->lock);
3186 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3187 bt_releasemutex(mgr->lock);
3190 // switch the original fence key from the
3191 // master page to the last split page.
3193 ptr = keyptr(prev->page,prev->page->cnt);
3194 bt_putid (value, prev->latch->page_no);
3196 //fprintf(stderr, "KeyIns for %d by %d at %d\n", (uint)prev->latch->page_no, thread_no, __LINE__);
3197 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3201 bt_syncpage (mgr, prev->page, prev->latch);
3203 // unlock and unpin the split pages
3205 bt_atomicrelease (mgr, latch->split, thread_no);
3207 // unlock and unpin the master page
3210 bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
3211 bt_unpinlatch(latch, thread_no, __LINE__);
3215 // since there are no splits remaining, we're
3216 // finished if master page occupied
3218 if( prev->page->act ) {
3219 bt_unlockpage(BtLockWrite, prev->latch, thread_no, __LINE__);
3222 bt_syncpage (mgr, prev->page, prev->latch);
3224 bt_unpinlatch(prev->latch, thread_no, __LINE__);
3228 // any and all splits were reversed, and the
3229 // master page located in prev is empty, delete it
3231 if( bt_deletepage (mgr, prev, thread_no) )
3239 // pick & promote a page into the larger btree
3241 BTERR bt_txnpromote (BtDb *bt)
3243 uint entry, slot, idx;
3251 entry = __sync_fetch_and_add(&bt->mgr->leafpromote, 1);
3253 entry = _InterlockedIncrement (&bt->mgr->leafpromote) - 1;
3255 entry %= bt->mgr->leaftotal;
3260 set->latch = bt->mgr->leafsets + entry;
3262 if( !bt_mutextry(set->latch->modify) )
3265 // skip this entry if it is pinned
3267 if( set->latch->pin & ~CLOCK_bit ) {
3268 bt_releasemutex(set->latch->modify);
3272 set->page = bt_mappage (bt->mgr, set->latch);
3274 // entry never used or has no left or right sibling
3276 if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3277 bt_releasemutex(set->latch->modify);
3281 // entry interiour node or being killed or constructed
3283 if( set->page->lvl || set->page->nopromote || set->page->kill ) {
3284 bt_releasemutex(set->latch->modify);
3288 // pin the page for our access
3289 // and leave it locked for the
3290 // duration of the promotion.
3293 bt_lockpage (BtLockWrite, set->latch, bt->thread_no, __LINE__);
3294 bt_releasemutex(set->latch->modify);
3296 // transfer slots in our selected page to the main btree
3297 if( !(entry % 100) )
3298 fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_no, set->page->act);
3300 if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
3301 fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
3302 return bt->main->err;
3305 // now delete the page
3307 if( bt_deletepage (bt->mgr, set, bt->thread_no) )
3308 fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread);
3310 return bt->mgr->err;
3314 // set cursor to highest slot on highest page
3316 uint bt_lastkey (BtDb *bt)
3318 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3321 if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
3322 set->page = bt_mappage (bt->mgr, set->latch);
3326 bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3327 memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3328 bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3329 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3330 return bt->cursor->cnt;
3333 // return previous slot on cursor page
3335 uint bt_prevkey (BtDb *bt, uint slot)
3337 uid cursor_page = bt->cursor->page_no;
3338 uid ourright, next, us = cursor_page;
3344 ourright = bt_getid(bt->cursor->right);
3347 if( !(next = bt_getid(bt->cursor->left)) )
3353 if( set->latch = bt_pinleaf (bt->mgr, next, bt->thread_no) )
3354 set->page = bt_mappage (bt->mgr, set->latch);
3358 bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3359 memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3360 bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3361 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3363 next = bt_getid (bt->cursor->right);
3365 if( bt->cursor->kill )
3369 if( next == ourright )
3374 return bt->cursor->cnt;
3377 // return next slot on cursor page
3378 // or slide cursor right into next page
3380 uint bt_nextkey (BtDb *bt, uint slot)
3386 right = bt_getid(bt->cursor->right);
3388 while( slot++ < bt->cursor->cnt )
3389 if( slotptr(bt->cursor,slot)->dead )
3391 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3399 if( set->latch = bt_pinleaf (bt->mgr, right, bt->thread_no) )
3400 set->page = bt_mappage (bt->mgr, set->latch);
3404 bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3405 memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3406 bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3407 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3412 return bt->mgr->err = 0;
3415 // cache page of keys into cursor and return starting slot for given key
3417 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3422 // cache page for retrieval
3424 if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3425 memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3429 bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3430 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3437 double getCpuTime(int type)
3440 FILETIME xittime[1];
3441 FILETIME systime[1];
3442 FILETIME usrtime[1];
3443 SYSTEMTIME timeconv[1];
3446 memset (timeconv, 0, sizeof(SYSTEMTIME));
3450 GetSystemTimeAsFileTime (xittime);
3451 FileTimeToSystemTime (xittime, timeconv);
3452 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3455 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3456 FileTimeToSystemTime (usrtime, timeconv);
3459 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3460 FileTimeToSystemTime (systime, timeconv);
3464 ans += (double)timeconv->wHour * 3600;
3465 ans += (double)timeconv->wMinute * 60;
3466 ans += (double)timeconv->wSecond;
3467 ans += (double)timeconv->wMilliseconds / 1000;
3472 #include <sys/resource.h>
3474 double getCpuTime(int type)
3476 struct rusage used[1];
3477 struct timeval tv[1];
3481 gettimeofday(tv, NULL);
3482 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3485 getrusage(RUSAGE_SELF, used);
3486 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3489 getrusage(RUSAGE_SELF, used);
3490 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3497 void bt_poolaudit (BtMgr *mgr)
3502 while( ++entry < mgr->latchtotal ) {
3503 latch = mgr->latchsets + entry;
3505 if( *latch->readwr->wrt->value )
3506 fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3508 // if( *latch->access->bits->tid )
3509 // fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3511 // if( *latch->parent->bits->tid )
3512 // fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3514 if( *latch->modify->value )
3515 fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3517 if( latch->pin & ~CLOCK_bit )
3518 fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3531 // standalone program to index file of keys
3532 // then list them onto std-out
3535 void *index_file (void *arg)
3537 uint __stdcall index_file (void *arg)
3540 int line = 0, found = 0, cnt = 0, idx;
3541 uid next, page_no = LEAF_page; // start on first page of leaves
3542 int ch, len = 0, slot, type = 0;
3543 unsigned char key[BT_maxkey];
3544 unsigned char txn[65536];
3545 ThreadArg *args = arg;
3554 bt = bt_open (args->mgr, args->main);
3557 if( args->idx < strlen (args->type) )
3558 ch = args->type[args->idx];
3560 ch = args->type[strlen(args->type) - 1];
3572 if( type == Delete )
3573 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3575 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3577 if( type == Delete )
3578 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3580 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3582 if( in = fopen (args->infile, "rb") )
3583 while( ch = getc(in), ch != EOF )
3589 if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3590 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3596 memcpy (txn + nxt, key + 10, len - 10);
3598 txn[nxt] = len - 10;
3600 memcpy (txn + nxt, key, 10);
3603 slotptr(page,++cnt)->off = nxt;
3604 slotptr(page,cnt)->type = type;
3607 if( cnt < args->num )
3614 if( bt_atomictxn (bt, page) )
3615 fprintf(stderr, "Error %d Line: %d by %d source: %d\n", bt->mgr->err, bt->mgr->line, bt->mgr->err_thread, line), exit(0);
3620 else if( len < BT_maxkey )
3622 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3626 fprintf(stderr, "started indexing for %s\n", args->infile);
3627 if( in = fopen (args->infile, "r") )
3628 while( ch = getc(in), ch != EOF )
3633 if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3634 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3637 else if( len < BT_maxkey )
3639 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3643 fprintf(stderr, "started finding keys for %s\n", args->infile);
3644 if( in = fopen (args->infile, "rb") )
3645 while( ch = getc(in), ch != EOF )
3649 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3651 else if( bt->mgr->err )
3652 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3655 else if( len < BT_maxkey )
3657 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3661 fprintf(stderr, "started scanning\n");
3664 if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
3665 set->page = bt_mappage (bt->mgr, set->latch);
3667 fprintf(stderr, "unable to obtain latch"), exit(1);
3669 bt_lockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3670 next = bt_getid (set->page->right);
3672 for( slot = 0; slot++ < set->page->cnt; )
3673 if( next || slot < set->page->cnt )
3674 if( !slotptr(set->page, slot)->dead ) {
3675 ptr = keyptr(set->page, slot);
3678 if( slotptr(set->page, slot)->type == Duplicate )
3681 fwrite (ptr->key, len, 1, stdout);
3682 val = valptr(set->page, slot);
3683 fwrite (val->value, val->len, 1, stdout);
3684 fputc ('\n', stdout);
3688 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3689 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3690 } while( page_no = next );
3692 fprintf(stderr, " Total keys read %d\n", cnt);
3696 fprintf(stderr, "started reverse scan\n");
3697 if( slot = bt_lastkey (bt) )
3698 while( slot = bt_prevkey (bt, slot) ) {
3699 if( slotptr(bt->cursor, slot)->dead )
3702 ptr = keyptr(bt->cursor, slot);
3705 if( slotptr(bt->cursor, slot)->type == Duplicate )
3708 fwrite (ptr->key, len, 1, stdout);
3709 val = valptr(bt->cursor, slot);
3710 fwrite (val->value, val->len, 1, stdout);
3711 fputc ('\n', stdout);
3715 fprintf(stderr, " Total keys read %d\n", cnt);
3720 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3722 fprintf(stderr, "started counting\n");
3723 next = bt->mgr->redopage + bt->mgr->pagezero->redopages;
3725 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3726 if( bt_readpage (bt->mgr, bt->cursor, page_no, 1) )
3729 if( !bt->cursor->free && !bt->cursor->lvl )
3730 cnt += bt->cursor->act;
3736 cnt--; // remove stopper key
3737 fprintf(stderr, " Total keys counted %d\n", cnt);
3749 typedef struct timeval timer;
3751 int main (int argc, char **argv)
3753 int idx, cnt, len, slot, err;
3761 uint mainleafpool = 0;
3762 uint mainleafxtra = 0;
3778 fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
3779 fprintf (stderr, " where idx_file is the name of the cache btree file\n");
3780 fprintf (stderr, " where main_file is the name of the main btree file\n");
3781 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3782 fprintf (stderr, " pagebits is the page size in bits for the cache btree\n");
3783 fprintf (stderr, " leafbits is the number of xtra bits for a leaf page\n");
3784 fprintf (stderr, " poolsize is the number of pages in buffer pool for the cache btree\n");
3785 fprintf (stderr, " leafpool is the number of leaf pages in leaf pool for the cache btree\n");
3786 fprintf (stderr, " txnsize = n to block transactions into n units, or zero for no transactions\n");
3787 fprintf (stderr, " redopages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3788 fprintf (stderr, " mainbits is the page size of the main btree in bits\n");
3789 fprintf (stderr, " mainpool is the number of main pages in the main buffer pool\n");
3790 fprintf (stderr, " mainleaf is the number of main pages in the main leaf pool\n");
3791 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3795 start = getCpuTime(0);
3798 bits = atoi(argv[4]);
3801 leafxtra = atoi(argv[5]);
3804 poolsize = atoi(argv[6]);
3807 fprintf (stderr, "no page pool\n"), exit(1);
3810 leafpool = atoi(argv[7]);
3813 num = atoi(argv[8]);
3816 redopages = atoi(argv[9]);
3818 if( redopages > 65535 )
3819 fprintf (stderr, "Recovery buffer too large\n"), exit(1);
3822 mainbits = atoi(argv[10]);
3825 mainleafxtra = atoi(argv[11]);
3828 mainpool = atoi(argv[12]);
3831 mainleafpool = atoi(argv[13]);
3835 threads = malloc (cnt * sizeof(pthread_t));
3837 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3839 args = malloc ((cnt + 1) * sizeof(ThreadArg));
3841 mgr = bt_mgr (argv[1], bits, leafxtra, poolsize, leafpool, redopages);
3844 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3849 main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool, mainleafpool, 0);
3852 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3861 for( idx = 0; idx < cnt; idx++ ) {
3862 args[idx].infile = argv[idx + 14];
3863 args[idx].type = argv[3];
3864 args[idx].main = main;
3865 args[idx].mgr = mgr;
3866 args[idx].num = num;
3867 args[idx].idx = idx;
3869 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3870 fprintf(stderr, "Error creating thread %d\n", err);
3872 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3876 args[0].infile = argv[idx + 10];
3877 args[0].type = argv[3];
3878 args[0].main = main;
3885 // wait for termination
3888 for( idx = 0; idx < cnt; idx++ )
3889 pthread_join (threads[idx], NULL);
3891 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3893 for( idx = 0; idx < cnt; idx++ )
3894 CloseHandle(threads[idx]);
3901 fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
3907 elapsed = getCpuTime(0) - start;
3908 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3909 elapsed = getCpuTime(1);
3910 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3911 elapsed = getCpuTime(2);
3912 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);