1 // btree version threadskv10 futex version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // redo log for failure recovery
10 // and LSM B-trees for write optimization
14 // author: karl malbrain, malbrain@cal.berkeley.edu
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
20 The orginal author is Karl Malbrain.
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
38 #include <linux/futex.h>
53 #define WIN32_LEAN_AND_MEAN
67 typedef unsigned long long uid;
68 typedef unsigned long long logseqno;
71 typedef unsigned long long off64_t;
72 typedef unsigned short ushort;
73 typedef unsigned int uint;
76 #define BT_ro 0x6f72 // ro
77 #define BT_rw 0x7772 // rw
79 #define BT_maxbits 26 // maximum page size in bits
80 #define BT_minbits 9 // minimum page size in bits
81 #define BT_minpage (1 << BT_minbits) // minimum page size
82 #define BT_maxpage (1 << BT_maxbits) // maximum page size
84 // BTree page number constants
85 #define ALLOC_page 0 // allocation page
86 #define ROOT_page 1 // root of the btree
87 #define LEAF_page 2 // first page of leaves
88 #define REDO_page 3 // first page of redo buffer
90 // Number of levels to create in a new BTree
95 There are six lock types for each node in four independent sets:
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
101 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
116 volatile ushort xlock[1]; // one writer has exclusive lock
117 volatile ushort wrt[1]; // count of other writers waiting
126 // definition for phase-fair reader/writer lock implementation
129 volatile ushort rin[1];
130 volatile ushort rout[1];
131 volatile ushort ticket[1];
132 volatile ushort serving[1];
135 // write only reentrant lock
141 volatile ushort tid[1];
142 volatile ushort dup[1];
146 volatile uint waiters[1];
154 // mode & definition for lite latch implementation
157 QueRd = 1, // reader queue
158 QueWr = 2 // writer queue
161 // hash table entries
164 uint entry; // Latch table entry at head of chain
165 BtMutexLatch latch[1];
168 // latch manager table structure
171 uid page_no; // latch set page number
172 RWLock readwr[1]; // read/write page lock
173 RWLock access[1]; // Access Intent/Page delete
174 WOLock parent[1]; // Posting of fence key in parent
175 WOLock atomic[1]; // Atomic update in progress
176 uint split; // right split page atomic insert
177 uint next; // next entry in hash table chain
178 uint prev; // prev entry in hash table chain
179 ushort pin; // number of accessing threads
180 unsigned char dirty; // page in cache is dirty (atomic setable)
181 unsigned char promote; // page in cache is dirty (atomic setable)
182 BtMutexLatch modify[1]; // modify entry lite latch
185 // Define the length of the page record numbers
189 // Page key slot definition.
191 // Keys are marked dead, but remain on the page until
192 // it cleanup is called. The fence key (highest key) for
193 // a leaf page is always present, even after cleanup.
197 // In addition to the Unique keys that occupy slots
198 // there are Librarian and Duplicate key
199 // slots occupying the key slot array.
201 // The Librarian slots are dead keys that
202 // serve as filler, available to add new Unique
203 // or Dup slots that are inserted into the B-tree.
205 // The Duplicate slots have had their key bytes extended
206 // by 6 bytes to contain a binary duplicate key uniqueifier.
216 uint off:BT_maxbits; // page offset for key start
217 uint type:3; // type of slot
218 uint dead:1; // set for deleted slot
221 // The key structure occupies space at the upper end of
222 // each page. It's a length byte followed by the key
226 unsigned char len; // this can be changed to a ushort or uint
227 unsigned char key[0];
230 // the value structure also occupies space at the upper
231 // end of the page. Each key is immediately followed by a value.
234 unsigned char len; // this can be changed to a ushort or uint
235 unsigned char value[0];
238 #define BT_maxkey 255 // maximum number of bytes in a key
239 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
241 // The first part of an index page.
242 // It is immediately followed
243 // by the BtSlot array of keys.
245 // note that this structure size
246 // must be a multiple of 8 bytes
247 // in order to place PageZero correctly.
249 typedef struct BtPage_ {
250 uint cnt; // count of keys in page
251 uint act; // count of active keys
252 uint min; // next key offset
253 uint garbage; // page garbage in bytes
254 unsigned char bits:7; // page size in bits
255 unsigned char free:1; // page is on free chain
256 unsigned char lvl:7; // level of page
257 unsigned char kill:1; // page is being deleted
258 unsigned char right[BtId]; // page number to right
259 unsigned char left[BtId]; // page number to left
260 unsigned char filler[2]; // padding to multiple of 8
261 logseqno lsn; // log sequence number applied
262 uid page_no; // this page number
265 // The loadpage interface object
268 BtPage page; // current page pointer
269 BtLatchSet *latch; // current page latch set
272 // structure for latch manager on ALLOC_page
275 struct BtPage_ alloc[1]; // next page_no in right ptr
276 unsigned char freechain[BtId]; // head of free page_nos chain
277 unsigned long long activepages; // number of active pages
278 uint redopages; // number of redo pages in file
281 // The object structure for Btree access
284 uint page_size; // page size
285 uint page_bits; // page size in bits
291 BtPageZero *pagezero; // mapped allocation page
292 BtHashEntry *hashtable; // the buffer pool hash table entries
293 BtLatchSet *latchsets; // mapped latch set from buffer pool
294 unsigned char *pagepool; // mapped to the buffer pool pages
295 unsigned char *redobuff; // mapped recovery buffer pointer
296 logseqno lsn, flushlsn; // current & first lsn flushed
297 BtMutexLatch redo[1]; // redo area lite latch
298 BtMutexLatch lock[1]; // allocation area lite latch
299 BtMutexLatch maps[1]; // mapping segments lite latch
300 ushort thread_no[1]; // next thread number
301 uint nlatchpage; // number of latch pages at BT_latch
302 uint latchtotal; // number of page latch entries
303 uint latchhash; // number of latch hash table slots
304 uint latchvictim; // next latch entry to examine
305 uint latchpromote; // next latch entry to promote
306 uint redolast; // last msync size of recovery buff
307 uint redoend; // eof/end element in recovery buff
308 int err; // last error
309 int line; // last error line no
310 int found; // number of keys found by delete
311 int reads, writes; // number of reads and writes
313 HANDLE halloc; // allocation handle
314 HANDLE hpool; // buffer pool handle
316 uint segments; // number of memory mapped segments
317 unsigned char *pages[64000];// memory mapped segments of b-tree
321 BtMgr *mgr; // buffer manager for entire process
322 BtMgr *main; // buffer manager for main btree
323 BtPage cursor; // cached page frame for start/next
324 ushort thread_no; // thread number
325 unsigned char key[BT_keyarray]; // last found complete key
328 // atomic txn structures
331 logseqno reqlsn; // redo log seq no required
332 uint entry; // latch table entry number
333 uint slot:31; // page slot number
334 uint reuse:1; // reused previous page
337 // Catastrophic errors
351 #define CLOCK_bit 0x8000
353 // recovery manager entry types
356 BTRM_eof = 0, // rest of buffer is emtpy
357 BTRM_add, // add a unique key-value to btree
358 BTRM_dup, // add a duplicate key-value to btree
359 BTRM_del, // delete a key-value from btree
360 BTRM_upd, // update a key with a new value
361 BTRM_new, // allocate a new empty page
362 BTRM_old // reuse an old empty page
365 // recovery manager entry
366 // structure followed by BtKey & BtVal
369 logseqno reqlsn; // log sequence number required
370 logseqno lsn; // log sequence number for entry
371 uint len; // length of entry
372 unsigned char type; // type of entry
373 unsigned char lvl; // level of btree entry pertains to
378 extern void bt_close (BtDb *bt);
379 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
380 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
381 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
382 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
383 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
384 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
385 extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
387 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
389 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
390 extern uint bt_nextkey (BtDb *bt, uint slot);
391 extern uint bt_prevkey (BtDb *db, uint slot);
392 extern uint bt_lastkey (BtDb *db);
395 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
396 extern void bt_mgrclose (BtMgr *mgr);
397 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
398 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
400 // atomic transaction functions
401 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
402 BTERR bt_txnpromote (BtDb *bt);
404 // The page is allocated from low and hi ends.
405 // The key slots are allocated from the bottom,
406 // while the text and value of the key
407 // are allocated from the top. When the two
408 // areas meet, the page is split into two.
410 // A key consists of a length byte, two bytes of
411 // index number (0 - 65535), and up to 253 bytes
414 // Associated with each key is a value byte string
415 // containing any value desired.
417 // The b-tree root is always located at page 1.
418 // The first leaf page of level zero is always
419 // located on page 2.
421 // The b-tree pages are linked with next
422 // pointers to facilitate enumerators,
423 // and provide for concurrency.
425 // When to root page fills, it is split in two and
426 // the tree height is raised by a new root at page
427 // one with two keys.
429 // Deleted keys are marked with a dead bit until
430 // page cleanup. The fence key for a leaf node is
433 // To achieve maximum concurrency one page is locked at a time
434 // as the tree is traversed to find leaf key in question. The right
435 // page numbers are used in cases where the page is being split,
438 // Page 0 is dedicated to lock for new page extensions,
439 // and chains empty pages together for reuse. It also
440 // contains the latch manager hash table.
442 // The ParentModification lock on a node is obtained to serialize posting
443 // or changing the fence key for a node.
445 // Empty pages are chained together through the ALLOC page and reused.
447 // Access macros to address slot and key values from the page
448 // Page slots use 1 based indexing.
450 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
451 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
452 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
454 void bt_putid(unsigned char *dest, uid id)
459 dest[i] = (unsigned char)id, id >>= 8;
462 uid bt_getid(unsigned char *src)
467 for( i = 0; i < BtId; i++ )
468 id <<= 8, id |= *src++;
473 // lite weight spin lock Latch Manager
475 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
477 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
480 void bt_mutexlock(BtMutexLatch *latch)
482 BtMutexLatch prev[1];
486 *prev->value = __sync_fetch_and_or(latch->value, XCL);
488 if( !*prev->bits->xlock ) { // did we set XCL?
490 __sync_fetch_and_sub(latch->value, WRT);
495 *prev->bits->wrt += 1;
496 __sync_fetch_and_add(latch->value, WRT);
499 sys_futex (latch->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr);
504 // try to obtain write lock
506 // return 1 if obtained,
509 int bt_mutextry(BtMutexLatch *latch)
511 BtMutexLatch prev[1];
513 *prev->value = __sync_fetch_and_or(latch->value, XCL);
515 // take write access if exclusive bit was clear
517 return !*prev->bits->xlock;
522 void bt_releasemutex(BtMutexLatch *latch)
524 BtMutexLatch prev[1];
526 *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
528 if( *prev->bits->wrt )
529 sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
532 // Write-Only Reentrant Lock
534 void WriteOLock (WOLock *lock, ushort tid)
536 uint prev, waited = 0;
539 bt_mutexlock(lock->xcl);
544 if( *lock->bits->tid == tid ) {
545 *lock->bits->dup += 1;
546 bt_releasemutex(lock->xcl);
549 if( !*lock->bits->tid ) {
550 *lock->bits->tid = tid;
551 bt_releasemutex(lock->xcl);
559 bt_releasemutex(lock->xcl);
561 sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, prev, NULL, NULL, QueWr );
565 void WriteORelease (WOLock *lock)
567 bt_mutexlock(lock->xcl);
569 if( *lock->bits->dup ) {
570 *lock->bits->dup -= 1;
571 bt_releasemutex(lock->xcl);
575 *lock->bits->tid = 0;
578 sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 32768, NULL, NULL, QueWr );
579 bt_releasemutex(lock->xcl);
582 // clear lock of holders and waiters
584 ClearWOLock (WOLock *lock)
587 bt_mutexlock(lock->xcl);
589 if( *lock->waiters ) {
590 bt_releasemutex(lock->xcl);
595 if( *lock->bits->tid ) {
596 bt_releasemutex(lock->xcl);
601 bt_releasemutex(lock->xcl);
606 // Phase-Fair reader/writer lock implementation
608 void WriteLock (RWLock *lock, ushort tid)
613 tix = __sync_fetch_and_add (lock->ticket, 1);
615 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
617 // wait for our ticket to come up
619 while( tix != lock->serving[0] )
626 w = PRES | (tix & PHID);
628 r = __sync_fetch_and_add (lock->rin, w);
630 r = _InterlockedExchangeAdd16 (lock->rin, w);
632 while( r != *lock->rout )
640 void WriteRelease (RWLock *lock)
643 __sync_fetch_and_and (lock->rin, ~MASK);
645 _InterlockedAnd16 (lock->rin, ~MASK);
650 // try to obtain read lock
651 // return 1 if successful
653 int ReadTry (RWLock *lock, ushort tid)
658 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
660 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
663 if( w == (*lock->rin & MASK) ) {
665 __sync_fetch_and_add (lock->rin, -RINC);
667 _InterlockedExchangeAdd16 (lock->rin, -RINC);
675 void ReadLock (RWLock *lock, ushort tid)
680 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
682 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
685 while( w == (*lock->rin & MASK) )
693 void ReadRelease (RWLock *lock)
696 __sync_fetch_and_add (lock->rout, RINC);
698 _InterlockedExchangeAdd16 (lock->rout, RINC);
702 // recovery manager -- flush dirty pages
704 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
706 uint cnt3 = 0, cnt2 = 0, cnt = 0;
711 // flush dirty pool pages to the btree
713 fprintf(stderr, "Start flushlsn ");
714 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
715 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
716 latch = mgr->latchsets + entry;
717 bt_mutexlock (latch->modify);
718 bt_lockpage(BtLockRead, latch, thread_no);
721 bt_writepage(mgr, page, latch->page_no);
722 latch->dirty = 0, cnt++;
724 if( latch->pin & ~CLOCK_bit )
726 bt_unlockpage(BtLockRead, latch);
727 bt_releasemutex (latch->modify);
729 fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
730 fprintf(stderr, "begin sync");
731 for( segment = 0; segment < mgr->segments; segment++ )
732 if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
733 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
734 fprintf(stderr, " end sync\n");
737 // recovery manager -- process current recovery buff on startup
738 // this won't do much if previous session was properly closed.
740 BTERR bt_recoveryredo (BtMgr *mgr)
747 hdr = (BtLogHdr *)mgr->redobuff;
748 mgr->flushlsn = hdr->lsn;
751 hdr = (BtLogHdr *)(mgr->redobuff + offset);
752 switch( hdr->type ) {
756 case BTRM_add: // add a unique key-value to btree
758 case BTRM_dup: // add a duplicate key-value to btree
759 case BTRM_del: // delete a key-value from btree
760 case BTRM_upd: // update a key with a new value
761 case BTRM_new: // allocate a new empty page
762 case BTRM_old: // reuse an old empty page
768 // recovery manager -- append new entry to recovery log
769 // flush dirty pages to disk when it overflows.
771 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
773 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
774 uint amt = sizeof(BtLogHdr);
778 bt_mutexlock (mgr->redo);
781 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
783 // see if new entry fits in buffer
784 // flush and reset if it doesn't
786 if( amt > size - mgr->redoend ) {
787 mgr->flushlsn = mgr->lsn;
788 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
789 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
792 bt_flushlsn(mgr, thread_no);
795 // fill in new entry & either eof or end block
797 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
802 hdr->lsn = ++mgr->lsn;
806 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
807 memset (eof, 0, sizeof(BtLogHdr));
809 // fill in key and value
812 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
813 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
816 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
817 memset (eof, 0, sizeof(BtLogHdr));
820 last = mgr->redolast & ~0xfff;
823 if( end - last + sizeof(BtLogHdr) >= 32768 )
824 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
825 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
829 bt_releasemutex(mgr->redo);
833 // recovery manager -- append transaction to recovery log
834 // flush dirty pages to disk when it overflows.
836 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
838 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
839 uint amt = 0, src, type;
846 // determine amount of redo recovery log space required
848 for( src = 0; src++ < source->cnt; ) {
849 key = keyptr(source,src);
850 val = valptr(source,src);
851 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
852 amt += sizeof(BtLogHdr);
855 bt_mutexlock (mgr->redo);
857 // see if new entry fits in buffer
858 // flush and reset if it doesn't
860 if( amt > size - mgr->redoend ) {
861 mgr->flushlsn = mgr->lsn;
862 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
863 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
866 bt_flushlsn (mgr, thread_no);
869 // assign new lsn to transaction
873 // fill in new entries
875 for( src = 0; src++ < source->cnt; ) {
876 key = keyptr(source, src);
877 val = valptr(source, src);
879 switch( slotptr(source, src)->type ) {
891 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
892 amt += sizeof(BtLogHdr);
894 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
900 // fill in key and value
902 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
903 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
908 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
909 memset (eof, 0, sizeof(BtLogHdr));
912 last = mgr->redolast & ~0xfff;
915 if( end - last + sizeof(BtLogHdr) >= 32768 )
916 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
917 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
921 bt_releasemutex(mgr->redo);
925 // sync a single btree page to disk
927 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
929 uint segment = latch->page_no >> 16;
932 if( bt_writepage (mgr, page, latch->page_no) )
935 perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
937 if( msync (perm, mgr->page_size, MS_SYNC) < 0 )
938 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
944 // read page into buffer pool from permanent location in Btree file
946 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
948 int flag = PROT_READ | PROT_WRITE;
949 uint segment = page_no >> 16;
953 if( segment < mgr->segments ) {
954 perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
955 if( perm->page_no != page_no )
957 memcpy (page, perm, mgr->page_size);
962 bt_mutexlock (mgr->maps);
964 if( segment < mgr->segments ) {
965 bt_releasemutex (mgr->maps);
969 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
972 bt_releasemutex (mgr->maps);
976 // write page to permanent location in Btree file
977 // clear the dirty bit
979 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
981 int flag = PROT_READ | PROT_WRITE;
982 uint segment = page_no >> 16;
986 if( segment < mgr->segments ) {
987 perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
988 if( page_no > LEAF_page && perm->page_no != page_no)
990 memcpy (perm, page, mgr->page_size);
995 bt_mutexlock (mgr->maps);
997 if( segment < mgr->segments ) {
998 bt_releasemutex (mgr->maps);
1002 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
1003 bt_releasemutex (mgr->maps);
1008 // set CLOCK bit in latch
1009 // decrement pin count
1011 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
1013 bt_mutexlock(latch->modify);
1014 latch->pin |= CLOCK_bit;
1017 bt_releasemutex(latch->modify);
1020 // return the btree cached page address
1022 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
1024 uid entry = latch - mgr->latchsets;
1025 BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
1030 // return next available latch entry
1031 // and with latch entry locked
1033 uint bt_availnext (BtMgr *mgr)
1040 entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
1042 entry = _InterlockedIncrement (&mgr->latchvictim);
1044 entry %= mgr->latchtotal;
1049 latch = mgr->latchsets + entry;
1051 if( !bt_mutextry(latch->modify) )
1054 // return this entry if it is not pinned
1059 // if the CLOCK bit is set
1060 // reset it to zero.
1062 latch->pin &= ~CLOCK_bit;
1063 bt_releasemutex(latch->modify);
1067 // pin page in buffer pool
1068 // return with latchset pinned
1070 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
1072 uint hashidx = page_no % mgr->latchhash;
1077 // try to find our entry
1079 bt_mutexlock(mgr->hashtable[hashidx].latch);
1081 if( entry = mgr->hashtable[hashidx].entry ) do
1083 latch = mgr->latchsets + entry;
1084 if( page_no == latch->page_no )
1086 } while( entry = latch->next );
1088 // found our entry: increment pin
1091 latch = mgr->latchsets + entry;
1092 bt_mutexlock(latch->modify);
1093 latch->pin |= CLOCK_bit;
1097 bt_releasemutex(latch->modify);
1098 bt_releasemutex(mgr->hashtable[hashidx].latch);
1102 // find and reuse unpinned entry
1106 entry = bt_availnext (mgr);
1107 latch = mgr->latchsets + entry;
1109 idx = latch->page_no % mgr->latchhash;
1111 // if latch is on a different hash chain
1112 // unlink from the old page_no chain
1114 if( latch->page_no )
1115 if( idx != hashidx ) {
1117 // skip over this entry if latch not available
1119 if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1120 bt_releasemutex(latch->modify);
1125 mgr->latchsets[latch->prev].next = latch->next;
1127 mgr->hashtable[idx].entry = latch->next;
1130 mgr->latchsets[latch->next].prev = latch->prev;
1132 bt_releasemutex (mgr->hashtable[idx].latch);
1135 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1137 // update permanent page area in btree from buffer pool
1138 // no read-lock is required since page is not pinned.
1141 if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
1142 return mgr->line = __LINE__, NULL;
1147 memcpy (page, contents, mgr->page_size);
1149 } else if( bt_readpage (mgr, page, page_no) )
1150 return mgr->line = __LINE__, NULL;
1152 // link page as head of hash table chain
1153 // if this is a never before used entry,
1154 // or it was previously on a different
1155 // hash table chain. Otherwise, just
1156 // leave it in its current hash table
1159 if( !latch->page_no || hashidx != idx ) {
1160 if( latch->next = mgr->hashtable[hashidx].entry )
1161 mgr->latchsets[latch->next].prev = entry;
1163 mgr->hashtable[hashidx].entry = entry;
1167 // fill in latch structure
1169 latch->pin = CLOCK_bit | 1;
1170 latch->page_no = page_no;
1173 bt_releasemutex (latch->modify);
1174 bt_releasemutex (mgr->hashtable[hashidx].latch);
1178 void bt_mgrclose (BtMgr *mgr)
1186 // flush previously written dirty pages
1187 // and write recovery buffer to disk
1189 fdatasync (mgr->idx);
1191 if( mgr->redoend ) {
1192 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1193 memset (eof, 0, sizeof(BtLogHdr));
1196 // write remaining dirty pool pages to the btree
1198 for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1199 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1200 latch = mgr->latchsets + slot;
1202 if( latch->dirty ) {
1203 bt_writepage(mgr, page, latch->page_no);
1204 latch->dirty = 0, num++;
1208 // clear redo recovery buffer on disk.
1210 if( mgr->pagezero->redopages ) {
1211 eof = (BtLogHdr *)mgr->redobuff;
1212 memset (eof, 0, sizeof(BtLogHdr));
1213 eof->lsn = mgr->lsn;
1214 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1215 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1218 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1221 while( mgr->segments )
1222 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1224 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1225 munmap (mgr->pagezero, mgr->page_size);
1227 FlushViewOfFile(mgr->pagezero, 0);
1228 UnmapViewOfFile(mgr->pagezero);
1229 UnmapViewOfFile(mgr->pagepool);
1230 CloseHandle(mgr->halloc);
1231 CloseHandle(mgr->hpool);
1237 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1238 FlushFileBuffers(mgr->idx);
1239 CloseHandle(mgr->idx);
1244 // close and release memory
1246 void bt_close (BtDb *bt)
1253 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1258 // open/create new btree buffer manager
1260 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1261 // size of page pool (e.g. 262144)
1263 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1265 uint lvl, attr, last, slot, idx;
1266 uint nlatchpage, latchhash;
1267 unsigned char value[BtId];
1268 int flag, initit = 0;
1269 BtPageZero *pagezero;
1277 // determine sanity of page size and buffer pool
1279 if( bits > BT_maxbits )
1281 else if( bits < BT_minbits )
1284 mgr = calloc (1, sizeof(BtMgr));
1286 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1288 if( mgr->idx == -1 ) {
1289 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1290 return free(mgr), NULL;
1293 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1294 attr = FILE_ATTRIBUTE_NORMAL;
1295 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1297 if( mgr->idx == INVALID_HANDLE_VALUE ) {
1298 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1299 return GlobalFree(mgr), NULL;
1304 pagezero = valloc (BT_maxpage);
1307 // read minimum page size to get root info
1308 // to support raw disk partition files
1309 // check if bits == 0 on the disk.
1311 if( size = lseek (mgr->idx, 0L, 2) )
1312 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1313 if( pagezero->alloc->bits )
1314 bits = pagezero->alloc->bits;
1318 return free(mgr), free(pagezero), NULL;
1322 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1323 size = GetFileSize(mgr->idx, amt);
1325 if( size || *amt ) {
1326 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1327 return bt_mgrclose (mgr), NULL;
1328 bits = pagezero->alloc->bits;
1333 mgr->page_size = 1 << bits;
1334 mgr->page_bits = bits;
1336 // calculate number of latch hash table entries
1338 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1340 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1341 mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1342 mgr->latchtotal = nodemax;
1347 // initialize an empty b-tree with latch page, root page, page of leaves
1348 // and page(s) of latches and page pool cache
1350 memset (pagezero, 0, 1 << bits);
1351 pagezero->alloc->lvl = MIN_lvl - 1;
1352 pagezero->alloc->bits = mgr->page_bits;
1353 pagezero->redopages = redopages;
1355 bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1);
1356 pagezero->activepages = 2;
1358 // initialize left-most LEAF page in
1359 // alloc->left and count of active leaf pages.
1361 bt_putid (pagezero->alloc->left, LEAF_page);
1362 ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits);
1364 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1365 fprintf (stderr, "Unable to create btree page zero\n");
1366 return bt_mgrclose (mgr), NULL;
1369 memset (pagezero, 0, 1 << bits);
1370 pagezero->alloc->bits = mgr->page_bits;
1372 for( lvl=MIN_lvl; lvl--; ) {
1373 BtSlot *node = slotptr(pagezero->alloc, 1);
1374 node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1375 key = keyptr(pagezero->alloc, 1);
1376 key->len = 2; // create stopper key
1380 bt_putid(value, MIN_lvl - lvl + 1);
1381 val = valptr(pagezero->alloc, 1);
1382 val->len = lvl ? BtId : 0;
1383 memcpy (val->value, value, val->len);
1385 pagezero->alloc->min = node->off;
1386 pagezero->alloc->lvl = lvl;
1387 pagezero->alloc->cnt = 1;
1388 pagezero->alloc->act = 1;
1389 pagezero->alloc->page_no = MIN_lvl - lvl;
1391 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1392 fprintf (stderr, "Unable to create btree page\n");
1393 return bt_mgrclose (mgr), NULL;
1401 VirtualFree (pagezero, 0, MEM_RELEASE);
1404 // mlock the first segment of 64K pages
1406 flag = PROT_READ | PROT_WRITE;
1407 mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1410 if( mgr->pages[0] == MAP_FAILED ) {
1411 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1412 return bt_mgrclose (mgr), NULL;
1415 mgr->pagezero = (BtPageZero *)mgr->pages[0];
1416 mlock (mgr->pagezero, mgr->page_size);
1418 mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size;
1419 mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1421 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1422 if( mgr->pagepool == MAP_FAILED ) {
1423 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1424 return bt_mgrclose (mgr), NULL;
1427 flag = PAGE_READWRITE;
1428 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1429 if( !mgr->halloc ) {
1430 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1431 return bt_mgrclose (mgr), NULL;
1434 flag = FILE_MAP_WRITE;
1435 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1436 if( !mgr->pagezero ) {
1437 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1438 return bt_mgrclose (mgr), NULL;
1441 flag = PAGE_READWRITE;
1442 size = (uid)mgr->nlatchpage << mgr->page_bits;
1443 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1445 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1446 return bt_mgrclose (mgr), NULL;
1449 flag = FILE_MAP_WRITE;
1450 mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1451 if( !mgr->pagepool ) {
1452 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1453 return bt_mgrclose (mgr), NULL;
1457 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1458 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1459 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1464 // open BTree access method
1465 // based on buffer manager
1467 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1469 BtDb *bt = malloc (sizeof(*bt));
1471 memset (bt, 0, sizeof(*bt));
1475 bt->cursor = valloc (mgr->page_size);
1477 bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1480 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1482 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1487 // compare two keys, return > 0, = 0, or < 0
1488 // =0: keys are same
1491 // as the comparison value
1493 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1495 uint len1 = key1->len;
1498 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1509 // place write, read, or parent lock on requested page_no.
1511 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1515 ReadLock (latch->readwr, thread_no);
1518 WriteLock (latch->readwr, thread_no);
1521 ReadLock (latch->access, thread_no);
1524 WriteLock (latch->access, thread_no);
1527 WriteOLock (latch->parent, thread_no);
1530 WriteOLock (latch->atomic, thread_no);
1532 case BtLockAtomic | BtLockRead:
1533 WriteOLock (latch->atomic, thread_no);
1534 ReadLock (latch->readwr, thread_no);
1536 case BtLockAtomic | BtLockWrite:
1537 WriteOLock (latch->atomic, thread_no);
1538 WriteLock (latch->readwr, thread_no);
1543 // remove write, read, or parent lock on requested page
1545 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1549 ReadRelease (latch->readwr);
1552 WriteRelease (latch->readwr);
1555 ReadRelease (latch->access);
1558 WriteRelease (latch->access);
1561 WriteORelease (latch->parent);
1564 WriteORelease (latch->atomic);
1566 case BtLockAtomic | BtLockRead:
1567 WriteORelease (latch->atomic);
1568 ReadRelease (latch->readwr);
1570 case BtLockAtomic | BtLockWrite:
1571 WriteORelease (latch->atomic);
1572 WriteRelease (latch->readwr);
1577 // allocate a new page
1578 // return with page latched, but unlocked.
1580 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1585 // lock allocation page
1587 bt_mutexlock(mgr->lock);
1589 // use empty chain first
1590 // else allocate new page
1592 if( page_no = bt_getid(mgr->pagezero->freechain) ) {
1593 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1594 set->page = bt_mappage (mgr, set->latch);
1596 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1598 mgr->pagezero->activepages++;
1599 bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
1601 // the page is currently free and this
1602 // will keep bt_txnpromote out.
1604 // contents will replace this bit
1605 // and pin will keep bt_txnpromote out
1607 contents->page_no = page_no;
1608 set->latch->dirty = 1;
1610 memcpy (set->page, contents, mgr->page_size);
1612 if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1613 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1615 bt_releasemutex(mgr->lock);
1619 page_no = bt_getid(mgr->pagezero->alloc->right);
1620 bt_putid(mgr->pagezero->alloc->right, page_no+1);
1622 // unlock allocation latch and
1623 // extend file into new page.
1625 mgr->pagezero->activepages++;
1626 if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1627 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1628 bt_releasemutex(mgr->lock);
1630 // keep bt_txnpromote out of this page
1633 contents->page_no = page_no;
1634 pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
1636 // don't load cache from btree page, load it from contents
1638 if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1639 set->page = bt_mappage (mgr, set->latch);
1643 // now pin will keep bt_txnpromote out
1645 set->page->free = 0;
1649 // find slot in page for given key at a given level
1651 int bt_findslot (BtPage page, unsigned char *key, uint len)
1653 uint diff, higher = page->cnt, low = 1, slot;
1656 // make stopper key an infinite fence value
1658 if( bt_getid (page->right) )
1663 // low is the lowest candidate.
1664 // loop ends when they meet
1666 // higher is already
1667 // tested as .ge. the passed key.
1669 while( diff = higher - low ) {
1670 slot = low + ( diff >> 1 );
1671 if( keycmp (keyptr(page, slot), key, len) < 0 )
1674 higher = slot, good++;
1677 // return zero if key is on right link page
1679 return good ? higher : 0;
1682 // find and load page at given level for given key
1683 // leave page rd or wr locked as requested
1685 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1687 uid page_no = ROOT_page, prevpage_no = 0;
1688 uint drill = 0xff, slot;
1689 BtLatchSet *prevlatch;
1690 uint mode, prevmode;
1694 // start at root of btree and drill down
1697 // determine lock mode of drill level
1698 mode = (drill == lvl) ? lock : BtLockRead;
1700 if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1703 // obtain access lock using lock chaining with Access mode
1705 if( page_no > ROOT_page )
1706 bt_lockpage(BtLockAccess, set->latch, thread_no);
1708 set->page = bt_mappage (mgr, set->latch);
1709 if( set->latch->promote )
1712 // release & unpin parent or left sibling page
1715 bt_unlockpage(prevmode, prevlatch);
1716 bt_unpinlatch (mgr, prevlatch);
1720 // obtain mode lock using lock chaining through AccessLock
1722 bt_lockpage(mode, set->latch, thread_no);
1724 if( set->page->free )
1725 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1727 if( page_no > ROOT_page )
1728 bt_unlockpage(BtLockAccess, set->latch);
1730 // re-read and re-lock root after determining actual level of root
1732 if( set->page->lvl != drill) {
1733 if( set->latch->page_no != ROOT_page )
1734 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1736 drill = set->page->lvl;
1738 if( lock != BtLockRead && drill == lvl ) {
1739 bt_unlockpage(mode, set->latch);
1740 bt_unpinlatch (mgr, set->latch);
1745 prevpage_no = set->latch->page_no;
1746 prevlatch = set->latch;
1747 prevpage = set->page;
1750 // find key on page at this level
1751 // and descend to requested level
1753 if( !set->page->kill )
1754 if( slot = bt_findslot (set->page, key, len) ) {
1758 // find next non-dead slot -- the fence key if nothing else
1760 while( slotptr(set->page, slot)->dead )
1761 if( slot++ < set->page->cnt )
1764 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1766 val = valptr(set->page, slot);
1768 if( val->len == BtId )
1769 page_no = bt_getid(valptr(set->page, slot)->value);
1771 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1777 // slide right into next page
1779 page_no = bt_getid(set->page->right);
1782 // return error on end of right chain
1784 mgr->line = __LINE__, mgr->err = BTERR_struct;
1785 return 0; // return error
1788 // return page to free list
1789 // page must be delete & write locked
1790 // and have no keys pointing to it.
1792 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1794 // lock allocation page
1796 bt_mutexlock (mgr->lock);
1800 memcpy(set->page->right, mgr->pagezero->freechain, BtId);
1801 bt_putid(mgr->pagezero->freechain, set->latch->page_no);
1802 set->latch->promote = 0;
1803 set->latch->dirty = 1;
1804 set->page->free = 1;
1806 // decrement active page count
1808 mgr->pagezero->activepages--;
1810 if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1811 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1813 // unlock released page
1814 // and unlock allocation page
1816 bt_unlockpage (BtLockDelete, set->latch);
1817 bt_unlockpage (BtLockWrite, set->latch);
1818 bt_unpinlatch (mgr, set->latch);
1819 bt_releasemutex (mgr->lock);
1822 // a fence key was deleted from a page
1823 // push new fence value upwards
1825 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1827 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1828 unsigned char value[BtId];
1832 // remove the old fence value
1834 ptr = keyptr(set->page, set->page->cnt);
1835 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1836 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1837 set->latch->dirty = 1;
1839 // cache new fence value
1841 ptr = keyptr(set->page, set->page->cnt);
1842 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1844 bt_lockpage (BtLockParent, set->latch, thread_no);
1845 bt_unlockpage (BtLockWrite, set->latch);
1847 // insert new (now smaller) fence key
1849 bt_putid (value, set->latch->page_no);
1850 ptr = (BtKey*)leftkey;
1852 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1855 // now delete old fence key
1857 ptr = (BtKey*)rightkey;
1859 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1862 bt_unlockpage (BtLockParent, set->latch);
1863 bt_unpinlatch(mgr, set->latch);
1867 // root has a single child
1868 // collapse a level from the tree
1870 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1877 // find the child entry and promote as new root contents
1880 for( idx = 0; idx++ < root->page->cnt; )
1881 if( !slotptr(root->page, idx)->dead )
1884 val = valptr(root->page, idx);
1886 if( val->len == BtId )
1887 page_no = bt_getid (valptr(root->page, idx)->value);
1889 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1891 if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1892 child->page = bt_mappage (mgr, child->latch);
1896 bt_lockpage (BtLockDelete, child->latch, thread_no);
1897 bt_lockpage (BtLockWrite, child->latch, thread_no);
1899 memcpy (root->page, child->page, mgr->page_size);
1900 root->latch->dirty = 1;
1902 bt_freepage (mgr, child);
1904 } while( root->page->lvl > 1 && root->page->act == 1 );
1906 bt_unlockpage (BtLockWrite, root->latch);
1907 bt_unpinlatch (mgr, root->latch);
1911 // delete a page and manage keys
1912 // call with page writelocked
1914 // returns with page removed
1915 // from the page pool.
1917 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, int delkey)
1919 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1920 unsigned char value[BtId];
1921 uint lvl = set->page->lvl;
1926 // cache copy of fence key
1927 // to remove in parent
1929 ptr = keyptr(set->page, set->page->cnt);
1930 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1932 // obtain lock on right page
1934 page_no = bt_getid(set->page->right);
1936 if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1937 right->page = bt_mappage (mgr, right->latch);
1941 bt_lockpage (BtLockWrite, right->latch, thread_no);
1943 // cache copy of key to update
1945 ptr = keyptr(right->page, right->page->cnt);
1946 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1948 if( right->page->kill )
1949 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1951 // pull contents of right peer into our empty page
1953 memcpy (set->page, right->page, mgr->page_size);
1954 set->page->page_no = set->latch->page_no;
1955 set->latch->dirty = 1;
1957 // mark right page deleted and point it to left page
1958 // until we can post parent updates that remove access
1959 // to the deleted page.
1961 bt_putid (right->page->right, set->latch->page_no);
1962 right->latch->dirty = 1;
1963 right->page->kill = 1;
1965 bt_lockpage (BtLockParent, right->latch, thread_no);
1966 bt_unlockpage (BtLockWrite, right->latch);
1968 bt_lockpage (BtLockParent, set->latch, thread_no);
1969 bt_unlockpage (BtLockWrite, set->latch);
1971 // redirect higher key directly to our new node contents
1973 bt_putid (value, set->latch->page_no);
1974 ptr = (BtKey*)higherfence;
1976 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1979 // delete old lower key to our node
1981 ptr = (BtKey*)lowerfence;
1984 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1987 // obtain delete and write locks to right node
1989 bt_unlockpage (BtLockParent, right->latch);
1990 bt_lockpage (BtLockDelete, right->latch, thread_no);
1991 bt_lockpage (BtLockWrite, right->latch, thread_no);
1992 bt_freepage (mgr, right);
1994 bt_unlockpage (BtLockParent, set->latch);
1995 bt_unpinlatch (mgr, set->latch);
1999 // find and delete key on page by marking delete flag bit
2000 // if page becomes empty, delete it from the btree
2002 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2004 uint slot, idx, found, fence;
2010 if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2011 node = slotptr(set->page, slot);
2012 ptr = keyptr(set->page, slot);
2016 // if librarian slot, advance to real slot
2018 if( node->type == Librarian ) {
2019 ptr = keyptr(set->page, ++slot);
2020 node = slotptr(set->page, slot);
2023 fence = slot == set->page->cnt;
2025 // delete the key, ignore request if already dead
2027 if( found = !keycmp (ptr, key, len) )
2028 if( found = node->dead == 0 ) {
2029 val = valptr(set->page,slot);
2030 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2033 // mark node type as delete
2035 node->type = Delete;
2038 // collapse empty slots beneath the fence
2039 // on interiour nodes
2042 while( idx = set->page->cnt - 1 )
2043 if( slotptr(set->page, idx)->dead ) {
2044 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2045 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2053 // did we delete a fence key in an upper level?
2055 if( lvl && set->page->act && fence )
2056 if( bt_fixfence (mgr, set, lvl, thread_no) )
2061 // do we need to collapse root?
2063 if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2064 if( bt_collapseroot (mgr, set, thread_no) )
2069 // delete empty page
2071 if( !set->page->act )
2072 return bt_deletepage (mgr, set, thread_no, 1);
2074 set->latch->dirty = 1;
2075 bt_unlockpage(BtLockWrite, set->latch);
2076 bt_unpinlatch (mgr, set->latch);
2080 // advance to next slot
2082 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2084 BtLatchSet *prevlatch;
2087 if( slot < set->page->cnt )
2090 prevlatch = set->latch;
2092 if( page_no = bt_getid(set->page->right) )
2093 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2094 set->page = bt_mappage (bt->mgr, set->latch);
2098 return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2100 // obtain access lock using lock chaining with Access mode
2102 bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2104 bt_unlockpage(BtLockRead, prevlatch);
2105 bt_unpinlatch (bt->mgr, prevlatch);
2107 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2108 bt_unlockpage(BtLockAccess, set->latch);
2112 // find unique key == given key, or first duplicate key in
2113 // leaf level and return number of value bytes
2114 // or (-1) if not found.
2116 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2124 if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2126 ptr = keyptr(set->page, slot);
2128 // skip librarian slot place holder
2130 if( slotptr(set->page, slot)->type == Librarian )
2131 ptr = keyptr(set->page, ++slot);
2133 // return actual key found
2135 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2138 if( slotptr(set->page, slot)->type == Duplicate )
2141 // not there if we reach the stopper key
2143 if( slot == set->page->cnt )
2144 if( !bt_getid (set->page->right) )
2147 // if key exists, return >= 0 value bytes copied
2148 // otherwise return (-1)
2150 if( slotptr(set->page, slot)->dead )
2154 if( !memcmp (ptr->key, key, len) ) {
2155 val = valptr (set->page,slot);
2156 if( valmax > val->len )
2158 memcpy (value, val->value, valmax);
2164 } while( slot = bt_findnext (bt, set, slot) );
2166 bt_unlockpage (BtLockRead, set->latch);
2167 bt_unpinlatch (bt->mgr, set->latch);
2171 // check page for space available,
2172 // clean if necessary and return
2173 // 0 - page needs splitting
2174 // >0 new slot value
2176 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2178 BtPage page = set->page, frame;
2179 uint nxt = mgr->page_size;
2180 uint cnt = 0, idx = 0;
2181 uint max = page->cnt;
2186 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2189 // skip cleanup and proceed to split
2190 // if there's not enough garbage
2193 if( page->garbage < nxt / 5 )
2196 frame = malloc (mgr->page_size);
2197 memcpy (frame, page, mgr->page_size);
2199 // skip page info and set rest of page to zero
2201 memset (page+1, 0, mgr->page_size - sizeof(*page));
2202 set->latch->dirty = 1;
2207 // clean up page first by
2208 // removing deleted keys
2210 while( cnt++ < max ) {
2214 if( cnt < max || frame->lvl )
2215 if( slotptr(frame,cnt)->dead )
2218 // copy the value across
2220 val = valptr(frame, cnt);
2221 nxt -= val->len + sizeof(BtVal);
2222 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2224 // copy the key across
2226 key = keyptr(frame, cnt);
2227 nxt -= key->len + sizeof(BtKey);
2228 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2230 // make a librarian slot
2232 slotptr(page, ++idx)->off = nxt;
2233 slotptr(page, idx)->type = Librarian;
2234 slotptr(page, idx)->dead = 1;
2238 slotptr(page, ++idx)->off = nxt;
2239 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2241 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2249 // see if page has enough space now, or does it need splitting?
2251 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2257 // split the root and raise the height of the btree
2259 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2261 unsigned char leftkey[BT_keyarray];
2262 uint nxt = mgr->page_size;
2263 unsigned char value[BtId];
2270 frame = malloc (mgr->page_size);
2271 memcpy (frame, root->page, mgr->page_size);
2273 // save left page fence key for new root
2275 ptr = keyptr(root->page, root->page->cnt);
2276 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2278 // Obtain an empty page to use, and copy the current
2279 // root contents into it, e.g. lower keys
2281 if( bt_newpage(mgr, left, frame, page_no) )
2284 left_page_no = left->latch->page_no;
2285 bt_unpinlatch (mgr, left->latch);
2288 // preserve the page info at the bottom
2289 // of higher keys and set rest to zero
2291 memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2293 // insert stopper key at top of newroot page
2294 // and increase the root height
2296 nxt -= BtId + sizeof(BtVal);
2297 bt_putid (value, right->page_no);
2298 val = (BtVal *)((unsigned char *)root->page + nxt);
2299 memcpy (val->value, value, BtId);
2302 nxt -= 2 + sizeof(BtKey);
2303 slotptr(root->page, 2)->off = nxt;
2304 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2309 // insert lower keys page fence key on newroot page as first key
2311 nxt -= BtId + sizeof(BtVal);
2312 bt_putid (value, left_page_no);
2313 val = (BtVal *)((unsigned char *)root->page + nxt);
2314 memcpy (val->value, value, BtId);
2317 ptr = (BtKey *)leftkey;
2318 nxt -= ptr->len + sizeof(BtKey);
2319 slotptr(root->page, 1)->off = nxt;
2320 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2322 bt_putid(root->page->right, 0);
2323 root->page->min = nxt; // reset lowest used offset and key count
2324 root->page->cnt = 2;
2325 root->page->act = 2;
2328 mgr->pagezero->alloc->lvl = root->page->lvl;
2330 // release and unpin root pages
2332 bt_unlockpage(BtLockWrite, root->latch);
2333 bt_unpinlatch (mgr, root->latch);
2335 bt_unpinlatch (mgr, right);
2339 // split already locked full node
2341 // return pool entry for new right
2342 // page, pinned & unlocked
2344 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2346 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2347 BtPage frame = malloc (mgr->page_size);
2348 uint lvl = set->page->lvl;
2355 // split higher half of keys to frame
2357 memset (frame, 0, mgr->page_size);
2358 max = set->page->cnt;
2362 while( cnt++ < max ) {
2363 if( cnt < max || set->page->lvl )
2364 if( slotptr(set->page, cnt)->dead )
2367 src = valptr(set->page, cnt);
2368 nxt -= src->len + sizeof(BtVal);
2369 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2371 key = keyptr(set->page, cnt);
2372 nxt -= key->len + sizeof(BtKey);
2373 ptr = (BtKey*)((unsigned char *)frame + nxt);
2374 memcpy (ptr, key, key->len + sizeof(BtKey));
2376 // add librarian slot
2378 slotptr(frame, ++idx)->off = nxt;
2379 slotptr(frame, idx)->type = Librarian;
2380 slotptr(frame, idx)->dead = 1;
2384 slotptr(frame, ++idx)->off = nxt;
2385 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2387 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2391 frame->bits = mgr->page_bits;
2398 if( set->latch->page_no > ROOT_page )
2399 bt_putid (frame->right, bt_getid (set->page->right));
2401 // get new free page and write higher keys to it.
2403 if( bt_newpage(mgr, right, frame, thread_no) )
2406 // process lower keys
2408 memcpy (frame, set->page, mgr->page_size);
2409 memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2410 set->latch->dirty = 1;
2412 nxt = mgr->page_size;
2413 set->page->garbage = 0;
2419 if( slotptr(frame, max)->type == Librarian )
2422 // assemble page of smaller keys
2424 while( cnt++ < max ) {
2425 if( slotptr(frame, cnt)->dead )
2427 val = valptr(frame, cnt);
2428 nxt -= val->len + sizeof(BtVal);
2429 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2431 key = keyptr(frame, cnt);
2432 nxt -= key->len + sizeof(BtKey);
2433 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2435 // add librarian slot
2437 slotptr(set->page, ++idx)->off = nxt;
2438 slotptr(set->page, idx)->type = Librarian;
2439 slotptr(set->page, idx)->dead = 1;
2443 slotptr(set->page, ++idx)->off = nxt;
2444 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2448 bt_putid(set->page->right, right->latch->page_no);
2449 set->page->min = nxt;
2450 set->page->cnt = idx;
2453 return right->latch - mgr->latchsets;
2456 // fix keys for newly split page
2457 // call with both pages pinned & locked
2458 // return unlocked and unpinned
2460 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2462 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2463 unsigned char value[BtId];
2464 uint lvl = set->page->lvl;
2468 // if current page is the root page, split it
2470 if( set->latch->page_no == ROOT_page )
2471 return bt_splitroot (mgr, set, right, thread_no);
2473 ptr = keyptr(set->page, set->page->cnt);
2474 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2476 page = bt_mappage (mgr, right);
2478 ptr = keyptr(page, page->cnt);
2479 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2481 // insert new fences in their parent pages
2483 bt_lockpage (BtLockParent, right, thread_no);
2485 bt_lockpage (BtLockParent, set->latch, thread_no);
2486 bt_unlockpage (BtLockWrite, set->latch);
2488 // insert new fence for reformulated left block of smaller keys
2490 bt_putid (value, set->latch->page_no);
2491 ptr = (BtKey *)leftkey;
2493 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2496 // switch fence for right block of larger keys to new right page
2498 bt_putid (value, right->page_no);
2499 ptr = (BtKey *)rightkey;
2501 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2504 bt_unlockpage (BtLockParent, set->latch);
2505 bt_unpinlatch (mgr, set->latch);
2507 bt_unlockpage (BtLockParent, right);
2508 bt_unpinlatch (mgr, right);
2512 // install new key and value onto page
2513 // page must already be checked for
2516 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2518 uint idx, librarian;
2524 // if found slot > desired slot and previous slot
2525 // is a librarian slot, use it
2528 if( slotptr(set->page, slot-1)->type == Librarian )
2531 // copy value onto page
2533 set->page->min -= vallen + sizeof(BtVal);
2534 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2535 memcpy (val->value, value, vallen);
2538 // copy key onto page
2540 set->page->min -= keylen + sizeof(BtKey);
2541 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2542 memcpy (ptr->key, key, keylen);
2545 // find first empty slot
2547 for( idx = slot; idx < set->page->cnt; idx++ )
2548 if( slotptr(set->page, idx)->dead )
2551 // now insert key into array before slot,
2552 // adding as many librarian slots as
2555 if( idx == set->page->cnt ) {
2556 int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2558 librarian = ++idx - slot;
2559 avail /= sizeof(BtSlot);
2564 if( librarian > avail )
2568 rate = (idx - slot) / librarian;
2569 set->page->cnt += librarian;
2574 librarian = 0, rate = 0;
2576 while( idx > slot ) {
2578 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2581 // add librarian slot per rate
2584 if( (idx - slot + 1)/2 <= librarian * rate ) {
2585 // if( rate && !(idx % rate) ) {
2586 node = slotptr(set->page, idx--);
2587 node->off = node[1].off;
2588 node->type = Librarian;
2595 set->latch->dirty = 1;
2600 node = slotptr(set->page, slot);
2601 node->off = set->page->min;
2606 bt_unlockpage (BtLockWrite, set->latch);
2607 bt_unpinlatch (mgr, set->latch);
2613 // Insert new key into the btree at given level.
2614 // either add a new key or update/add an existing one
2616 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2618 uint slot, idx, len, entry;
2624 while( 1 ) { // find the page and slot for the current key
2625 if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2626 node = slotptr(set->page, slot);
2627 ptr = keyptr(set->page, slot);
2630 mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2634 // if librarian slot == found slot, advance to real slot
2636 if( node->type == Librarian )
2637 if( !keycmp (ptr, key, keylen) ) {
2638 ptr = keyptr(set->page, ++slot);
2639 node = slotptr(set->page, slot);
2642 // if inserting a duplicate key or unique
2643 // key that doesn't exist on the page,
2644 // check for adequate space on the page
2645 // and insert the new key before slot.
2650 if( keycmp (ptr, key, keylen) )
2651 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2652 return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
2653 else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2655 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2660 // if key already exists, update value and return
2662 val = valptr(set->page, slot);
2664 if( val->len >= vallen ) {
2665 if( slotptr(set->page, slot)->dead )
2670 set->page->garbage += val->len - vallen;
2671 set->latch->dirty = 1;
2673 memcpy (val->value, value, vallen);
2674 bt_unlockpage(BtLockWrite, set->latch);
2675 bt_unpinlatch (mgr, set->latch);
2679 // new update value doesn't fit in existing value area
2680 // make sure page has room
2683 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2690 if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2691 if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2693 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2698 // copy key and value onto page and update slot
2700 set->page->min -= vallen + sizeof(BtVal);
2701 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2702 memcpy (val->value, value, vallen);
2705 set->latch->dirty = 1;
2706 set->page->min -= keylen + sizeof(BtKey);
2707 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2708 memcpy (ptr->key, key, keylen);
2711 node->off = set->page->min;
2712 bt_unlockpage(BtLockWrite, set->latch);
2713 bt_unpinlatch (mgr, set->latch);
2720 // determine actual page where key is located
2721 // return slot number
2723 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2725 BtKey *key = keyptr(source,src);
2726 uint slot = locks[src].slot;
2729 if( src > 1 && locks[src].reuse )
2730 entry = locks[src-1].entry, slot = 0;
2732 entry = locks[src].entry;
2735 set->latch = mgr->latchsets + entry;
2736 set->page = bt_mappage (mgr, set->latch);
2740 // is locks->reuse set? or was slot zeroed?
2741 // if so, find where our key is located
2742 // on current page or pages split on
2743 // same page txn operations.
2746 set->latch = mgr->latchsets + entry;
2747 set->page = bt_mappage (mgr, set->latch);
2749 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2750 if( slotptr(set->page, slot)->type == Librarian )
2752 if( locks[src].reuse )
2753 locks[src].entry = entry;
2756 } while( entry = set->latch->split );
2758 mgr->line = __LINE__, mgr->err = BTERR_atomic;
2762 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2764 BtKey *key = keyptr(source, src);
2765 BtVal *val = valptr(source, src);
2770 while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2771 if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2772 if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2774 set->page->lsn = lsn;
2780 if( entry = bt_splitpage (mgr, set, thread_no) )
2781 latch = mgr->latchsets + entry;
2785 // splice right page into split chain
2788 bt_lockpage(BtLockWrite, latch, thread_no);
2789 latch->split = set->latch->split;
2790 set->latch->split = entry;
2791 locks[src].slot = 0;
2794 return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2797 // perform delete from smaller btree
2798 // insert a delete slot if not found there
2800 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2802 BtKey *key = keyptr(source, src);
2809 if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2810 node = slotptr(set->page, slot);
2811 ptr = keyptr(set->page, slot);
2812 val = valptr(set->page, slot);
2814 return mgr->line = __LINE__, mgr->err = BTERR_struct;
2816 // if slot is not found, insert a delete slot
2818 if( keycmp (ptr, key->key, key->len) )
2819 return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
2821 // if node is already dead,
2822 // ignore the request.
2827 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2828 set->latch->dirty = 1;
2829 set->page->lsn = lsn;
2833 __sync_fetch_and_add(&mgr->found, 1);
2837 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2839 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2840 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2842 return keycmp (key1, key2->key, key2->len);
2844 // atomic modification of a batch of keys.
2846 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2848 uint src, idx, slot, samepage, entry, que = 0;
2849 BtKey *key, *ptr, *key2;
2855 // stable sort the list of keys into order to
2856 // prevent deadlocks between threads.
2858 for( src = 1; src++ < source->cnt; ) {
2859 *temp = *slotptr(source,src);
2860 key = keyptr (source,src);
2862 for( idx = src; --idx; ) {
2863 key2 = keyptr (source,idx);
2864 if( keycmp (key, key2->key, key2->len) < 0 ) {
2865 *slotptr(source,idx+1) = *slotptr(source,idx);
2866 *slotptr(source,idx) = *temp;
2872 qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2873 // add entries to redo log
2875 if( bt->mgr->pagezero->redopages )
2876 lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2880 // perform the individual actions in the transaction
2882 if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2883 return bt->mgr->err;
2885 // if number of active pages
2886 // is greater than the buffer pool
2887 // promote page into larger btree
2890 while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
2891 if( bt_txnpromote (bt) )
2892 return bt->mgr->err;
2899 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2901 uint src, idx, slot, samepage, entry, que = 0;
2902 BtPageSet set[1], prev[1];
2903 unsigned char value[BtId];
2911 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2913 // Load the leaf page for each key
2914 // group same page references with reuse bit
2915 // and determine any constraint violations
2917 for( src = 0; src++ < source->cnt; ) {
2918 key = keyptr(source, src);
2921 // first determine if this modification falls
2922 // on the same page as the previous modification
2923 // note that the far right leaf page is a special case
2925 if( samepage = src > 1 )
2926 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2927 slot = bt_findslot(set->page, key->key, key->len);
2930 if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockAtomic, thread_no) )
2931 set->latch->split = 0;
2935 if( slotptr(set->page, slot)->type == Librarian )
2936 ptr = keyptr(set->page, ++slot);
2938 ptr = keyptr(set->page, slot);
2941 locks[src].entry = set->latch - mgr->latchsets;
2942 locks[src].slot = slot;
2943 locks[src].reuse = 0;
2945 locks[src].entry = 0;
2946 locks[src].slot = 0;
2947 locks[src].reuse = 1;
2950 // capture current lsn for master page
2952 locks[src].reqlsn = set->page->lsn;
2955 // obtain write lock for each master page
2957 for( src = 0; src++ < source->cnt; ) {
2958 if( locks[src].reuse )
2961 set->latch = mgr->latchsets + locks[src].entry;
2962 bt_lockpage (BtLockWrite, set->latch, thread_no);
2965 // insert or delete each key
2966 // process any splits or merges
2967 // run through txn list backwards
2969 samepage = source->cnt + 1;
2971 for( src = source->cnt; src; src-- ) {
2972 if( locks[src].reuse )
2975 // perform the txn operations
2976 // from smaller to larger on
2979 for( idx = src; idx < samepage; idx++ )
2980 switch( slotptr(source,idx)->type ) {
2982 if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
2988 if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
2993 bt_atomicpage (mgr, source, locks, idx, set);
2997 // after the same page operations have finished,
2998 // process master page for splits or deletion.
3000 latch = prev->latch = mgr->latchsets + locks[src].entry;
3001 prev->page = bt_mappage (mgr, prev->latch);
3004 // pick-up all splits from master page
3005 // each one is already pinned & WriteLocked.
3007 if( entry = prev->latch->split ) do {
3008 set->latch = mgr->latchsets + entry;
3009 set->page = bt_mappage (mgr, set->latch);
3011 // delete empty master page by undoing its split
3012 // (this is potentially another empty page)
3013 // note that there are no pointers to it yet
3015 if( !prev->page->act ) {
3016 memcpy (set->page->left, prev->page->left, BtId);
3017 memcpy (prev->page, set->page, mgr->page_size);
3018 bt_lockpage (BtLockDelete, set->latch, thread_no);
3019 prev->latch->split = set->latch->split;
3020 prev->latch->dirty = 1;
3021 bt_freepage (mgr, set);
3025 // remove empty split page from the split chain
3026 // and return it to the free list. No other
3027 // thread has its page number yet.
3029 if( !set->page->act ) {
3030 memcpy (prev->page->right, set->page->right, BtId);
3031 prev->latch->split = set->latch->split;
3033 bt_lockpage (BtLockDelete, set->latch, thread_no);
3034 bt_freepage (mgr, set);
3038 // update prev's fence key
3040 ptr = keyptr(prev->page,prev->page->cnt);
3041 bt_putid (value, prev->latch->page_no);
3043 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3046 // splice in the left link into the split page
3048 bt_putid (set->page->left, prev->latch->page_no);
3051 bt_syncpage (mgr, prev->page, prev->latch);
3053 // page is unpinned below to avoid bt_txnpromote
3055 bt_unlockpage(BtLockWrite, prev->latch);
3057 } while( entry = prev->latch->split );
3059 // update left pointer in next right page from last split page
3060 // (if all splits were reversed or none occurred, latch->split == 0)
3062 if( latch->split ) {
3063 // fix left pointer in master's original (now split)
3064 // far right sibling or set rightmost page in page zero
3066 if( right = bt_getid (prev->page->right) ) {
3067 if( set->latch = bt_pinlatch (mgr, right, NULL, thread_no) )
3068 set->page = bt_mappage (mgr, set->latch);
3072 bt_lockpage (BtLockWrite, set->latch, thread_no);
3073 bt_putid (set->page->left, prev->latch->page_no);
3074 set->latch->dirty = 1;
3076 bt_unlockpage (BtLockWrite, set->latch);
3077 bt_unpinlatch (mgr, set->latch);
3078 } else { // prev is rightmost page
3079 bt_mutexlock (mgr->lock);
3080 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3081 bt_releasemutex(mgr->lock);
3084 // Process last page split in chain
3085 // by switching the key from the master
3086 // page to the last split.
3088 ptr = keyptr(prev->page,prev->page->cnt);
3089 bt_putid (value, prev->latch->page_no);
3091 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3094 bt_unlockpage(BtLockWrite, prev->latch);
3097 bt_syncpage (mgr, prev->page, prev->latch);
3099 bt_unlockpage(BtLockAtomic, latch);
3100 bt_unpinlatch(mgr, latch);
3102 // go through the list of splits and
3103 // release the latch pins
3105 while( entry = latch->split ) {
3106 latch = mgr->latchsets + entry;
3107 bt_unpinlatch(mgr, latch);
3113 // since there are no splits, we're
3114 // finished if master page occupied
3116 if( prev->page->act ) {
3117 bt_unlockpage(BtLockAtomic, prev->latch);
3118 bt_unlockpage(BtLockWrite, prev->latch);
3121 bt_syncpage (mgr, prev->page, prev->latch);
3123 bt_unpinlatch(mgr, prev->latch);
3127 // any and all splits were reversed, and the
3128 // master page located in prev is empty, delete it
3130 if( bt_deletepage (mgr, prev, thread_no, 1) )
3138 // promote a page into the larger btree
3140 BTERR bt_txnpromote (BtDb *bt)
3142 uint entry, slot, idx;
3150 entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
3152 entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
3154 entry %= bt->mgr->latchtotal;
3159 set->latch = bt->mgr->latchsets + entry;
3161 if( !bt_mutextry(set->latch->modify) )
3164 // skip this entry if it is pinned
3166 if( set->latch->pin & ~CLOCK_bit ) {
3167 bt_releasemutex(set->latch->modify);
3171 set->page = bt_mappage (bt->mgr, set->latch);
3173 // entry never used or has no right sibling
3175 if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3176 bt_releasemutex(set->latch->modify);
3180 // entry interiour node or being killed or promoted
3182 if( set->page->lvl || set->page->free || set->page->kill ) {
3183 bt_releasemutex(set->latch->modify);
3187 // pin the page for our useage
3190 set->latch->promote = 1;
3191 bt_releasemutex(set->latch->modify);
3193 bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3195 // remove the key for the page
3196 // and wait for other threads to fade away
3198 ptr = keyptr(set->page, set->page->cnt);
3200 if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3201 return bt->mgr->err;
3203 bt_unlockpage (BtLockWrite, set->latch);
3204 while( (set->latch->pin & ~CLOCK_bit) > 1 )
3206 bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
3207 bt_lockpage (BtLockAtomic, set->latch, bt->thread_no);
3208 bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3210 // transfer slots in our selected page to larger btree
3211 if( !(set->latch->page_no % 100) )
3212 fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act);
3214 if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) )
3215 return bt->main->err;
3217 // now delete the page
3219 bt_unlockpage (BtLockDelete, set->latch);
3220 bt_unlockpage (BtLockAtomic, set->latch);
3221 return bt_deletepage (bt->mgr, set, bt->thread_no, 0);
3225 // set cursor to highest slot on highest page
3227 uint bt_lastkey (BtDb *bt)
3229 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3232 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3233 set->page = bt_mappage (bt->mgr, set->latch);
3237 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3238 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3239 bt_unlockpage(BtLockRead, set->latch);
3240 bt_unpinlatch (bt->mgr, set->latch);
3241 return bt->cursor->cnt;
3244 // return previous slot on cursor page
3246 uint bt_prevkey (BtDb *bt, uint slot)
3248 uid cursor_page = bt->cursor->page_no;
3249 uid ourright, next, us = cursor_page;
3255 ourright = bt_getid(bt->cursor->right);
3258 if( !(next = bt_getid(bt->cursor->left)) )
3264 if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3265 set->page = bt_mappage (bt->mgr, set->latch);
3269 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3270 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3271 bt_unlockpage(BtLockRead, set->latch);
3272 bt_unpinlatch (bt->mgr, set->latch);
3274 next = bt_getid (bt->cursor->right);
3276 if( bt->cursor->kill )
3280 if( next == ourright )
3285 return bt->cursor->cnt;
3288 // return next slot on cursor page
3289 // or slide cursor right into next page
3291 uint bt_nextkey (BtDb *bt, uint slot)
3297 right = bt_getid(bt->cursor->right);
3299 while( slot++ < bt->cursor->cnt )
3300 if( slotptr(bt->cursor,slot)->dead )
3302 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3310 if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3311 set->page = bt_mappage (bt->mgr, set->latch);
3315 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3316 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3317 bt_unlockpage(BtLockRead, set->latch);
3318 bt_unpinlatch (bt->mgr, set->latch);
3323 return bt->mgr->err = 0;
3326 // cache page of keys into cursor and return starting slot for given key
3328 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3333 // cache page for retrieval
3335 if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3336 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3340 bt_unlockpage(BtLockRead, set->latch);
3341 bt_unpinlatch (bt->mgr, set->latch);
3348 double getCpuTime(int type)
3351 FILETIME xittime[1];
3352 FILETIME systime[1];
3353 FILETIME usrtime[1];
3354 SYSTEMTIME timeconv[1];
3357 memset (timeconv, 0, sizeof(SYSTEMTIME));
3361 GetSystemTimeAsFileTime (xittime);
3362 FileTimeToSystemTime (xittime, timeconv);
3363 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3366 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3367 FileTimeToSystemTime (usrtime, timeconv);
3370 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3371 FileTimeToSystemTime (systime, timeconv);
3375 ans += (double)timeconv->wHour * 3600;
3376 ans += (double)timeconv->wMinute * 60;
3377 ans += (double)timeconv->wSecond;
3378 ans += (double)timeconv->wMilliseconds / 1000;
3383 #include <sys/resource.h>
3385 double getCpuTime(int type)
3387 struct rusage used[1];
3388 struct timeval tv[1];
3392 gettimeofday(tv, NULL);
3393 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3396 getrusage(RUSAGE_SELF, used);
3397 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3400 getrusage(RUSAGE_SELF, used);
3401 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3408 void bt_poolaudit (BtMgr *mgr)
3413 while( ++entry < mgr->latchtotal ) {
3414 latch = mgr->latchsets + entry;
3416 if( *latch->readwr->rin & MASK )
3417 fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3419 if( *latch->access->rin & MASK )
3420 fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3422 // if( *latch->parent->xcl->value )
3423 // fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3425 // if( *latch->atomic->xcl->value )
3426 // fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3428 // if( *latch->modify->value )
3429 // fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3431 if( latch->pin & ~CLOCK_bit )
3432 fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3445 // standalone program to index file of keys
3446 // then list them onto std-out
3449 void *index_file (void *arg)
3451 uint __stdcall index_file (void *arg)
3454 int line = 0, found = 0, cnt = 0, idx;
3455 uid next, page_no = LEAF_page; // start on first page of leaves
3456 int ch, len = 0, slot, type = 0;
3457 unsigned char key[BT_maxkey];
3458 unsigned char txn[65536];
3459 ThreadArg *args = arg;
3468 bt = bt_open (args->mgr, args->main);
3471 if( args->idx < strlen (args->type) )
3472 ch = args->type[args->idx];
3474 ch = args->type[strlen(args->type) - 1];
3486 if( type == Delete )
3487 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3489 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3491 if( type == Delete )
3492 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3494 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3496 if( in = fopen (args->infile, "rb") )
3497 while( ch = getc(in), ch != EOF )
3503 if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3504 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3510 memcpy (txn + nxt, key + 10, len - 10);
3512 txn[nxt] = len - 10;
3514 memcpy (txn + nxt, key, 10);
3517 slotptr(page,++cnt)->off = nxt;
3518 slotptr(page,cnt)->type = type;
3521 if( cnt < args->num )
3528 if( bt_atomictxn (bt, page) )
3529 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3534 else if( len < BT_maxkey )
3536 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3540 fprintf(stderr, "started indexing for %s\n", args->infile);
3541 if( in = fopen (args->infile, "r") )
3542 while( ch = getc(in), ch != EOF )
3547 if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3548 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3551 else if( len < BT_maxkey )
3553 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3557 fprintf(stderr, "started finding keys for %s\n", args->infile);
3558 if( in = fopen (args->infile, "rb") )
3559 while( ch = getc(in), ch != EOF )
3563 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3565 else if( bt->mgr->err )
3566 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3569 else if( len < BT_maxkey )
3571 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3575 fprintf(stderr, "started scanning\n");
3578 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3579 set->page = bt_mappage (bt->mgr, set->latch);
3581 fprintf(stderr, "unable to obtain latch"), exit(1);
3583 bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3584 next = bt_getid (set->page->right);
3586 for( slot = 0; slot++ < set->page->cnt; )
3587 if( next || slot < set->page->cnt )
3588 if( !slotptr(set->page, slot)->dead ) {
3589 ptr = keyptr(set->page, slot);
3592 if( slotptr(set->page, slot)->type == Duplicate )
3595 fwrite (ptr->key, len, 1, stdout);
3596 val = valptr(set->page, slot);
3597 fwrite (val->value, val->len, 1, stdout);
3598 fputc ('\n', stdout);
3602 bt_unlockpage (BtLockRead, set->latch);
3603 bt_unpinlatch (bt->mgr, set->latch);
3604 } while( page_no = next );
3606 fprintf(stderr, " Total keys read %d\n", cnt);
3610 fprintf(stderr, "started reverse scan\n");
3611 if( slot = bt_lastkey (bt) )
3612 while( slot = bt_prevkey (bt, slot) ) {
3613 if( slotptr(bt->cursor, slot)->dead )
3616 ptr = keyptr(bt->cursor, slot);
3619 if( slotptr(bt->cursor, slot)->type == Duplicate )
3622 fwrite (ptr->key, len, 1, stdout);
3623 val = valptr(bt->cursor, slot);
3624 fwrite (val->value, val->len, 1, stdout);
3625 fputc ('\n', stdout);
3629 fprintf(stderr, " Total keys read %d\n", cnt);
3634 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3636 fprintf(stderr, "started counting\n");
3637 next = REDO_page + bt->mgr->pagezero->redopages;
3639 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3640 if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3643 if( !bt->cursor->free && !bt->cursor->lvl )
3644 cnt += bt->cursor->act;
3650 cnt--; // remove stopper key
3651 fprintf(stderr, " Total keys counted %d\n", cnt);
3663 typedef struct timeval timer;
3665 int main (int argc, char **argv)
3667 int idx, cnt, len, slot, err;
3668 int segsize, bits = 16;
3688 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3689 fprintf (stderr, " where idx_file is the name of the cache btree file\n");
3690 fprintf (stderr, " where main_file is the name of the main btree file\n");
3691 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3692 fprintf (stderr, " page_bits is the page size in bits for the cache btree\n");
3693 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3694 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3695 fprintf (stderr, " recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3696 fprintf (stderr, " main_bits is the page size of the main btree in bits\n");
3697 fprintf (stderr, " main_pool is the number of main pages in the main buffer pool\n");
3698 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3702 start = getCpuTime(0);
3705 bits = atoi(argv[4]);
3708 poolsize = atoi(argv[5]);
3711 fprintf (stderr, "Warning: no mapped_pool\n");
3714 num = atoi(argv[6]);
3717 redopages = atoi(argv[7]);
3719 if( redopages + REDO_page > 65535 )
3720 fprintf (stderr, "Warning: Recovery buffer too large\n");
3723 mainbits = atoi(argv[8]);
3726 mainpool = atoi(argv[9]);
3730 threads = malloc (cnt * sizeof(pthread_t));
3732 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3734 args = malloc ((cnt + 1) * sizeof(ThreadArg));
3736 mgr = bt_mgr (argv[1], bits, poolsize, redopages);
3739 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3744 main = bt_mgr (argv[2], mainbits, mainpool, 0);
3747 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3756 for( idx = 0; idx < cnt; idx++ ) {
3757 args[idx].infile = argv[idx + 10];
3758 args[idx].type = argv[3];
3759 args[idx].main = main;
3760 args[idx].mgr = mgr;
3761 args[idx].num = num;
3762 args[idx].idx = idx;
3764 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3765 fprintf(stderr, "Error creating thread %d\n", err);
3767 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3771 args[0].infile = argv[idx + 10];
3772 args[0].type = argv[3];
3773 args[0].main = main;
3780 // wait for termination
3783 for( idx = 0; idx < cnt; idx++ )
3784 pthread_join (threads[idx], NULL);
3786 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3788 for( idx = 0; idx < cnt; idx++ )
3789 CloseHandle(threads[idx]);
3796 fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
3802 elapsed = getCpuTime(0) - start;
3803 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3804 elapsed = getCpuTime(1);
3805 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3806 elapsed = getCpuTime(2);
3807 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);