1 // btree version threadskv10b futex version
2 // with reworked bt_deletekey code,
3 // phase-fair re-entrant reader writer locks,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // redo log for failure recovery
10 // and LSM B-trees for write optimization
14 // author: karl malbrain, malbrain@cal.berkeley.edu
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
20 The orginal author is Karl Malbrain.
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
38 #include <linux/futex.h>
53 #define WIN32_LEAN_AND_MEAN
67 typedef unsigned long long uid;
68 typedef unsigned long long logseqno;
71 typedef unsigned long long off64_t;
72 typedef unsigned short ushort;
73 typedef unsigned int uint;
76 #define BT_ro 0x6f72 // ro
77 #define BT_rw 0x7772 // rw
79 #define BT_maxbits 26 // maximum page size in bits
80 #define BT_minbits 9 // minimum page size in bits
81 #define BT_minpage (1 << BT_minbits) // minimum page size
82 #define BT_maxpage (1 << BT_maxbits) // maximum page size
84 // BTree page number constants
85 #define ALLOC_page 0 // allocation page
86 #define ROOT_page 1 // root of the btree
87 #define LEAF_page 2 // first page of leaves
88 #define REDO_page 3 // first page of redo buffer
90 // Number of levels to create in a new BTree
95 There are six lock types for each node in four independent sets:
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
101 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
117 volatile ushort xlock[1]; // one writer has exclusive lock
118 volatile ushort wrt[1]; // count of other writers waiting
127 // definition for reader/writer reentrant lock implementation
133 volatile ushort tid[1];
134 volatile ushort readers[1];
138 volatile ushort waitwrite[1];
139 volatile ushort waitread[1];
140 volatile ushort phase[1]; // phase == 1 for reading after write
141 volatile ushort dup[1]; // reentrant counter
144 // write only reentrant lock
150 volatile ushort tid[1];
151 volatile ushort dup[1];
155 volatile uint waiters[1];
158 // mode & definition for lite latch implementation
161 QueRd = 1, // reader queue
162 QueWr = 2 // writer queue
165 // hash table entries
168 uint entry; // Latch table entry at head of chain
169 BtMutexLatch latch[1];
172 // latch manager table structure
175 uid page_no; // latch set page number
176 RWLock readwr[1]; // read/write page lock
177 RWLock access[1]; // Access Intent/Page delete
178 RWLock parent[1]; // Posting of fence key in parent
179 RWLock atomic[1]; // Atomic update in progress
180 RWLock link[1]; // left link being updated
181 uint split; // right split page atomic insert
182 uint next; // next entry in hash table chain
183 uint prev; // prev entry in hash table chain
184 ushort pin; // number of accessing threads
185 unsigned char dirty; // page in cache is dirty (atomic setable)
186 BtMutexLatch modify[1]; // modify entry lite latch
189 // Define the length of the page record numbers
193 // Page key slot definition.
195 // Keys are marked dead, but remain on the page until
196 // it cleanup is called. The fence key (highest key) for
197 // a leaf page is always present, even after cleanup.
201 // In addition to the Unique keys that occupy slots
202 // there are Librarian and Duplicate key
203 // slots occupying the key slot array.
205 // The Librarian slots are dead keys that
206 // serve as filler, available to add new Unique
207 // or Dup slots that are inserted into the B-tree.
209 // The Duplicate slots have had their key bytes extended
210 // by 6 bytes to contain a binary duplicate key uniqueifier.
220 uint off:BT_maxbits; // page offset for key start
221 uint type:3; // type of slot
222 uint dead:1; // set for deleted slot
225 // The key structure occupies space at the upper end of
226 // each page. It's a length byte followed by the key
230 unsigned char len; // this can be changed to a ushort or uint
231 unsigned char key[0];
234 // the value structure also occupies space at the upper
235 // end of the page. Each key is immediately followed by a value.
238 unsigned char len; // this can be changed to a ushort or uint
239 unsigned char value[0];
242 #define BT_maxkey 255 // maximum number of bytes in a key
243 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
245 // The first part of an index page.
246 // It is immediately followed
247 // by the BtSlot array of keys.
249 // note that this structure size
250 // must be a multiple of 8 bytes
251 // in order to place PageZero correctly.
253 typedef struct BtPage_ {
254 uint cnt; // count of keys in page
255 uint act; // count of active keys
256 uint min; // next key offset
257 uint garbage; // page garbage in bytes
258 unsigned char bits:7; // page size in bits
259 unsigned char free:1; // page is on free chain
260 unsigned char lvl:7; // level of page
261 unsigned char kill:1; // page is being deleted
262 unsigned char right[BtId]; // page number to right
263 unsigned char left[BtId]; // page number to left
264 unsigned char filler[2]; // padding to multiple of 8
265 logseqno lsn; // log sequence number applied
266 uid page_no; // this page number
269 // The loadpage interface object
272 BtPage page; // current page pointer
273 BtLatchSet *latch; // current page latch set
276 // structure for latch manager on ALLOC_page
279 struct BtPage_ alloc[1]; // next page_no in right ptr
280 unsigned char freechain[BtId]; // head of free page_nos chain
281 unsigned long long activepages; // number of active pages
282 uint redopages; // number of redo pages in file
285 // The object structure for Btree access
288 uint page_size; // page size
289 uint page_bits; // page size in bits
295 BtPageZero *pagezero; // mapped allocation page
296 BtHashEntry *hashtable; // the buffer pool hash table entries
297 BtLatchSet *latchsets; // mapped latch set from buffer pool
298 unsigned char *pagepool; // mapped to the buffer pool pages
299 unsigned char *redobuff; // mapped recovery buffer pointer
300 logseqno lsn, flushlsn; // current & first lsn flushed
301 BtMutexLatch redo[1]; // redo area lite latch
302 BtMutexLatch lock[1]; // allocation area lite latch
303 BtMutexLatch maps[1]; // mapping segments lite latch
304 ushort thread_no[1]; // next thread number
305 uint nlatchpage; // number of latch pages at BT_latch
306 uint latchtotal; // number of page latch entries
307 uint latchhash; // number of latch hash table slots
308 uint latchvictim; // next latch entry to examine
309 uint latchpromote; // next latch entry to promote
310 uint redolast; // last msync size of recovery buff
311 uint redoend; // eof/end element in recovery buff
312 int err; // last error
313 int line; // last error line no
314 int found; // number of keys found by delete
315 int reads, writes; // number of reads and writes
317 HANDLE halloc; // allocation handle
318 HANDLE hpool; // buffer pool handle
320 uint segments; // number of memory mapped segments
321 unsigned char *pages[64000];// memory mapped segments of b-tree
325 BtMgr *mgr; // buffer manager for entire process
326 BtMgr *main; // buffer manager for main btree
327 BtPage frame; // cached page frame for promote
328 BtPage cursor; // cached page frame for start/next
329 ushort thread_no; // thread number
330 unsigned char key[BT_keyarray]; // last found complete key
333 // atomic txn structures
336 logseqno reqlsn; // redo log seq no required
337 uint entry; // latch table entry number
338 uint slot:31; // page slot number
339 uint reuse:1; // reused previous page
342 // Catastrophic errors
356 #define CLOCK_bit 0x8000
358 // recovery manager entry types
361 BTRM_eof = 0, // rest of buffer is emtpy
362 BTRM_add, // add a unique key-value to btree
363 BTRM_dup, // add a duplicate key-value to btree
364 BTRM_del, // delete a key-value from btree
365 BTRM_upd, // update a key with a new value
366 BTRM_new, // allocate a new empty page
367 BTRM_old // reuse an old empty page
370 // recovery manager entry
371 // structure followed by BtKey & BtVal
374 logseqno reqlsn; // log sequence number required
375 logseqno lsn; // log sequence number for entry
376 uint len; // length of entry
377 unsigned char type; // type of entry
378 unsigned char lvl; // level of btree entry pertains to
383 extern void bt_close (BtDb *bt);
384 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
385 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
386 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
387 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
388 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
389 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
390 extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
392 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
394 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
395 extern uint bt_nextkey (BtDb *bt, uint slot);
396 extern uint bt_prevkey (BtDb *db, uint slot);
397 extern uint bt_lastkey (BtDb *db);
400 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
401 extern void bt_mgrclose (BtMgr *mgr);
402 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
403 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
405 // atomic transaction functions
406 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
407 BTERR bt_txnpromote (BtDb *bt);
409 // The page is allocated from low and hi ends.
410 // The key slots are allocated from the bottom,
411 // while the text and value of the key
412 // are allocated from the top. When the two
413 // areas meet, the page is split into two.
415 // A key consists of a length byte, two bytes of
416 // index number (0 - 65535), and up to 253 bytes
419 // Associated with each key is a value byte string
420 // containing any value desired.
422 // The b-tree root is always located at page 1.
423 // The first leaf page of level zero is always
424 // located on page 2.
426 // The b-tree pages are linked with next
427 // pointers to facilitate enumerators,
428 // and provide for concurrency.
430 // When to root page fills, it is split in two and
431 // the tree height is raised by a new root at page
432 // one with two keys.
434 // Deleted keys are marked with a dead bit until
435 // page cleanup. The fence key for a leaf node is
438 // To achieve maximum concurrency one page is locked at a time
439 // as the tree is traversed to find leaf key in question. The right
440 // page numbers are used in cases where the page is being split,
443 // Page 0 is dedicated to lock for new page extensions,
444 // and chains empty pages together for reuse. It also
445 // contains the latch manager hash table.
447 // The ParentModification lock on a node is obtained to serialize posting
448 // or changing the fence key for a node.
450 // Empty pages are chained together through the ALLOC page and reused.
452 // Access macros to address slot and key values from the page
453 // Page slots use 1 based indexing.
455 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
456 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
457 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
459 void bt_putid(unsigned char *dest, uid id)
464 dest[i] = (unsigned char)id, id >>= 8;
467 uid bt_getid(unsigned char *src)
472 for( i = 0; i < BtId; i++ )
473 id <<= 8, id |= *src++;
478 // lite weight spin lock Latch Manager
480 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
482 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
485 void bt_mutexlock(BtMutexLatch *latch)
487 BtMutexLatch prev[1];
491 *prev->value = __sync_fetch_and_or(latch->value, XCL);
493 if( !*prev->bits->xlock ) { // did we set XCL?
495 __sync_fetch_and_sub(latch->value, WRT);
500 *prev->bits->wrt += 1;
501 __sync_fetch_and_add(latch->value, WRT);
504 sys_futex (latch->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr);
509 // try to obtain write lock
511 // return 1 if obtained,
514 int bt_mutextry(BtMutexLatch *latch)
516 BtMutexLatch prev[1];
518 *prev->value = __sync_fetch_and_or(latch->value, XCL);
520 // take write access if exclusive bit was clear
522 return !*prev->bits->xlock;
527 void bt_releasemutex(BtMutexLatch *latch)
529 BtMutexLatch prev[1];
531 *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
533 if( *prev->bits->wrt )
534 sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
537 // reentrant reader/writer lock implementation
539 void WriteLock (RWLock *lock, ushort tid)
545 bt_mutexlock(lock->xcl);
548 // is this a re-entrant request?
550 if( *prev->bits->tid == tid )
553 // wait if write already taken, or there are readers
555 else if( *prev->bits->tid || *prev->bits->readers ) {
557 waited++, *lock->waitwrite += 1;
559 // otherwise, we can take the lock
563 *lock->waitwrite -= 1;
565 *lock->bits->tid = tid;
566 *lock->phase = 0; // set writing phase
569 bt_releasemutex(lock->xcl);
571 if( *lock->bits->tid == tid )
574 sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr );
578 void WriteRelease (RWLock *lock)
580 bt_mutexlock(lock->xcl);
582 // were we reentrant?
586 bt_releasemutex(lock->xcl);
590 // release write lock and
591 // set reading after write phase
593 *lock->bits->tid = 0;
595 // were readers waiting for a write cycle?
597 if( *lock->waitread ) {
599 sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 32768, NULL, NULL, QueRd );
601 // otherwise were writers waiting
603 } else if( *lock->waitwrite ) {
605 sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
608 bt_releasemutex(lock->xcl);
611 void ReadLock (RWLock *lock, ushort tid)
613 uint xit, waited = 0;
617 bt_mutexlock(lock->xcl);
621 // wait if a write lock is currenty active
622 // or we are not in a new read cycle and
623 // writers are waiting.
625 if( *prev->bits->tid || !*prev->phase && *prev->waitwrite ) {
627 waited++, *lock->waitread += 1;
629 // else we can take the lock
633 *lock->waitread -= 1;
635 *lock->bits->readers += 1;
639 bt_releasemutex(lock->xcl);
641 // did we increment readers?
646 sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueRd );
650 void ReadRelease (RWLock *lock)
654 bt_mutexlock(lock->xcl);
657 *prev->bits->readers = *lock->bits->readers -= 1;
659 if( !*lock->waitread && *lock->waitwrite )
660 *prev->phase = *lock->phase = 0; // stop accepting new readers
662 bt_releasemutex(lock->xcl);
664 // were writers waiting for a read cycle to finish?
666 if( !*prev->phase && !*prev->bits->readers )
667 if( *prev->waitwrite )
668 sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
671 // recovery manager -- flush dirty pages
673 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
675 uint cnt3 = 0, cnt2 = 0, cnt = 0;
680 // flush dirty pool pages to the btree
682 fprintf(stderr, "Start flushlsn ");
683 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
684 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
685 latch = mgr->latchsets + entry;
686 bt_mutexlock (latch->modify);
687 bt_lockpage(BtLockRead, latch, thread_no);
690 bt_writepage(mgr, page, latch->page_no);
691 latch->dirty = 0, cnt++;
693 if( latch->pin & ~CLOCK_bit )
695 bt_unlockpage(BtLockRead, latch);
696 bt_releasemutex (latch->modify);
698 fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
699 fprintf(stderr, "begin sync");
700 for( segment = 0; segment < mgr->segments; segment++ )
701 if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
702 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
703 fprintf(stderr, " end sync\n");
706 // recovery manager -- process current recovery buff on startup
707 // this won't do much if previous session was properly closed.
709 BTERR bt_recoveryredo (BtMgr *mgr)
716 hdr = (BtLogHdr *)mgr->redobuff;
717 mgr->flushlsn = hdr->lsn;
720 hdr = (BtLogHdr *)(mgr->redobuff + offset);
721 switch( hdr->type ) {
725 case BTRM_add: // add a unique key-value to btree
727 case BTRM_dup: // add a duplicate key-value to btree
728 case BTRM_del: // delete a key-value from btree
729 case BTRM_upd: // update a key with a new value
730 case BTRM_new: // allocate a new empty page
731 case BTRM_old: // reuse an old empty page
737 // recovery manager -- append new entry to recovery log
738 // flush dirty pages to disk when it overflows.
740 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
742 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
743 uint amt = sizeof(BtLogHdr);
747 bt_mutexlock (mgr->redo);
750 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
752 // see if new entry fits in buffer
753 // flush and reset if it doesn't
755 if( amt > size - mgr->redoend ) {
756 mgr->flushlsn = mgr->lsn;
757 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
758 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
761 bt_flushlsn(mgr, thread_no);
764 // fill in new entry & either eof or end block
766 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
771 hdr->lsn = ++mgr->lsn;
775 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
776 memset (eof, 0, sizeof(BtLogHdr));
778 // fill in key and value
781 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
782 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
785 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
786 memset (eof, 0, sizeof(BtLogHdr));
789 last = mgr->redolast & ~0xfff;
792 if( end - last + sizeof(BtLogHdr) >= 32768 )
793 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
794 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
798 bt_releasemutex(mgr->redo);
802 // recovery manager -- append transaction to recovery log
803 // flush dirty pages to disk when it overflows.
805 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
807 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
808 uint amt = 0, src, type;
815 // determine amount of redo recovery log space required
817 for( src = 0; src++ < source->cnt; ) {
818 key = keyptr(source,src);
819 val = valptr(source,src);
820 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
821 amt += sizeof(BtLogHdr);
824 bt_mutexlock (mgr->redo);
826 // see if new entry fits in buffer
827 // flush and reset if it doesn't
829 if( amt > size - mgr->redoend ) {
830 mgr->flushlsn = mgr->lsn;
831 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
832 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
835 bt_flushlsn (mgr, thread_no);
838 // assign new lsn to transaction
842 // fill in new entries
844 for( src = 0; src++ < source->cnt; ) {
845 key = keyptr(source, src);
846 val = valptr(source, src);
848 switch( slotptr(source, src)->type ) {
860 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
861 amt += sizeof(BtLogHdr);
863 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
869 // fill in key and value
871 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
872 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
877 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
878 memset (eof, 0, sizeof(BtLogHdr));
881 last = mgr->redolast & ~0xfff;
884 if( end - last + sizeof(BtLogHdr) >= 32768 )
885 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
886 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
890 bt_releasemutex(mgr->redo);
894 // sync a single btree page to disk
896 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
898 uint segment = latch->page_no >> 16;
901 if( bt_writepage (mgr, page, latch->page_no) )
904 perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
906 if( msync (perm, mgr->page_size, MS_SYNC) < 0 )
907 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
913 // read page into buffer pool from permanent location in Btree file
915 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
917 int flag = PROT_READ | PROT_WRITE;
918 uint segment = page_no >> 16;
922 if( segment < mgr->segments ) {
923 perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
925 memcpy (page, perm, mgr->page_size);
930 bt_mutexlock (mgr->maps);
932 if( segment < mgr->segments ) {
933 bt_releasemutex (mgr->maps);
937 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
940 bt_releasemutex (mgr->maps);
944 // write page to permanent location in Btree file
945 // clear the dirty bit
947 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
949 int flag = PROT_READ | PROT_WRITE;
950 uint segment = page_no >> 16;
954 if( segment < mgr->segments ) {
955 perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
957 memcpy (perm, page, mgr->page_size);
962 bt_mutexlock (mgr->maps);
964 if( segment < mgr->segments ) {
965 bt_releasemutex (mgr->maps);
969 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
970 bt_releasemutex (mgr->maps);
975 // set CLOCK bit in latch
976 // decrement pin count
978 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
980 bt_mutexlock(latch->modify);
981 latch->pin |= CLOCK_bit;
984 bt_releasemutex(latch->modify);
987 // return the btree cached page address
989 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
991 uid entry = latch - mgr->latchsets;
992 BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
997 // return next available latch entry
998 // and with latch entry locked
1000 uint bt_availnext (BtMgr *mgr)
1007 entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
1009 entry = _InterlockedIncrement (&mgr->latchvictim);
1011 entry %= mgr->latchtotal;
1016 latch = mgr->latchsets + entry;
1018 if( !bt_mutextry(latch->modify) )
1021 // return this entry if it is not pinned
1026 // if the CLOCK bit is set
1027 // reset it to zero.
1029 latch->pin &= ~CLOCK_bit;
1030 bt_releasemutex(latch->modify);
1034 // pin page in buffer pool
1035 // return with latchset pinned
1037 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
1039 uint hashidx = page_no % mgr->latchhash;
1044 // try to find our entry
1046 bt_mutexlock(mgr->hashtable[hashidx].latch);
1048 if( entry = mgr->hashtable[hashidx].entry ) do
1050 latch = mgr->latchsets + entry;
1051 if( page_no == latch->page_no )
1053 } while( entry = latch->next );
1055 // found our entry: increment pin
1058 latch = mgr->latchsets + entry;
1059 bt_mutexlock(latch->modify);
1060 latch->pin |= CLOCK_bit;
1063 bt_releasemutex(latch->modify);
1064 bt_releasemutex(mgr->hashtable[hashidx].latch);
1068 // find and reuse unpinned entry
1072 entry = bt_availnext (mgr);
1073 latch = mgr->latchsets + entry;
1075 idx = latch->page_no % mgr->latchhash;
1077 // if latch is on a different hash chain
1078 // unlink from the old page_no chain
1080 if( latch->page_no )
1081 if( idx != hashidx ) {
1083 // skip over this entry if latch not available
1085 if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1086 bt_releasemutex(latch->modify);
1091 mgr->latchsets[latch->prev].next = latch->next;
1093 mgr->hashtable[idx].entry = latch->next;
1096 mgr->latchsets[latch->next].prev = latch->prev;
1098 bt_releasemutex (mgr->hashtable[idx].latch);
1101 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1103 // update permanent page area in btree from buffer pool
1104 // no read-lock is required since page is not pinned.
1107 if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
1108 return mgr->line = __LINE__, NULL;
1113 memcpy (page, contents, mgr->page_size);
1115 } else if( bt_readpage (mgr, page, page_no) )
1116 return mgr->line = __LINE__, NULL;
1118 // link page as head of hash table chain
1119 // if this is a never before used entry,
1120 // or it was previously on a different
1121 // hash table chain. Otherwise, just
1122 // leave it in its current hash table
1125 if( !latch->page_no || hashidx != idx ) {
1126 if( latch->next = mgr->hashtable[hashidx].entry )
1127 mgr->latchsets[latch->next].prev = entry;
1129 mgr->hashtable[hashidx].entry = entry;
1133 // fill in latch structure
1135 latch->pin = CLOCK_bit | 1;
1136 latch->page_no = page_no;
1139 bt_releasemutex (latch->modify);
1140 bt_releasemutex (mgr->hashtable[hashidx].latch);
1144 void bt_mgrclose (BtMgr *mgr)
1152 // flush previously written dirty pages
1153 // and write recovery buffer to disk
1155 fdatasync (mgr->idx);
1157 if( mgr->redoend ) {
1158 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1159 memset (eof, 0, sizeof(BtLogHdr));
1162 // write remaining dirty pool pages to the btree
1164 for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1165 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1166 latch = mgr->latchsets + slot;
1168 if( latch->dirty ) {
1169 bt_writepage(mgr, page, latch->page_no);
1170 latch->dirty = 0, num++;
1174 // clear redo recovery buffer on disk.
1176 if( mgr->pagezero->redopages ) {
1177 eof = (BtLogHdr *)mgr->redobuff;
1178 memset (eof, 0, sizeof(BtLogHdr));
1179 eof->lsn = mgr->lsn;
1180 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1181 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1184 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1187 while( mgr->segments )
1188 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1190 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1191 munmap (mgr->pagezero, mgr->page_size);
1193 FlushViewOfFile(mgr->pagezero, 0);
1194 UnmapViewOfFile(mgr->pagezero);
1195 UnmapViewOfFile(mgr->pagepool);
1196 CloseHandle(mgr->halloc);
1197 CloseHandle(mgr->hpool);
1203 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1204 FlushFileBuffers(mgr->idx);
1205 CloseHandle(mgr->idx);
1210 // close and release memory
1212 void bt_close (BtDb *bt)
1221 VirtualFree (bt->frame, 0, MEM_RELEASE);
1223 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1228 // open/create new btree buffer manager
1230 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1231 // size of page pool (e.g. 262144)
1233 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1235 uint lvl, attr, last, slot, idx;
1236 uint nlatchpage, latchhash;
1237 unsigned char value[BtId];
1238 int flag, initit = 0;
1239 BtPageZero *pagezero;
1247 // determine sanity of page size and buffer pool
1249 if( bits > BT_maxbits )
1251 else if( bits < BT_minbits )
1254 mgr = calloc (1, sizeof(BtMgr));
1256 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1258 if( mgr->idx == -1 ) {
1259 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1260 return free(mgr), NULL;
1263 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1264 attr = FILE_ATTRIBUTE_NORMAL;
1265 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1267 if( mgr->idx == INVALID_HANDLE_VALUE ) {
1268 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1269 return GlobalFree(mgr), NULL;
1274 pagezero = valloc (BT_maxpage);
1277 // read minimum page size to get root info
1278 // to support raw disk partition files
1279 // check if bits == 0 on the disk.
1281 if( size = lseek (mgr->idx, 0L, 2) )
1282 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1283 if( pagezero->alloc->bits )
1284 bits = pagezero->alloc->bits;
1288 return free(mgr), free(pagezero), NULL;
1292 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1293 size = GetFileSize(mgr->idx, amt);
1295 if( size || *amt ) {
1296 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1297 return bt_mgrclose (mgr), NULL;
1298 bits = pagezero->alloc->bits;
1303 mgr->page_size = 1 << bits;
1304 mgr->page_bits = bits;
1306 // calculate number of latch hash table entries
1308 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1310 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1311 mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1312 mgr->latchtotal = nodemax;
1317 // initialize an empty b-tree with latch page, root page, page of leaves
1318 // and page(s) of latches and page pool cache
1320 memset (pagezero, 0, 1 << bits);
1321 pagezero->alloc->lvl = MIN_lvl - 1;
1322 pagezero->alloc->bits = mgr->page_bits;
1323 pagezero->redopages = redopages;
1325 bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1);
1326 pagezero->activepages = 2;
1328 // initialize left-most LEAF page in
1329 // alloc->left and count of active leaf pages.
1331 bt_putid (pagezero->alloc->left, LEAF_page);
1332 ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits);
1334 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1335 fprintf (stderr, "Unable to create btree page zero\n");
1336 return bt_mgrclose (mgr), NULL;
1339 memset (pagezero, 0, 1 << bits);
1340 pagezero->alloc->bits = mgr->page_bits;
1342 for( lvl=MIN_lvl; lvl--; ) {
1343 BtSlot *node = slotptr(pagezero->alloc, 1);
1344 node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1345 key = keyptr(pagezero->alloc, 1);
1346 key->len = 2; // create stopper key
1350 bt_putid(value, MIN_lvl - lvl + 1);
1351 val = valptr(pagezero->alloc, 1);
1352 val->len = lvl ? BtId : 0;
1353 memcpy (val->value, value, val->len);
1355 pagezero->alloc->min = node->off;
1356 pagezero->alloc->lvl = lvl;
1357 pagezero->alloc->cnt = 1;
1358 pagezero->alloc->act = 1;
1359 pagezero->alloc->page_no = MIN_lvl - lvl;
1361 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1362 fprintf (stderr, "Unable to create btree page\n");
1363 return bt_mgrclose (mgr), NULL;
1371 VirtualFree (pagezero, 0, MEM_RELEASE);
1374 // mlock the first segment of 64K pages
1376 flag = PROT_READ | PROT_WRITE;
1377 mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1380 if( mgr->pages[0] == MAP_FAILED ) {
1381 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1382 return bt_mgrclose (mgr), NULL;
1385 mgr->pagezero = (BtPageZero *)mgr->pages[0];
1386 mlock (mgr->pagezero, mgr->page_size);
1388 mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size;
1389 mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1391 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1392 if( mgr->pagepool == MAP_FAILED ) {
1393 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1394 return bt_mgrclose (mgr), NULL;
1397 flag = PAGE_READWRITE;
1398 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1399 if( !mgr->halloc ) {
1400 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1401 return bt_mgrclose (mgr), NULL;
1404 flag = FILE_MAP_WRITE;
1405 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1406 if( !mgr->pagezero ) {
1407 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1408 return bt_mgrclose (mgr), NULL;
1411 flag = PAGE_READWRITE;
1412 size = (uid)mgr->nlatchpage << mgr->page_bits;
1413 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1415 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1416 return bt_mgrclose (mgr), NULL;
1419 flag = FILE_MAP_WRITE;
1420 mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1421 if( !mgr->pagepool ) {
1422 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1423 return bt_mgrclose (mgr), NULL;
1427 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1428 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1429 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1434 // open BTree access method
1435 // based on buffer manager
1437 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1439 BtDb *bt = malloc (sizeof(*bt));
1441 memset (bt, 0, sizeof(*bt));
1445 bt->cursor = valloc (mgr->page_size);
1446 bt->frame = valloc (mgr->page_size);
1448 bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1449 bt->frame = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1452 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1454 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1459 // compare two keys, return > 0, = 0, or < 0
1460 // =0: keys are same
1463 // as the comparison value
1465 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1467 uint len1 = key1->len;
1470 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1481 // place write, read, or parent lock on requested page_no.
1483 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1487 ReadLock (latch->readwr, thread_no);
1490 WriteLock (latch->readwr, thread_no);
1493 ReadLock (latch->access, thread_no);
1496 WriteLock (latch->access, thread_no);
1499 WriteLock (latch->parent, thread_no);
1502 WriteLock (latch->atomic, thread_no);
1504 case BtLockAtomic | BtLockRead:
1505 WriteLock (latch->atomic, thread_no);
1506 ReadLock (latch->readwr, thread_no);
1508 case BtLockAtomic | BtLockWrite:
1509 WriteLock (latch->atomic, thread_no);
1510 WriteLock (latch->readwr, thread_no);
1513 WriteLock (latch->link, thread_no);
1518 // remove write, read, or parent lock on requested page
1520 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1524 ReadRelease (latch->readwr);
1527 WriteRelease (latch->readwr);
1530 ReadRelease (latch->access);
1533 WriteRelease (latch->access);
1536 WriteRelease (latch->parent);
1539 WriteRelease (latch->atomic);
1541 case BtLockAtomic | BtLockRead:
1542 WriteRelease (latch->atomic);
1543 ReadRelease (latch->readwr);
1545 case BtLockAtomic | BtLockWrite:
1546 WriteRelease (latch->atomic);
1547 WriteRelease (latch->readwr);
1550 WriteRelease (latch->link);
1555 // allocate a new page
1556 // return with page latched, but unlocked.
1558 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1563 // lock allocation page
1565 bt_mutexlock(mgr->lock);
1567 // use empty chain first
1568 // else allocate new page
1570 if( page_no = bt_getid(mgr->pagezero->freechain) ) {
1571 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1572 set->page = bt_mappage (mgr, set->latch);
1574 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1576 mgr->pagezero->activepages++;
1577 bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
1579 // the page is currently free and this
1580 // will keep bt_txnpromote out.
1582 // contents will replace this bit
1583 // and pin will keep bt_txnpromote out
1585 contents->page_no = page_no;
1586 set->latch->dirty = 1;
1588 memcpy (set->page, contents, mgr->page_size);
1590 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1591 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1593 bt_releasemutex(mgr->lock);
1597 page_no = bt_getid(mgr->pagezero->alloc->right);
1598 bt_putid(mgr->pagezero->alloc->right, page_no+1);
1600 // unlock allocation latch and
1601 // extend file into new page.
1603 mgr->pagezero->activepages++;
1604 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1605 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1606 bt_releasemutex(mgr->lock);
1608 // keep bt_txnpromote out of this page
1611 contents->page_no = page_no;
1612 pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
1614 // don't load cache from btree page, load it from contents
1616 if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1617 set->page = bt_mappage (mgr, set->latch);
1621 // now pin will keep bt_txnpromote out
1623 set->page->free = 0;
1627 // find slot in page for given key at a given level
1629 int bt_findslot (BtPage page, unsigned char *key, uint len)
1631 uint diff, higher = page->cnt, low = 1, slot;
1634 // make stopper key an infinite fence value
1636 if( bt_getid (page->right) )
1641 // low is the lowest candidate.
1642 // loop ends when they meet
1644 // higher is already
1645 // tested as .ge. the passed key.
1647 while( diff = higher - low ) {
1648 slot = low + ( diff >> 1 );
1649 if( keycmp (keyptr(page, slot), key, len) < 0 )
1652 higher = slot, good++;
1655 // return zero if key is on right link page
1657 return good ? higher : 0;
1660 // find and load page at given level for given key
1661 // leave page rd or wr locked as requested
1663 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1665 uid page_no = ROOT_page, prevpage_no = 0;
1666 uint drill = 0xff, slot;
1667 BtLatchSet *prevlatch;
1668 uint mode, prevmode;
1672 // start at root of btree and drill down
1675 // determine lock mode of drill level
1676 mode = (drill == lvl) ? lock : BtLockRead;
1678 if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1681 // obtain access lock using lock chaining with Access mode
1683 if( page_no > ROOT_page )
1684 bt_lockpage(BtLockAccess, set->latch, thread_no);
1686 set->page = bt_mappage (mgr, set->latch);
1688 // release & unpin parent or left sibling page
1691 bt_unlockpage(prevmode, prevlatch);
1692 bt_unpinlatch (mgr, prevlatch);
1696 // obtain mode lock using lock chaining through AccessLock
1698 bt_lockpage(mode, set->latch, thread_no);
1700 if( set->page->free )
1701 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1703 if( page_no > ROOT_page )
1704 bt_unlockpage(BtLockAccess, set->latch);
1706 // re-read and re-lock root after determining actual level of root
1708 if( set->page->lvl != drill) {
1709 if( set->latch->page_no != ROOT_page )
1710 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1712 drill = set->page->lvl;
1714 if( lock != BtLockRead && drill == lvl ) {
1715 bt_unlockpage(mode, set->latch);
1716 bt_unpinlatch (mgr, set->latch);
1721 prevpage_no = set->latch->page_no;
1722 prevlatch = set->latch;
1723 prevpage = set->page;
1726 // find key on page at this level
1727 // and descend to requested level
1729 if( !set->page->kill )
1730 if( slot = bt_findslot (set->page, key, len) ) {
1734 // find next non-dead slot -- the fence key if nothing else
1736 while( slotptr(set->page, slot)->dead )
1737 if( slot++ < set->page->cnt )
1740 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1742 val = valptr(set->page, slot);
1744 if( val->len == BtId )
1745 page_no = bt_getid(valptr(set->page, slot)->value);
1747 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1753 // slide right into next page
1755 page_no = bt_getid(set->page->right);
1758 // return error on end of right chain
1760 mgr->line = __LINE__, mgr->err = BTERR_struct;
1761 return 0; // return error
1764 // return page to free list
1765 // page must be delete & write locked
1766 // and have no keys pointing to it.
1768 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1770 // lock allocation page
1772 bt_mutexlock (mgr->lock);
1776 memcpy(set->page->right, mgr->pagezero->freechain, BtId);
1777 bt_putid(mgr->pagezero->freechain, set->latch->page_no);
1778 set->latch->dirty = 1;
1779 set->page->free = 1;
1781 // decrement active page count
1783 mgr->pagezero->activepages--;
1785 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1786 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1788 // unlock released page
1789 // and unlock allocation page
1791 bt_unlockpage (BtLockDelete, set->latch);
1792 bt_unlockpage (BtLockWrite, set->latch);
1793 bt_unpinlatch (mgr, set->latch);
1794 bt_releasemutex (mgr->lock);
1797 // a fence key was deleted from a page
1798 // push new fence value upwards
1800 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1802 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1803 unsigned char value[BtId];
1807 // remove the old fence value
1809 ptr = keyptr(set->page, set->page->cnt);
1810 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1811 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1812 set->latch->dirty = 1;
1814 // cache new fence value
1816 ptr = keyptr(set->page, set->page->cnt);
1817 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1819 bt_lockpage (BtLockParent, set->latch, thread_no);
1820 bt_unlockpage (BtLockWrite, set->latch);
1822 // insert new (now smaller) fence key
1824 bt_putid (value, set->latch->page_no);
1825 ptr = (BtKey*)leftkey;
1827 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1830 // now delete old fence key
1832 ptr = (BtKey*)rightkey;
1834 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1837 bt_unlockpage (BtLockParent, set->latch);
1838 bt_unpinlatch(mgr, set->latch);
1842 // root has a single child
1843 // collapse a level from the tree
1845 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1852 // find the child entry and promote as new root contents
1855 for( idx = 0; idx++ < root->page->cnt; )
1856 if( !slotptr(root->page, idx)->dead )
1859 val = valptr(root->page, idx);
1861 if( val->len == BtId )
1862 page_no = bt_getid (valptr(root->page, idx)->value);
1864 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1866 if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1867 child->page = bt_mappage (mgr, child->latch);
1871 bt_lockpage (BtLockDelete, child->latch, thread_no);
1872 bt_lockpage (BtLockWrite, child->latch, thread_no);
1874 memcpy (root->page, child->page, mgr->page_size);
1875 root->latch->dirty = 1;
1877 bt_freepage (mgr, child);
1879 } while( root->page->lvl > 1 && root->page->act == 1 );
1881 bt_unlockpage (BtLockWrite, root->latch);
1882 bt_unpinlatch (mgr, root->latch);
1886 // delete a page and manage keys
1887 // call with page writelocked
1889 // returns the right page pool entry for freeing
1890 // or zero on error.
1892 uint bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, BtLock mode)
1894 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1895 unsigned char value[BtId];
1896 uint lvl = set->page->lvl;
1901 // cache copy of fence key
1902 // to remove in parent
1904 ptr = keyptr(set->page, set->page->cnt);
1905 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1907 // obtain lock on right page
1909 page_no = bt_getid(set->page->right);
1911 if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1912 right->page = bt_mappage (mgr, right->latch);
1916 bt_lockpage (mode, right->latch, thread_no);
1918 // cache copy of key to update
1920 ptr = keyptr(right->page, right->page->cnt);
1921 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1923 if( right->page->kill )
1924 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1926 // pull contents of right peer into our empty page
1928 bt_lockpage (BtLockLink, set->latch, thread_no);
1929 memcpy (right->page->left, set->page->left, BtId);
1930 memcpy (set->page, right->page, mgr->page_size);
1931 set->page->page_no = set->latch->page_no;
1932 set->latch->dirty = 1;
1933 bt_unlockpage (BtLockLink, set->latch);
1935 // mark right page deleted and point it to left page
1936 // until we can post parent updates that remove access
1937 // to the deleted page.
1939 bt_putid (right->page->right, set->latch->page_no);
1940 right->latch->dirty = 1;
1941 right->page->kill = 1;
1943 bt_lockpage (BtLockParent, right->latch, thread_no);
1944 bt_unlockpage (mode, right->latch);
1946 bt_lockpage (BtLockParent, set->latch, thread_no);
1947 bt_unlockpage (BtLockWrite, set->latch);
1949 // redirect higher key directly to our new node contents
1951 bt_putid (value, set->latch->page_no);
1952 ptr = (BtKey*)higherfence;
1954 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1957 // delete old lower key to our node
1959 ptr = (BtKey*)lowerfence;
1961 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1964 bt_unlockpage (BtLockParent, set->latch);
1965 return right->latch - mgr->latchsets;
1968 // find and delete key on page by marking delete flag bit
1969 // if page becomes empty, delete it from the btree
1971 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
1973 uint slot, idx, found, fence, entry;
1974 BtPageSet set[1], right[1];
1979 if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
1980 node = slotptr(set->page, slot);
1981 ptr = keyptr(set->page, slot);
1985 // if librarian slot, advance to real slot
1987 if( node->type == Librarian ) {
1988 ptr = keyptr(set->page, ++slot);
1989 node = slotptr(set->page, slot);
1992 fence = slot == set->page->cnt;
1994 // delete the key, ignore request if already dead
1996 if( found = !keycmp (ptr, key, len) )
1997 if( found = node->dead == 0 ) {
1998 val = valptr(set->page,slot);
1999 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2002 // mark node type as delete
2004 node->type = Delete;
2007 // collapse empty slots beneath the fence
2008 // on interiour nodes
2011 while( idx = set->page->cnt - 1 )
2012 if( slotptr(set->page, idx)->dead ) {
2013 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2014 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2022 // did we delete a fence key in an upper level?
2024 if( lvl && set->page->act && fence )
2025 if( bt_fixfence (mgr, set, lvl, thread_no) )
2030 // do we need to collapse root?
2032 if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2033 if( bt_collapseroot (mgr, set, thread_no) )
2038 // delete empty page
2040 if( !set->page->act ) {
2041 if( entry = bt_deletepage (mgr, set, thread_no, BtLockWrite) )
2042 right->latch = mgr->latchsets + entry;
2046 // obtain delete and write locks to right node
2048 bt_unlockpage (BtLockParent, right->latch);
2049 right->page = bt_mappage (mgr, right->latch);
2050 bt_lockpage (BtLockDelete, right->latch, thread_no);
2051 bt_lockpage (BtLockWrite, right->latch, thread_no);
2052 bt_freepage (mgr, right);
2054 bt_unpinlatch (mgr, set->latch);
2058 set->latch->dirty = 1;
2059 bt_unlockpage(BtLockWrite, set->latch);
2060 bt_unpinlatch (mgr, set->latch);
2064 // advance to next slot
2066 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2068 BtLatchSet *prevlatch;
2071 if( slot < set->page->cnt )
2074 prevlatch = set->latch;
2076 if( page_no = bt_getid(set->page->right) )
2077 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2078 set->page = bt_mappage (bt->mgr, set->latch);
2082 return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2084 // obtain access lock using lock chaining with Access mode
2086 bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2088 bt_unlockpage(BtLockRead, prevlatch);
2089 bt_unpinlatch (bt->mgr, prevlatch);
2091 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2092 bt_unlockpage(BtLockAccess, set->latch);
2096 // find unique key == given key, or first duplicate key in
2097 // leaf level and return number of value bytes
2098 // or (-1) if not found.
2100 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2108 if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2110 ptr = keyptr(set->page, slot);
2112 // skip librarian slot place holder
2114 if( slotptr(set->page, slot)->type == Librarian )
2115 ptr = keyptr(set->page, ++slot);
2117 // return actual key found
2119 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2122 if( slotptr(set->page, slot)->type == Duplicate )
2125 // not there if we reach the stopper key
2127 if( slot == set->page->cnt )
2128 if( !bt_getid (set->page->right) )
2131 // if key exists, return >= 0 value bytes copied
2132 // otherwise return (-1)
2134 if( slotptr(set->page, slot)->dead )
2138 if( !memcmp (ptr->key, key, len) ) {
2139 val = valptr (set->page,slot);
2140 if( valmax > val->len )
2142 memcpy (value, val->value, valmax);
2148 } while( slot = bt_findnext (bt, set, slot) );
2150 bt_unlockpage (BtLockRead, set->latch);
2151 bt_unpinlatch (bt->mgr, set->latch);
2155 // check page for space available,
2156 // clean if necessary and return
2157 // 0 - page needs splitting
2158 // >0 new slot value
2160 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2162 BtPage page = set->page, frame;
2163 uint nxt = mgr->page_size;
2164 uint cnt = 0, idx = 0;
2165 uint max = page->cnt;
2170 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2173 // skip cleanup and proceed to split
2174 // if there's not enough garbage
2177 if( page->garbage < nxt / 5 )
2180 frame = malloc (mgr->page_size);
2181 memcpy (frame, page, mgr->page_size);
2183 // skip page info and set rest of page to zero
2185 memset (page+1, 0, mgr->page_size - sizeof(*page));
2186 set->latch->dirty = 1;
2191 // clean up page first by
2192 // removing deleted keys
2194 while( cnt++ < max ) {
2198 if( cnt < max || frame->lvl )
2199 if( slotptr(frame,cnt)->dead )
2202 // copy the value across
2204 val = valptr(frame, cnt);
2205 nxt -= val->len + sizeof(BtVal);
2206 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2208 // copy the key across
2210 key = keyptr(frame, cnt);
2211 nxt -= key->len + sizeof(BtKey);
2212 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2214 // make a librarian slot
2216 slotptr(page, ++idx)->off = nxt;
2217 slotptr(page, idx)->type = Librarian;
2218 slotptr(page, idx)->dead = 1;
2222 slotptr(page, ++idx)->off = nxt;
2223 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2225 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2233 // see if page has enough space now, or does it need splitting?
2235 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2241 // split the root and raise the height of the btree
2243 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2245 unsigned char leftkey[BT_keyarray];
2246 uint nxt = mgr->page_size;
2247 unsigned char value[BtId];
2254 frame = malloc (mgr->page_size);
2255 memcpy (frame, root->page, mgr->page_size);
2257 // save left page fence key for new root
2259 ptr = keyptr(root->page, root->page->cnt);
2260 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2262 // Obtain an empty page to use, and copy the current
2263 // root contents into it, e.g. lower keys
2265 if( bt_newpage(mgr, left, frame, page_no) )
2268 left_page_no = left->latch->page_no;
2269 bt_unpinlatch (mgr, left->latch);
2272 // preserve the page info at the bottom
2273 // of higher keys and set rest to zero
2275 memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2277 // insert stopper key at top of newroot page
2278 // and increase the root height
2280 nxt -= BtId + sizeof(BtVal);
2281 bt_putid (value, right->page_no);
2282 val = (BtVal *)((unsigned char *)root->page + nxt);
2283 memcpy (val->value, value, BtId);
2286 nxt -= 2 + sizeof(BtKey);
2287 slotptr(root->page, 2)->off = nxt;
2288 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2293 // insert lower keys page fence key on newroot page as first key
2295 nxt -= BtId + sizeof(BtVal);
2296 bt_putid (value, left_page_no);
2297 val = (BtVal *)((unsigned char *)root->page + nxt);
2298 memcpy (val->value, value, BtId);
2301 ptr = (BtKey *)leftkey;
2302 nxt -= ptr->len + sizeof(BtKey);
2303 slotptr(root->page, 1)->off = nxt;
2304 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2306 bt_putid(root->page->right, 0);
2307 root->page->min = nxt; // reset lowest used offset and key count
2308 root->page->cnt = 2;
2309 root->page->act = 2;
2312 mgr->pagezero->alloc->lvl = root->page->lvl;
2314 // release and unpin root pages
2316 bt_unlockpage(BtLockWrite, root->latch);
2317 bt_unpinlatch (mgr, root->latch);
2319 bt_unpinlatch (mgr, right);
2323 // split already locked full node
2325 // return pool entry for new right
2326 // page, pinned & unlocked
2328 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2330 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2331 BtPage frame = malloc (mgr->page_size);
2332 uint lvl = set->page->lvl;
2339 // split higher half of keys to frame
2341 memset (frame, 0, mgr->page_size);
2342 max = set->page->cnt;
2346 while( cnt++ < max ) {
2347 if( cnt < max || set->page->lvl )
2348 if( slotptr(set->page, cnt)->dead )
2351 src = valptr(set->page, cnt);
2352 nxt -= src->len + sizeof(BtVal);
2353 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2355 key = keyptr(set->page, cnt);
2356 nxt -= key->len + sizeof(BtKey);
2357 ptr = (BtKey*)((unsigned char *)frame + nxt);
2358 memcpy (ptr, key, key->len + sizeof(BtKey));
2360 // add librarian slot
2362 slotptr(frame, ++idx)->off = nxt;
2363 slotptr(frame, idx)->type = Librarian;
2364 slotptr(frame, idx)->dead = 1;
2368 slotptr(frame, ++idx)->off = nxt;
2369 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2371 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2375 frame->bits = mgr->page_bits;
2382 if( set->latch->page_no > ROOT_page )
2383 bt_putid (frame->right, bt_getid (set->page->right));
2385 // get new free page and write higher keys to it.
2387 if( bt_newpage(mgr, right, frame, thread_no) )
2390 // process lower keys
2392 memcpy (frame, set->page, mgr->page_size);
2393 memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2394 set->latch->dirty = 1;
2396 nxt = mgr->page_size;
2397 set->page->garbage = 0;
2403 if( slotptr(frame, max)->type == Librarian )
2406 // assemble page of smaller keys
2408 while( cnt++ < max ) {
2409 if( slotptr(frame, cnt)->dead )
2411 val = valptr(frame, cnt);
2412 nxt -= val->len + sizeof(BtVal);
2413 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2415 key = keyptr(frame, cnt);
2416 nxt -= key->len + sizeof(BtKey);
2417 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2419 // add librarian slot
2421 slotptr(set->page, ++idx)->off = nxt;
2422 slotptr(set->page, idx)->type = Librarian;
2423 slotptr(set->page, idx)->dead = 1;
2427 slotptr(set->page, ++idx)->off = nxt;
2428 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2432 bt_putid(set->page->right, right->latch->page_no);
2433 set->page->min = nxt;
2434 set->page->cnt = idx;
2437 return right->latch - mgr->latchsets;
2440 // fix keys for newly split page
2441 // call with both pages pinned & locked
2442 // return unlocked and unpinned
2444 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2446 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2447 unsigned char value[BtId];
2448 uint lvl = set->page->lvl;
2452 // if current page is the root page, split it
2454 if( set->latch->page_no == ROOT_page )
2455 return bt_splitroot (mgr, set, right, thread_no);
2457 ptr = keyptr(set->page, set->page->cnt);
2458 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2460 page = bt_mappage (mgr, right);
2462 ptr = keyptr(page, page->cnt);
2463 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2465 // insert new fences in their parent pages
2467 bt_lockpage (BtLockParent, right, thread_no);
2469 bt_lockpage (BtLockParent, set->latch, thread_no);
2470 bt_unlockpage (BtLockWrite, set->latch);
2472 // insert new fence for reformulated left block of smaller keys
2474 bt_putid (value, set->latch->page_no);
2475 ptr = (BtKey *)leftkey;
2477 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2480 // switch fence for right block of larger keys to new right page
2482 bt_putid (value, right->page_no);
2483 ptr = (BtKey *)rightkey;
2485 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2488 bt_unlockpage (BtLockParent, set->latch);
2489 bt_unpinlatch (mgr, set->latch);
2491 bt_unlockpage (BtLockParent, right);
2492 bt_unpinlatch (mgr, right);
2496 // install new key and value onto page
2497 // page must already be checked for
2500 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2502 uint idx, librarian;
2508 // if found slot > desired slot and previous slot
2509 // is a librarian slot, use it
2512 if( slotptr(set->page, slot-1)->type == Librarian )
2515 // copy value onto page
2517 set->page->min -= vallen + sizeof(BtVal);
2518 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2519 memcpy (val->value, value, vallen);
2522 // copy key onto page
2524 set->page->min -= keylen + sizeof(BtKey);
2525 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2526 memcpy (ptr->key, key, keylen);
2529 // find first empty slot
2531 for( idx = slot; idx < set->page->cnt; idx++ )
2532 if( slotptr(set->page, idx)->dead )
2535 // now insert key into array before slot,
2536 // adding as many librarian slots as
2539 if( idx == set->page->cnt ) {
2540 int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2542 librarian = ++idx - slot;
2543 avail /= sizeof(BtSlot);
2548 if( librarian > avail )
2552 rate = (idx - slot) / librarian;
2553 set->page->cnt += librarian;
2558 librarian = 0, rate = 0;
2560 while( idx > slot ) {
2562 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2565 // add librarian slot per rate
2568 if( (idx - slot + 1)/2 <= librarian * rate ) {
2569 node = slotptr(set->page, idx--);
2570 node->off = node[1].off;
2571 node->type = Librarian;
2577 set->latch->dirty = 1;
2582 node = slotptr(set->page, slot);
2583 node->off = set->page->min;
2588 bt_unlockpage (BtLockWrite, set->latch);
2589 bt_unpinlatch (mgr, set->latch);
2595 // Insert new key into the btree at given level.
2596 // either add a new key or update/add an existing one
2598 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2600 uint slot, idx, len, entry;
2606 while( 1 ) { // find the page and slot for the current key
2607 if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2608 node = slotptr(set->page, slot);
2609 ptr = keyptr(set->page, slot);
2612 mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2616 // if librarian slot == found slot, advance to real slot
2618 if( node->type == Librarian )
2619 if( !keycmp (ptr, key, keylen) ) {
2620 ptr = keyptr(set->page, ++slot);
2621 node = slotptr(set->page, slot);
2624 // if inserting a duplicate key or unique
2625 // key that doesn't exist on the page,
2626 // check for adequate space on the page
2627 // and insert the new key before slot.
2632 if( keycmp (ptr, key, keylen) )
2633 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2634 return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
2635 else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2637 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2642 // if key already exists, update value and return
2644 val = valptr(set->page, slot);
2646 if( val->len >= vallen ) {
2647 if( slotptr(set->page, slot)->dead )
2652 set->page->garbage += val->len - vallen;
2653 set->latch->dirty = 1;
2655 memcpy (val->value, value, vallen);
2656 bt_unlockpage(BtLockWrite, set->latch);
2657 bt_unpinlatch (mgr, set->latch);
2661 // new update value doesn't fit in existing value area
2662 // make sure page has room
2665 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2672 if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2673 if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2675 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2680 // copy key and value onto page and update slot
2682 set->page->min -= vallen + sizeof(BtVal);
2683 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2684 memcpy (val->value, value, vallen);
2687 set->latch->dirty = 1;
2688 set->page->min -= keylen + sizeof(BtKey);
2689 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2690 memcpy (ptr->key, key, keylen);
2693 node->off = set->page->min;
2694 bt_unlockpage(BtLockWrite, set->latch);
2695 bt_unpinlatch (mgr, set->latch);
2702 // determine actual page where key is located
2703 // return slot number
2705 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2707 BtKey *key = keyptr(source,src);
2708 uint slot = locks[src].slot;
2711 if( src > 1 && locks[src].reuse )
2712 entry = locks[src-1].entry, slot = 0;
2714 entry = locks[src].entry;
2717 set->latch = mgr->latchsets + entry;
2718 set->page = bt_mappage (mgr, set->latch);
2722 // is locks->reuse set? or was slot zeroed?
2723 // if so, find where our key is located
2724 // on current page or pages split on
2725 // same page txn operations.
2728 set->latch = mgr->latchsets + entry;
2729 set->page = bt_mappage (mgr, set->latch);
2731 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2732 if( slotptr(set->page, slot)->type == Librarian )
2734 if( locks[src].reuse )
2735 locks[src].entry = entry;
2738 } while( entry = set->latch->split );
2740 mgr->line = __LINE__, mgr->err = BTERR_atomic;
2744 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2746 BtKey *key = keyptr(source, src);
2747 BtVal *val = valptr(source, src);
2752 while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2753 if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2754 if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2756 set->page->lsn = lsn;
2762 if( entry = bt_splitpage (mgr, set, thread_no) )
2763 latch = mgr->latchsets + entry;
2767 // splice right page into split chain
2770 bt_lockpage(BtLockWrite, latch, thread_no);
2771 latch->split = set->latch->split;
2772 set->latch->split = entry;
2773 locks[src].slot = 0;
2776 return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2779 // perform delete from smaller btree
2780 // insert a delete slot if not found there
2782 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2784 BtKey *key = keyptr(source, src);
2791 if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2792 node = slotptr(set->page, slot);
2793 ptr = keyptr(set->page, slot);
2794 val = valptr(set->page, slot);
2796 return mgr->line = __LINE__, mgr->err = BTERR_struct;
2798 // if slot is not found, insert a delete slot
2800 if( keycmp (ptr, key->key, key->len) )
2801 return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
2803 // if node is already dead,
2804 // ignore the request.
2809 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2810 set->latch->dirty = 1;
2811 set->page->lsn = lsn;
2815 __sync_fetch_and_add(&mgr->found, 1);
2819 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2821 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2822 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2824 return keycmp (key1, key2->key, key2->len);
2826 // atomic modification of a batch of keys.
2828 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2830 uint src, idx, slot, samepage, entry, que = 0;
2831 BtKey *key, *ptr, *key2;
2837 // stable sort the list of keys into order to
2838 // prevent deadlocks between threads.
2840 for( src = 1; src++ < source->cnt; ) {
2841 *temp = *slotptr(source,src);
2842 key = keyptr (source,src);
2844 for( idx = src; --idx; ) {
2845 key2 = keyptr (source,idx);
2846 if( keycmp (key, key2->key, key2->len) < 0 ) {
2847 *slotptr(source,idx+1) = *slotptr(source,idx);
2848 *slotptr(source,idx) = *temp;
2854 qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2855 // add entries to redo log
2857 if( bt->mgr->pagezero->redopages )
2858 lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2862 // perform the individual actions in the transaction
2864 if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2865 return bt->mgr->err;
2867 // if number of active pages
2868 // is greater than the buffer pool
2869 // promote page into larger btree
2872 while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
2873 if( bt_txnpromote (bt) )
2874 return bt->mgr->err;
2881 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2883 uint src, idx, slot, samepage, entry, que = 0;
2884 BtPageSet set[1], prev[1], right[1];
2885 unsigned char value[BtId];
2893 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2895 // Load the leaf page for each key
2896 // group same page references with reuse bit
2897 // and determine any constraint violations
2899 for( src = 0; src++ < source->cnt; ) {
2900 key = keyptr(source, src);
2903 // first determine if this modification falls
2904 // on the same page as the previous modification
2905 // note that the far right leaf page is a special case
2907 if( samepage = src > 1 )
2908 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2909 slot = bt_findslot(set->page, key->key, key->len);
2912 if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockAtomic + BtLockWrite, thread_no) )
2913 set->latch->split = 0;
2917 if( slotptr(set->page, slot)->type == Librarian )
2921 locks[src].entry = set->latch - mgr->latchsets;
2922 locks[src].slot = slot;
2923 locks[src].reuse = 0;
2925 locks[src].entry = 0;
2926 locks[src].slot = 0;
2927 locks[src].reuse = 1;
2930 // capture current lsn for master page
2932 locks[src].reqlsn = set->page->lsn;
2935 // insert or delete each key
2936 // process any splits or merges
2937 // run through txn list backwards
2939 samepage = source->cnt + 1;
2941 for( src = source->cnt; src; src-- ) {
2942 if( locks[src].reuse )
2945 // perform the txn operations
2946 // from smaller to larger on
2949 for( idx = src; idx < samepage; idx++ )
2950 switch( slotptr(source,idx)->type ) {
2952 if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
2958 if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
2963 bt_atomicpage (mgr, source, locks, idx, set);
2967 // after the same page operations have finished,
2968 // process master page for splits or deletion.
2970 latch = prev->latch = mgr->latchsets + locks[src].entry;
2971 prev->page = bt_mappage (mgr, prev->latch);
2974 // pick-up all splits from master page
2975 // each one is already pinned & WriteLocked.
2977 if( entry = latch->split ) do {
2978 set->latch = mgr->latchsets + entry;
2979 set->page = bt_mappage (mgr, set->latch);
2981 // delete empty master page by undoing its split
2982 // (this is potentially another empty page)
2984 if( !prev->page->act ) {
2985 memcpy (set->page->left, prev->page->left, BtId);
2986 memcpy (prev->page, set->page, mgr->page_size);
2987 bt_lockpage (BtLockDelete, set->latch, thread_no);
2988 prev->latch->split = set->latch->split;
2989 prev->latch->dirty = 1;
2990 bt_freepage (mgr, set);
2994 // remove empty split page from the split chain
2995 // and return it to the free list. No other
2996 // thread has its page number yet.
2998 if( !set->page->act ) {
2999 memcpy (prev->page->right, set->page->right, BtId);
3000 prev->latch->split = set->latch->split;
3002 bt_lockpage (BtLockDelete, set->latch, thread_no);
3003 bt_freepage (mgr, set);
3007 // update prev's fence key
3009 ptr = keyptr(prev->page,prev->page->cnt);
3010 bt_putid (value, prev->latch->page_no);
3012 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3015 // splice in the left link into the split page
3017 bt_putid (set->page->left, prev->latch->page_no);
3020 bt_syncpage (mgr, prev->page, prev->latch);
3022 // page is unlocked & unpinned below to avoid bt_txnpromote
3025 } while( entry = prev->latch->split );
3027 // update left pointer in next right page from last split page
3028 // (if all splits were reversed or none occurred, latch->split == 0)
3030 if( latch->split ) {
3031 // fix left pointer in master's original (now split)
3032 // far right sibling or set rightmost page in page zero
3034 if( right_page_no = bt_getid (prev->page->right) ) {
3035 if( set->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
3036 set->page = bt_mappage (mgr, set->latch);
3040 bt_lockpage (BtLockLink, set->latch, thread_no);
3041 bt_putid (set->page->left, prev->latch->page_no);
3042 set->latch->dirty = 1;
3044 bt_unlockpage (BtLockLink, set->latch);
3045 bt_unpinlatch (mgr, set->latch);
3046 } else { // prev is rightmost page
3047 bt_mutexlock (mgr->lock);
3048 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3049 bt_releasemutex(mgr->lock);
3052 // Process last page split in chain
3053 // by switching the key from the master
3054 // page to the last split.
3056 ptr = keyptr(prev->page,prev->page->cnt);
3057 bt_putid (value, prev->latch->page_no);
3059 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3063 bt_syncpage (mgr, prev->page, prev->latch);
3065 // unlock and unpin master page
3067 bt_unlockpage(BtLockAtomic, latch);
3068 bt_unlockpage(BtLockWrite, latch);
3069 bt_unpinlatch(mgr, latch);
3071 // go through the list of splits and
3072 // release the locks and unpin
3074 while( entry = latch->split ) {
3075 latch = mgr->latchsets + entry;
3076 bt_unlockpage(BtLockWrite, latch);
3077 bt_unpinlatch(mgr, latch);
3083 // since there are no splits, we're
3084 // finished if master page occupied
3086 bt_unlockpage(BtLockAtomic, prev->latch);
3088 if( prev->page->act ) {
3089 bt_unlockpage(BtLockWrite, prev->latch);
3092 bt_syncpage (mgr, prev->page, prev->latch);
3094 bt_unpinlatch(mgr, prev->latch);
3098 // any and all splits were reversed, and the
3099 // master page located in prev is empty, delete it
3101 if( entry = bt_deletepage (mgr, prev, thread_no, BtLockWrite) )
3102 right->latch = mgr->latchsets + entry;
3106 // obtain delete and write locks to right node
3108 bt_unlockpage (BtLockParent, right->latch);
3109 right->page = bt_mappage (mgr, right->latch);
3110 bt_lockpage (BtLockDelete, right->latch, thread_no);
3111 bt_lockpage (BtLockWrite, right->latch, thread_no);
3112 bt_freepage (mgr, right);
3114 bt_unpinlatch (mgr, prev->latch);
3121 // promote a page into the larger btree
3123 BTERR bt_txnpromote (BtDb *bt)
3125 BtPageSet set[1], right[1];
3126 uint entry, slot, idx;
3133 entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
3135 entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
3137 entry %= bt->mgr->latchtotal;
3142 set->latch = bt->mgr->latchsets + entry;
3144 if( !bt_mutextry(set->latch->modify) )
3147 // skip this entry if it is pinned
3149 if( set->latch->pin & ~CLOCK_bit ) {
3150 bt_releasemutex(set->latch->modify);
3154 set->page = bt_mappage (bt->mgr, set->latch);
3156 // entry never used or has no right sibling
3158 if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3159 bt_releasemutex(set->latch->modify);
3163 // entry interiour node or being killed
3165 if( set->page->lvl || set->page->free || set->page->kill ) {
3166 bt_releasemutex(set->latch->modify);
3170 // pin the page for our useage
3173 bt_releasemutex(set->latch->modify);
3174 bt_lockpage (BtLockAtomic | BtLockWrite, set->latch, bt->thread_no);
3175 memcpy (bt->frame, set->page, bt->mgr->page_size);
3177 if( !(set->latch->page_no % 100) )
3178 fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act);
3180 if( entry = bt_deletepage (bt->mgr, set, bt->thread_no, BtLockAtomic | BtLockWrite) )
3181 right->latch = bt->mgr->latchsets + entry;
3183 return bt->mgr->err;
3185 // obtain delete and write locks to right node
3187 bt_unlockpage (BtLockParent, right->latch);
3188 right->page = bt_mappage (bt->mgr, right->latch);
3190 // release page with its new contents
3192 bt_unlockpage (BtLockAtomic, set->latch);
3193 bt_unpinlatch (bt->mgr, set->latch);
3195 // transfer slots in our selected page to larger btree
3197 if( bt_atomicexec (bt->main, bt->frame, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) )
3198 return bt->main->err;
3200 // free the page we took over
3202 bt_lockpage (BtLockDelete, right->latch, bt->thread_no);
3203 bt_lockpage (BtLockWrite, right->latch, bt->thread_no);
3204 bt_freepage (bt->mgr, right);
3209 // set cursor to highest slot on highest page
3211 uint bt_lastkey (BtDb *bt)
3213 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3216 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3217 set->page = bt_mappage (bt->mgr, set->latch);
3221 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3222 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3223 bt_unlockpage(BtLockRead, set->latch);
3224 bt_unpinlatch (bt->mgr, set->latch);
3225 return bt->cursor->cnt;
3228 // return previous slot on cursor page
3230 uint bt_prevkey (BtDb *bt, uint slot)
3232 uid cursor_page = bt->cursor->page_no;
3233 uid ourright, next, us = cursor_page;
3239 ourright = bt_getid(bt->cursor->right);
3242 if( !(next = bt_getid(bt->cursor->left)) )
3248 if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3249 set->page = bt_mappage (bt->mgr, set->latch);
3253 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3254 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3255 bt_unlockpage(BtLockRead, set->latch);
3256 bt_unpinlatch (bt->mgr, set->latch);
3258 next = bt_getid (bt->cursor->right);
3260 if( bt->cursor->kill )
3264 if( next == ourright )
3269 return bt->cursor->cnt;
3272 // return next slot on cursor page
3273 // or slide cursor right into next page
3275 uint bt_nextkey (BtDb *bt, uint slot)
3281 right = bt_getid(bt->cursor->right);
3283 while( slot++ < bt->cursor->cnt )
3284 if( slotptr(bt->cursor,slot)->dead )
3286 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3294 if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3295 set->page = bt_mappage (bt->mgr, set->latch);
3299 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3300 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3301 bt_unlockpage(BtLockRead, set->latch);
3302 bt_unpinlatch (bt->mgr, set->latch);
3307 return bt->mgr->err = 0;
3310 // cache page of keys into cursor and return starting slot for given key
3312 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3317 // cache page for retrieval
3319 if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3320 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3324 bt_unlockpage(BtLockRead, set->latch);
3325 bt_unpinlatch (bt->mgr, set->latch);
3332 double getCpuTime(int type)
3335 FILETIME xittime[1];
3336 FILETIME systime[1];
3337 FILETIME usrtime[1];
3338 SYSTEMTIME timeconv[1];
3341 memset (timeconv, 0, sizeof(SYSTEMTIME));
3345 GetSystemTimeAsFileTime (xittime);
3346 FileTimeToSystemTime (xittime, timeconv);
3347 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3350 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3351 FileTimeToSystemTime (usrtime, timeconv);
3354 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3355 FileTimeToSystemTime (systime, timeconv);
3359 ans += (double)timeconv->wHour * 3600;
3360 ans += (double)timeconv->wMinute * 60;
3361 ans += (double)timeconv->wSecond;
3362 ans += (double)timeconv->wMilliseconds / 1000;
3367 #include <sys/resource.h>
3369 double getCpuTime(int type)
3371 struct rusage used[1];
3372 struct timeval tv[1];
3376 gettimeofday(tv, NULL);
3377 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3380 getrusage(RUSAGE_SELF, used);
3381 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3384 getrusage(RUSAGE_SELF, used);
3385 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3392 void bt_poolaudit (BtMgr *mgr)
3397 while( ++entry < mgr->latchtotal ) {
3398 latch = mgr->latchsets + entry;
3400 if( *latch->readwr->value )
3401 fprintf(stderr, "latchset %d wrtlocked for page %d\n", entry, latch->page_no);
3403 if( *latch->access->value )
3404 fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3406 if( *latch->parent->value )
3407 fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3409 if( *latch->atomic->value )
3410 fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3412 if( *latch->modify->value )
3413 fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3415 if( latch->pin & ~CLOCK_bit )
3416 fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3429 // standalone program to index file of keys
3430 // then list them onto std-out
3433 void *index_file (void *arg)
3435 uint __stdcall index_file (void *arg)
3438 int line = 0, found = 0, cnt = 0, idx;
3439 uid next, page_no = LEAF_page; // start on first page of leaves
3440 int ch, len = 0, slot, type = 0;
3441 unsigned char key[BT_maxkey];
3442 unsigned char txn[65536];
3443 ThreadArg *args = arg;
3452 bt = bt_open (args->mgr, args->main);
3455 if( args->idx < strlen (args->type) )
3456 ch = args->type[args->idx];
3458 ch = args->type[strlen(args->type) - 1];
3470 if( type == Delete )
3471 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3473 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3475 if( type == Delete )
3476 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3478 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3480 if( in = fopen (args->infile, "rb") )
3481 while( ch = getc(in), ch != EOF )
3487 if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3488 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3494 memcpy (txn + nxt, key + 10, len - 10);
3496 txn[nxt] = len - 10;
3498 memcpy (txn + nxt, key, 10);
3501 slotptr(page,++cnt)->off = nxt;
3502 slotptr(page,cnt)->type = type;
3505 if( cnt < args->num )
3512 if( bt_atomictxn (bt, page) )
3513 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3518 else if( len < BT_maxkey )
3520 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3524 fprintf(stderr, "started indexing for %s\n", args->infile);
3525 if( in = fopen (args->infile, "r") )
3526 while( ch = getc(in), ch != EOF )
3531 if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3532 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3535 else if( len < BT_maxkey )
3537 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3541 fprintf(stderr, "started finding keys for %s\n", args->infile);
3542 if( in = fopen (args->infile, "rb") )
3543 while( ch = getc(in), ch != EOF )
3547 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3549 else if( bt->mgr->err )
3550 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3553 else if( len < BT_maxkey )
3555 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3559 fprintf(stderr, "started scanning\n");
3562 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3563 set->page = bt_mappage (bt->mgr, set->latch);
3565 fprintf(stderr, "unable to obtain latch"), exit(1);
3567 bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3568 next = bt_getid (set->page->right);
3570 for( slot = 0; slot++ < set->page->cnt; )
3571 if( next || slot < set->page->cnt )
3572 if( !slotptr(set->page, slot)->dead ) {
3573 ptr = keyptr(set->page, slot);
3576 if( slotptr(set->page, slot)->type == Duplicate )
3579 fwrite (ptr->key, len, 1, stdout);
3580 val = valptr(set->page, slot);
3581 fwrite (val->value, val->len, 1, stdout);
3582 fputc ('\n', stdout);
3586 bt_unlockpage (BtLockRead, set->latch);
3587 bt_unpinlatch (bt->mgr, set->latch);
3588 } while( page_no = next );
3590 fprintf(stderr, " Total keys read %d\n", cnt);
3594 fprintf(stderr, "started reverse scan\n");
3595 if( slot = bt_lastkey (bt) )
3596 while( slot = bt_prevkey (bt, slot) ) {
3597 if( slotptr(bt->cursor, slot)->dead )
3600 ptr = keyptr(bt->cursor, slot);
3603 if( slotptr(bt->cursor, slot)->type == Duplicate )
3606 fwrite (ptr->key, len, 1, stdout);
3607 val = valptr(bt->cursor, slot);
3608 fwrite (val->value, val->len, 1, stdout);
3609 fputc ('\n', stdout);
3613 fprintf(stderr, " Total keys read %d\n", cnt);
3618 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3620 fprintf(stderr, "started counting\n");
3621 next = REDO_page + bt->mgr->pagezero->redopages;
3623 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3624 if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3627 if( !bt->cursor->free && !bt->cursor->lvl )
3628 cnt += bt->cursor->act;
3634 cnt--; // remove stopper key
3635 fprintf(stderr, " Total keys counted %d\n", cnt);
3647 typedef struct timeval timer;
3649 int main (int argc, char **argv)
3651 int idx, cnt, len, slot, err;
3652 int segsize, bits = 16;
3672 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3673 fprintf (stderr, " where idx_file is the name of the cache btree file\n");
3674 fprintf (stderr, " where main_file is the name of the main btree file\n");
3675 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3676 fprintf (stderr, " page_bits is the page size in bits for the cache btree\n");
3677 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3678 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3679 fprintf (stderr, " recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3680 fprintf (stderr, " main_bits is the page size of the main btree in bits\n");
3681 fprintf (stderr, " main_pool is the number of main pages in the main buffer pool\n");
3682 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3686 start = getCpuTime(0);
3689 bits = atoi(argv[4]);
3692 poolsize = atoi(argv[5]);
3695 fprintf (stderr, "Warning: no mapped_pool\n");
3698 num = atoi(argv[6]);
3701 redopages = atoi(argv[7]);
3703 if( redopages + REDO_page > 65535 )
3704 fprintf (stderr, "Warning: Recovery buffer too large\n");
3707 mainbits = atoi(argv[8]);
3710 mainpool = atoi(argv[9]);
3714 threads = malloc (cnt * sizeof(pthread_t));
3716 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3718 args = malloc ((cnt + 1) * sizeof(ThreadArg));
3720 mgr = bt_mgr (argv[1], bits, poolsize, redopages);
3723 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3728 main = bt_mgr (argv[2], mainbits, mainpool, 0);
3731 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3740 for( idx = 0; idx < cnt; idx++ ) {
3741 args[idx].infile = argv[idx + 10];
3742 args[idx].type = argv[3];
3743 args[idx].main = main;
3744 args[idx].mgr = mgr;
3745 args[idx].num = num;
3746 args[idx].idx = idx;
3748 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3749 fprintf(stderr, "Error creating thread %d\n", err);
3751 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3755 args[0].infile = argv[idx + 10];
3756 args[0].type = argv[3];
3757 args[0].main = main;
3764 // wait for termination
3767 for( idx = 0; idx < cnt; idx++ )
3768 pthread_join (threads[idx], NULL);
3770 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3772 for( idx = 0; idx < cnt; idx++ )
3773 CloseHandle(threads[idx]);
3780 fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
3786 elapsed = getCpuTime(0) - start;
3787 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3788 elapsed = getCpuTime(1);
3789 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3790 elapsed = getCpuTime(2);
3791 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);