1 // btree version threadskv10b futex version
2 // with reworked bt_deletekey code,
3 // phase-fair re-entrant reader writer locks,
4 // librarian page split code,
5 // bi-directional cursors
6 // traditional buffer pool manager
7 // ACID batched key-value updates
8 // redo log for failure recovery
9 // and LSM B-trees for write optimization
13 // author: karl malbrain, malbrain@cal.berkeley.edu
16 This work, including the source code, documentation
17 and related data, is placed into the public domain.
19 The orginal author is Karl Malbrain.
21 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
22 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
23 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
24 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
25 RESULTING FROM THE USE, MODIFICATION, OR
26 REDISTRIBUTION OF THIS SOFTWARE.
29 // Please see the project home page for documentation
30 // code.google.com/p/high-concurrency-btree
32 #define _FILE_OFFSET_BITS 64
33 #define _LARGEFILE64_SOURCE
37 #include <linux/futex.h>
52 #define WIN32_LEAN_AND_MEAN
66 typedef unsigned long long uid;
67 typedef unsigned long long logseqno;
70 typedef unsigned long long off64_t;
71 typedef unsigned short ushort;
72 typedef unsigned int uint;
75 #define BT_ro 0x6f72 // ro
76 #define BT_rw 0x7772 // rw
78 #define BT_maxbits 26 // maximum page size in bits
79 #define BT_minbits 9 // minimum page size in bits
80 #define BT_minpage (1 << BT_minbits) // minimum page size
81 #define BT_maxpage (1 << BT_maxbits) // maximum page size
83 // BTree page number constants
84 #define ALLOC_page 0 // allocation page
85 #define ROOT_page 1 // root of the btree
86 #define LEAF_page 2 // first page of leaves
87 #define REDO_page 3 // first page of redo buffer
89 // Number of levels to create in a new BTree
94 There are six lock types for each node in four independent sets:
95 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
96 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
97 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
98 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
99 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
100 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
116 volatile ushort xlock[1]; // one writer has exclusive lock
117 volatile ushort wrt[1]; // count of other writers waiting
126 // definition for reader/writer reentrant lock implementation
132 volatile ushort tid[1];
133 volatile ushort readers[1];
137 volatile ushort waitwrite[1];
138 volatile ushort waitread[1];
139 volatile ushort phase[1]; // phase == 1 for reading after write
140 volatile ushort dup[1]; // reentrant counter
143 // write only reentrant lock
149 volatile ushort tid[1];
150 volatile ushort dup[1];
154 volatile uint waiters[1];
157 // mode & definition for lite latch implementation
160 QueRd = 1, // reader queue
161 QueWr = 2 // writer queue
164 // hash table entries
167 uint entry; // Latch table entry at head of chain
168 BtMutexLatch latch[1];
171 // latch manager table structure
174 uid page_no; // latch set page number
175 RWLock readwr[1]; // read/write page lock
176 RWLock access[1]; // Access Intent/Page delete
177 RWLock parent[1]; // Posting of fence key in parent
178 RWLock atomic[1]; // Atomic update in progress
179 RWLock link[1]; // left link being updated
180 uint split; // right split page atomic insert
181 uint next; // next entry in hash table chain
182 uint prev; // prev entry in hash table chain
183 ushort pin; // number of accessing threads
184 unsigned char dirty; // page in cache is dirty (atomic setable)
185 BtMutexLatch modify[1]; // modify entry lite latch
188 // Define the length of the page record numbers
192 // Page key slot definition.
194 // Keys are marked dead, but remain on the page until
195 // it cleanup is called. The fence key (highest key) for
196 // a leaf page is always present, even after cleanup.
200 // In addition to the Unique keys that occupy slots
201 // there are Librarian and Duplicate key
202 // slots occupying the key slot array.
204 // The Librarian slots are dead keys that
205 // serve as filler, available to add new Unique
206 // or Dup slots that are inserted into the B-tree.
208 // The Duplicate slots have had their key bytes extended
209 // by 6 bytes to contain a binary duplicate key uniqueifier.
219 uint off:BT_maxbits; // page offset for key start
220 uint type:3; // type of slot
221 uint dead:1; // set for deleted slot
224 // The key structure occupies space at the upper end of
225 // each page. It's a length byte followed by the key
229 unsigned char len; // this can be changed to a ushort or uint
230 unsigned char key[0];
233 // the value structure also occupies space at the upper
234 // end of the page. Each key is immediately followed by a value.
237 unsigned char len; // this can be changed to a ushort or uint
238 unsigned char value[0];
241 #define BT_maxkey 255 // maximum number of bytes in a key
242 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
244 // The first part of an index page.
245 // It is immediately followed
246 // by the BtSlot array of keys.
248 // note that this structure size
249 // must be a multiple of 8 bytes
250 // in order to place PageZero correctly.
252 typedef struct BtPage_ {
253 uint cnt; // count of keys in page
254 uint act; // count of active keys
255 uint min; // next key offset
256 uint garbage; // page garbage in bytes
257 unsigned char bits:7; // page size in bits
258 unsigned char free:1; // page is on free chain
259 unsigned char lvl:7; // level of page
260 unsigned char kill:1; // page is being deleted
261 unsigned char right[BtId]; // page number to right
262 unsigned char left[BtId]; // page number to left
263 unsigned char filler[2]; // padding to multiple of 8
264 logseqno lsn; // log sequence number applied
265 uid page_no; // this page number
268 // The loadpage interface object
271 BtPage page; // current page pointer
272 BtLatchSet *latch; // current page latch set
275 // structure for latch manager on ALLOC_page
278 struct BtPage_ alloc[1]; // next page_no in right ptr
279 unsigned char freechain[BtId]; // head of free page_nos chain
280 unsigned long long activepages; // number of active pages
281 uint redopages; // number of redo pages in file
284 // The object structure for Btree access
287 uint page_size; // page size
288 uint page_bits; // page size in bits
294 BtPageZero *pagezero; // mapped allocation page
295 BtHashEntry *hashtable; // the buffer pool hash table entries
296 BtLatchSet *latchsets; // mapped latch set from buffer pool
297 unsigned char *pagepool; // mapped to the buffer pool pages
298 unsigned char *redobuff; // mapped recovery buffer pointer
299 logseqno lsn, flushlsn; // current & first lsn flushed
300 BtMutexLatch redo[1]; // redo area lite latch
301 BtMutexLatch lock[1]; // allocation area lite latch
302 BtMutexLatch maps[1]; // mapping segments lite latch
303 ushort thread_no[1]; // next thread number
304 uint nlatchpage; // number of latch pages at BT_latch
305 uint latchtotal; // number of page latch entries
306 uint latchhash; // number of latch hash table slots
307 uint latchvictim; // next latch entry to examine
308 uint latchpromote; // next latch entry to promote
309 uint redolast; // last msync size of recovery buff
310 uint redoend; // eof/end element in recovery buff
311 int err; // last error
312 int line; // last error line no
313 int found; // number of keys found by delete
314 int reads, writes; // number of reads and writes
316 HANDLE halloc; // allocation handle
317 HANDLE hpool; // buffer pool handle
319 uint segments; // number of memory mapped segments
320 unsigned char *pages[64000];// memory mapped segments of b-tree
324 BtMgr *mgr; // buffer manager for entire process
325 BtMgr *main; // buffer manager for main btree
326 BtPage frame; // cached page frame for promote
327 BtPage cursor; // cached page frame for start/next
328 ushort thread_no; // thread number
329 unsigned char key[BT_keyarray]; // last found complete key
332 // atomic txn structures
335 logseqno reqlsn; // redo log seq no required
336 uint entry; // latch table entry number
337 uint slot:31; // page slot number
338 uint reuse:1; // reused previous page
341 // Catastrophic errors
355 #define CLOCK_bit 0x8000
357 // recovery manager entry types
360 BTRM_eof = 0, // rest of buffer is emtpy
361 BTRM_add, // add a unique key-value to btree
362 BTRM_dup, // add a duplicate key-value to btree
363 BTRM_del, // delete a key-value from btree
364 BTRM_upd, // update a key with a new value
365 BTRM_new, // allocate a new empty page
366 BTRM_old // reuse an old empty page
369 // recovery manager entry
370 // structure followed by BtKey & BtVal
373 logseqno reqlsn; // log sequence number required
374 logseqno lsn; // log sequence number for entry
375 uint len; // length of entry
376 unsigned char type; // type of entry
377 unsigned char lvl; // level of btree entry pertains to
382 extern void bt_close (BtDb *bt);
383 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
384 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
385 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
386 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
387 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
388 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
389 extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
391 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
393 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
394 extern uint bt_nextkey (BtDb *bt, uint slot);
395 extern uint bt_prevkey (BtDb *db, uint slot);
396 extern uint bt_lastkey (BtDb *db);
399 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
400 extern void bt_mgrclose (BtMgr *mgr);
401 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
402 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
404 // atomic transaction functions
405 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
406 BTERR bt_txnpromote (BtDb *bt);
408 // The page is allocated from low and hi ends.
409 // The key slots are allocated from the bottom,
410 // while the text and value of the key
411 // are allocated from the top. When the two
412 // areas meet, the page is split into two.
414 // A key consists of a length byte, two bytes of
415 // index number (0 - 65535), and up to 253 bytes
418 // Associated with each key is a value byte string
419 // containing any value desired.
421 // The b-tree root is always located at page 1.
422 // The first leaf page of level zero is always
423 // located on page 2.
425 // The b-tree pages are linked with next
426 // pointers to facilitate enumerators,
427 // and provide for concurrency.
429 // When to root page fills, it is split in two and
430 // the tree height is raised by a new root at page
431 // one with two keys.
433 // Deleted keys are marked with a dead bit until
434 // page cleanup. The fence key for a leaf node is
437 // To achieve maximum concurrency one page is locked at a time
438 // as the tree is traversed to find leaf key in question. The right
439 // page numbers are used in cases where the page is being split,
442 // Page 0 is dedicated to lock for new page extensions,
443 // and chains empty pages together for reuse. It also
444 // contains the latch manager hash table.
446 // The ParentModification lock on a node is obtained to serialize posting
447 // or changing the fence key for a node.
449 // Empty pages are chained together through the ALLOC page and reused.
451 // Access macros to address slot and key values from the page
452 // Page slots use 1 based indexing.
454 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
455 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
456 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
458 void bt_putid(unsigned char *dest, uid id)
463 dest[i] = (unsigned char)id, id >>= 8;
466 uid bt_getid(unsigned char *src)
471 for( i = 0; i < BtId; i++ )
472 id <<= 8, id |= *src++;
477 // lite weight spin lock Latch Manager
479 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
481 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
484 void bt_mutexlock(BtMutexLatch *latch)
486 BtMutexLatch prev[1];
490 *prev->value = __sync_fetch_and_or(latch->value, XCL);
492 if( !*prev->bits->xlock ) { // did we set XCL?
494 __sync_fetch_and_sub(latch->value, WRT);
499 *prev->bits->wrt += 1;
500 __sync_fetch_and_add(latch->value, WRT);
503 sys_futex (latch->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr);
508 // try to obtain write lock
510 // return 1 if obtained,
513 int bt_mutextry(BtMutexLatch *latch)
515 BtMutexLatch prev[1];
517 *prev->value = __sync_fetch_and_or(latch->value, XCL);
519 // take write access if exclusive bit was clear
521 return !*prev->bits->xlock;
526 void bt_releasemutex(BtMutexLatch *latch)
528 BtMutexLatch prev[1];
530 *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
532 if( *prev->bits->wrt )
533 sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
536 // reentrant reader/writer lock implementation
538 void WriteLock (RWLock *lock, ushort tid)
544 bt_mutexlock(lock->xcl);
547 // is this a re-entrant request?
549 if( *prev->bits->tid == tid )
552 // wait if write already taken, or there are readers
554 else if( *prev->bits->tid || *prev->bits->readers ) {
556 waited++, *lock->waitwrite += 1;
558 // otherwise, we can take the lock
562 *lock->waitwrite -= 1;
564 *lock->bits->tid = tid;
565 *lock->phase = 0; // set writing phase
568 bt_releasemutex(lock->xcl);
570 if( *lock->bits->tid == tid )
573 sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr );
577 void WriteRelease (RWLock *lock)
579 bt_mutexlock(lock->xcl);
581 // were we reentrant?
585 bt_releasemutex(lock->xcl);
589 // release write lock and
590 // set reading after write phase
592 *lock->bits->tid = 0;
594 // were readers waiting for a write cycle?
596 if( *lock->waitread ) {
598 sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 32768, NULL, NULL, QueRd );
600 // otherwise were writers waiting
602 } else if( *lock->waitwrite ) {
604 sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
607 bt_releasemutex(lock->xcl);
610 void ReadLock (RWLock *lock, ushort tid)
612 uint xit, waited = 0;
616 bt_mutexlock(lock->xcl);
620 // wait if a write lock is currenty active
621 // or we are not in a new read cycle and
622 // writers are waiting.
624 if( *prev->bits->tid || !*prev->phase && *prev->waitwrite ) {
626 waited++, *lock->waitread += 1;
628 // else we can take the lock
632 *lock->waitread -= 1;
634 *lock->bits->readers += 1;
638 bt_releasemutex(lock->xcl);
640 // did we increment readers?
645 sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueRd );
649 void ReadRelease (RWLock *lock)
653 bt_mutexlock(lock->xcl);
656 *prev->bits->readers = *lock->bits->readers -= 1;
658 if( !*lock->waitread && *lock->waitwrite )
659 *prev->phase = *lock->phase = 0; // stop accepting new readers
661 bt_releasemutex(lock->xcl);
663 // were writers waiting for a read cycle to finish?
665 if( !*prev->phase && !*prev->bits->readers )
666 if( *prev->waitwrite )
667 sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
670 // recovery manager -- flush dirty pages
672 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
674 uint cnt3 = 0, cnt2 = 0, cnt = 0;
679 // flush dirty pool pages to the btree
681 fprintf(stderr, "Start flushlsn ");
682 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
683 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
684 latch = mgr->latchsets + entry;
685 bt_mutexlock (latch->modify);
686 bt_lockpage(BtLockRead, latch, thread_no);
689 bt_writepage(mgr, page, latch->page_no);
690 latch->dirty = 0, cnt++;
692 if( latch->pin & ~CLOCK_bit )
694 bt_unlockpage(BtLockRead, latch);
695 bt_releasemutex (latch->modify);
697 fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
698 fprintf(stderr, "begin sync");
699 for( segment = 0; segment < mgr->segments; segment++ )
700 if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
701 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
702 fprintf(stderr, " end sync\n");
705 // recovery manager -- process current recovery buff on startup
706 // this won't do much if previous session was properly closed.
708 BTERR bt_recoveryredo (BtMgr *mgr)
715 hdr = (BtLogHdr *)mgr->redobuff;
716 mgr->flushlsn = hdr->lsn;
719 hdr = (BtLogHdr *)(mgr->redobuff + offset);
720 switch( hdr->type ) {
724 case BTRM_add: // add a unique key-value to btree
726 case BTRM_dup: // add a duplicate key-value to btree
727 case BTRM_del: // delete a key-value from btree
728 case BTRM_upd: // update a key with a new value
729 case BTRM_new: // allocate a new empty page
730 case BTRM_old: // reuse an old empty page
736 // recovery manager -- append new entry to recovery log
737 // flush dirty pages to disk when it overflows.
739 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
741 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
742 uint amt = sizeof(BtLogHdr);
746 bt_mutexlock (mgr->redo);
749 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
751 // see if new entry fits in buffer
752 // flush and reset if it doesn't
754 if( amt > size - mgr->redoend ) {
755 mgr->flushlsn = mgr->lsn;
756 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
757 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
760 bt_flushlsn(mgr, thread_no);
763 // fill in new entry & either eof or end block
765 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
770 hdr->lsn = ++mgr->lsn;
774 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
775 memset (eof, 0, sizeof(BtLogHdr));
777 // fill in key and value
780 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
781 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
784 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
785 memset (eof, 0, sizeof(BtLogHdr));
788 last = mgr->redolast & ~0xfff;
791 if( end - last + sizeof(BtLogHdr) >= 32768 )
792 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
793 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
797 bt_releasemutex(mgr->redo);
801 // recovery manager -- append transaction to recovery log
802 // flush dirty pages to disk when it overflows.
804 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
806 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
807 uint amt = 0, src, type;
814 // determine amount of redo recovery log space required
816 for( src = 0; src++ < source->cnt; ) {
817 key = keyptr(source,src);
818 val = valptr(source,src);
819 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
820 amt += sizeof(BtLogHdr);
823 bt_mutexlock (mgr->redo);
825 // see if new entry fits in buffer
826 // flush and reset if it doesn't
828 if( amt > size - mgr->redoend ) {
829 mgr->flushlsn = mgr->lsn;
830 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
831 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
834 bt_flushlsn (mgr, thread_no);
837 // assign new lsn to transaction
841 // fill in new entries
843 for( src = 0; src++ < source->cnt; ) {
844 key = keyptr(source, src);
845 val = valptr(source, src);
847 switch( slotptr(source, src)->type ) {
859 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
860 amt += sizeof(BtLogHdr);
862 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
868 // fill in key and value
870 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
871 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
876 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
877 memset (eof, 0, sizeof(BtLogHdr));
880 last = mgr->redolast & ~0xfff;
883 if( end - last + sizeof(BtLogHdr) >= 32768 )
884 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
885 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
889 bt_releasemutex(mgr->redo);
893 // sync a single btree page to disk
895 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
897 uint segment = latch->page_no >> 16;
900 if( bt_writepage (mgr, page, latch->page_no) )
903 perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
905 if( msync (perm, mgr->page_size, MS_SYNC) < 0 )
906 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
912 // read page into buffer pool from permanent location in Btree file
914 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
916 int flag = PROT_READ | PROT_WRITE;
917 uint segment = page_no >> 16;
921 if( segment < mgr->segments ) {
922 perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
924 memcpy (page, perm, mgr->page_size);
929 bt_mutexlock (mgr->maps);
931 if( segment < mgr->segments ) {
932 bt_releasemutex (mgr->maps);
936 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
939 bt_releasemutex (mgr->maps);
943 // write page to permanent location in Btree file
944 // clear the dirty bit
946 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
948 int flag = PROT_READ | PROT_WRITE;
949 uint segment = page_no >> 16;
953 if( segment < mgr->segments ) {
954 perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
956 memcpy (perm, page, mgr->page_size);
961 bt_mutexlock (mgr->maps);
963 if( segment < mgr->segments ) {
964 bt_releasemutex (mgr->maps);
968 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
969 bt_releasemutex (mgr->maps);
974 // set CLOCK bit in latch
975 // decrement pin count
977 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
979 bt_mutexlock(latch->modify);
980 latch->pin |= CLOCK_bit;
983 bt_releasemutex(latch->modify);
986 // return the btree cached page address
988 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
990 uid entry = latch - mgr->latchsets;
991 BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
996 // return next available latch entry
997 // and with latch entry locked
999 uint bt_availnext (BtMgr *mgr)
1006 entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
1008 entry = _InterlockedIncrement (&mgr->latchvictim);
1010 entry %= mgr->latchtotal;
1015 latch = mgr->latchsets + entry;
1017 if( !bt_mutextry(latch->modify) )
1020 // return this entry if it is not pinned
1025 // if the CLOCK bit is set
1026 // reset it to zero.
1028 latch->pin &= ~CLOCK_bit;
1029 bt_releasemutex(latch->modify);
1033 // pin page in buffer pool
1034 // return with latchset pinned
1036 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
1038 uint hashidx = page_no % mgr->latchhash;
1043 // try to find our entry
1045 bt_mutexlock(mgr->hashtable[hashidx].latch);
1047 if( entry = mgr->hashtable[hashidx].entry ) do
1049 latch = mgr->latchsets + entry;
1050 if( page_no == latch->page_no )
1052 } while( entry = latch->next );
1054 // found our entry: increment pin
1057 latch = mgr->latchsets + entry;
1058 bt_mutexlock(latch->modify);
1059 latch->pin |= CLOCK_bit;
1062 bt_releasemutex(latch->modify);
1063 bt_releasemutex(mgr->hashtable[hashidx].latch);
1067 // find and reuse unpinned entry
1071 entry = bt_availnext (mgr);
1072 latch = mgr->latchsets + entry;
1074 idx = latch->page_no % mgr->latchhash;
1076 // if latch is on a different hash chain
1077 // unlink from the old page_no chain
1079 if( latch->page_no )
1080 if( idx != hashidx ) {
1082 // skip over this entry if latch not available
1084 if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1085 bt_releasemutex(latch->modify);
1090 mgr->latchsets[latch->prev].next = latch->next;
1092 mgr->hashtable[idx].entry = latch->next;
1095 mgr->latchsets[latch->next].prev = latch->prev;
1097 bt_releasemutex (mgr->hashtable[idx].latch);
1100 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1102 // update permanent page area in btree from buffer pool
1103 // no read-lock is required since page is not pinned.
1106 if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
1107 return mgr->line = __LINE__, NULL;
1112 memcpy (page, contents, mgr->page_size);
1114 } else if( bt_readpage (mgr, page, page_no) )
1115 return mgr->line = __LINE__, NULL;
1117 // link page as head of hash table chain
1118 // if this is a never before used entry,
1119 // or it was previously on a different
1120 // hash table chain. Otherwise, just
1121 // leave it in its current hash table
1124 if( !latch->page_no || hashidx != idx ) {
1125 if( latch->next = mgr->hashtable[hashidx].entry )
1126 mgr->latchsets[latch->next].prev = entry;
1128 mgr->hashtable[hashidx].entry = entry;
1132 // fill in latch structure
1134 latch->pin = CLOCK_bit | 1;
1135 latch->page_no = page_no;
1138 bt_releasemutex (latch->modify);
1139 bt_releasemutex (mgr->hashtable[hashidx].latch);
1143 void bt_mgrclose (BtMgr *mgr)
1151 // flush previously written dirty pages
1152 // and write recovery buffer to disk
1154 fdatasync (mgr->idx);
1156 if( mgr->redoend ) {
1157 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1158 memset (eof, 0, sizeof(BtLogHdr));
1161 // write remaining dirty pool pages to the btree
1163 for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1164 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1165 latch = mgr->latchsets + slot;
1167 if( latch->dirty ) {
1168 bt_writepage(mgr, page, latch->page_no);
1169 latch->dirty = 0, num++;
1173 // clear redo recovery buffer on disk.
1175 if( mgr->pagezero->redopages ) {
1176 eof = (BtLogHdr *)mgr->redobuff;
1177 memset (eof, 0, sizeof(BtLogHdr));
1178 eof->lsn = mgr->lsn;
1179 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1180 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1183 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1186 while( mgr->segments )
1187 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1189 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1190 munmap (mgr->pagezero, mgr->page_size);
1192 FlushViewOfFile(mgr->pagezero, 0);
1193 UnmapViewOfFile(mgr->pagezero);
1194 UnmapViewOfFile(mgr->pagepool);
1195 CloseHandle(mgr->halloc);
1196 CloseHandle(mgr->hpool);
1202 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1203 FlushFileBuffers(mgr->idx);
1204 CloseHandle(mgr->idx);
1209 // close and release memory
1211 void bt_close (BtDb *bt)
1220 VirtualFree (bt->frame, 0, MEM_RELEASE);
1222 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1227 // open/create new btree buffer manager
1229 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1230 // size of page pool (e.g. 262144)
1232 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1234 uint lvl, attr, last, slot, idx;
1235 uint nlatchpage, latchhash;
1236 unsigned char value[BtId];
1237 int flag, initit = 0;
1238 BtPageZero *pagezero;
1246 // determine sanity of page size and buffer pool
1248 if( bits > BT_maxbits )
1250 else if( bits < BT_minbits )
1253 mgr = calloc (1, sizeof(BtMgr));
1255 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1257 if( mgr->idx == -1 ) {
1258 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1259 return free(mgr), NULL;
1262 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1263 attr = FILE_ATTRIBUTE_NORMAL;
1264 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1266 if( mgr->idx == INVALID_HANDLE_VALUE ) {
1267 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1268 return GlobalFree(mgr), NULL;
1273 pagezero = valloc (BT_maxpage);
1276 // read minimum page size to get root info
1277 // to support raw disk partition files
1278 // check if bits == 0 on the disk.
1280 if( size = lseek (mgr->idx, 0L, 2) )
1281 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1282 if( pagezero->alloc->bits )
1283 bits = pagezero->alloc->bits;
1287 return free(mgr), free(pagezero), NULL;
1291 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1292 size = GetFileSize(mgr->idx, amt);
1294 if( size || *amt ) {
1295 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1296 return bt_mgrclose (mgr), NULL;
1297 bits = pagezero->alloc->bits;
1302 mgr->page_size = 1 << bits;
1303 mgr->page_bits = bits;
1305 // calculate number of latch hash table entries
1307 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1309 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1310 mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1311 mgr->latchtotal = nodemax;
1316 // initialize an empty b-tree with latch page, root page, page of leaves
1317 // and page(s) of latches and page pool cache
1319 memset (pagezero, 0, 1 << bits);
1320 pagezero->alloc->lvl = MIN_lvl - 1;
1321 pagezero->alloc->bits = mgr->page_bits;
1322 pagezero->redopages = redopages;
1324 bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1);
1325 pagezero->activepages = 2;
1327 // initialize left-most LEAF page in
1328 // alloc->left and count of active leaf pages.
1330 bt_putid (pagezero->alloc->left, LEAF_page);
1331 ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits);
1333 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1334 fprintf (stderr, "Unable to create btree page zero\n");
1335 return bt_mgrclose (mgr), NULL;
1338 memset (pagezero, 0, 1 << bits);
1339 pagezero->alloc->bits = mgr->page_bits;
1341 for( lvl=MIN_lvl; lvl--; ) {
1342 BtSlot *node = slotptr(pagezero->alloc, 1);
1343 node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1344 key = keyptr(pagezero->alloc, 1);
1345 key->len = 2; // create stopper key
1349 bt_putid(value, MIN_lvl - lvl + 1);
1350 val = valptr(pagezero->alloc, 1);
1351 val->len = lvl ? BtId : 0;
1352 memcpy (val->value, value, val->len);
1354 pagezero->alloc->min = node->off;
1355 pagezero->alloc->lvl = lvl;
1356 pagezero->alloc->cnt = 1;
1357 pagezero->alloc->act = 1;
1358 pagezero->alloc->page_no = MIN_lvl - lvl;
1360 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1361 fprintf (stderr, "Unable to create btree page\n");
1362 return bt_mgrclose (mgr), NULL;
1370 VirtualFree (pagezero, 0, MEM_RELEASE);
1373 // mlock the first segment of 64K pages
1375 flag = PROT_READ | PROT_WRITE;
1376 mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1379 if( mgr->pages[0] == MAP_FAILED ) {
1380 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1381 return bt_mgrclose (mgr), NULL;
1384 mgr->pagezero = (BtPageZero *)mgr->pages[0];
1385 mlock (mgr->pagezero, mgr->page_size);
1387 mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size;
1388 mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1390 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1391 if( mgr->pagepool == MAP_FAILED ) {
1392 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1393 return bt_mgrclose (mgr), NULL;
1396 flag = PAGE_READWRITE;
1397 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1398 if( !mgr->halloc ) {
1399 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1400 return bt_mgrclose (mgr), NULL;
1403 flag = FILE_MAP_WRITE;
1404 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1405 if( !mgr->pagezero ) {
1406 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1407 return bt_mgrclose (mgr), NULL;
1410 flag = PAGE_READWRITE;
1411 size = (uid)mgr->nlatchpage << mgr->page_bits;
1412 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1414 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1415 return bt_mgrclose (mgr), NULL;
1418 flag = FILE_MAP_WRITE;
1419 mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1420 if( !mgr->pagepool ) {
1421 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1422 return bt_mgrclose (mgr), NULL;
1426 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1427 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1428 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1433 // open BTree access method
1434 // based on buffer manager
1436 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1438 BtDb *bt = malloc (sizeof(*bt));
1440 memset (bt, 0, sizeof(*bt));
1444 bt->cursor = valloc (mgr->page_size);
1445 bt->frame = valloc (mgr->page_size);
1447 bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1448 bt->frame = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1451 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1453 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1458 // compare two keys, return > 0, = 0, or < 0
1459 // =0: keys are same
1462 // as the comparison value
1464 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1466 uint len1 = key1->len;
1469 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1480 // place write, read, or parent lock on requested page_no.
1482 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1486 ReadLock (latch->readwr, thread_no);
1489 WriteLock (latch->readwr, thread_no);
1492 ReadLock (latch->access, thread_no);
1495 WriteLock (latch->access, thread_no);
1498 WriteLock (latch->parent, thread_no);
1501 WriteLock (latch->atomic, thread_no);
1503 case BtLockAtomic | BtLockRead:
1504 WriteLock (latch->atomic, thread_no);
1505 ReadLock (latch->readwr, thread_no);
1507 case BtLockAtomic | BtLockWrite:
1508 WriteLock (latch->atomic, thread_no);
1509 WriteLock (latch->readwr, thread_no);
1512 WriteLock (latch->link, thread_no);
1517 // remove write, read, or parent lock on requested page
1519 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1523 ReadRelease (latch->readwr);
1526 WriteRelease (latch->readwr);
1529 ReadRelease (latch->access);
1532 WriteRelease (latch->access);
1535 WriteRelease (latch->parent);
1538 WriteRelease (latch->atomic);
1540 case BtLockAtomic | BtLockRead:
1541 WriteRelease (latch->atomic);
1542 ReadRelease (latch->readwr);
1544 case BtLockAtomic | BtLockWrite:
1545 WriteRelease (latch->atomic);
1546 WriteRelease (latch->readwr);
1549 WriteRelease (latch->link);
1554 // allocate a new page
1555 // return with page latched, but unlocked.
1557 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1562 // lock allocation page
1564 bt_mutexlock(mgr->lock);
1566 // use empty chain first
1567 // else allocate new page
1569 if( page_no = bt_getid(mgr->pagezero->freechain) ) {
1570 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1571 set->page = bt_mappage (mgr, set->latch);
1573 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1575 mgr->pagezero->activepages++;
1576 bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
1578 // the page is currently free and this
1579 // will keep bt_txnpromote out.
1581 // contents will replace this bit
1582 // and pin will keep bt_txnpromote out
1584 contents->page_no = page_no;
1585 set->latch->dirty = 1;
1587 memcpy (set->page, contents, mgr->page_size);
1589 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1590 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1592 bt_releasemutex(mgr->lock);
1596 page_no = bt_getid(mgr->pagezero->alloc->right);
1597 bt_putid(mgr->pagezero->alloc->right, page_no+1);
1599 // unlock allocation latch and
1600 // extend file into new page.
1602 mgr->pagezero->activepages++;
1603 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1604 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1605 bt_releasemutex(mgr->lock);
1607 // keep bt_txnpromote out of this page
1610 contents->page_no = page_no;
1611 pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
1613 // don't load cache from btree page, load it from contents
1615 if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1616 set->page = bt_mappage (mgr, set->latch);
1620 // now pin will keep bt_txnpromote out
1622 set->page->free = 0;
1626 // find slot in page for given key at a given level
1628 int bt_findslot (BtPage page, unsigned char *key, uint len)
1630 uint diff, higher = page->cnt, low = 1, slot;
1633 // make stopper key an infinite fence value
1635 if( bt_getid (page->right) )
1640 // low is the lowest candidate.
1641 // loop ends when they meet
1643 // higher is already
1644 // tested as .ge. the passed key.
1646 while( diff = higher - low ) {
1647 slot = low + ( diff >> 1 );
1648 if( keycmp (keyptr(page, slot), key, len) < 0 )
1651 higher = slot, good++;
1654 // return zero if key is on right link page
1656 return good ? higher : 0;
1659 // find and load page at given level for given key
1660 // leave page rd or wr locked as requested
1662 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1664 uid page_no = ROOT_page, prevpage_no = 0;
1665 uint drill = 0xff, slot;
1666 BtLatchSet *prevlatch;
1667 uint mode, prevmode;
1671 // start at root of btree and drill down
1674 // determine lock mode of drill level
1675 mode = (drill == lvl) ? lock : BtLockRead;
1677 if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1680 // obtain access lock using lock chaining with Access mode
1682 if( page_no > ROOT_page )
1683 bt_lockpage(BtLockAccess, set->latch, thread_no);
1685 set->page = bt_mappage (mgr, set->latch);
1687 // release & unpin parent or left sibling page
1690 bt_unlockpage(prevmode, prevlatch);
1691 bt_unpinlatch (mgr, prevlatch);
1695 // obtain mode lock using lock chaining through AccessLock
1697 bt_lockpage(mode, set->latch, thread_no);
1699 if( set->page->free )
1700 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1702 if( page_no > ROOT_page )
1703 bt_unlockpage(BtLockAccess, set->latch);
1705 // re-read and re-lock root after determining actual level of root
1707 if( set->page->lvl != drill) {
1708 if( set->latch->page_no != ROOT_page )
1709 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1711 drill = set->page->lvl;
1713 if( lock != BtLockRead && drill == lvl ) {
1714 bt_unlockpage(mode, set->latch);
1715 bt_unpinlatch (mgr, set->latch);
1720 prevpage_no = set->latch->page_no;
1721 prevlatch = set->latch;
1722 prevpage = set->page;
1725 // find key on page at this level
1726 // and descend to requested level
1728 if( !set->page->kill )
1729 if( slot = bt_findslot (set->page, key, len) ) {
1733 // find next non-dead slot -- the fence key if nothing else
1735 while( slotptr(set->page, slot)->dead )
1736 if( slot++ < set->page->cnt )
1739 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1741 val = valptr(set->page, slot);
1743 if( val->len == BtId )
1744 page_no = bt_getid(valptr(set->page, slot)->value);
1746 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1752 // slide right into next page
1754 page_no = bt_getid(set->page->right);
1757 // return error on end of right chain
1759 mgr->line = __LINE__, mgr->err = BTERR_struct;
1760 return 0; // return error
1763 // return page to free list
1764 // page must be delete & write locked
1765 // and have no keys pointing to it.
1767 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1769 // lock allocation page
1771 bt_mutexlock (mgr->lock);
1775 memcpy(set->page->right, mgr->pagezero->freechain, BtId);
1776 bt_putid(mgr->pagezero->freechain, set->latch->page_no);
1777 set->latch->dirty = 1;
1778 set->page->free = 1;
1780 // decrement active page count
1782 mgr->pagezero->activepages--;
1784 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1785 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1787 // unlock released page
1788 // and unlock allocation page
1790 bt_unlockpage (BtLockDelete, set->latch);
1791 bt_unlockpage (BtLockWrite, set->latch);
1792 bt_unpinlatch (mgr, set->latch);
1793 bt_releasemutex (mgr->lock);
1796 // a fence key was deleted from a page
1797 // push new fence value upwards
1799 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1801 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1802 unsigned char value[BtId];
1806 // remove the old fence value
1808 ptr = keyptr(set->page, set->page->cnt);
1809 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1810 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1811 set->latch->dirty = 1;
1813 // cache new fence value
1815 ptr = keyptr(set->page, set->page->cnt);
1816 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1818 bt_lockpage (BtLockParent, set->latch, thread_no);
1819 bt_unlockpage (BtLockWrite, set->latch);
1821 // insert new (now smaller) fence key
1823 bt_putid (value, set->latch->page_no);
1824 ptr = (BtKey*)leftkey;
1826 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1829 // now delete old fence key
1831 ptr = (BtKey*)rightkey;
1833 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1836 bt_unlockpage (BtLockParent, set->latch);
1837 bt_unpinlatch(mgr, set->latch);
1841 // root has a single child
1842 // collapse a level from the tree
1844 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1851 // find the child entry and promote as new root contents
1854 for( idx = 0; idx++ < root->page->cnt; )
1855 if( !slotptr(root->page, idx)->dead )
1858 val = valptr(root->page, idx);
1860 if( val->len == BtId )
1861 page_no = bt_getid (valptr(root->page, idx)->value);
1863 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1865 if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1866 child->page = bt_mappage (mgr, child->latch);
1870 bt_lockpage (BtLockDelete, child->latch, thread_no);
1871 bt_lockpage (BtLockWrite, child->latch, thread_no);
1873 memcpy (root->page, child->page, mgr->page_size);
1874 root->latch->dirty = 1;
1876 bt_freepage (mgr, child);
1878 } while( root->page->lvl > 1 && root->page->act == 1 );
1880 bt_unlockpage (BtLockWrite, root->latch);
1881 bt_unpinlatch (mgr, root->latch);
1885 // delete a page and manage keys
1886 // call with page writelocked
1888 // returns the right page pool entry for freeing
1889 // or zero on error.
1891 uint bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, BtLock mode)
1893 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1894 unsigned char value[BtId];
1895 uint lvl = set->page->lvl;
1900 // cache copy of fence key
1901 // to remove in parent
1903 ptr = keyptr(set->page, set->page->cnt);
1904 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1906 // obtain lock on right page
1908 page_no = bt_getid(set->page->right);
1910 if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1911 right->page = bt_mappage (mgr, right->latch);
1915 bt_lockpage (mode, right->latch, thread_no);
1917 // cache copy of key to update
1919 ptr = keyptr(right->page, right->page->cnt);
1920 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1922 if( right->page->kill )
1923 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1925 // pull contents of right peer into our empty page
1927 bt_lockpage (BtLockLink, set->latch, thread_no);
1928 memcpy (right->page->left, set->page->left, BtId);
1929 memcpy (set->page, right->page, mgr->page_size);
1930 set->page->page_no = set->latch->page_no;
1931 set->latch->dirty = 1;
1932 bt_unlockpage (BtLockLink, set->latch);
1934 // mark right page deleted and point it to left page
1935 // until we can post parent updates that remove access
1936 // to the deleted page.
1938 bt_putid (right->page->right, set->latch->page_no);
1939 right->latch->dirty = 1;
1940 right->page->kill = 1;
1942 bt_lockpage (BtLockParent, right->latch, thread_no);
1943 bt_unlockpage (mode, right->latch);
1945 bt_lockpage (BtLockParent, set->latch, thread_no);
1946 bt_unlockpage (BtLockWrite, set->latch);
1948 // redirect higher key directly to our new node contents
1950 bt_putid (value, set->latch->page_no);
1951 ptr = (BtKey*)higherfence;
1953 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1956 // delete old lower key to our node
1958 ptr = (BtKey*)lowerfence;
1960 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1963 bt_unlockpage (BtLockParent, set->latch);
1964 return right->latch - mgr->latchsets;
1967 // find and delete key on page by marking delete flag bit
1968 // if page becomes empty, delete it from the btree
1970 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
1972 uint slot, idx, found, fence, entry;
1973 BtPageSet set[1], right[1];
1978 if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
1979 node = slotptr(set->page, slot);
1980 ptr = keyptr(set->page, slot);
1984 // if librarian slot, advance to real slot
1986 if( node->type == Librarian ) {
1987 ptr = keyptr(set->page, ++slot);
1988 node = slotptr(set->page, slot);
1991 fence = slot == set->page->cnt;
1993 // delete the key, ignore request if already dead
1995 if( found = !keycmp (ptr, key, len) )
1996 if( found = node->dead == 0 ) {
1997 val = valptr(set->page,slot);
1998 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2001 // mark node type as delete
2003 node->type = Delete;
2006 // collapse empty slots beneath the fence
2007 // on interiour nodes
2010 while( idx = set->page->cnt - 1 )
2011 if( slotptr(set->page, idx)->dead ) {
2012 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2013 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2021 // did we delete a fence key in an upper level?
2023 if( lvl && set->page->act && fence )
2024 if( bt_fixfence (mgr, set, lvl, thread_no) )
2029 // do we need to collapse root?
2031 if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2032 if( bt_collapseroot (mgr, set, thread_no) )
2037 // delete empty page
2039 if( !set->page->act ) {
2040 if( entry = bt_deletepage (mgr, set, thread_no, BtLockWrite) )
2041 right->latch = mgr->latchsets + entry;
2045 // obtain delete and write locks to right node
2047 bt_unlockpage (BtLockParent, right->latch);
2048 right->page = bt_mappage (mgr, right->latch);
2049 bt_lockpage (BtLockDelete, right->latch, thread_no);
2050 bt_lockpage (BtLockWrite, right->latch, thread_no);
2051 bt_freepage (mgr, right);
2053 bt_unpinlatch (mgr, set->latch);
2057 set->latch->dirty = 1;
2058 bt_unlockpage(BtLockWrite, set->latch);
2059 bt_unpinlatch (mgr, set->latch);
2063 // advance to next slot
2065 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2067 BtLatchSet *prevlatch;
2070 if( slot < set->page->cnt )
2073 prevlatch = set->latch;
2075 if( page_no = bt_getid(set->page->right) )
2076 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2077 set->page = bt_mappage (bt->mgr, set->latch);
2081 return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2083 // obtain access lock using lock chaining with Access mode
2085 bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2087 bt_unlockpage(BtLockRead, prevlatch);
2088 bt_unpinlatch (bt->mgr, prevlatch);
2090 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2091 bt_unlockpage(BtLockAccess, set->latch);
2095 // find unique key == given key, or first duplicate key in
2096 // leaf level and return number of value bytes
2097 // or (-1) if not found.
2099 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2107 if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2109 ptr = keyptr(set->page, slot);
2111 // skip librarian slot place holder
2113 if( slotptr(set->page, slot)->type == Librarian )
2114 ptr = keyptr(set->page, ++slot);
2116 // return actual key found
2118 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2121 if( slotptr(set->page, slot)->type == Duplicate )
2124 // not there if we reach the stopper key
2126 if( slot == set->page->cnt )
2127 if( !bt_getid (set->page->right) )
2130 // if key exists, return >= 0 value bytes copied
2131 // otherwise return (-1)
2133 if( slotptr(set->page, slot)->dead )
2137 if( !memcmp (ptr->key, key, len) ) {
2138 val = valptr (set->page,slot);
2139 if( valmax > val->len )
2141 memcpy (value, val->value, valmax);
2147 } while( slot = bt_findnext (bt, set, slot) );
2149 bt_unlockpage (BtLockRead, set->latch);
2150 bt_unpinlatch (bt->mgr, set->latch);
2154 // check page for space available,
2155 // clean if necessary and return
2156 // 0 - page needs splitting
2157 // >0 new slot value
2159 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2161 BtPage page = set->page, frame;
2162 uint nxt = mgr->page_size;
2163 uint cnt = 0, idx = 0;
2164 uint max = page->cnt;
2169 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2172 // skip cleanup and proceed to split
2173 // if there's not enough garbage
2176 if( page->garbage < nxt / 5 )
2179 frame = malloc (mgr->page_size);
2180 memcpy (frame, page, mgr->page_size);
2182 // skip page info and set rest of page to zero
2184 memset (page+1, 0, mgr->page_size - sizeof(*page));
2185 set->latch->dirty = 1;
2190 // clean up page first by
2191 // removing deleted keys
2193 while( cnt++ < max ) {
2197 if( cnt < max || frame->lvl )
2198 if( slotptr(frame,cnt)->dead )
2201 // copy the value across
2203 val = valptr(frame, cnt);
2204 nxt -= val->len + sizeof(BtVal);
2205 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2207 // copy the key across
2209 key = keyptr(frame, cnt);
2210 nxt -= key->len + sizeof(BtKey);
2211 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2213 // make a librarian slot
2215 slotptr(page, ++idx)->off = nxt;
2216 slotptr(page, idx)->type = Librarian;
2217 slotptr(page, idx)->dead = 1;
2221 slotptr(page, ++idx)->off = nxt;
2222 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2224 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2232 // see if page has enough space now, or does it need splitting?
2234 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2240 // split the root and raise the height of the btree
2242 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2244 unsigned char leftkey[BT_keyarray];
2245 uint nxt = mgr->page_size;
2246 unsigned char value[BtId];
2253 frame = malloc (mgr->page_size);
2254 memcpy (frame, root->page, mgr->page_size);
2256 // save left page fence key for new root
2258 ptr = keyptr(root->page, root->page->cnt);
2259 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2261 // Obtain an empty page to use, and copy the current
2262 // root contents into it, e.g. lower keys
2264 if( bt_newpage(mgr, left, frame, page_no) )
2267 left_page_no = left->latch->page_no;
2268 bt_unpinlatch (mgr, left->latch);
2271 // preserve the page info at the bottom
2272 // of higher keys and set rest to zero
2274 memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2276 // insert stopper key at top of newroot page
2277 // and increase the root height
2279 nxt -= BtId + sizeof(BtVal);
2280 bt_putid (value, right->page_no);
2281 val = (BtVal *)((unsigned char *)root->page + nxt);
2282 memcpy (val->value, value, BtId);
2285 nxt -= 2 + sizeof(BtKey);
2286 slotptr(root->page, 2)->off = nxt;
2287 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2292 // insert lower keys page fence key on newroot page as first key
2294 nxt -= BtId + sizeof(BtVal);
2295 bt_putid (value, left_page_no);
2296 val = (BtVal *)((unsigned char *)root->page + nxt);
2297 memcpy (val->value, value, BtId);
2300 ptr = (BtKey *)leftkey;
2301 nxt -= ptr->len + sizeof(BtKey);
2302 slotptr(root->page, 1)->off = nxt;
2303 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2305 bt_putid(root->page->right, 0);
2306 root->page->min = nxt; // reset lowest used offset and key count
2307 root->page->cnt = 2;
2308 root->page->act = 2;
2311 mgr->pagezero->alloc->lvl = root->page->lvl;
2313 // release and unpin root pages
2315 bt_unlockpage(BtLockWrite, root->latch);
2316 bt_unpinlatch (mgr, root->latch);
2318 bt_unpinlatch (mgr, right);
2322 // split already locked full node
2324 // return pool entry for new right
2325 // page, pinned & unlocked
2327 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2329 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2330 BtPage frame = malloc (mgr->page_size);
2331 uint lvl = set->page->lvl;
2338 // split higher half of keys to frame
2340 memset (frame, 0, mgr->page_size);
2341 max = set->page->cnt;
2345 while( cnt++ < max ) {
2346 if( cnt < max || set->page->lvl )
2347 if( slotptr(set->page, cnt)->dead )
2350 src = valptr(set->page, cnt);
2351 nxt -= src->len + sizeof(BtVal);
2352 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2354 key = keyptr(set->page, cnt);
2355 nxt -= key->len + sizeof(BtKey);
2356 ptr = (BtKey*)((unsigned char *)frame + nxt);
2357 memcpy (ptr, key, key->len + sizeof(BtKey));
2359 // add librarian slot
2361 slotptr(frame, ++idx)->off = nxt;
2362 slotptr(frame, idx)->type = Librarian;
2363 slotptr(frame, idx)->dead = 1;
2367 slotptr(frame, ++idx)->off = nxt;
2368 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2370 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2374 frame->bits = mgr->page_bits;
2381 if( set->latch->page_no > ROOT_page )
2382 bt_putid (frame->right, bt_getid (set->page->right));
2384 // get new free page and write higher keys to it.
2386 if( bt_newpage(mgr, right, frame, thread_no) )
2389 // process lower keys
2391 memcpy (frame, set->page, mgr->page_size);
2392 memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2393 set->latch->dirty = 1;
2395 nxt = mgr->page_size;
2396 set->page->garbage = 0;
2402 if( slotptr(frame, max)->type == Librarian )
2405 // assemble page of smaller keys
2407 while( cnt++ < max ) {
2408 if( slotptr(frame, cnt)->dead )
2410 val = valptr(frame, cnt);
2411 nxt -= val->len + sizeof(BtVal);
2412 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2414 key = keyptr(frame, cnt);
2415 nxt -= key->len + sizeof(BtKey);
2416 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2418 // add librarian slot
2420 slotptr(set->page, ++idx)->off = nxt;
2421 slotptr(set->page, idx)->type = Librarian;
2422 slotptr(set->page, idx)->dead = 1;
2426 slotptr(set->page, ++idx)->off = nxt;
2427 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2431 bt_putid(set->page->right, right->latch->page_no);
2432 set->page->min = nxt;
2433 set->page->cnt = idx;
2436 return right->latch - mgr->latchsets;
2439 // fix keys for newly split page
2440 // call with both pages pinned & locked
2441 // return unlocked and unpinned
2443 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2445 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2446 unsigned char value[BtId];
2447 uint lvl = set->page->lvl;
2451 // if current page is the root page, split it
2453 if( set->latch->page_no == ROOT_page )
2454 return bt_splitroot (mgr, set, right, thread_no);
2456 ptr = keyptr(set->page, set->page->cnt);
2457 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2459 page = bt_mappage (mgr, right);
2461 ptr = keyptr(page, page->cnt);
2462 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2464 // insert new fences in their parent pages
2466 bt_lockpage (BtLockParent, right, thread_no);
2468 bt_lockpage (BtLockParent, set->latch, thread_no);
2469 bt_unlockpage (BtLockWrite, set->latch);
2471 // insert new fence for reformulated left block of smaller keys
2473 bt_putid (value, set->latch->page_no);
2474 ptr = (BtKey *)leftkey;
2476 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2479 // switch fence for right block of larger keys to new right page
2481 bt_putid (value, right->page_no);
2482 ptr = (BtKey *)rightkey;
2484 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2487 bt_unlockpage (BtLockParent, set->latch);
2488 bt_unpinlatch (mgr, set->latch);
2490 bt_unlockpage (BtLockParent, right);
2491 bt_unpinlatch (mgr, right);
2495 // install new key and value onto page
2496 // page must already be checked for
2499 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2501 uint idx, librarian;
2507 // if found slot > desired slot and previous slot
2508 // is a librarian slot, use it
2511 if( slotptr(set->page, slot-1)->type == Librarian )
2514 // copy value onto page
2516 set->page->min -= vallen + sizeof(BtVal);
2517 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2518 memcpy (val->value, value, vallen);
2521 // copy key onto page
2523 set->page->min -= keylen + sizeof(BtKey);
2524 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2525 memcpy (ptr->key, key, keylen);
2528 // find first empty slot
2530 for( idx = slot; idx < set->page->cnt; idx++ )
2531 if( slotptr(set->page, idx)->dead )
2534 // now insert key into array before slot,
2535 // adding as many librarian slots as
2538 if( idx == set->page->cnt ) {
2539 int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2541 librarian = ++idx - slot;
2542 avail /= sizeof(BtSlot);
2547 if( librarian > avail )
2551 rate = (idx - slot) / librarian;
2552 set->page->cnt += librarian;
2557 librarian = 0, rate = 0;
2559 while( idx > slot ) {
2561 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2564 // add librarian slot per rate
2567 if( (idx - slot + 1)/2 <= librarian * rate ) {
2568 node = slotptr(set->page, idx--);
2569 node->off = node[1].off;
2570 node->type = Librarian;
2576 set->latch->dirty = 1;
2581 node = slotptr(set->page, slot);
2582 node->off = set->page->min;
2587 bt_unlockpage (BtLockWrite, set->latch);
2588 bt_unpinlatch (mgr, set->latch);
2594 // Insert new key into the btree at given level.
2595 // either add a new key or update/add an existing one
2597 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2599 uint slot, idx, len, entry;
2605 while( 1 ) { // find the page and slot for the current key
2606 if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2607 node = slotptr(set->page, slot);
2608 ptr = keyptr(set->page, slot);
2611 mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2615 // if librarian slot == found slot, advance to real slot
2617 if( node->type == Librarian )
2618 if( !keycmp (ptr, key, keylen) ) {
2619 ptr = keyptr(set->page, ++slot);
2620 node = slotptr(set->page, slot);
2623 // if inserting a duplicate key or unique
2624 // key that doesn't exist on the page,
2625 // check for adequate space on the page
2626 // and insert the new key before slot.
2631 if( keycmp (ptr, key, keylen) )
2632 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2633 return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
2634 else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2636 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2641 // if key already exists, update value and return
2643 val = valptr(set->page, slot);
2645 if( val->len >= vallen ) {
2646 if( slotptr(set->page, slot)->dead )
2651 set->page->garbage += val->len - vallen;
2652 set->latch->dirty = 1;
2654 memcpy (val->value, value, vallen);
2655 bt_unlockpage(BtLockWrite, set->latch);
2656 bt_unpinlatch (mgr, set->latch);
2660 // new update value doesn't fit in existing value area
2661 // make sure page has room
2664 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2671 if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2672 if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2674 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2679 // copy key and value onto page and update slot
2681 set->page->min -= vallen + sizeof(BtVal);
2682 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2683 memcpy (val->value, value, vallen);
2686 set->latch->dirty = 1;
2687 set->page->min -= keylen + sizeof(BtKey);
2688 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2689 memcpy (ptr->key, key, keylen);
2692 node->off = set->page->min;
2693 bt_unlockpage(BtLockWrite, set->latch);
2694 bt_unpinlatch (mgr, set->latch);
2701 // determine actual page where key is located
2702 // return slot number
2704 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2706 BtKey *key = keyptr(source,src);
2707 uint slot = locks[src].slot;
2710 if( src > 1 && locks[src].reuse )
2711 entry = locks[src-1].entry, slot = 0;
2713 entry = locks[src].entry;
2716 set->latch = mgr->latchsets + entry;
2717 set->page = bt_mappage (mgr, set->latch);
2721 // is locks->reuse set? or was slot zeroed?
2722 // if so, find where our key is located
2723 // on current page or pages split on
2724 // same page txn operations.
2727 set->latch = mgr->latchsets + entry;
2728 set->page = bt_mappage (mgr, set->latch);
2730 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2731 if( slotptr(set->page, slot)->type == Librarian )
2733 if( locks[src].reuse )
2734 locks[src].entry = entry;
2737 } while( entry = set->latch->split );
2739 mgr->line = __LINE__, mgr->err = BTERR_atomic;
2743 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2745 BtKey *key = keyptr(source, src);
2746 BtVal *val = valptr(source, src);
2751 while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2752 if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2753 if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2755 set->page->lsn = lsn;
2761 if( entry = bt_splitpage (mgr, set, thread_no) )
2762 latch = mgr->latchsets + entry;
2766 // splice right page into split chain
2769 bt_lockpage(BtLockWrite, latch, thread_no);
2770 latch->split = set->latch->split;
2771 set->latch->split = entry;
2772 locks[src].slot = 0;
2775 return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2778 // perform delete from smaller btree
2779 // insert a delete slot if not found there
2781 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2783 BtKey *key = keyptr(source, src);
2790 if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2791 node = slotptr(set->page, slot);
2792 ptr = keyptr(set->page, slot);
2793 val = valptr(set->page, slot);
2795 return mgr->line = __LINE__, mgr->err = BTERR_struct;
2797 // if slot is not found, insert a delete slot
2799 if( keycmp (ptr, key->key, key->len) )
2800 return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
2802 // if node is already dead,
2803 // ignore the request.
2808 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2809 set->latch->dirty = 1;
2810 set->page->lsn = lsn;
2814 __sync_fetch_and_add(&mgr->found, 1);
2818 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2820 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2821 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2823 return keycmp (key1, key2->key, key2->len);
2825 // atomic modification of a batch of keys.
2827 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2829 uint src, idx, slot, samepage, entry, que = 0;
2830 BtKey *key, *ptr, *key2;
2836 // stable sort the list of keys into order to
2837 // prevent deadlocks between threads.
2839 for( src = 1; src++ < source->cnt; ) {
2840 *temp = *slotptr(source,src);
2841 key = keyptr (source,src);
2843 for( idx = src; --idx; ) {
2844 key2 = keyptr (source,idx);
2845 if( keycmp (key, key2->key, key2->len) < 0 ) {
2846 *slotptr(source,idx+1) = *slotptr(source,idx);
2847 *slotptr(source,idx) = *temp;
2853 qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2854 // add entries to redo log
2856 if( bt->mgr->pagezero->redopages )
2857 lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2861 // perform the individual actions in the transaction
2863 if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2864 return bt->mgr->err;
2866 // if number of active pages
2867 // is greater than the buffer pool
2868 // promote page into larger btree
2871 while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
2872 if( bt_txnpromote (bt) )
2873 return bt->mgr->err;
2880 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2882 uint src, idx, slot, samepage, entry, que = 0;
2883 BtPageSet set[1], prev[1], right[1];
2884 unsigned char value[BtId];
2892 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2894 // Load the leaf page for each key
2895 // group same page references with reuse bit
2896 // and determine any constraint violations
2898 for( src = 0; src++ < source->cnt; ) {
2899 key = keyptr(source, src);
2902 // first determine if this modification falls
2903 // on the same page as the previous modification
2904 // note that the far right leaf page is a special case
2906 if( samepage = src > 1 )
2907 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2908 slot = bt_findslot(set->page, key->key, key->len);
2911 if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockAtomic + BtLockWrite, thread_no) )
2912 set->latch->split = 0;
2916 if( slotptr(set->page, slot)->type == Librarian )
2920 locks[src].entry = set->latch - mgr->latchsets;
2921 locks[src].slot = slot;
2922 locks[src].reuse = 0;
2924 locks[src].entry = 0;
2925 locks[src].slot = 0;
2926 locks[src].reuse = 1;
2929 // capture current lsn for master page
2931 locks[src].reqlsn = set->page->lsn;
2934 // insert or delete each key
2935 // process any splits or merges
2936 // run through txn list backwards
2938 samepage = source->cnt + 1;
2940 for( src = source->cnt; src; src-- ) {
2941 if( locks[src].reuse )
2944 // perform the txn operations
2945 // from smaller to larger on
2948 for( idx = src; idx < samepage; idx++ )
2949 switch( slotptr(source,idx)->type ) {
2951 if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
2957 if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
2962 bt_atomicpage (mgr, source, locks, idx, set);
2966 // after the same page operations have finished,
2967 // process master page for splits or deletion.
2969 latch = prev->latch = mgr->latchsets + locks[src].entry;
2970 prev->page = bt_mappage (mgr, prev->latch);
2973 // pick-up all splits from master page
2974 // each one is already pinned & WriteLocked.
2976 if( entry = latch->split ) do {
2977 set->latch = mgr->latchsets + entry;
2978 set->page = bt_mappage (mgr, set->latch);
2980 // delete empty master page by undoing its split
2981 // (this is potentially another empty page)
2983 if( !prev->page->act ) {
2984 memcpy (set->page->left, prev->page->left, BtId);
2985 memcpy (prev->page, set->page, mgr->page_size);
2986 bt_lockpage (BtLockDelete, set->latch, thread_no);
2987 prev->latch->split = set->latch->split;
2988 prev->latch->dirty = 1;
2989 bt_freepage (mgr, set);
2993 // remove empty split page from the split chain
2994 // and return it to the free list. No other
2995 // thread has its page number yet.
2997 if( !set->page->act ) {
2998 memcpy (prev->page->right, set->page->right, BtId);
2999 prev->latch->split = set->latch->split;
3001 bt_lockpage (BtLockDelete, set->latch, thread_no);
3002 bt_freepage (mgr, set);
3006 // update prev's fence key
3008 ptr = keyptr(prev->page,prev->page->cnt);
3009 bt_putid (value, prev->latch->page_no);
3011 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3014 // splice in the left link into the split page
3016 bt_putid (set->page->left, prev->latch->page_no);
3019 bt_syncpage (mgr, prev->page, prev->latch);
3021 // page is unlocked & unpinned below to avoid bt_txnpromote
3024 } while( entry = prev->latch->split );
3026 // update left pointer in next right page from last split page
3027 // (if all splits were reversed or none occurred, latch->split == 0)
3029 if( latch->split ) {
3030 // fix left pointer in master's original (now split)
3031 // far right sibling or set rightmost page in page zero
3033 if( right_page_no = bt_getid (prev->page->right) ) {
3034 if( set->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
3035 set->page = bt_mappage (mgr, set->latch);
3039 bt_lockpage (BtLockLink, set->latch, thread_no);
3040 bt_putid (set->page->left, prev->latch->page_no);
3041 set->latch->dirty = 1;
3043 bt_unlockpage (BtLockLink, set->latch);
3044 bt_unpinlatch (mgr, set->latch);
3045 } else { // prev is rightmost page
3046 bt_mutexlock (mgr->lock);
3047 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3048 bt_releasemutex(mgr->lock);
3051 // Process last page split in chain
3052 // by switching the key from the master
3053 // page to the last split.
3055 ptr = keyptr(prev->page,prev->page->cnt);
3056 bt_putid (value, prev->latch->page_no);
3058 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3062 bt_syncpage (mgr, prev->page, prev->latch);
3064 // unlock and unpin master page
3066 bt_unlockpage(BtLockAtomic, latch);
3067 bt_unlockpage(BtLockWrite, latch);
3068 bt_unpinlatch(mgr, latch);
3070 // go through the list of splits and
3071 // release the locks and unpin
3073 while( entry = latch->split ) {
3074 latch = mgr->latchsets + entry;
3075 bt_unlockpage(BtLockWrite, latch);
3076 bt_unpinlatch(mgr, latch);
3082 // since there are no splits, we're
3083 // finished if master page occupied
3085 bt_unlockpage(BtLockAtomic, prev->latch);
3087 if( prev->page->act ) {
3088 bt_unlockpage(BtLockWrite, prev->latch);
3091 bt_syncpage (mgr, prev->page, prev->latch);
3093 bt_unpinlatch(mgr, prev->latch);
3097 // any and all splits were reversed, and the
3098 // master page located in prev is empty, delete it
3100 if( entry = bt_deletepage (mgr, prev, thread_no, BtLockWrite) )
3101 right->latch = mgr->latchsets + entry;
3105 // obtain delete and write locks to right node
3107 bt_unlockpage (BtLockParent, right->latch);
3108 right->page = bt_mappage (mgr, right->latch);
3109 bt_lockpage (BtLockDelete, right->latch, thread_no);
3110 bt_lockpage (BtLockWrite, right->latch, thread_no);
3111 bt_freepage (mgr, right);
3113 bt_unpinlatch (mgr, prev->latch);
3120 // promote a page into the larger btree
3122 BTERR bt_txnpromote (BtDb *bt)
3124 BtPageSet set[1], right[1];
3125 uint entry, slot, idx;
3132 entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
3134 entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
3136 entry %= bt->mgr->latchtotal;
3141 set->latch = bt->mgr->latchsets + entry;
3143 if( !bt_mutextry(set->latch->modify) )
3146 // skip this entry if it is pinned
3148 if( set->latch->pin & ~CLOCK_bit ) {
3149 bt_releasemutex(set->latch->modify);
3153 set->page = bt_mappage (bt->mgr, set->latch);
3155 // entry never used or has no right sibling
3157 if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3158 bt_releasemutex(set->latch->modify);
3162 // entry interiour node or being killed
3164 if( set->page->lvl || set->page->free || set->page->kill ) {
3165 bt_releasemutex(set->latch->modify);
3169 // pin the page for our useage
3172 bt_releasemutex(set->latch->modify);
3173 bt_lockpage (BtLockAtomic | BtLockWrite, set->latch, bt->thread_no);
3174 memcpy (bt->frame, set->page, bt->mgr->page_size);
3176 if( !(set->latch->page_no % 100) )
3177 fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act);
3179 if( entry = bt_deletepage (bt->mgr, set, bt->thread_no, BtLockAtomic | BtLockWrite) )
3180 right->latch = bt->mgr->latchsets + entry;
3182 return bt->mgr->err;
3184 // obtain delete and write locks to right node
3186 bt_unlockpage (BtLockParent, right->latch);
3187 right->page = bt_mappage (bt->mgr, right->latch);
3189 // release page with its new contents
3191 bt_unlockpage (BtLockAtomic, set->latch);
3192 bt_unpinlatch (bt->mgr, set->latch);
3194 // transfer slots in our selected page to larger btree
3196 if( bt_atomicexec (bt->main, bt->frame, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) )
3197 return bt->main->err;
3199 // free the page we took over
3201 bt_lockpage (BtLockDelete, right->latch, bt->thread_no);
3202 bt_lockpage (BtLockWrite, right->latch, bt->thread_no);
3203 bt_freepage (bt->mgr, right);
3208 // set cursor to highest slot on highest page
3210 uint bt_lastkey (BtDb *bt)
3212 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3215 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3216 set->page = bt_mappage (bt->mgr, set->latch);
3220 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3221 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3222 bt_unlockpage(BtLockRead, set->latch);
3223 bt_unpinlatch (bt->mgr, set->latch);
3224 return bt->cursor->cnt;
3227 // return previous slot on cursor page
3229 uint bt_prevkey (BtDb *bt, uint slot)
3231 uid cursor_page = bt->cursor->page_no;
3232 uid ourright, next, us = cursor_page;
3238 ourright = bt_getid(bt->cursor->right);
3241 if( !(next = bt_getid(bt->cursor->left)) )
3247 if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3248 set->page = bt_mappage (bt->mgr, set->latch);
3252 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3253 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3254 bt_unlockpage(BtLockRead, set->latch);
3255 bt_unpinlatch (bt->mgr, set->latch);
3257 next = bt_getid (bt->cursor->right);
3259 if( bt->cursor->kill )
3263 if( next == ourright )
3268 return bt->cursor->cnt;
3271 // return next slot on cursor page
3272 // or slide cursor right into next page
3274 uint bt_nextkey (BtDb *bt, uint slot)
3280 right = bt_getid(bt->cursor->right);
3282 while( slot++ < bt->cursor->cnt )
3283 if( slotptr(bt->cursor,slot)->dead )
3285 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3293 if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3294 set->page = bt_mappage (bt->mgr, set->latch);
3298 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3299 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3300 bt_unlockpage(BtLockRead, set->latch);
3301 bt_unpinlatch (bt->mgr, set->latch);
3306 return bt->mgr->err = 0;
3309 // cache page of keys into cursor and return starting slot for given key
3311 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3316 // cache page for retrieval
3318 if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3319 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3323 bt_unlockpage(BtLockRead, set->latch);
3324 bt_unpinlatch (bt->mgr, set->latch);
3331 double getCpuTime(int type)
3334 FILETIME xittime[1];
3335 FILETIME systime[1];
3336 FILETIME usrtime[1];
3337 SYSTEMTIME timeconv[1];
3340 memset (timeconv, 0, sizeof(SYSTEMTIME));
3344 GetSystemTimeAsFileTime (xittime);
3345 FileTimeToSystemTime (xittime, timeconv);
3346 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3349 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3350 FileTimeToSystemTime (usrtime, timeconv);
3353 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3354 FileTimeToSystemTime (systime, timeconv);
3358 ans += (double)timeconv->wHour * 3600;
3359 ans += (double)timeconv->wMinute * 60;
3360 ans += (double)timeconv->wSecond;
3361 ans += (double)timeconv->wMilliseconds / 1000;
3366 #include <sys/resource.h>
3368 double getCpuTime(int type)
3370 struct rusage used[1];
3371 struct timeval tv[1];
3375 gettimeofday(tv, NULL);
3376 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3379 getrusage(RUSAGE_SELF, used);
3380 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3383 getrusage(RUSAGE_SELF, used);
3384 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3391 void bt_poolaudit (BtMgr *mgr)
3396 while( ++entry < mgr->latchtotal ) {
3397 latch = mgr->latchsets + entry;
3399 if( *latch->readwr->value )
3400 fprintf(stderr, "latchset %d wrtlocked for page %d\n", entry, latch->page_no);
3402 if( *latch->access->value )
3403 fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3405 if( *latch->parent->value )
3406 fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3408 if( *latch->atomic->value )
3409 fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3411 if( *latch->modify->value )
3412 fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3414 if( latch->pin & ~CLOCK_bit )
3415 fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3428 // standalone program to index file of keys
3429 // then list them onto std-out
3432 void *index_file (void *arg)
3434 uint __stdcall index_file (void *arg)
3437 int line = 0, found = 0, cnt = 0, idx;
3438 uid next, page_no = LEAF_page; // start on first page of leaves
3439 int ch, len = 0, slot, type = 0;
3440 unsigned char key[BT_maxkey];
3441 unsigned char txn[65536];
3442 ThreadArg *args = arg;
3451 bt = bt_open (args->mgr, args->main);
3454 if( args->idx < strlen (args->type) )
3455 ch = args->type[args->idx];
3457 ch = args->type[strlen(args->type) - 1];
3469 if( type == Delete )
3470 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3472 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3474 if( type == Delete )
3475 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3477 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3479 if( in = fopen (args->infile, "rb") )
3480 while( ch = getc(in), ch != EOF )
3486 if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3487 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3493 memcpy (txn + nxt, key + 10, len - 10);
3495 txn[nxt] = len - 10;
3497 memcpy (txn + nxt, key, 10);
3500 slotptr(page,++cnt)->off = nxt;
3501 slotptr(page,cnt)->type = type;
3504 if( cnt < args->num )
3511 if( bt_atomictxn (bt, page) )
3512 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3517 else if( len < BT_maxkey )
3519 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3523 fprintf(stderr, "started indexing for %s\n", args->infile);
3524 if( in = fopen (args->infile, "r") )
3525 while( ch = getc(in), ch != EOF )
3530 if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3531 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3534 else if( len < BT_maxkey )
3536 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3540 fprintf(stderr, "started finding keys for %s\n", args->infile);
3541 if( in = fopen (args->infile, "rb") )
3542 while( ch = getc(in), ch != EOF )
3546 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3548 else if( bt->mgr->err )
3549 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3552 else if( len < BT_maxkey )
3554 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3558 fprintf(stderr, "started scanning\n");
3561 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3562 set->page = bt_mappage (bt->mgr, set->latch);
3564 fprintf(stderr, "unable to obtain latch"), exit(1);
3566 bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3567 next = bt_getid (set->page->right);
3569 for( slot = 0; slot++ < set->page->cnt; )
3570 if( next || slot < set->page->cnt )
3571 if( !slotptr(set->page, slot)->dead ) {
3572 ptr = keyptr(set->page, slot);
3575 if( slotptr(set->page, slot)->type == Duplicate )
3578 fwrite (ptr->key, len, 1, stdout);
3579 val = valptr(set->page, slot);
3580 fwrite (val->value, val->len, 1, stdout);
3581 fputc ('\n', stdout);
3585 bt_unlockpage (BtLockRead, set->latch);
3586 bt_unpinlatch (bt->mgr, set->latch);
3587 } while( page_no = next );
3589 fprintf(stderr, " Total keys read %d\n", cnt);
3593 fprintf(stderr, "started reverse scan\n");
3594 if( slot = bt_lastkey (bt) )
3595 while( slot = bt_prevkey (bt, slot) ) {
3596 if( slotptr(bt->cursor, slot)->dead )
3599 ptr = keyptr(bt->cursor, slot);
3602 if( slotptr(bt->cursor, slot)->type == Duplicate )
3605 fwrite (ptr->key, len, 1, stdout);
3606 val = valptr(bt->cursor, slot);
3607 fwrite (val->value, val->len, 1, stdout);
3608 fputc ('\n', stdout);
3612 fprintf(stderr, " Total keys read %d\n", cnt);
3617 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3619 fprintf(stderr, "started counting\n");
3620 next = REDO_page + bt->mgr->pagezero->redopages;
3622 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3623 if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3626 if( !bt->cursor->free && !bt->cursor->lvl )
3627 cnt += bt->cursor->act;
3633 cnt--; // remove stopper key
3634 fprintf(stderr, " Total keys counted %d\n", cnt);
3646 typedef struct timeval timer;
3648 int main (int argc, char **argv)
3650 int idx, cnt, len, slot, err;
3651 int segsize, bits = 16;
3671 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3672 fprintf (stderr, " where idx_file is the name of the cache btree file\n");
3673 fprintf (stderr, " where main_file is the name of the main btree file\n");
3674 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3675 fprintf (stderr, " page_bits is the page size in bits for the cache btree\n");
3676 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3677 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3678 fprintf (stderr, " recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3679 fprintf (stderr, " main_bits is the page size of the main btree in bits\n");
3680 fprintf (stderr, " main_pool is the number of main pages in the main buffer pool\n");
3681 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3685 start = getCpuTime(0);
3688 bits = atoi(argv[4]);
3691 poolsize = atoi(argv[5]);
3694 fprintf (stderr, "Warning: no mapped_pool\n");
3697 num = atoi(argv[6]);
3700 redopages = atoi(argv[7]);
3702 if( redopages + REDO_page > 65535 )
3703 fprintf (stderr, "Warning: Recovery buffer too large\n");
3706 mainbits = atoi(argv[8]);
3709 mainpool = atoi(argv[9]);
3713 threads = malloc (cnt * sizeof(pthread_t));
3715 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3717 args = malloc ((cnt + 1) * sizeof(ThreadArg));
3719 mgr = bt_mgr (argv[1], bits, poolsize, redopages);
3722 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3727 main = bt_mgr (argv[2], mainbits, mainpool, 0);
3730 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3739 for( idx = 0; idx < cnt; idx++ ) {
3740 args[idx].infile = argv[idx + 10];
3741 args[idx].type = argv[3];
3742 args[idx].main = main;
3743 args[idx].mgr = mgr;
3744 args[idx].num = num;
3745 args[idx].idx = idx;
3747 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3748 fprintf(stderr, "Error creating thread %d\n", err);
3750 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3754 args[0].infile = argv[idx + 10];
3755 args[0].type = argv[3];
3756 args[0].main = main;
3763 // wait for termination
3766 for( idx = 0; idx < cnt; idx++ )
3767 pthread_join (threads[idx], NULL);
3769 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3771 for( idx = 0; idx < cnt; idx++ )
3772 CloseHandle(threads[idx]);
3779 fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
3785 elapsed = getCpuTime(0) - start;
3786 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3787 elapsed = getCpuTime(1);
3788 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3789 elapsed = getCpuTime(2);
3790 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);