1 // btree version threadskv10e futex version
2 // with reworked bt_deletekey code,
3 // phase-fair re-entrant reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // redo log for failure recovery
10 // LSM B-trees for write optimization
11 // and larger sized leaf pages than non-leaf
15 // author: karl malbrain, malbrain@cal.berkeley.edu
18 This work, including the source code, documentation
19 and related data, is placed into the public domain.
21 The orginal author is Karl Malbrain.
23 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
24 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
25 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
26 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
27 RESULTING FROM THE USE, MODIFICATION, OR
28 REDISTRIBUTION OF THIS SOFTWARE.
31 // Please see the project home page for documentation
32 // code.google.com/p/high-concurrency-btree
34 #define _FILE_OFFSET_BITS 64
35 #define _LARGEFILE64_SOURCE
39 #include <xmmintrin.h>
40 #include <linux/futex.h>
55 #define WIN32_LEAN_AND_MEAN
69 typedef unsigned long long uid;
70 typedef unsigned long long logseqno;
73 typedef unsigned long long off64_t;
74 typedef unsigned short ushort;
75 typedef unsigned int uint;
78 #define BT_ro 0x6f72 // ro
79 #define BT_rw 0x7772 // rw
81 #define BT_maxbits 26 // maximum page size in bits
82 #define BT_minbits 9 // minimum page size in bits
83 #define BT_minpage (1 << BT_minbits) // minimum page size
84 #define BT_maxpage (1 << BT_maxbits) // maximum page size
86 // BTree page number constants
87 #define ALLOC_page 0 // allocation page
88 #define ROOT_page 1 // root of the btree
89 #define LEAF_page 2 // first page of leaves
91 // Number of levels to create in a new BTree
96 There are six lock types for each node in four independent sets:
97 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
98 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
99 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
100 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
101 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
102 6. (set 4) LinkModification: Exclusive. Update of a node's left link is underway. Incompatible with another LinkModification.
117 volatile unsigned char xcl[1];
118 volatile unsigned char filler;
119 volatile ushort waiters[1];
125 // definition for reader/writer reentrant lock implementation
131 ushort dup; // re-entrant locks
132 ushort tid; // owner thread-no
133 uint line; // owner line #
136 // hash table entries
140 uint entry; // Latch table entry at head of chain
143 // latch manager table structure
146 uid page_no; // latch set page number
147 MutexLatch modify[1]; // modify entry lite latch
148 RWLock readwr[1]; // read/write page lock
149 RWLock access[1]; // Access Intent/Page delete
150 RWLock parent[1]; // Posting of fence key in parent
151 RWLock link[1]; // left link update in progress
152 uint split; // right split page atomic insert
153 uint next; // next entry in hash table chain
154 uint prev; // prev entry in hash table chain
155 volatile ushort pin; // number of accessing threads
156 unsigned char dirty; // page in cache is dirty
157 unsigned char leaf; // page in cache is a leaf
160 // Define the length of the page record numbers
164 // Page key slot definition.
166 // Keys are marked dead, but remain on the page until
167 // it cleanup is called. The fence key (highest key) for
168 // a leaf page is always present, even after cleanup.
172 // In addition to the Unique keys that occupy slots
173 // there are Librarian and Duplicate key
174 // slots occupying the key slot array.
176 // The Librarian slots are dead keys that
177 // serve as filler, available to add new Unique
178 // or Dup slots that are inserted into the B-tree.
180 // The Duplicate slots have had their key bytes extended
181 // by 6 bytes to contain a binary duplicate key uniqueifier.
192 uint off:BT_maxbits; // page offset for key start
193 uint type:3; // type of slot
194 uint dead:1; // set for deleted slot
197 // The key structure occupies space at the upper end of
198 // each page. It's a length byte followed by the key
202 unsigned char len; // this can be changed to a ushort or uint
203 unsigned char key[0];
206 // the value structure also occupies space at the upper
207 // end of the page. Each key is immediately followed by a value.
210 unsigned char len; // this can be changed to a ushort or uint
211 unsigned char value[0];
214 #define BT_maxkey 255 // maximum number of bytes in a key
215 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
217 // The first part of an index page.
218 // It is immediately followed
219 // by the BtSlot array of keys.
221 typedef struct BtPage_ {
222 uint cnt; // count of keys in page
223 uint act; // count of active keys
224 uint min; // next key offset
225 uint garbage; // page garbage in bytes
226 unsigned char lvl; // level of page
227 unsigned char free; // page is on free chain
228 unsigned char kill; // page is being deleted
229 unsigned char nopromote; // page is being constructed
230 unsigned char filler1[6]; // padding to multiple of 8 bytes
231 unsigned char right[BtId]; // page number to right
232 unsigned char left[BtId]; // page number to left
233 unsigned char filler2[2]; // padding to multiple of 8 bytes
234 logseqno lsn; // log sequence number applied
235 uid page_no; // this page number
238 // The loadpage interface object
241 BtPage page; // current page pointer
242 BtLatchSet *latch; // current page latch set
245 // structure for latch manager on ALLOC_page
248 struct BtPage_ alloc[1]; // next page_no in right ptr
249 unsigned char freechain[BtId]; // head of free page_nos chain
250 unsigned char leafchain[BtId]; // head of leaf page_nos chain
251 unsigned long long leafpages; // number of active leaf pages
252 uint redopages; // number of redo pages in file
253 unsigned char leaf_xtra; // leaf page size in xtra bits
254 unsigned char page_bits; // base page size in bits
257 // The object structure for Btree access
260 uint page_size; // base page size
261 uint page_bits; // base page size in bits
262 uint leaf_xtra; // leaf xtra bits
268 BtPageZero *pagezero; // mapped allocation page
269 BtHashEntry *hashtable; // the buffer pool hash table entries
270 BtHashEntry *leaftable; // the buffer pool hash table entries
271 BtLatchSet *latchsets; // mapped latch set from buffer pool
272 BtLatchSet *leafsets; // mapped latch set from buffer pool
273 unsigned char *pagepool; // mapped to the buffer pool pages
274 unsigned char *leafpool; // mapped to the leaf pool pages
275 unsigned char *redobuff; // mapped recovery buffer pointer
276 logseqno lsn, flushlsn; // current & first lsn flushed
277 MutexLatch redo[1]; // redo area lite latch
278 MutexLatch lock[1]; // allocation area lite latch
279 MutexLatch maps[1]; // mapping segments lite latch
280 ushort thread_no[1]; // highest thread number issued
281 ushort err_thread; // error thread number
282 uint nlatchpage; // size of buffer pool & latchsets
283 uint latchtotal; // number of page latch entries
284 uint latchhash; // number of latch hash table slots
285 uint latchvictim; // next latch entry to examine
286 uint nleafpage; // size of leaf pool & leafsets
287 uint leaftotal; // number of leaf latch entries
288 uint leafhash; // number of leaf hash table slots
289 uint leafvictim; // next leaf entry to examine
290 uint leafpromote; // next leaf entry to promote
291 uint redopage; // page number of redo buffer
292 uint redolast; // last msync size of recovery buff
293 uint redoend; // eof/end element in recovery buff
294 int err; // last error
295 int line; // last error line no
296 int found; // number of keys found by delete
297 int reads, writes; // number of reads and writes
299 HANDLE halloc; // allocation handle
300 HANDLE hpool; // buffer pool handle
302 volatile uint segments; // number of memory mapped segments
303 unsigned char *pages[64000];// memory mapped segments of b-tree
307 BtMgr *mgr; // buffer manager for entire process
308 BtMgr *main; // buffer manager for main btree
309 BtPage cursor; // cached page frame for start/next
310 ushort thread_no; // thread number
311 unsigned char key[BT_keyarray]; // last found complete key
314 // atomic txn structures
317 logseqno reqlsn; // redo log seq no required
318 uint entry:31; // latch table entry number
319 uint reuse:1; // reused previous page
322 // Catastrophic errors
336 #define CLOCK_bit 0x8000
338 // recovery manager entry types
341 BTRM_eof = 0, // rest of buffer is emtpy
342 BTRM_add, // add a unique key-value to btree
343 BTRM_dup, // add a duplicate key-value to btree
344 BTRM_del, // delete a key-value from btree
345 BTRM_upd, // update a key with a new value
346 BTRM_new, // allocate a new empty page
347 BTRM_old // reuse an old empty page
350 // recovery manager entry
351 // structure followed by BtKey & BtVal
354 logseqno reqlsn; // log sequence number required
355 logseqno lsn; // log sequence number for entry
356 uint len; // length of entry
357 unsigned char type; // type of entry
358 unsigned char lvl; // level of btree entry pertains to
363 extern void bt_close (BtDb *bt);
364 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
365 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
366 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
367 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
368 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
369 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
370 extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
372 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
374 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
375 extern uint bt_nextkey (BtDb *bt, uint slot);
376 extern uint bt_prevkey (BtDb *db, uint slot);
377 extern uint bt_lastkey (BtDb *db);
380 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
381 extern void bt_mgrclose (BtMgr *mgr);
382 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
383 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
385 // atomic transaction functions
386 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
387 BTERR bt_txnpromote (BtDb *bt);
389 // The page is allocated from low and hi ends.
390 // The key slots are allocated from the bottom,
391 // while the text and value of the key
392 // are allocated from the top. When the two
393 // areas meet, the page is split into two.
395 // A key consists of a length byte, two bytes of
396 // index number (0 - 65535), and up to 253 bytes
399 // Associated with each key is a value byte string
400 // containing any value desired.
402 // The b-tree root is always located at page 1.
403 // The first leaf page of level zero is always
404 // located on page 2.
406 // The b-tree pages are linked with next
407 // pointers to facilitate enumerators,
408 // and provide for concurrency.
410 // When to root page fills, it is split in two and
411 // the tree height is raised by a new root at page
412 // one with two keys.
414 // Deleted keys are marked with a dead bit until
415 // page cleanup. The fence key for a leaf node is
418 // To achieve maximum concurrency one page is locked at a time
419 // as the tree is traversed to find leaf key in question. The right
420 // page numbers are used in cases where the page is being split,
423 // Page 0 is dedicated to lock for new page extensions,
424 // and chains empty pages together for reuse. It also
425 // contains the latch manager hash table.
427 // The ParentModification lock on a node is obtained to serialize posting
428 // or changing the fence key for a node.
430 // Empty pages are chained together through the ALLOC page and reused.
432 // Access macros to address slot and key values from the page
433 // Page slots use 1 based indexing.
435 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
436 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
437 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
439 void bt_putid(unsigned char *dest, uid id)
444 dest[i] = (unsigned char)id, id >>= 8;
447 uid bt_getid(unsigned char *src)
452 for( i = 0; i < BtId; i++ )
453 id <<= 8, id |= *src++;
458 // lite weight spin lock Latch Manager
460 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
462 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
465 void bt_mutexlock(MutexLatch *latch)
467 uint idx, waited = 0;
471 for( idx = 0; idx < 100; idx++ ) {
472 *prev->value = __sync_fetch_and_or (latch->value, 1);
473 if( !*prev->bits->xcl ) {
475 __sync_fetch_and_sub (latch->bits->waiters, 1);
481 __sync_fetch_and_add (latch->bits->waiters, 1);
482 *prev->bits->waiters += 1;
486 sys_futex (latch->value, FUTEX_WAIT_PRIVATE, *prev->value, NULL, NULL, 0);
490 int bt_mutextry(MutexLatch *latch)
492 return !__sync_lock_test_and_set (latch->bits->xcl, 1);
495 void bt_releasemutex(MutexLatch *latch)
497 *latch->bits->xcl = 0;
499 if( *latch->bits->waiters )
500 sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
503 // reader/writer lock implementation
505 void WriteLock (RWLock *lock, ushort tid, uint line)
507 if( lock->tid == tid ) {
511 bt_mutexlock (lock->xcl);
512 bt_mutexlock (lock->wrt);
513 bt_releasemutex (lock->xcl);
519 void WriteRelease (RWLock *lock)
526 bt_releasemutex (lock->wrt);
529 void ReadLock (RWLock *lock, ushort tid)
531 bt_mutexlock (lock->xcl);
533 if( !__sync_fetch_and_add (&lock->readers, 1) )
534 bt_mutexlock (lock->wrt);
536 bt_releasemutex (lock->xcl);
539 void ReadRelease (RWLock *lock)
541 if( __sync_fetch_and_sub (&lock->readers, 1) == 1 )
542 bt_releasemutex (lock->wrt);
545 // recovery manager -- flush dirty pages
547 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
549 uint cnt3 = 0, cnt2 = 0, cnt = 0;
554 // flush dirty pool pages to the btree
556 fprintf(stderr, "Start flushlsn ");
557 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
558 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
559 latch = mgr->latchsets + entry;
560 bt_mutexlock (latch->modify);
561 bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
564 bt_writepage(mgr, page, latch->page_no, 0);
565 latch->dirty = 0, cnt++;
567 if( latch->pin & ~CLOCK_bit )
569 bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
570 bt_releasemutex (latch->modify);
572 for( entry = 1; entry < mgr->leaftotal; entry++ ) {
573 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
574 latch = mgr->leafsets + entry;
575 bt_mutexlock (latch->modify);
576 bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
579 bt_writepage(mgr, page, latch->page_no, 1);
580 latch->dirty = 0, cnt++;
582 if( latch->pin & ~CLOCK_bit )
584 bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
585 bt_releasemutex (latch->modify);
587 fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
588 fprintf(stderr, "begin sync");
589 for( segment = 0; segment < mgr->segments; segment++ )
590 if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
591 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
592 fprintf(stderr, " end sync\n");
595 // recovery manager -- process current recovery buff on startup
596 // this won't do much if previous session was properly closed.
598 BTERR bt_recoveryredo (BtMgr *mgr)
605 hdr = (BtLogHdr *)mgr->redobuff;
606 mgr->flushlsn = hdr->lsn;
609 hdr = (BtLogHdr *)(mgr->redobuff + offset);
610 switch( hdr->type ) {
614 case BTRM_add: // add a unique key-value to btree
616 case BTRM_dup: // add a duplicate key-value to btree
617 case BTRM_del: // delete a key-value from btree
618 case BTRM_upd: // update a key with a new value
619 case BTRM_new: // allocate a new empty page
620 case BTRM_old: // reuse an old empty page
626 // recovery manager -- append new entry to recovery log
627 // flush dirty pages to disk when it overflows.
629 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
631 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
632 uint amt = sizeof(BtLogHdr);
636 bt_mutexlock (mgr->redo);
639 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
641 // see if new entry fits in buffer
642 // flush and reset if it doesn't
644 if( amt > size - mgr->redoend ) {
645 mgr->flushlsn = mgr->lsn;
646 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
647 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
650 bt_flushlsn(mgr, thread_no);
653 // fill in new entry & either eof or end block
655 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
660 hdr->lsn = ++mgr->lsn;
664 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
665 memset (eof, 0, sizeof(BtLogHdr));
667 // fill in key and value
670 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
671 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
674 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
675 memset (eof, 0, sizeof(BtLogHdr));
678 last = mgr->redolast & ~0xfff;
681 if( end - last + sizeof(BtLogHdr) >= 32768 )
682 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
683 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
687 bt_releasemutex(mgr->redo);
691 // recovery manager -- append transaction to recovery log
692 // flush dirty pages to disk when it overflows.
694 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
696 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
697 uint amt = 0, src, type;
704 // determine amount of redo recovery log space required
706 for( src = 0; src++ < source->cnt; ) {
707 key = keyptr(source,src);
708 val = valptr(source,src);
709 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
710 amt += sizeof(BtLogHdr);
713 bt_mutexlock (mgr->redo);
715 // see if new entry fits in buffer
716 // flush and reset if it doesn't
718 if( amt > size - mgr->redoend ) {
719 mgr->flushlsn = mgr->lsn;
720 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
721 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
724 bt_flushlsn (mgr, thread_no);
727 // assign new lsn to transaction
731 // fill in new entries
733 for( src = 0; src++ < source->cnt; ) {
734 key = keyptr(source, src);
735 val = valptr(source, src);
737 switch( slotptr(source, src)->type ) {
749 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
750 amt += sizeof(BtLogHdr);
752 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
758 // fill in key and value
760 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
761 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
766 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
767 memset (eof, 0, sizeof(BtLogHdr));
770 last = mgr->redolast & ~0xfff;
773 if( end - last + sizeof(BtLogHdr) >= 32768 )
774 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
775 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
779 bt_releasemutex(mgr->redo);
783 // sync a single btree page to disk
785 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
787 uint segment = latch->page_no >> 16;
788 uint page_size = mgr->page_size;
791 if( bt_writepage (mgr, page, latch->page_no, latch->leaf) )
795 page_size <<= mgr->leaf_xtra;
797 perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
799 if( msync (perm, page_size, MS_SYNC) < 0 )
800 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
806 // read page into buffer pool from permanent location in Btree file
808 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
810 int flag = PROT_READ | PROT_WRITE;
811 uint page_size = mgr->page_size;
812 uint off = 0, segment, fragment;
816 page_size <<= mgr->leaf_xtra;
818 fragment = page_no & 0xffff;
819 segment = page_no >> 16;
822 while( off < page_size ) {
824 segment++, fragment = 0;
826 if( segment < mgr->segments ) {
827 perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
829 memcpy ((unsigned char *)page + off, perm, mgr->page_size);
830 off += mgr->page_size;
835 bt_mutexlock (mgr->maps);
837 if( segment < mgr->segments ) {
838 bt_releasemutex (mgr->maps);
842 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
845 bt_releasemutex (mgr->maps);
851 // write page to permanent location in Btree file
853 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
855 int flag = PROT_READ | PROT_WRITE;
856 uint page_size = mgr->page_size;
857 uint off = 0, segment, fragment;
861 page_size <<= mgr->leaf_xtra;
863 fragment = page_no & 0xffff;
864 segment = page_no >> 16;
867 while( off < page_size ) {
869 segment++, fragment = 0;
871 if( segment < mgr->segments ) {
872 perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
873 memcpy (perm, (unsigned char *)page + off, mgr->page_size);
874 off += mgr->page_size;
879 bt_mutexlock (mgr->maps);
881 if( segment < mgr->segments ) {
882 bt_releasemutex (mgr->maps);
886 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
888 bt_releasemutex (mgr->maps);
894 // set CLOCK bit in latch
895 // decrement pin count
899 void bt_unpinlatch (BtLatchSet *latch, ushort thread_no, uint line)
901 bt_mutexlock(latch->modify);
902 latch->pin |= CLOCK_bit;
905 bt_releasemutex(latch->modify);
908 // return the btree cached page address
910 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
912 uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets);
916 page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
918 page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
923 // return next available leaf entry
924 // and with latch entry locked
926 uint bt_availleaf (BtMgr *mgr)
933 entry = __sync_fetch_and_add (&mgr->leafvictim, 1) + 1;
935 entry = _InterlockedIncrement (&mgr->leafvictim);
937 entry %= mgr->leaftotal;
942 latch = mgr->leafsets + entry;
944 if( !bt_mutextry(latch->modify) )
947 // return this entry if it is not pinned
952 // if the CLOCK bit is set
955 latch->pin &= ~CLOCK_bit;
956 bt_releasemutex(latch->modify);
960 // return next available latch entry
961 // and with latch entry locked
963 uint bt_availnext (BtMgr *mgr)
970 entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
972 entry = _InterlockedIncrement (&mgr->latchvictim);
974 entry %= mgr->latchtotal;
979 latch = mgr->latchsets + entry;
981 if( !bt_mutextry(latch->modify) )
984 // return this entry if it is not pinned
989 // if the CLOCK bit is set
992 latch->pin &= ~CLOCK_bit;
993 bt_releasemutex(latch->modify);
997 // pin leaf in leaf buffer pool
998 // return with latchset pinned
1000 BtLatchSet *bt_pinleaf (BtMgr *mgr, uid page_no, ushort thread_no)
1002 uint hashidx = page_no % mgr->leafhash;
1007 // try to find our entry
1009 bt_mutexlock(mgr->leaftable[hashidx].latch);
1011 if( entry = mgr->leaftable[hashidx].entry )
1013 latch = mgr->leafsets + entry;
1015 if( page_no == latch->page_no )
1017 } while( entry = latch->next );
1019 // found our entry: increment pin
1022 latch = mgr->leafsets + entry;
1023 bt_mutexlock(latch->modify);
1024 latch->pin |= CLOCK_bit;
1027 bt_releasemutex(latch->modify);
1028 bt_releasemutex(mgr->leaftable[hashidx].latch);
1032 // find and reuse unpinned entry
1035 entry = bt_availleaf (mgr);
1036 latch = mgr->leafsets + entry;
1038 idx = latch->page_no % mgr->leafhash;
1040 // if latch is on a different hash chain
1041 // unlink from the old page_no chain
1043 if( latch->page_no )
1044 if( idx != hashidx ) {
1046 // skip over this entry if latch not available
1048 if( !bt_mutextry (mgr->leaftable[idx].latch) ) {
1049 bt_releasemutex(latch->modify);
1054 mgr->leafsets[latch->prev].next = latch->next;
1056 mgr->leaftable[idx].entry = latch->next;
1059 mgr->leafsets[latch->next].prev = latch->prev;
1061 bt_releasemutex (mgr->leaftable[idx].latch);
1064 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1066 // update permanent page area in btree from buffer pool
1067 // no read-lock is required since page is not pinned.
1070 if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) )
1071 return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1075 if( bt_readpage (mgr, page, page_no, 1) )
1076 return mgr->line = __LINE__, NULL;
1078 // link page as head of hash table chain
1079 // if this is a never before used entry,
1080 // or it was previously on a different
1081 // hash table chain. Otherwise, just
1082 // leave it in its current hash table
1085 if( !latch->page_no || hashidx != idx ) {
1086 if( latch->next = mgr->leaftable[hashidx].entry )
1087 mgr->leafsets[latch->next].prev = entry;
1089 mgr->leaftable[hashidx].entry = entry;
1093 // fill in latch structure
1095 latch->pin = CLOCK_bit | 1;
1096 latch->page_no = page_no;
1099 bt_releasemutex (latch->modify);
1100 bt_releasemutex (mgr->leaftable[hashidx].latch);
1104 // pin page in non-leaf buffer pool
1105 // return with latchset pinned
1107 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, ushort thread_no)
1109 uint hashidx = page_no % mgr->latchhash;
1114 // try to find our entry
1116 bt_mutexlock(mgr->hashtable[hashidx].latch);
1118 if( entry = mgr->hashtable[hashidx].entry ) do
1120 latch = mgr->latchsets + entry;
1122 if( page_no == latch->page_no )
1124 } while( entry = latch->next );
1126 // found our entry: increment pin
1129 latch = mgr->latchsets + entry;
1130 bt_mutexlock(latch->modify);
1131 latch->pin |= CLOCK_bit;
1133 bt_releasemutex(latch->modify);
1134 bt_releasemutex(mgr->hashtable[hashidx].latch);
1138 // find and reuse unpinned entry
1141 entry = bt_availnext (mgr);
1142 latch = mgr->latchsets + entry;
1144 idx = latch->page_no % mgr->latchhash;
1146 // if latch is on a different hash chain
1147 // unlink from the old page_no chain
1149 if( latch->page_no )
1150 if( idx != hashidx ) {
1152 // skip over this entry if latch not available
1154 if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1155 bt_releasemutex(latch->modify);
1160 mgr->latchsets[latch->prev].next = latch->next;
1162 mgr->hashtable[idx].entry = latch->next;
1165 mgr->latchsets[latch->next].prev = latch->prev;
1167 bt_releasemutex (mgr->hashtable[idx].latch);
1170 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1172 // update permanent page area in btree from buffer pool
1173 // no read-lock is required since page is not pinned.
1176 if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1177 return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1181 if( bt_readpage (mgr, page, page_no, 0) )
1182 return mgr->line = __LINE__, NULL;
1184 // link page as head of hash table chain
1185 // if this is a never before used entry,
1186 // or it was previously on a different
1187 // hash table chain. Otherwise, just
1188 // leave it in its current hash table
1191 if( !latch->page_no || hashidx != idx ) {
1192 if( latch->next = mgr->hashtable[hashidx].entry )
1193 mgr->latchsets[latch->next].prev = entry;
1195 mgr->hashtable[hashidx].entry = entry;
1199 // fill in latch structure
1201 latch->pin = CLOCK_bit | 1;
1202 latch->page_no = page_no;
1205 bt_releasemutex (latch->modify);
1206 bt_releasemutex (mgr->hashtable[hashidx].latch);
1210 void bt_mgrclose (BtMgr *mgr)
1218 // flush previously written dirty pages
1219 // and write recovery buffer to disk
1221 fdatasync (mgr->idx);
1223 if( mgr->redoend ) {
1224 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1225 memset (eof, 0, sizeof(BtLogHdr));
1228 // write remaining dirty pool pages to the btree
1230 for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1231 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1232 latch = mgr->latchsets + slot;
1234 if( latch->dirty ) {
1235 bt_writepage(mgr, page, latch->page_no, 0);
1236 latch->dirty = 0, num++;
1240 // write remaining dirty leaf pages to the btree
1242 for( slot = 1; slot < mgr->leaftotal; slot++ ) {
1243 page = (BtPage)(((uid)slot << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1244 latch = mgr->leafsets + slot;
1246 if( latch->dirty ) {
1247 bt_writepage(mgr, page, latch->page_no, 1);
1248 latch->dirty = 0, num++;
1252 // clear redo recovery buffer on disk.
1254 if( mgr->pagezero->redopages ) {
1255 eof = (BtLogHdr *)mgr->redobuff;
1256 memset (eof, 0, sizeof(BtLogHdr));
1257 eof->lsn = mgr->lsn;
1258 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1259 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1262 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1265 while( mgr->segments )
1266 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1268 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1269 munmap (mgr->leafpool, (uid)mgr->nleafpage << mgr->page_bits);
1270 munmap (mgr->pagezero, mgr->page_size);
1272 FlushViewOfFile(mgr->pagezero, 0);
1273 UnmapViewOfFile(mgr->pagezero);
1274 UnmapViewOfFile(mgr->pagepool);
1275 CloseHandle(mgr->halloc);
1276 CloseHandle(mgr->hpool);
1282 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1283 FlushFileBuffers(mgr->idx);
1284 CloseHandle(mgr->idx);
1289 // close and release memory
1291 void bt_close (BtDb *bt)
1298 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1303 // open/create new btree buffer manager
1305 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1306 // size of page pool (e.g. 262144) and leaf pool
1308 BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax, uint leafmax, uint redopages)
1310 uint lvl, attr, last, slot, idx;
1311 uint nlatchpage, latchhash;
1312 uint nleafpage, leafhash;
1313 unsigned char value[BtId];
1314 int flag, initit = 0;
1315 BtPageZero *pagezero;
1323 // determine sanity of page size and buffer pool
1325 if( leafxtra + pagebits > BT_maxbits )
1326 fprintf (stderr, "pagebits + leafxtra > maxbits\n"), exit(1);
1328 if( pagebits < BT_minbits )
1329 fprintf (stderr, "pagebits < minbits\n"), exit(1);
1332 mgr = calloc (1, sizeof(BtMgr));
1334 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1336 if( mgr->idx == -1 ) {
1337 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1338 return free(mgr), NULL;
1341 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1342 attr = FILE_ATTRIBUTE_NORMAL;
1343 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1345 if( mgr->idx == INVALID_HANDLE_VALUE ) {
1346 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1347 return GlobalFree(mgr), NULL;
1352 pagezero = valloc (BT_maxpage);
1355 // read minimum page size to get root info
1356 // to support raw disk partition files
1357 // check if page_bits == 0 on the disk.
1359 if( size = lseek (mgr->idx, 0L, 2) )
1360 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1361 if( pagezero->page_bits ) {
1362 pagebits = pagezero->page_bits;
1363 leafxtra = pagezero->leaf_xtra;
1367 return free(mgr), free(pagezero), NULL;
1371 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1372 size = GetFileSize(mgr->idx, amt);
1374 if( size || *amt ) {
1375 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1376 return bt_mgrclose (mgr), NULL;
1377 pagebits = pagezero->page_bits;
1378 leafxtra = pagezero->leaf_xtra;
1383 mgr->page_size = 1 << pagebits;
1384 mgr->page_bits = pagebits;
1385 mgr->leaf_xtra = leafxtra;
1387 // calculate number of latch hash table entries
1389 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1391 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1392 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1) >> mgr->page_bits;
1393 mgr->latchtotal = nodemax;
1395 // calculate number of leaf hash table entries
1397 mgr->nleafpage = ((uid)leafmax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1399 mgr->nleafpage += leafmax << leafxtra; // size of the leaf pool in pages
1400 mgr->nleafpage += (sizeof(BtLatchSet) * leafmax + mgr->page_size - 1) >> mgr->page_bits;
1401 mgr->leaftotal = leafmax;
1403 mgr->redopage = LEAF_page + (1 << leafxtra);
1408 // initialize an empty b-tree with latch page, root page, page of leaves
1409 // and page(s) of latches and page pool cache
1411 ftruncate (mgr->idx, (mgr->redopage + pagezero->redopages) << mgr->page_bits);
1412 memset (pagezero, 0, 1 << pagebits);
1413 pagezero->alloc->lvl = MIN_lvl - 1;
1414 pagezero->redopages = redopages;
1415 pagezero->page_bits = mgr->page_bits;
1416 pagezero->leaf_xtra = leafxtra;
1418 bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages);
1419 pagezero->leafpages = 1;
1421 // initialize left-most LEAF page in
1422 // alloc->left and count of active leaf pages.
1424 bt_putid (pagezero->alloc->left, LEAF_page);
1426 if( bt_writepage (mgr, pagezero->alloc, 0, 0) ) {
1427 fprintf (stderr, "Unable to create btree page zero\n");
1428 return bt_mgrclose (mgr), NULL;
1431 memset (pagezero, 0, 1 << pagebits);
1433 for( lvl=MIN_lvl; lvl--; ) {
1434 BtSlot *node = slotptr(pagezero->alloc, 1);
1435 node->off = mgr->page_size;
1438 node->off <<= mgr->leaf_xtra;
1440 node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1441 key = keyptr(pagezero->alloc, 1);
1442 key->len = 2; // create stopper key
1446 bt_putid(value, MIN_lvl - lvl + 1);
1447 val = valptr(pagezero->alloc, 1);
1448 val->len = lvl ? BtId : 0;
1449 memcpy (val->value, value, val->len);
1451 pagezero->alloc->min = node->off;
1452 pagezero->alloc->lvl = lvl;
1453 pagezero->alloc->cnt = 1;
1454 pagezero->alloc->act = 1;
1455 pagezero->alloc->page_no = MIN_lvl - lvl;
1457 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, !lvl) ) {
1458 fprintf (stderr, "Unable to create btree page\n");
1459 return bt_mgrclose (mgr), NULL;
1467 VirtualFree (pagezero, 0, MEM_RELEASE);
1470 // mlock the first segment of 64K pages
1472 flag = PROT_READ | PROT_WRITE;
1473 mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1476 if( mgr->pages[0] == MAP_FAILED ) {
1477 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1478 return bt_mgrclose (mgr), NULL;
1481 mgr->pagezero = (BtPageZero *)mgr->pages[0];
1482 mlock (mgr->pagezero, mgr->page_size);
1484 mgr->redobuff = mgr->pages[0] + (mgr->redopage << mgr->page_bits);
1485 mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1487 // allocate pool buffers
1489 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1490 if( mgr->pagepool == MAP_FAILED ) {
1491 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1492 return bt_mgrclose (mgr), NULL;
1495 mgr->leafpool = mmap (0, (uid)mgr->nleafpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1496 if( mgr->leafpool == MAP_FAILED ) {
1497 fprintf (stderr, "Unable to mmap anonymous leaf pool pages, error = %d\n", errno);
1498 return bt_mgrclose (mgr), NULL;
1501 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1502 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1503 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1505 mgr->leafsets = (BtLatchSet *)(mgr->leafpool + ((uid)mgr->leaftotal << mgr->page_bits << mgr->leaf_xtra));
1506 mgr->leaftable = (BtHashEntry *)(mgr->leafsets + mgr->leaftotal);
1507 mgr->leafhash = (mgr->leafpool + ((uid)mgr->nleafpage << mgr->page_bits) - (unsigned char *)mgr->leaftable) / sizeof(BtHashEntry);
1512 // open BTree access method
1513 // based on buffer manager
1515 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1517 BtDb *bt = malloc (sizeof(*bt));
1519 memset (bt, 0, sizeof(*bt));
1523 bt->cursor = valloc (mgr->page_size << mgr->leaf_xtra);
1525 bt->cursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
1528 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1530 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1535 // compare two keys, return > 0, = 0, or < 0
1536 // =0: keys are same
1539 // as the comparison value
1541 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1543 uint len1 = key1->len;
1546 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1557 // place write, read, or parent lock on requested page_no.
1559 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1563 ReadLock (latch->readwr, thread_no);
1567 //fprintf(stderr, "WrtRqst %d by %d at %d\n", (uint)latch->page_no, thread_no, line);
1568 WriteLock (latch->readwr, thread_no, line);
1570 //fprintf(stderr, "WrtLock %d by %d at %d\n", (uint)latch->page_no, thread_no, line);
1573 ReadLock (latch->access, thread_no);
1576 WriteLock (latch->access, thread_no, line);
1579 WriteLock (latch->parent, thread_no, line);
1582 WriteLock (latch->link, thread_no, line);
1587 // remove write, read, or parent lock on requested page
1589 void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1593 ReadRelease (latch->readwr);
1597 //fprintf(stderr, "Un Lock %d by %d at %d\n", (uint)latch->page_no, thread_no, line);
1598 WriteRelease (latch->readwr);
1601 ReadRelease (latch->access);
1604 WriteRelease (latch->access);
1607 WriteRelease (latch->parent);
1610 WriteRelease (latch->link);
1615 // allocate a new page
1616 // return with page latched, but unlocked.
1618 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_no)
1620 uint page_size = mgr->page_size, page_xtra = 0;
1621 unsigned char *freechain;
1625 freechain = mgr->pagezero->freechain;
1627 freechain = mgr->pagezero->leafchain;
1628 mgr->pagezero->leafpages++;
1629 page_xtra = mgr->leaf_xtra;
1630 page_size <<= page_xtra;
1633 // lock allocation page
1635 bt_mutexlock(mgr->lock);
1637 // use empty chain first
1638 // else allocate new page
1640 if( page_no = bt_getid(freechain) ) {
1641 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1642 set->page = bt_mappage (mgr, set->latch);
1644 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1646 bt_putid(freechain, bt_getid(set->page->right));
1648 // the page is currently nopromote and this
1649 // will keep bt_txnpromote out.
1651 // contents will replace this bit
1652 // and pin will keep bt_txnpromote out
1654 contents->page_no = page_no;
1655 contents->nopromote = 0;
1656 set->latch->dirty = 1;
1658 memcpy (set->page, contents, page_size);
1660 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1661 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1663 bt_releasemutex(mgr->lock);
1667 page_no = bt_getid(mgr->pagezero->alloc->right);
1668 bt_putid(mgr->pagezero->alloc->right, page_no+(1 << page_xtra));
1670 // unlock allocation latch and
1671 // extend file into new page.
1673 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1674 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1675 bt_releasemutex(mgr->lock);
1677 // keep bt_txnpromote out of this page
1678 contents->nopromote = 1;
1679 contents->page_no = page_no;
1680 if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
1681 fprintf(stderr, "Write %d error %d\n", (uint)page_no, errno);
1682 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1683 set->page = bt_mappage (mgr, set->latch);
1685 return mgr->err_thread = thread_no, mgr->err;
1687 // now pin will keep bt_txnpromote out
1689 set->page->nopromote = 0;
1690 set->latch->dirty = 1;
1694 // find slot in page for given key at a given level
1696 int bt_findslot (BtPage page, unsigned char *key, uint len)
1698 uint diff, higher = page->cnt, low = 1, slot;
1701 // make stopper key an infinite fence value
1703 if( bt_getid (page->right) )
1708 // low is the lowest candidate.
1709 // loop ends when they meet
1711 // higher is already
1712 // tested as .ge. the passed key.
1714 while( diff = higher - low ) {
1715 slot = low + ( diff >> 1 );
1716 if( keycmp (keyptr(page, slot), key, len) < 0 )
1719 higher = slot, good++;
1722 // return zero if key is on right link page
1724 return good ? higher : 0;
1727 // find and load page at given level for given key
1728 // leave page rd or wr locked as requested
1730 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1732 uid page_no = ROOT_page, prevpage_no = 0;
1733 uint drill = 0xff, slot;
1734 BtLatchSet *prevlatch;
1735 uint mode, prevmode;
1740 // start at root of btree and drill down
1743 // determine lock mode of drill level
1744 mode = (drill == lvl) ? lock : BtLockRead;
1746 if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1747 set->page = bt_mappage (mgr, set->latch);
1751 // obtain access lock using lock chaining with Access mode
1753 if( page_no > ROOT_page )
1754 bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1756 // release & unpin parent or left sibling page
1759 bt_unlockpage(prevmode, prevlatch, thread_no, __LINE__);
1760 bt_unpinlatch (prevlatch, thread_no, __LINE__);
1764 // obtain mode lock using lock coupling through AccessLock
1766 bt_lockpage(mode, set->latch, thread_no, __LINE__);
1768 // grab our fence key
1770 ptr=keyptr(set->page,set->page->cnt);
1772 if( set->page->free )
1773 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1775 if( page_no > ROOT_page )
1776 bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1778 // re-read and re-lock root after determining actual level of root
1780 if( set->page->lvl != drill) {
1781 if( set->latch->page_no != ROOT_page )
1782 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1784 drill = set->page->lvl;
1786 if( lock != BtLockRead && drill == lvl ) {
1787 bt_unlockpage(mode, set->latch, thread_no, __LINE__);
1788 bt_unpinlatch (set->latch, thread_no, __LINE__);
1793 prevpage_no = set->latch->page_no;
1794 prevlatch = set->latch;
1795 prevpage = set->page;
1798 // if requested key is beyond our fence,
1799 // slide to the right
1801 if( keycmp (ptr, key, len) < 0 )
1802 if( page_no = bt_getid(set->page->right) )
1805 // if page is part of a delete operation,
1806 // slide to the left;
1808 if( set->page->kill ) {
1809 bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
1810 page_no = bt_getid(set->page->left);
1811 bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
1815 // find key on page at this level
1816 // and descend to requested level
1818 if( slot = bt_findslot (set->page, key, len) ) {
1822 // find next non-dead slot -- the fence key if nothing else
1824 while( slotptr(set->page, slot)->dead )
1825 if( slot++ < set->page->cnt )
1828 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1830 val = valptr(set->page, slot);
1832 if( val->len == BtId )
1833 page_no = bt_getid(val->value);
1835 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0;
1841 // slide right into next page
1843 page_no = bt_getid(set->page->right);
1847 // return error on end of right chain
1849 mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1850 return 0; // return error
1853 // return page to free list
1854 // page must be delete & write locked
1855 // and have no keys pointing to it.
1857 void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1859 unsigned char *freechain;
1861 if( set->page->lvl )
1862 freechain = mgr->pagezero->freechain;
1864 freechain = mgr->pagezero->leafchain;
1865 mgr->pagezero->leafpages--;
1868 // lock allocation page
1870 bt_mutexlock (mgr->lock);
1874 memcpy(set->page->right, freechain, BtId);
1875 bt_putid(freechain, set->latch->page_no);
1876 set->latch->dirty = 1;
1877 set->page->free = 1;
1879 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1880 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1882 // unlock released page
1883 // and unlock allocation page
1885 bt_unlockpage (BtLockDelete, set->latch, thread_no, __LINE__);
1886 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1887 bt_unpinlatch (set->latch, thread_no, __LINE__);
1888 bt_releasemutex (mgr->lock);
1891 // a fence key was deleted from a page
1892 // push new fence value upwards
1894 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1896 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1897 unsigned char value[BtId];
1901 // remove the old fence value
1903 ptr = keyptr(set->page, set->page->cnt);
1904 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1905 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1906 set->latch->dirty = 1;
1908 // cache new fence value
1910 ptr = keyptr(set->page, set->page->cnt);
1911 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1913 bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
1914 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1916 // insert new (now smaller) fence key
1918 bt_putid (value, set->latch->page_no);
1919 ptr = (BtKey*)leftkey;
1921 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1922 return mgr->err_thread = thread_no, mgr->err;
1924 // now delete old fence key
1926 ptr = (BtKey*)rightkey;
1928 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1929 return mgr->err_thread = thread_no, mgr->err;
1931 bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
1932 bt_unpinlatch(set->latch, thread_no, __LINE__);
1936 // root has a single child
1937 // collapse a level from the tree
1939 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1946 // find the child entry and promote as new root contents
1949 for( idx = 0; idx++ < root->page->cnt; )
1950 if( !slotptr(root->page, idx)->dead )
1953 val = valptr(root->page, idx);
1955 if( val->len == BtId )
1956 page_no = bt_getid (valptr(root->page, idx)->value);
1958 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1960 if( child->latch = bt_pinlatch (mgr, page_no, thread_no) )
1961 child->page = bt_mappage (mgr, child->latch);
1963 return mgr->err_thread = thread_no, mgr->err;
1965 bt_lockpage (BtLockDelete, child->latch, thread_no, __LINE__);
1966 bt_lockpage (BtLockWrite, child->latch, thread_no, __LINE__);
1968 memcpy (root->page, child->page, mgr->page_size);
1969 root->latch->dirty = 1;
1971 bt_freepage (mgr, child, thread_no);
1973 } while( root->page->lvl > 1 && root->page->act == 1 );
1975 bt_unlockpage (BtLockWrite, root->latch, thread_no, __LINE__);
1976 bt_unpinlatch (root->latch, thread_no, __LINE__);
1980 // delete a page and manage key
1981 // call with page writelocked
1983 // returns with page unpinned
1984 // from the page pool.
1986 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl)
1988 unsigned char lowerfence[BT_keyarray];
1989 uint page_size = mgr->page_size;
1990 BtPageSet right[1], temp[1];
1991 unsigned char value[BtId];
1992 uid page_no, right2;
1996 page_size <<= mgr->leaf_xtra;
1998 // cache copy of original fence key
1999 // that is being deleted.
2001 ptr = keyptr(set->page, set->page->cnt);
2002 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
2004 // obtain locks on right page
2006 page_no = bt_getid(set->page->right);
2008 if( right->latch = lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
2009 right->page = bt_mappage (mgr, right->latch);
2013 bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2015 if( right->page->kill )
2016 return mgr->line = __LINE__, mgr->err = BTERR_struct;
2018 // pull contents of right peer into our empty page
2019 // preserving our left page number.
2021 bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
2022 page_no = bt_getid(set->page->left);
2023 memcpy (set->page, right->page, page_size);
2024 bt_putid (set->page->left, page_no);
2025 bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
2027 set->page->page_no = set->latch->page_no;
2028 set->latch->dirty = 1;
2030 if( right2 = bt_getid (right->page->right) ) {
2031 if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2032 temp->page = bt_mappage (mgr, temp->latch);
2036 bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2037 bt_putid (temp->page->left, set->latch->page_no);
2038 bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2039 bt_unpinlatch (temp->latch, thread_no, __LINE__);
2040 } else { // page is rightmost
2041 bt_mutexlock (mgr->lock);
2042 bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
2043 bt_releasemutex(mgr->lock);
2046 // mark right page deleted and release lock
2048 right->latch->dirty = 1;
2049 right->page->kill = 1;
2051 // redirect higher key directly to our new node contents
2053 ptr = keyptr(right->page, right->page->cnt);
2054 bt_putid (value, set->latch->page_no);
2056 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
2059 bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2061 // delete orignal fence key in parent
2062 // and unlock our page.
2064 ptr = (BtKey *)lowerfence;
2066 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
2069 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2070 bt_unpinlatch (set->latch, thread_no, __LINE__);
2072 // obtain delete and write locks to right node
2074 bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
2075 bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2076 bt_freepage (mgr, right, thread_no);
2080 // find and delete key on page by marking delete flag bit
2081 // if page becomes empty, delete it from the btree
2083 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2085 uint slot, idx, found, fence;
2091 if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2092 node = slotptr(set->page, slot);
2093 ptr = keyptr(set->page, slot);
2095 return mgr->err_thread = thread_no, mgr->err;
2097 // if librarian slot, advance to real slot
2099 if( node->type == Librarian ) {
2100 ptr = keyptr(set->page, ++slot);
2101 node = slotptr(set->page, slot);
2104 fence = slot == set->page->cnt;
2106 // delete the key, ignore request if already dead
2108 if( found = !keycmp (ptr, key, len) )
2109 if( found = node->dead == 0 ) {
2110 val = valptr(set->page,slot);
2111 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2114 // mark node type as delete
2116 node->type = Delete;
2119 // collapse empty slots beneath the fence
2120 // on interiour nodes
2123 while( idx = set->page->cnt - 1 )
2124 if( slotptr(set->page, idx)->dead ) {
2125 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2126 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2134 // did we delete a fence key in an upper level?
2136 if( lvl && set->page->act && fence )
2137 if( bt_fixfence (mgr, set, lvl, thread_no) )
2138 return mgr->err_thread = thread_no, mgr->err;
2142 // do we need to collapse root?
2144 if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2145 if( bt_collapseroot (mgr, set, thread_no) )
2146 return mgr->err_thread = thread_no, mgr->err;
2150 // delete empty page
2152 if( !set->page->act )
2153 return bt_deletepage (mgr, set, thread_no, set->page->lvl);
2155 set->latch->dirty = 1;
2156 bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2157 bt_unpinlatch (set->latch, thread_no, __LINE__);
2161 // advance to next slot
2163 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2165 BtLatchSet *prevlatch;
2168 if( slot < set->page->cnt )
2171 prevlatch = set->latch;
2173 if( page_no = bt_getid(set->page->right) )
2174 if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
2175 set->page = bt_mappage (bt->mgr, set->latch);
2179 return bt->mgr->err = BTERR_struct, bt->mgr->err_thread = bt->thread_no, bt->mgr->line = __LINE__, 0;
2181 // obtain access lock using lock chaining with Access mode
2183 bt_lockpage(BtLockAccess, set->latch, bt->thread_no, __LINE__);
2185 bt_unlockpage(BtLockRead, prevlatch, bt->thread_no, __LINE__);
2186 bt_unpinlatch (prevlatch, bt->thread_no, __LINE__);
2188 bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
2189 bt_unlockpage(BtLockAccess, set->latch, bt->thread_no, __LINE__);
2193 // find unique key == given key, or first duplicate key in
2194 // leaf level and return number of value bytes
2195 // or (-1) if not found.
2197 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2205 if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2207 ptr = keyptr(set->page, slot);
2209 // skip librarian slot place holder
2211 if( slotptr(set->page, slot)->type == Librarian )
2212 ptr = keyptr(set->page, ++slot);
2214 // return actual key found
2216 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2219 if( slotptr(set->page, slot)->type == Duplicate )
2222 // not there if we reach the stopper key
2224 if( slot == set->page->cnt )
2225 if( !bt_getid (set->page->right) )
2228 // if key exists, return >= 0 value bytes copied
2229 // otherwise return (-1)
2231 if( slotptr(set->page, slot)->dead )
2235 if( !memcmp (ptr->key, key, len) ) {
2236 val = valptr (set->page,slot);
2237 if( valmax > val->len )
2239 memcpy (value, val->value, valmax);
2245 } while( slot = bt_findnext (bt, set, slot) );
2247 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
2248 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
2252 // check page for space available,
2253 // clean if necessary and return
2254 // 0 - page needs splitting
2255 // >0 new slot value
2257 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2259 uint page_size = mgr->page_size;
2260 BtPage page = set->page, frame;
2261 uint cnt = 0, idx = 0;
2262 uint max = page->cnt;
2267 if( !set->page->lvl )
2268 page_size <<= mgr->leaf_xtra;
2270 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2273 // skip cleanup and proceed to split
2274 // if there's not enough garbage
2277 if( page->garbage < page_size / 5 )
2280 frame = malloc (page_size);
2281 memcpy (frame, page, page_size);
2283 // skip page info and set rest of page to zero
2285 memset (page+1, 0, page_size - sizeof(*page));
2286 set->latch->dirty = 1;
2288 page->min = page_size;
2292 // clean up page first by
2293 // removing deleted keys
2295 while( cnt++ < max ) {
2299 if( cnt < max || frame->lvl )
2300 if( slotptr(frame,cnt)->dead )
2303 // copy the value across
2305 val = valptr(frame, cnt);
2306 page->min -= val->len + sizeof(BtVal);
2307 memcpy ((unsigned char *)page + page->min, val, val->len + sizeof(BtVal));
2309 // copy the key across
2311 key = keyptr(frame, cnt);
2312 page->min -= key->len + sizeof(BtKey);
2313 memcpy ((unsigned char *)page + page->min, key, key->len + sizeof(BtKey));
2315 // make a librarian slot
2317 slotptr(page, ++idx)->off = page->min;
2318 slotptr(page, idx)->type = Librarian;
2319 slotptr(page, idx)->dead = 1;
2323 slotptr(page, ++idx)->off = page->min;
2324 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2326 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2333 // see if page has enough space now, or does it need splitting?
2335 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2341 // split the root and raise the height of the btree
2343 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort thread_no)
2345 unsigned char leftkey[BT_keyarray];
2346 uint nxt = mgr->page_size;
2347 unsigned char value[BtId];
2354 frame = malloc (mgr->page_size);
2355 memcpy (frame, root->page, mgr->page_size);
2357 // save left page fence key for new root
2359 ptr = keyptr(root->page, root->page->cnt);
2360 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2362 // Obtain an empty page to use, and copy the current
2363 // root contents into it, e.g. lower keys
2365 if( bt_newpage(mgr, left, frame, thread_no) )
2366 return mgr->err_thread = thread_no, mgr->err;
2368 left_page_no = left->latch->page_no;
2369 bt_unpinlatch (left->latch, thread_no, __LINE__);
2372 // preserve the page info at the bottom
2373 // of higher keys and set rest to zero
2375 memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2377 // insert stopper key at top of newroot page
2378 // and increase the root height
2380 nxt -= BtId + sizeof(BtVal);
2381 bt_putid (value, right->page_no);
2382 val = (BtVal *)((unsigned char *)root->page + nxt);
2383 memcpy (val->value, value, BtId);
2386 nxt -= 2 + sizeof(BtKey);
2387 slotptr(root->page, 2)->off = nxt;
2388 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2393 // insert lower keys page fence key on newroot page as first key
2395 nxt -= BtId + sizeof(BtVal);
2396 bt_putid (value, left_page_no);
2397 val = (BtVal *)((unsigned char *)root->page + nxt);
2398 memcpy (val->value, value, BtId);
2401 ptr = (BtKey *)leftkey;
2402 nxt -= ptr->len + sizeof(BtKey);
2403 slotptr(root->page, 1)->off = nxt;
2404 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2406 bt_putid(root->page->right, 0);
2407 root->page->min = nxt; // reset lowest used offset and key count
2408 root->page->cnt = 2;
2409 root->page->act = 2;
2412 mgr->pagezero->alloc->lvl = root->page->lvl;
2414 // release and unpin root pages
2416 bt_unlockpage(BtLockWrite, root->latch, thread_no, __LINE__);
2417 bt_unpinlatch (root->latch, thread_no, __LINE__);
2419 bt_unpinlatch (right, thread_no, __LINE__);
2423 // split already locked full node
2425 // return pool entry for new right
2426 // page, pinned & unlocked
2428 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint linkleft)
2430 uint page_size = mgr->page_size;
2431 BtPageSet right[1], temp[1];
2432 uint cnt = 0, idx = 0, max;
2433 uint lvl = set->page->lvl;
2441 if( !set->page->lvl )
2442 page_size <<= mgr->leaf_xtra;
2444 // split higher half of keys to frame
2446 frame = malloc (page_size);
2447 memset (frame, 0, page_size);
2448 frame->min = page_size;
2449 max = set->page->cnt;
2453 while( cnt++ < max ) {
2454 if( cnt < max || set->page->lvl )
2455 if( slotptr(set->page, cnt)->dead )
2458 src = valptr(set->page, cnt);
2459 frame->min -= src->len + sizeof(BtVal);
2460 memcpy ((unsigned char *)frame + frame->min, src, src->len + sizeof(BtVal));
2462 key = keyptr(set->page, cnt);
2463 frame->min -= key->len + sizeof(BtKey);
2464 ptr = (BtKey*)((unsigned char *)frame + frame->min);
2465 memcpy (ptr, key, key->len + sizeof(BtKey));
2467 // add librarian slot
2469 slotptr(frame, ++idx)->off = frame->min;
2470 slotptr(frame, idx)->type = Librarian;
2471 slotptr(frame, idx)->dead = 1;
2475 slotptr(frame, ++idx)->off = frame->min;
2476 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2478 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2487 if( set->latch->page_no > ROOT_page ) {
2488 right2 = bt_getid (set->page->right);
2489 bt_putid (frame->right, right2);
2490 bt_putid (frame->left, set->latch->page_no);
2493 // get new free page and write higher keys to it.
2495 if( bt_newpage(mgr, right, frame, thread_no) )
2498 // link far right's left pointer to new page
2500 if( linkleft && set->latch->page_no > ROOT_page )
2502 if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2503 temp->page = bt_mappage (mgr, temp->latch);
2507 bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2508 bt_putid (temp->page->left, right->latch->page_no);
2509 bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2510 bt_unpinlatch (temp->latch, thread_no, __LINE__);
2511 } else { // page is rightmost
2512 bt_mutexlock (mgr->lock);
2513 bt_putid (mgr->pagezero->alloc->left, right->latch->page_no);
2514 bt_releasemutex(mgr->lock);
2517 // process lower keys
2519 memcpy (frame, set->page, page_size);
2520 memset (set->page+1, 0, page_size - sizeof(*set->page));
2521 set->latch->dirty = 1;
2523 set->page->min = page_size;
2524 set->page->garbage = 0;
2530 // assemble page of smaller keys
2532 while( cnt++ < max ) {
2533 if( slotptr(frame, cnt)->dead )
2535 val = valptr(frame, cnt);
2536 set->page->min -= val->len + sizeof(BtVal);
2537 memcpy ((unsigned char *)set->page + set->page->min, val, val->len + sizeof(BtVal));
2539 key = keyptr(frame, cnt);
2540 set->page->min -= key->len + sizeof(BtKey);
2541 memcpy ((unsigned char *)set->page + set->page->min, key, key->len + sizeof(BtKey));
2543 // add librarian slot
2545 slotptr(set->page, ++idx)->off = set->page->min;
2546 slotptr(set->page, idx)->type = Librarian;
2547 slotptr(set->page, idx)->dead = 1;
2551 slotptr(set->page, ++idx)->off = set->page->min;
2552 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2556 bt_putid(set->page->right, right->latch->page_no);
2557 set->page->cnt = idx;
2560 entry = right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
2564 // fix keys for newly split page
2565 // call with both pages pinned & locked
2566 // return unlocked and unpinned
2568 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2570 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2571 unsigned char value[BtId];
2572 uint lvl = set->page->lvl;
2576 // if current page is the root page, split it
2578 if( set->latch->page_no == ROOT_page )
2579 return bt_splitroot (mgr, set, right, thread_no);
2581 ptr = keyptr(set->page, set->page->cnt);
2582 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2584 page = bt_mappage (mgr, right);
2586 ptr = keyptr(page, page->cnt);
2587 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2589 // insert new fences in their parent pages
2591 bt_lockpage (BtLockParent, right, thread_no, __LINE__);
2593 bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
2594 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2596 // insert new fence for reformulated left block of smaller keys
2598 bt_putid (value, set->latch->page_no);
2599 ptr = (BtKey *)leftkey;
2601 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2602 return mgr->err_thread = thread_no, mgr->err;
2604 // switch fence for right block of larger keys to new right page
2606 bt_putid (value, right->page_no);
2607 ptr = (BtKey *)rightkey;
2609 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2610 return mgr->err_thread = thread_no, mgr->err;
2612 bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
2613 bt_unpinlatch (set->latch, thread_no, __LINE__);
2615 bt_unlockpage (BtLockParent, right, thread_no, __LINE__);
2616 bt_unpinlatch (right, thread_no, __LINE__);
2620 // install new key and value onto page
2621 // page must already be checked for
2624 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2626 uint idx, librarian;
2632 // if previous slot is a librarian slot, use it
2635 if( slotptr(set->page, slot-1)->type == Librarian )
2638 // copy value onto page
2640 set->page->min -= vallen + sizeof(BtVal);
2641 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2642 memcpy (val->value, value, vallen);
2645 // copy key onto page
2647 set->page->min -= keylen + sizeof(BtKey);
2648 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2649 memcpy (ptr->key, key, keylen);
2652 // find first empty slot
2654 for( idx = slot; idx < set->page->cnt; idx++ )
2655 if( slotptr(set->page, idx)->dead )
2658 // now insert key into array before slot,
2659 // adding as many librarian slots as
2662 if( idx == set->page->cnt ) {
2663 int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2665 librarian = ++idx - slot;
2666 avail /= sizeof(BtSlot);
2671 if( librarian > avail )
2675 rate = (idx - slot) / librarian;
2676 set->page->cnt += librarian;
2681 librarian = 0, rate = 0;
2683 while( idx > slot ) {
2685 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2688 // add librarian slot per rate
2691 if( (idx - slot + 1)/2 <= librarian * rate ) {
2692 node = slotptr(set->page, idx--);
2693 node->off = node[1].off;
2694 node->type = Librarian;
2700 set->latch->dirty = 1;
2705 node = slotptr(set->page, slot);
2706 node->off = set->page->min;
2712 // Insert new key into the btree at given level.
2713 // either add a new key or update/add an existing one
2715 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2717 uint slot, idx, len, entry;
2723 while( 1 ) { // find the page and slot for the current key
2724 if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2725 node = slotptr(set->page, slot);
2726 ptr = keyptr(set->page, slot);
2729 mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2730 return mgr->err_thread = thread_no, mgr->err;
2733 // if librarian slot == found slot, advance to real slot
2735 if( node->type == Librarian ) {
2736 // if( !keycmp (ptr, key, keylen) ) {
2737 ptr = keyptr(set->page, ++slot);
2738 node = slotptr(set->page, slot);
2741 // if inserting a duplicate key or unique
2742 // key that doesn't exist on the page,
2743 // check for adequate space on the page
2744 // and insert the new key before slot.
2749 if( !keycmp (ptr, key, keylen) )
2752 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2753 if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
2758 if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2759 if( !bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2762 return mgr->err_thread = thread_no, mgr->err;
2765 if( keycmp (ptr, key, keylen) )
2771 // if key already exists, update value and return
2773 val = valptr(set->page, slot);
2775 if( val->len >= vallen ) {
2781 set->page->garbage += val->len - vallen;
2782 set->latch->dirty = 1;
2784 memcpy (val->value, value, vallen);
2788 // new update value doesn't fit in existing value area
2789 // make sure page has room
2792 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2799 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2802 if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2803 if( !bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2806 return mgr->err_thread = thread_no, mgr->err;
2809 // copy key and value onto page and update slot
2811 set->page->min -= vallen + sizeof(BtVal);
2812 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2813 memcpy (val->value, value, vallen);
2816 set->latch->dirty = 1;
2817 set->page->min -= keylen + sizeof(BtKey);
2818 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2819 memcpy (ptr->key, key, keylen);
2822 slotptr(set->page,slot)->off = set->page->min;
2825 bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2826 bt_unpinlatch (set->latch, thread_no, __LINE__);
2830 // determine actual page where key is located
2831 // return slot number
2833 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2835 BtKey *key = keyptr(source,src), *ptr;
2836 unsigned char fence[BT_keyarray];
2839 if( locks[src].reuse )
2840 entry = locks[src-1].entry;
2842 entry = locks[src].entry;
2844 // find where our key is located
2845 // on current page or pages split on
2846 // same page txn operations.
2849 set->latch = mgr->leafsets + entry;
2850 set->page = bt_mappage (mgr, set->latch);
2852 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2853 if( slotptr(set->page, slot)->type == Librarian )
2855 if( locks[src].reuse )
2856 locks[src].entry = entry;
2859 } while( entry = set->latch->split );
2861 ptr = keyptr (set->page, set->page->cnt);
2862 memcpy (fence, ptr, ptr->len + 1);
2864 mgr->line = __LINE__, mgr->err = BTERR_atomic;
2868 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2870 BtKey *key = keyptr(source, src);
2871 BtVal *val = valptr(source, src);
2876 while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2877 if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2878 if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type) )
2879 return mgr->err_thread = thread_no, mgr->err;
2881 set->page->lsn = lsn;
2887 if( entry = bt_splitpage (mgr, set, thread_no, 0) )
2888 latch = mgr->leafsets + entry;
2890 return mgr->err_thread = thread_no, mgr->err;
2892 // splice right page into split chain
2895 bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2896 latch->split = set->latch->split;
2897 set->latch->split = entry;
2900 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
2903 // perform delete from smaller btree
2904 // insert a delete slot if not found there
2906 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2908 BtKey *key = keyptr(source, src);
2915 if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2916 node = slotptr(set->page, slot);
2917 ptr = keyptr(set->page, slot);
2918 val = valptr(set->page, slot);
2920 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
2922 // if slot is not found, insert a delete slot
2924 if( keycmp (ptr, key->key, key->len) )
2925 if( bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete) )
2928 // if node is already dead,
2929 // ignore the request.
2934 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2935 set->latch->dirty = 1;
2936 set->page->lsn = lsn;
2940 __sync_fetch_and_add(&mgr->found, 1);
2944 // release master's splits from right to left
2946 void bt_atomicrelease (BtMgr *mgr, uint entry, ushort thread_no)
2948 BtLatchSet *latch = mgr->leafsets + entry;
2951 bt_atomicrelease (mgr, latch->split, thread_no);
2954 bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
2955 bt_unpinlatch(latch, thread_no, __LINE__);
2958 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2960 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2961 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2963 return keycmp (key1, key2->key, key2->len);
2965 // atomic modification of a batch of keys.
2967 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2969 uint src, idx, slot, samepage, entry, que = 0;
2970 BtKey *key, *ptr, *key2;
2976 // stable sort the list of keys into order to
2977 // prevent deadlocks between threads.
2979 for( src = 1; src++ < source->cnt; ) {
2980 *temp = *slotptr(source,src);
2981 key = keyptr (source,src);
2983 for( idx = src; --idx; ) {
2984 key2 = keyptr (source,idx);
2985 if( keycmp (key, key2->key, key2->len) < 0 ) {
2986 *slotptr(source,idx+1) = *slotptr(source,idx);
2987 *slotptr(source,idx) = *temp;
2993 qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2994 // add entries to redo log
2996 if( bt->mgr->pagezero->redopages )
2997 lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
3001 // perform the individual actions in the transaction
3003 if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
3004 return bt->mgr->err;
3006 // if number of active pages
3007 // is greater than the buffer pool
3008 // promote page into larger btree
3011 while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no )
3012 if( bt_txnpromote (bt) )
3013 return bt->mgr->err;
3020 // execute the source list of inserts/deletes
3022 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
3024 uint src, idx, samepage, entry;
3025 BtPageSet set[1], prev[1];
3026 unsigned char value[BtId];
3034 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
3036 // Load the leaf page for each key
3037 // group same page references with reuse bit
3039 for( src = 0; src++ < source->cnt; ) {
3040 key = keyptr(source, src);
3042 // first determine if this modification falls
3043 // on the same page as the previous modification
3044 // note that the far right leaf page is a special case
3046 if( samepage = src > 1 )
3047 samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
3050 if( bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
3051 ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
3055 entry = set->latch - mgr->leafsets;
3056 locks[src].reuse = samepage;
3057 locks[src].entry = entry;
3059 // capture current lsn for master page
3061 locks[src].reqlsn = set->page->lsn;
3064 // insert or delete each key
3065 // process any splits or merges
3066 // run through txn list backwards
3068 samepage = source->cnt + 1;
3070 for( src = source->cnt; src; src-- ) {
3071 if( locks[src].reuse )
3074 // perform the txn operations
3075 // from smaller to larger on
3078 for( idx = src; idx < samepage; idx++ )
3079 switch( slotptr(source,idx)->type ) {
3081 if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
3087 if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
3092 bt_atomicpage (mgr, source, locks, idx, set);
3096 // after the same page operations have finished,
3097 // process master page for splits or deletion.
3099 latch = prev->latch = mgr->leafsets + locks[src].entry;
3100 prev->page = bt_mappage (mgr, prev->latch);
3103 // pick-up all splits from master page
3104 // each one is already pinned & WriteLocked.
3106 while( entry = prev->latch->split ) {
3107 set->latch = mgr->leafsets + entry;
3108 set->page = bt_mappage (mgr, set->latch);
3110 // delete empty master page by undoing its split
3111 // (this is potentially another empty page)
3112 // note that there are no pointers to it yet
3114 if( !prev->page->act ) {
3115 memcpy (set->page->left, prev->page->left, BtId);
3116 memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
3117 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3118 prev->latch->split = set->latch->split;
3119 prev->latch->dirty = 1;
3120 bt_freepage (mgr, set, thread_no);
3124 // remove empty split page from the split chain
3125 // and return it to the free list. No other
3126 // thread has its page number yet.
3128 if( !set->page->act ) {
3129 memcpy (prev->page->right, set->page->right, BtId);
3130 prev->latch->split = set->latch->split;
3132 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3133 bt_freepage (mgr, set, thread_no);
3137 // update prev's fence key
3139 ptr = keyptr(prev->page,prev->page->cnt);
3140 bt_putid (value, prev->latch->page_no);
3142 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3145 // splice in the left link into the split page
3147 bt_putid (set->page->left, prev->latch->page_no);
3150 bt_syncpage (mgr, prev->page, prev->latch);
3155 // update left pointer in next right page from last split page
3156 // (if all splits were reversed or none occurred, latch->split == 0)
3158 if( latch->split ) {
3159 // fix left pointer in master's original (now split)
3160 // far right sibling or set rightmost page in page zero
3162 if( right_page_no = bt_getid (prev->page->right) ) {
3163 if( set->latch = bt_pinleaf (mgr, right_page_no, thread_no) )
3164 set->page = bt_mappage (mgr, set->latch);
3168 bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3169 bt_putid (set->page->left, prev->latch->page_no);
3170 set->latch->dirty = 1;
3172 bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
3173 bt_unpinlatch (set->latch, thread_no, __LINE__);
3174 } else { // prev is rightmost page
3175 bt_mutexlock (mgr->lock);
3176 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3177 bt_releasemutex(mgr->lock);
3180 // switch the original fence key from the
3181 // master page to the last split page.
3183 ptr = keyptr(prev->page,prev->page->cnt);
3184 bt_putid (value, prev->latch->page_no);
3186 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) )
3190 bt_syncpage (mgr, prev->page, prev->latch);
3192 // unlock and unpin the split pages
3194 bt_atomicrelease (mgr, latch->split, thread_no);
3196 // unlock and unpin the master page
3199 bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
3200 bt_unpinlatch(latch, thread_no, __LINE__);
3204 // since there are no splits remaining, we're
3205 // finished if master page occupied
3207 if( prev->page->act ) {
3208 bt_unlockpage(BtLockWrite, prev->latch, thread_no, __LINE__);
3211 bt_syncpage (mgr, prev->page, prev->latch);
3213 bt_unpinlatch(prev->latch, thread_no, __LINE__);
3217 // any and all splits were reversed, and the
3218 // master page located in prev is empty, delete it
3220 if( bt_deletepage (mgr, prev, thread_no, 0) )
3228 // pick & promote a page into the larger btree
3230 BTERR bt_txnpromote (BtDb *bt)
3232 uint entry, slot, idx;
3240 entry = __sync_fetch_and_add(&bt->mgr->leafpromote, 1);
3242 entry = _InterlockedIncrement (&bt->mgr->leafpromote) - 1;
3244 entry %= bt->mgr->leaftotal;
3249 set->latch = bt->mgr->leafsets + entry;
3251 // skip this entry if it has never been used
3253 if( !set->latch->page_no )
3256 if( !bt_mutextry(set->latch->modify) )
3259 // skip this entry if it is pinned
3261 if( set->latch->pin & ~CLOCK_bit ) {
3262 bt_releasemutex(set->latch->modify);
3266 set->page = bt_mappage (bt->mgr, set->latch);
3268 // entry has no right sibling
3270 if( !bt_getid (set->page->right) ) {
3271 bt_releasemutex(set->latch->modify);
3275 // entry interiour node or being killed or constructed
3277 if( set->page->lvl || set->page->nopromote || set->page->kill ) {
3278 bt_releasemutex(set->latch->modify);
3282 // pin the page for our access
3283 // and leave it locked for the
3284 // duration of the promotion.
3287 bt_lockpage (BtLockWrite, set->latch, bt->thread_no, __LINE__);
3288 bt_releasemutex(set->latch->modify);
3290 // transfer slots in our selected page to the main btree
3291 if( !(entry % 100) )
3292 fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_no, set->page->act);
3294 if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
3295 fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
3296 return bt->main->err;
3299 // now delete the page
3301 if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
3302 fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread);
3304 return bt->mgr->err;
3308 // set cursor to highest slot on highest page
3310 uint bt_lastkey (BtDb *bt)
3312 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3315 if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
3316 set->page = bt_mappage (bt->mgr, set->latch);
3320 bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3321 memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3322 bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3323 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3324 return bt->cursor->cnt;
3327 // return previous slot on cursor page
3329 uint bt_prevkey (BtDb *bt, uint slot)
3331 uid cursor_page = bt->cursor->page_no;
3332 uid ourright, next, us = cursor_page;
3338 ourright = bt_getid(bt->cursor->right);
3341 if( !(next = bt_getid(bt->cursor->left)) )
3347 if( set->latch = bt_pinleaf (bt->mgr, next, bt->thread_no) )
3348 set->page = bt_mappage (bt->mgr, set->latch);
3352 bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3353 memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3354 bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3355 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3357 next = bt_getid (bt->cursor->right);
3359 if( bt->cursor->kill )
3363 if( next == ourright )
3368 return bt->cursor->cnt;
3371 // return next slot on cursor page
3372 // or slide cursor right into next page
3374 uint bt_nextkey (BtDb *bt, uint slot)
3380 right = bt_getid(bt->cursor->right);
3382 while( slot++ < bt->cursor->cnt )
3383 if( slotptr(bt->cursor,slot)->dead )
3385 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3393 if( set->latch = bt_pinleaf (bt->mgr, right, bt->thread_no) )
3394 set->page = bt_mappage (bt->mgr, set->latch);
3398 bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3399 memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3400 bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3401 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3406 return bt->mgr->err = 0;
3409 // cache page of keys into cursor and return starting slot for given key
3411 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3416 // cache page for retrieval
3418 if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3419 memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3423 bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3424 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3431 double getCpuTime(int type)
3434 FILETIME xittime[1];
3435 FILETIME systime[1];
3436 FILETIME usrtime[1];
3437 SYSTEMTIME timeconv[1];
3440 memset (timeconv, 0, sizeof(SYSTEMTIME));
3444 GetSystemTimeAsFileTime (xittime);
3445 FileTimeToSystemTime (xittime, timeconv);
3446 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3449 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3450 FileTimeToSystemTime (usrtime, timeconv);
3453 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3454 FileTimeToSystemTime (systime, timeconv);
3458 ans += (double)timeconv->wHour * 3600;
3459 ans += (double)timeconv->wMinute * 60;
3460 ans += (double)timeconv->wSecond;
3461 ans += (double)timeconv->wMilliseconds / 1000;
3466 #include <sys/resource.h>
3468 double getCpuTime(int type)
3470 struct rusage used[1];
3471 struct timeval tv[1];
3475 gettimeofday(tv, NULL);
3476 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3479 getrusage(RUSAGE_SELF, used);
3480 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3483 getrusage(RUSAGE_SELF, used);
3484 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3491 void bt_poolaudit (BtMgr *mgr, char *type)
3493 BtLatchSet *latch, test[1];
3496 memset (test, 0, sizeof(test));
3498 if( memcmp (test, mgr->latchsets, sizeof(test)) )
3499 fprintf(stderr, "%s latchset zero overwritten\n", type);
3501 if( memcmp (test, mgr->leafsets, sizeof(test)) )
3502 fprintf(stderr, "%s leafset zero overwritten\n", type);
3504 for( entry = 0; ++entry < mgr->latchtotal; ) {
3505 latch = mgr->latchsets + entry;
3507 if( *latch->modify->value )
3508 fprintf(stderr, "%s latchset %d modifylocked for page %d\n", type, entry, latch->page_no);
3510 if( latch->pin & ~CLOCK_bit )
3511 fprintf(stderr, "%s latchset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3514 for( entry = 0; ++entry < mgr->leaftotal; ) {
3515 latch = mgr->leafsets + entry;
3517 if( *latch->modify->value )
3518 fprintf(stderr, "%s leafset %d modifylocked for page %d\n", type, entry, latch->page_no);
3520 if( latch->pin & ~CLOCK_bit )
3521 fprintf(stderr, "%s leafset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3534 // standalone program to index file of keys
3535 // then list them onto std-out
3538 void *index_file (void *arg)
3540 uint __stdcall index_file (void *arg)
3543 int line = 0, found = 0, cnt = 0, idx;
3544 uid next, page_no = LEAF_page; // start on first page of leaves
3545 int ch, len = 0, slot, type = 0;
3546 unsigned char key[BT_maxkey];
3547 unsigned char txn[65536];
3548 ThreadArg *args = arg;
3557 bt = bt_open (args->mgr, args->main);
3560 if( args->idx < strlen (args->type) )
3561 ch = args->type[args->idx];
3563 ch = args->type[strlen(args->type) - 1];
3575 if( type == Delete )
3576 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3578 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3580 if( type == Delete )
3581 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3583 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3585 if( in = fopen (args->infile, "rb") )
3586 while( ch = getc(in), ch != EOF )
3592 if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3593 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3599 memcpy (txn + nxt, key + 10, len - 10);
3601 txn[nxt] = len - 10;
3603 memcpy (txn + nxt, key, 10);
3606 slotptr(page,++cnt)->off = nxt;
3607 slotptr(page,cnt)->type = type;
3610 if( cnt < args->num )
3617 if( bt_atomictxn (bt, page) )
3618 fprintf(stderr, "Error %d Line: %d by %d source: %d\n", bt->mgr->err, bt->mgr->line, bt->mgr->err_thread, line), exit(0);
3623 else if( len < BT_maxkey )
3625 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3629 fprintf(stderr, "started indexing for %s\n", args->infile);
3630 if( in = fopen (args->infile, "r") )
3631 while( ch = getc(in), ch != EOF )
3636 if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3637 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3640 else if( len < BT_maxkey )
3642 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3646 fprintf(stderr, "started finding keys for %s\n", args->infile);
3647 if( in = fopen (args->infile, "rb") )
3648 while( ch = getc(in), ch != EOF )
3652 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3654 else if( bt->mgr->err )
3655 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3658 else if( len < BT_maxkey )
3660 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3664 fprintf(stderr, "started scanning\n");
3667 if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
3668 set->page = bt_mappage (bt->mgr, set->latch);
3670 fprintf(stderr, "unable to obtain latch"), exit(1);
3672 bt_lockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3673 next = bt_getid (set->page->right);
3675 for( slot = 0; slot++ < set->page->cnt; )
3676 if( next || slot < set->page->cnt )
3677 if( !slotptr(set->page, slot)->dead ) {
3678 ptr = keyptr(set->page, slot);
3681 if( slotptr(set->page, slot)->type == Duplicate )
3684 fwrite (ptr->key, len, 1, stdout);
3685 val = valptr(set->page, slot);
3686 fwrite (val->value, val->len, 1, stdout);
3687 fputc ('\n', stdout);
3691 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3692 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3693 } while( page_no = next );
3695 fprintf(stderr, " Total keys read %d\n", cnt);
3699 fprintf(stderr, "started reverse scan\n");
3700 if( slot = bt_lastkey (bt) )
3701 while( slot = bt_prevkey (bt, slot) ) {
3702 if( slotptr(bt->cursor, slot)->dead )
3705 ptr = keyptr(bt->cursor, slot);
3708 if( slotptr(bt->cursor, slot)->type == Duplicate )
3711 fwrite (ptr->key, len, 1, stdout);
3712 val = valptr(bt->cursor, slot);
3713 fwrite (val->value, val->len, 1, stdout);
3714 fputc ('\n', stdout);
3718 fprintf(stderr, " Total keys read %d\n", cnt);
3723 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3725 fprintf(stderr, "started counting\n");
3726 next = bt->mgr->redopage + bt->mgr->pagezero->redopages;
3728 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3729 if( bt_readpage (bt->mgr, bt->cursor, page_no, 1) )
3732 if( !bt->cursor->free && !bt->cursor->lvl )
3733 cnt += bt->cursor->act;
3739 cnt--; // remove stopper key
3740 fprintf(stderr, " Total keys counted %d\n", cnt);
3752 typedef struct timeval timer;
3754 int main (int argc, char **argv)
3756 int idx, cnt, len, slot, err;
3764 uint mainleafpool = 0;
3765 uint mainleafxtra = 0;
3781 fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
3782 fprintf (stderr, " where idx_file is the name of the cache btree file\n");
3783 fprintf (stderr, " where main_file is the name of the main btree file\n");
3784 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3785 fprintf (stderr, " pagebits is the page size in bits for the cache btree\n");
3786 fprintf (stderr, " leafbits is the number of xtra bits for a leaf page\n");
3787 fprintf (stderr, " poolsize is the number of pages in buffer pool for the cache btree\n");
3788 fprintf (stderr, " leafpool is the number of leaf pages in leaf pool for the cache btree\n");
3789 fprintf (stderr, " txnsize = n to block transactions into n units, or zero for no transactions\n");
3790 fprintf (stderr, " redopages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3791 fprintf (stderr, " mainbits is the page size of the main btree in bits\n");
3792 fprintf (stderr, " mainpool is the number of main pages in the main buffer pool\n");
3793 fprintf (stderr, " mainleaf is the number of main pages in the main leaf pool\n");
3794 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3798 start = getCpuTime(0);
3801 bits = atoi(argv[4]);
3804 leafxtra = atoi(argv[5]);
3807 poolsize = atoi(argv[6]);
3810 fprintf (stderr, "no page pool\n"), exit(1);
3813 leafpool = atoi(argv[7]);
3816 num = atoi(argv[8]);
3819 redopages = atoi(argv[9]);
3821 if( redopages > 65535 )
3822 fprintf (stderr, "Recovery buffer too large\n"), exit(1);
3825 mainbits = atoi(argv[10]);
3828 mainleafxtra = atoi(argv[11]);
3831 mainpool = atoi(argv[12]);
3834 mainleafpool = atoi(argv[13]);
3838 threads = malloc (cnt * sizeof(pthread_t));
3840 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3842 args = malloc ((cnt + 1) * sizeof(ThreadArg));
3844 mgr = bt_mgr (argv[1], bits, leafxtra, poolsize, leafpool, redopages);
3847 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3852 main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool, mainleafpool, 0);
3855 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3864 for( idx = 0; idx < cnt; idx++ ) {
3865 args[idx].infile = argv[idx + 14];
3866 args[idx].type = argv[3];
3867 args[idx].main = main;
3868 args[idx].mgr = mgr;
3869 args[idx].num = num;
3870 args[idx].idx = idx;
3872 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3873 fprintf(stderr, "Error creating thread %d\n", err);
3875 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3879 args[0].infile = argv[idx + 10];
3880 args[0].type = argv[3];
3881 args[0].main = main;
3888 // wait for termination
3891 for( idx = 0; idx < cnt; idx++ )
3892 pthread_join (threads[idx], NULL);
3894 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3896 for( idx = 0; idx < cnt; idx++ )
3897 CloseHandle(threads[idx]);
3899 bt_poolaudit(mgr, "cache");
3902 bt_poolaudit(main, "main");
3904 fprintf(stderr, "cache %d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
3906 fprintf(stderr, "main %d reads %d writes %d found\n", main->reads, main->writes, main->found);
3912 elapsed = getCpuTime(0) - start;
3913 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3914 elapsed = getCpuTime(1);
3915 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3916 elapsed = getCpuTime(2);
3917 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);