1 // btree version threadskv10b futex version
2 // with reworked bt_deletekey code,
3 // phase-fair re-entrant reader writer locks,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // redo log for failure recovery
10 // and LSM B-trees for write optimization
14 // author: karl malbrain, malbrain@cal.berkeley.edu
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
20 The orginal author is Karl Malbrain.
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
38 #include <linux/futex.h>
53 #define WIN32_LEAN_AND_MEAN
67 typedef unsigned long long uid;
68 typedef unsigned long long logseqno;
71 typedef unsigned long long off64_t;
72 typedef unsigned short ushort;
73 typedef unsigned int uint;
76 #define BT_ro 0x6f72 // ro
77 #define BT_rw 0x7772 // rw
79 #define BT_maxbits 26 // maximum page size in bits
80 #define BT_minbits 9 // minimum page size in bits
81 #define BT_minpage (1 << BT_minbits) // minimum page size
82 #define BT_maxpage (1 << BT_maxbits) // maximum page size
84 // BTree page number constants
85 #define ALLOC_page 0 // allocation page
86 #define ROOT_page 1 // root of the btree
87 #define LEAF_page 2 // first page of leaves
88 #define REDO_page 3 // first page of redo buffer
90 // Number of levels to create in a new BTree
95 There are six lock types for each node in four independent sets:
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
101 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
117 volatile ushort xlock[1]; // one writer has exclusive lock
118 volatile ushort wrt[1]; // count of other writers waiting
127 // definition for reader/writer reentrant lock implementation
133 volatile ushort tid[1];
134 volatile ushort readers[1];
138 volatile ushort waitwrite[1];
139 volatile ushort waitread[1];
140 volatile ushort phase[1]; // phase == 1 for reading after write
141 volatile ushort dup[1]; // reentrant counter
144 // write only reentrant lock
150 volatile ushort tid[1];
151 volatile ushort dup[1];
155 volatile uint waiters[1];
158 // mode & definition for lite latch implementation
161 QueRd = 1, // reader queue
162 QueWr = 2 // writer queue
165 // hash table entries
168 uint entry; // Latch table entry at head of chain
169 BtMutexLatch latch[1];
172 // latch manager table structure
175 uid page_no; // latch set page number
176 RWLock readwr[1]; // read/write page lock
177 RWLock access[1]; // Access Intent/Page delete
178 RWLock parent[1]; // Posting of fence key in parent
179 RWLock atomic[1]; // Atomic update in progress
180 RWLock link[1]; // left link being updated
181 uint split; // right split page atomic insert
182 uint next; // next entry in hash table chain
183 uint prev; // prev entry in hash table chain
184 ushort pin; // number of accessing threads
185 unsigned char dirty; // page in cache is dirty (atomic setable)
186 BtMutexLatch modify[1]; // modify entry lite latch
189 // Define the length of the page record numbers
193 // Page key slot definition.
195 // Keys are marked dead, but remain on the page until
196 // it cleanup is called. The fence key (highest key) for
197 // a leaf page is always present, even after cleanup.
201 // In addition to the Unique keys that occupy slots
202 // there are Librarian and Duplicate key
203 // slots occupying the key slot array.
205 // The Librarian slots are dead keys that
206 // serve as filler, available to add new Unique
207 // or Dup slots that are inserted into the B-tree.
209 // The Duplicate slots have had their key bytes extended
210 // by 6 bytes to contain a binary duplicate key uniqueifier.
220 uint off:BT_maxbits; // page offset for key start
221 uint type:3; // type of slot
222 uint dead:1; // set for deleted slot
225 // The key structure occupies space at the upper end of
226 // each page. It's a length byte followed by the key
230 unsigned char len; // this can be changed to a ushort or uint
231 unsigned char key[0];
234 // the value structure also occupies space at the upper
235 // end of the page. Each key is immediately followed by a value.
238 unsigned char len; // this can be changed to a ushort or uint
239 unsigned char value[0];
242 #define BT_maxkey 255 // maximum number of bytes in a key
243 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
245 // The first part of an index page.
246 // It is immediately followed
247 // by the BtSlot array of keys.
249 // note that this structure size
250 // must be a multiple of 8 bytes
251 // in order to place PageZero correctly.
253 typedef struct BtPage_ {
254 uint cnt; // count of keys in page
255 uint act; // count of active keys
256 uint min; // next key offset
257 uint garbage; // page garbage in bytes
258 unsigned char bits:7; // page size in bits
259 unsigned char free:1; // page is on free chain
260 unsigned char lvl:7; // level of page
261 unsigned char kill:1; // page is being deleted
262 unsigned char right[BtId]; // page number to right
263 unsigned char left[BtId]; // page number to left
264 unsigned char filler[2]; // padding to multiple of 8
265 logseqno lsn; // log sequence number applied
266 uid page_no; // this page number
269 // The loadpage interface object
272 BtPage page; // current page pointer
273 BtLatchSet *latch; // current page latch set
276 // structure for latch manager on ALLOC_page
279 struct BtPage_ alloc[1]; // next page_no in right ptr
280 unsigned char freechain[BtId]; // head of free page_nos chain
281 unsigned long long activepages; // number of active pages
282 uint redopages; // number of redo pages in file
285 // The object structure for Btree access
288 uint page_size; // page size
289 uint page_bits; // page size in bits
295 BtPageZero *pagezero; // mapped allocation page
296 BtHashEntry *hashtable; // the buffer pool hash table entries
297 BtLatchSet *latchsets; // mapped latch set from buffer pool
298 unsigned char *pagepool; // mapped to the buffer pool pages
299 unsigned char *redobuff; // mapped recovery buffer pointer
300 logseqno lsn, flushlsn; // current & first lsn flushed
301 BtMutexLatch redo[1]; // redo area lite latch
302 BtMutexLatch lock[1]; // allocation area lite latch
303 BtMutexLatch maps[1]; // mapping segments lite latch
304 ushort thread_no[1]; // next thread number
305 uint nlatchpage; // number of latch pages at BT_latch
306 uint latchtotal; // number of page latch entries
307 uint latchhash; // number of latch hash table slots
308 uint latchvictim; // next latch entry to examine
309 uint latchpromote; // next latch entry to promote
310 uint redolast; // last msync size of recovery buff
311 uint redoend; // eof/end element in recovery buff
312 int err; // last error
313 int line; // last error line no
314 int found; // number of keys found by delete
315 int reads, writes; // number of reads and writes
317 HANDLE halloc; // allocation handle
318 HANDLE hpool; // buffer pool handle
320 uint segments; // number of memory mapped segments
321 unsigned char *pages[64000];// memory mapped segments of b-tree
325 BtMgr *mgr; // buffer manager for entire process
326 BtMgr *main; // buffer manager for main btree
327 BtPage frame; // cached page frame for promote
328 BtPage cursor; // cached page frame for start/next
329 ushort thread_no; // thread number
330 unsigned char key[BT_keyarray]; // last found complete key
333 // atomic txn structures
336 logseqno reqlsn; // redo log seq no required
337 uint entry; // latch table entry number
338 uint slot:31; // page slot number
339 uint reuse:1; // reused previous page
342 // Catastrophic errors
356 #define CLOCK_bit 0x8000
358 // recovery manager entry types
361 BTRM_eof = 0, // rest of buffer is emtpy
362 BTRM_add, // add a unique key-value to btree
363 BTRM_dup, // add a duplicate key-value to btree
364 BTRM_del, // delete a key-value from btree
365 BTRM_upd, // update a key with a new value
366 BTRM_new, // allocate a new empty page
367 BTRM_old // reuse an old empty page
370 // recovery manager entry
371 // structure followed by BtKey & BtVal
374 logseqno reqlsn; // log sequence number required
375 logseqno lsn; // log sequence number for entry
376 uint len; // length of entry
377 unsigned char type; // type of entry
378 unsigned char lvl; // level of btree entry pertains to
383 extern void bt_close (BtDb *bt);
384 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
385 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
386 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
387 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
388 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
389 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
390 extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
392 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
394 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
395 extern uint bt_nextkey (BtDb *bt, uint slot);
396 extern uint bt_prevkey (BtDb *db, uint slot);
397 extern uint bt_lastkey (BtDb *db);
400 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
401 extern void bt_mgrclose (BtMgr *mgr);
402 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
403 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
405 // atomic transaction functions
406 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
407 BTERR bt_txnpromote (BtDb *bt);
409 // The page is allocated from low and hi ends.
410 // The key slots are allocated from the bottom,
411 // while the text and value of the key
412 // are allocated from the top. When the two
413 // areas meet, the page is split into two.
415 // A key consists of a length byte, two bytes of
416 // index number (0 - 65535), and up to 253 bytes
419 // Associated with each key is a value byte string
420 // containing any value desired.
422 // The b-tree root is always located at page 1.
423 // The first leaf page of level zero is always
424 // located on page 2.
426 // The b-tree pages are linked with next
427 // pointers to facilitate enumerators,
428 // and provide for concurrency.
430 // When to root page fills, it is split in two and
431 // the tree height is raised by a new root at page
432 // one with two keys.
434 // Deleted keys are marked with a dead bit until
435 // page cleanup. The fence key for a leaf node is
438 // To achieve maximum concurrency one page is locked at a time
439 // as the tree is traversed to find leaf key in question. The right
440 // page numbers are used in cases where the page is being split,
443 // Page 0 is dedicated to lock for new page extensions,
444 // and chains empty pages together for reuse. It also
445 // contains the latch manager hash table.
447 // The ParentModification lock on a node is obtained to serialize posting
448 // or changing the fence key for a node.
450 // Empty pages are chained together through the ALLOC page and reused.
452 // Access macros to address slot and key values from the page
453 // Page slots use 1 based indexing.
455 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
456 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
457 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
459 void bt_putid(unsigned char *dest, uid id)
464 dest[i] = (unsigned char)id, id >>= 8;
467 uid bt_getid(unsigned char *src)
472 for( i = 0; i < BtId; i++ )
473 id <<= 8, id |= *src++;
478 // lite weight spin lock Latch Manager
480 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
482 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
485 void bt_mutexlock(BtMutexLatch *latch)
487 BtMutexLatch prev[1];
491 *prev->value = __sync_fetch_and_or(latch->value, XCL);
493 if( !*prev->bits->xlock ) { // did we set XCL?
495 __sync_fetch_and_sub(latch->value, WRT);
500 *prev->bits->wrt += 1;
501 __sync_fetch_and_add(latch->value, WRT);
504 sys_futex (latch->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr);
509 // try to obtain write lock
511 // return 1 if obtained,
514 int bt_mutextry(BtMutexLatch *latch)
516 BtMutexLatch prev[1];
518 *prev->value = __sync_fetch_and_or(latch->value, XCL);
520 // take write access if exclusive bit was clear
522 return !*prev->bits->xlock;
527 void bt_releasemutex(BtMutexLatch *latch)
529 BtMutexLatch prev[1];
531 *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
533 if( *prev->bits->wrt )
534 sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
537 // reentrant reader/writer lock implementation
539 void WriteLock (RWLock *lock, ushort tid)
545 bt_mutexlock(lock->xcl);
548 // is this a re-entrant request?
550 if( *prev->bits->tid == tid )
553 // wait if write already taken, or there are readers
555 else if( *prev->bits->tid || *prev->bits->readers ) {
557 waited++, *lock->waitwrite += 1;
559 // otherwise, we can take the lock
563 *lock->waitwrite -= 1;
565 *lock->bits->tid = tid;
566 *lock->phase = 0; // set writing phase
569 bt_releasemutex(lock->xcl);
571 if( *lock->bits->tid == tid )
574 sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr );
578 void WriteRelease (RWLock *lock)
580 bt_mutexlock(lock->xcl);
582 // were we reentrant?
586 bt_releasemutex(lock->xcl);
590 // release write lock and
591 // set reading after write phase
593 *lock->bits->tid = 0;
595 // were readers waiting for a write cycle?
597 if( *lock->waitread ) {
599 sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 32768, NULL, NULL, QueRd );
601 // otherwise were writers waiting
603 } else if( *lock->waitwrite ) {
605 sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
608 bt_releasemutex(lock->xcl);
611 void ReadLock (RWLock *lock, ushort tid)
613 uint xit, waited = 0;
617 bt_mutexlock(lock->xcl);
621 // wait if a write lock is currenty active
622 // or we are not in a new read cycle and
623 // writers are waiting.
625 if( *prev->bits->tid || !*prev->phase && *prev->waitwrite ) {
627 waited++, *lock->waitread += 1;
629 // else we can take the lock
633 *lock->waitread -= 1;
635 *lock->bits->readers += 1;
639 bt_releasemutex(lock->xcl);
641 // did we increment readers?
646 sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueRd );
650 void ReadRelease (RWLock *lock)
654 bt_mutexlock(lock->xcl);
657 *prev->bits->readers = *lock->bits->readers -= 1;
659 if( !*lock->waitread && *lock->waitwrite )
660 *prev->phase = *lock->phase = 0; // stop accepting new readers
662 bt_releasemutex(lock->xcl);
664 // were writers waiting for a read cycle to finish?
666 if( !*prev->phase && !*prev->bits->readers )
667 if( *prev->waitwrite )
668 sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
671 // recovery manager -- flush dirty pages
673 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
675 uint cnt3 = 0, cnt2 = 0, cnt = 0;
680 // flush dirty pool pages to the btree
682 fprintf(stderr, "Start flushlsn ");
683 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
684 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
685 latch = mgr->latchsets + entry;
686 bt_mutexlock (latch->modify);
687 bt_lockpage(BtLockRead, latch, thread_no);
690 bt_writepage(mgr, page, latch->page_no);
691 latch->dirty = 0, cnt++;
693 if( latch->pin & ~CLOCK_bit )
695 bt_unlockpage(BtLockRead, latch);
696 bt_releasemutex (latch->modify);
698 fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
699 fprintf(stderr, "begin sync");
700 for( segment = 0; segment < mgr->segments; segment++ )
701 if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
702 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
703 fprintf(stderr, " end sync\n");
706 // recovery manager -- process current recovery buff on startup
707 // this won't do much if previous session was properly closed.
709 BTERR bt_recoveryredo (BtMgr *mgr)
716 hdr = (BtLogHdr *)mgr->redobuff;
717 mgr->flushlsn = hdr->lsn;
720 hdr = (BtLogHdr *)(mgr->redobuff + offset);
721 switch( hdr->type ) {
725 case BTRM_add: // add a unique key-value to btree
727 case BTRM_dup: // add a duplicate key-value to btree
728 case BTRM_del: // delete a key-value from btree
729 case BTRM_upd: // update a key with a new value
730 case BTRM_new: // allocate a new empty page
731 case BTRM_old: // reuse an old empty page
737 // recovery manager -- append new entry to recovery log
738 // flush dirty pages to disk when it overflows.
740 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
742 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
743 uint amt = sizeof(BtLogHdr);
747 bt_mutexlock (mgr->redo);
750 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
752 // see if new entry fits in buffer
753 // flush and reset if it doesn't
755 if( amt > size - mgr->redoend ) {
756 mgr->flushlsn = mgr->lsn;
757 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
758 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
761 bt_flushlsn(mgr, thread_no);
764 // fill in new entry & either eof or end block
766 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
771 hdr->lsn = ++mgr->lsn;
775 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
776 memset (eof, 0, sizeof(BtLogHdr));
778 // fill in key and value
781 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
782 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
785 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
786 memset (eof, 0, sizeof(BtLogHdr));
789 last = mgr->redolast & ~0xfff;
792 if( end - last + sizeof(BtLogHdr) >= 32768 )
793 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
794 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
798 bt_releasemutex(mgr->redo);
802 // recovery manager -- append transaction to recovery log
803 // flush dirty pages to disk when it overflows.
805 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
807 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
808 uint amt = 0, src, type;
815 // determine amount of redo recovery log space required
817 for( src = 0; src++ < source->cnt; ) {
818 key = keyptr(source,src);
819 val = valptr(source,src);
820 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
821 amt += sizeof(BtLogHdr);
824 bt_mutexlock (mgr->redo);
826 // see if new entry fits in buffer
827 // flush and reset if it doesn't
829 if( amt > size - mgr->redoend ) {
830 mgr->flushlsn = mgr->lsn;
831 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
832 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
835 bt_flushlsn (mgr, thread_no);
838 // assign new lsn to transaction
842 // fill in new entries
844 for( src = 0; src++ < source->cnt; ) {
845 key = keyptr(source, src);
846 val = valptr(source, src);
848 switch( slotptr(source, src)->type ) {
860 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
861 amt += sizeof(BtLogHdr);
863 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
869 // fill in key and value
871 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
872 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
877 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
878 memset (eof, 0, sizeof(BtLogHdr));
881 last = mgr->redolast & ~0xfff;
884 if( end - last + sizeof(BtLogHdr) >= 32768 )
885 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
886 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
890 bt_releasemutex(mgr->redo);
894 // sync a single btree page to disk
896 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
898 uint segment = latch->page_no >> 16;
901 if( bt_writepage (mgr, page, latch->page_no) )
904 perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
906 if( msync (perm, mgr->page_size, MS_SYNC) < 0 )
907 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
913 // read page into buffer pool from permanent location in Btree file
915 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
917 int flag = PROT_READ | PROT_WRITE;
918 uint segment = page_no >> 16;
922 if( segment < mgr->segments ) {
923 perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
925 memcpy (page, perm, mgr->page_size);
930 bt_mutexlock (mgr->maps);
932 if( segment < mgr->segments ) {
933 bt_releasemutex (mgr->maps);
937 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
940 bt_releasemutex (mgr->maps);
944 // write page to permanent location in Btree file
945 // clear the dirty bit
947 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
949 int flag = PROT_READ | PROT_WRITE;
950 uint segment = page_no >> 16;
954 if( segment < mgr->segments ) {
955 perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
957 memcpy (perm, page, mgr->page_size);
962 bt_mutexlock (mgr->maps);
964 if( segment < mgr->segments ) {
965 bt_releasemutex (mgr->maps);
969 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
970 bt_releasemutex (mgr->maps);
975 // set CLOCK bit in latch
976 // decrement pin count
978 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
980 bt_mutexlock(latch->modify);
981 latch->pin |= CLOCK_bit;
984 bt_releasemutex(latch->modify);
987 // return the btree cached page address
989 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
991 uid entry = latch - mgr->latchsets;
992 BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
997 // return next available latch entry
998 // and with latch entry locked
1000 uint bt_availnext (BtMgr *mgr)
1007 entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
1009 entry = _InterlockedIncrement (&mgr->latchvictim);
1011 entry %= mgr->latchtotal;
1016 latch = mgr->latchsets + entry;
1018 if( !bt_mutextry(latch->modify) )
1021 // return this entry if it is not pinned
1026 // if the CLOCK bit is set
1027 // reset it to zero.
1029 latch->pin &= ~CLOCK_bit;
1030 bt_releasemutex(latch->modify);
1034 // pin page in buffer pool
1035 // return with latchset pinned
1037 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
1039 uint hashidx = page_no % mgr->latchhash;
1044 // try to find our entry
1046 bt_mutexlock(mgr->hashtable[hashidx].latch);
1048 if( entry = mgr->hashtable[hashidx].entry ) do
1050 latch = mgr->latchsets + entry;
1051 if( page_no == latch->page_no )
1053 } while( entry = latch->next );
1055 // found our entry: increment pin
1058 latch = mgr->latchsets + entry;
1059 bt_mutexlock(latch->modify);
1060 latch->pin |= CLOCK_bit;
1063 bt_releasemutex(latch->modify);
1064 bt_releasemutex(mgr->hashtable[hashidx].latch);
1068 // find and reuse unpinned entry
1072 entry = bt_availnext (mgr);
1073 latch = mgr->latchsets + entry;
1075 idx = latch->page_no % mgr->latchhash;
1077 // if latch is on a different hash chain
1078 // unlink from the old page_no chain
1080 if( latch->page_no )
1081 if( idx != hashidx ) {
1083 // skip over this entry if latch not available
1085 if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1086 bt_releasemutex(latch->modify);
1091 mgr->latchsets[latch->prev].next = latch->next;
1093 mgr->hashtable[idx].entry = latch->next;
1096 mgr->latchsets[latch->next].prev = latch->prev;
1098 bt_releasemutex (mgr->hashtable[idx].latch);
1101 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1103 // update permanent page area in btree from buffer pool
1104 // no read-lock is required since page is not pinned.
1107 if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
1108 return mgr->line = __LINE__, NULL;
1113 memcpy (page, contents, mgr->page_size);
1115 } else if( bt_readpage (mgr, page, page_no) )
1116 return mgr->line = __LINE__, NULL;
1118 // link page as head of hash table chain
1119 // if this is a never before used entry,
1120 // or it was previously on a different
1121 // hash table chain. Otherwise, just
1122 // leave it in its current hash table
1125 if( !latch->page_no || hashidx != idx ) {
1126 if( latch->next = mgr->hashtable[hashidx].entry )
1127 mgr->latchsets[latch->next].prev = entry;
1129 mgr->hashtable[hashidx].entry = entry;
1133 // fill in latch structure
1135 latch->pin = CLOCK_bit | 1;
1136 latch->page_no = page_no;
1139 bt_releasemutex (latch->modify);
1140 bt_releasemutex (mgr->hashtable[hashidx].latch);
1144 void bt_mgrclose (BtMgr *mgr)
1152 // flush previously written dirty pages
1153 // and write recovery buffer to disk
1155 fdatasync (mgr->idx);
1157 if( mgr->redoend ) {
1158 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1159 memset (eof, 0, sizeof(BtLogHdr));
1162 // write remaining dirty pool pages to the btree
1164 for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1165 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1166 latch = mgr->latchsets + slot;
1168 if( latch->dirty ) {
1169 bt_writepage(mgr, page, latch->page_no);
1170 latch->dirty = 0, num++;
1174 // clear redo recovery buffer on disk.
1176 if( mgr->pagezero->redopages ) {
1177 eof = (BtLogHdr *)mgr->redobuff;
1178 memset (eof, 0, sizeof(BtLogHdr));
1179 eof->lsn = mgr->lsn;
1180 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1181 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1184 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1187 while( mgr->segments )
1188 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1190 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1191 munmap (mgr->pagezero, mgr->page_size);
1193 FlushViewOfFile(mgr->pagezero, 0);
1194 UnmapViewOfFile(mgr->pagezero);
1195 UnmapViewOfFile(mgr->pagepool);
1196 CloseHandle(mgr->halloc);
1197 CloseHandle(mgr->hpool);
1203 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1204 FlushFileBuffers(mgr->idx);
1205 CloseHandle(mgr->idx);
1210 // close and release memory
1212 void bt_close (BtDb *bt)
1221 VirtualFree (bt->frame, 0, MEM_RELEASE);
1223 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1228 // open/create new btree buffer manager
1230 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1231 // size of page pool (e.g. 262144)
1233 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1235 uint lvl, attr, last, slot, idx;
1236 uint nlatchpage, latchhash;
1237 unsigned char value[BtId];
1238 int flag, initit = 0;
1239 BtPageZero *pagezero;
1247 // determine sanity of page size and buffer pool
1249 if( bits > BT_maxbits )
1251 else if( bits < BT_minbits )
1254 mgr = calloc (1, sizeof(BtMgr));
1256 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1258 if( mgr->idx == -1 ) {
1259 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1260 return free(mgr), NULL;
1263 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1264 attr = FILE_ATTRIBUTE_NORMAL;
1265 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1267 if( mgr->idx == INVALID_HANDLE_VALUE ) {
1268 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1269 return GlobalFree(mgr), NULL;
1274 pagezero = valloc (BT_maxpage);
1277 // read minimum page size to get root info
1278 // to support raw disk partition files
1279 // check if bits == 0 on the disk.
1281 if( size = lseek (mgr->idx, 0L, 2) )
1282 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1283 if( pagezero->alloc->bits )
1284 bits = pagezero->alloc->bits;
1288 return free(mgr), free(pagezero), NULL;
1292 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1293 size = GetFileSize(mgr->idx, amt);
1295 if( size || *amt ) {
1296 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1297 return bt_mgrclose (mgr), NULL;
1298 bits = pagezero->alloc->bits;
1303 mgr->page_size = 1 << bits;
1304 mgr->page_bits = bits;
1306 // calculate number of latch hash table entries
1308 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1310 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1311 mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1312 mgr->latchtotal = nodemax;
1317 // initialize an empty b-tree with latch page, root page, page of leaves
1318 // and page(s) of latches and page pool cache
1320 memset (pagezero, 0, 1 << bits);
1321 pagezero->alloc->lvl = MIN_lvl - 1;
1322 pagezero->alloc->bits = mgr->page_bits;
1323 pagezero->redopages = redopages;
1325 bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1);
1326 pagezero->activepages = 2;
1328 // initialize left-most LEAF page in
1329 // alloc->left and count of active leaf pages.
1331 bt_putid (pagezero->alloc->left, LEAF_page);
1332 ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits);
1334 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1335 fprintf (stderr, "Unable to create btree page zero\n");
1336 return bt_mgrclose (mgr), NULL;
1339 memset (pagezero, 0, 1 << bits);
1340 pagezero->alloc->bits = mgr->page_bits;
1342 for( lvl=MIN_lvl; lvl--; ) {
1343 BtSlot *node = slotptr(pagezero->alloc, 1);
1344 node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1345 key = keyptr(pagezero->alloc, 1);
1346 key->len = 2; // create stopper key
1350 bt_putid(value, MIN_lvl - lvl + 1);
1351 val = valptr(pagezero->alloc, 1);
1352 val->len = lvl ? BtId : 0;
1353 memcpy (val->value, value, val->len);
1355 pagezero->alloc->min = node->off;
1356 pagezero->alloc->lvl = lvl;
1357 pagezero->alloc->cnt = 1;
1358 pagezero->alloc->act = 1;
1359 pagezero->alloc->page_no = MIN_lvl - lvl;
1361 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1362 fprintf (stderr, "Unable to create btree page\n");
1363 return bt_mgrclose (mgr), NULL;
1371 VirtualFree (pagezero, 0, MEM_RELEASE);
1374 // mlock the first segment of 64K pages
1376 flag = PROT_READ | PROT_WRITE;
1377 mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1380 if( mgr->pages[0] == MAP_FAILED ) {
1381 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1382 return bt_mgrclose (mgr), NULL;
1385 mgr->pagezero = (BtPageZero *)mgr->pages[0];
1386 mlock (mgr->pagezero, mgr->page_size);
1388 mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size;
1389 mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1391 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1392 if( mgr->pagepool == MAP_FAILED ) {
1393 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1394 return bt_mgrclose (mgr), NULL;
1397 flag = PAGE_READWRITE;
1398 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1399 if( !mgr->halloc ) {
1400 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1401 return bt_mgrclose (mgr), NULL;
1404 flag = FILE_MAP_WRITE;
1405 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1406 if( !mgr->pagezero ) {
1407 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1408 return bt_mgrclose (mgr), NULL;
1411 flag = PAGE_READWRITE;
1412 size = (uid)mgr->nlatchpage << mgr->page_bits;
1413 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1415 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1416 return bt_mgrclose (mgr), NULL;
1419 flag = FILE_MAP_WRITE;
1420 mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1421 if( !mgr->pagepool ) {
1422 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1423 return bt_mgrclose (mgr), NULL;
1427 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1428 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1429 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1434 // open BTree access method
1435 // based on buffer manager
1437 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1439 BtDb *bt = malloc (sizeof(*bt));
1441 memset (bt, 0, sizeof(*bt));
1445 bt->cursor = valloc (mgr->page_size);
1446 bt->frame = valloc (mgr->page_size);
1448 bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1449 bt->frame = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1452 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1454 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1459 // compare two keys, return > 0, = 0, or < 0
1460 // =0: keys are same
1463 // as the comparison value
1465 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1467 uint len1 = key1->len;
1470 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1481 // place write, read, or parent lock on requested page_no.
1483 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1487 ReadLock (latch->readwr, thread_no);
1490 WriteLock (latch->readwr, thread_no);
1493 ReadLock (latch->access, thread_no);
1496 WriteLock (latch->access, thread_no);
1499 WriteLock (latch->parent, thread_no);
1502 WriteLock (latch->atomic, thread_no);
1504 case BtLockAtomic | BtLockRead:
1505 WriteLock (latch->atomic, thread_no);
1506 ReadLock (latch->readwr, thread_no);
1508 case BtLockAtomic | BtLockWrite:
1509 WriteLock (latch->atomic, thread_no);
1510 WriteLock (latch->readwr, thread_no);
1513 WriteLock (latch->link, thread_no);
1518 // remove write, read, or parent lock on requested page
1520 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1524 ReadRelease (latch->readwr);
1527 WriteRelease (latch->readwr);
1530 ReadRelease (latch->access);
1533 WriteRelease (latch->access);
1536 WriteRelease (latch->parent);
1539 WriteRelease (latch->atomic);
1541 case BtLockAtomic | BtLockRead:
1542 WriteRelease (latch->atomic);
1543 ReadRelease (latch->readwr);
1545 case BtLockAtomic | BtLockWrite:
1546 WriteRelease (latch->atomic);
1547 WriteRelease (latch->readwr);
1550 WriteRelease (latch->link);
1555 // allocate a new page
1556 // return with page latched, but unlocked.
1558 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1563 // lock allocation page
1565 bt_mutexlock(mgr->lock);
1567 // use empty chain first
1568 // else allocate new page
1570 if( page_no = bt_getid(mgr->pagezero->freechain) ) {
1571 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1572 set->page = bt_mappage (mgr, set->latch);
1574 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1576 mgr->pagezero->activepages++;
1577 bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
1579 // the page is currently free and this
1580 // will keep bt_txnpromote out.
1582 // contents will replace this bit
1583 // and pin will keep bt_txnpromote out
1585 contents->page_no = page_no;
1586 set->latch->dirty = 1;
1588 memcpy (set->page, contents, mgr->page_size);
1590 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1591 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1593 bt_releasemutex(mgr->lock);
1597 page_no = bt_getid(mgr->pagezero->alloc->right);
1598 bt_putid(mgr->pagezero->alloc->right, page_no+1);
1600 // unlock allocation latch and
1601 // extend file into new page.
1603 mgr->pagezero->activepages++;
1604 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1605 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1606 bt_releasemutex(mgr->lock);
1608 // keep bt_txnpromote out of this page
1611 contents->page_no = page_no;
1612 pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
1614 // don't load cache from btree page, load it from contents
1616 if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1617 set->page = bt_mappage (mgr, set->latch);
1621 // now pin will keep bt_txnpromote out
1623 set->page->free = 0;
1627 // find slot in page for given key at a given level
1629 int bt_findslot (BtPage page, unsigned char *key, uint len)
1631 uint diff, higher = page->cnt, low = 1, slot;
1634 // make stopper key an infinite fence value
1636 if( bt_getid (page->right) )
1641 // low is the lowest candidate.
1642 // loop ends when they meet
1644 // higher is already
1645 // tested as .ge. the passed key.
1647 while( diff = higher - low ) {
1648 slot = low + ( diff >> 1 );
1649 if( keycmp (keyptr(page, slot), key, len) < 0 )
1652 higher = slot, good++;
1655 // return zero if key is on right link page
1657 return good ? higher : 0;
1660 // find and load page at given level for given key
1661 // leave page rd or wr locked as requested
1663 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1665 uid page_no = ROOT_page, prevpage_no = 0;
1666 uint drill = 0xff, slot;
1667 BtLatchSet *prevlatch;
1668 uint mode, prevmode;
1672 // start at root of btree and drill down
1675 // determine lock mode of drill level
1676 mode = (drill == lvl) ? lock : BtLockRead;
1678 if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1681 // obtain access lock using lock chaining with Access mode
1683 if( page_no > ROOT_page )
1684 bt_lockpage(BtLockAccess, set->latch, thread_no);
1686 set->page = bt_mappage (mgr, set->latch);
1688 // release & unpin parent or left sibling page
1691 bt_unlockpage(prevmode, prevlatch);
1692 bt_unpinlatch (mgr, prevlatch);
1696 // obtain mode lock using lock chaining through AccessLock
1698 bt_lockpage(mode, set->latch, thread_no);
1700 if( set->page->free )
1701 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1703 if( page_no > ROOT_page )
1704 bt_unlockpage(BtLockAccess, set->latch);
1706 // re-read and re-lock root after determining actual level of root
1708 if( set->page->lvl != drill) {
1709 if( set->latch->page_no != ROOT_page )
1710 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1712 drill = set->page->lvl;
1714 if( lock != BtLockRead && drill == lvl ) {
1715 bt_unlockpage(mode, set->latch);
1716 bt_unpinlatch (mgr, set->latch);
1721 prevpage_no = set->latch->page_no;
1722 prevlatch = set->latch;
1723 prevpage = set->page;
1726 // find key on page at this level
1727 // and descend to requested level
1729 if( !set->page->kill )
1730 if( slot = bt_findslot (set->page, key, len) ) {
1734 // find next non-dead slot -- the fence key if nothing else
1736 while( slotptr(set->page, slot)->dead )
1737 if( slot++ < set->page->cnt )
1740 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1742 val = valptr(set->page, slot);
1744 if( val->len == BtId )
1745 page_no = bt_getid(valptr(set->page, slot)->value);
1747 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1753 // slide right into next page
1755 page_no = bt_getid(set->page->right);
1758 // return error on end of right chain
1760 mgr->line = __LINE__, mgr->err = BTERR_struct;
1761 return 0; // return error
1764 // return page to free list
1765 // page must be delete & write locked
1766 // and have no keys pointing to it.
1768 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1770 // lock allocation page
1772 bt_mutexlock (mgr->lock);
1776 memcpy(set->page->right, mgr->pagezero->freechain, BtId);
1777 bt_putid(mgr->pagezero->freechain, set->latch->page_no);
1778 set->latch->dirty = 1;
1779 set->page->free = 1;
1781 // decrement active page count
1783 mgr->pagezero->activepages--;
1785 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1786 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1788 // unlock released page
1789 // and unlock allocation page
1791 bt_unlockpage (BtLockDelete, set->latch);
1792 bt_unlockpage (BtLockWrite, set->latch);
1793 bt_unpinlatch (mgr, set->latch);
1794 bt_releasemutex (mgr->lock);
1797 // a fence key was deleted from a page
1798 // push new fence value upwards
1800 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1802 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1803 unsigned char value[BtId];
1807 // remove the old fence value
1809 ptr = keyptr(set->page, set->page->cnt);
1810 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1811 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1812 set->latch->dirty = 1;
1814 // cache new fence value
1816 ptr = keyptr(set->page, set->page->cnt);
1817 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1819 bt_lockpage (BtLockParent, set->latch, thread_no);
1820 bt_unlockpage (BtLockWrite, set->latch);
1822 // insert new (now smaller) fence key
1824 bt_putid (value, set->latch->page_no);
1825 ptr = (BtKey*)leftkey;
1827 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1830 // now delete old fence key
1832 ptr = (BtKey*)rightkey;
1834 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1837 bt_unlockpage (BtLockParent, set->latch);
1838 bt_unpinlatch(mgr, set->latch);
1842 // root has a single child
1843 // collapse a level from the tree
1845 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1852 // find the child entry and promote as new root contents
1855 for( idx = 0; idx++ < root->page->cnt; )
1856 if( !slotptr(root->page, idx)->dead )
1859 val = valptr(root->page, idx);
1861 if( val->len == BtId )
1862 page_no = bt_getid (valptr(root->page, idx)->value);
1864 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1866 if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1867 child->page = bt_mappage (mgr, child->latch);
1871 bt_lockpage (BtLockDelete, child->latch, thread_no);
1872 bt_lockpage (BtLockWrite, child->latch, thread_no);
1874 memcpy (root->page, child->page, mgr->page_size);
1875 root->latch->dirty = 1;
1877 bt_freepage (mgr, child);
1879 } while( root->page->lvl > 1 && root->page->act == 1 );
1881 bt_unlockpage (BtLockWrite, root->latch);
1882 bt_unpinlatch (mgr, root->latch);
1886 // delete a page and manage keys
1887 // call with page writelocked
1889 // returns the right page pool entry for freeing
1890 // or zero on error.
1892 uint bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, BtLock mode)
1894 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1895 unsigned char value[BtId];
1896 uint lvl = set->page->lvl;
1901 // cache copy of fence key
1902 // to remove in parent
1904 ptr = keyptr(set->page, set->page->cnt);
1905 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1907 // obtain lock on right page
1909 page_no = bt_getid(set->page->right);
1911 if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1912 right->page = bt_mappage (mgr, right->latch);
1916 bt_lockpage (mode, right->latch, thread_no);
1918 // cache copy of key to update
1920 ptr = keyptr(right->page, right->page->cnt);
1921 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1923 if( right->page->kill )
1924 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1926 // pull contents of right peer into our empty page
1928 bt_lockpage (BtLockLink, set->latch, thread_no);
1929 memcpy (right->page->left, set->page->left, BtId);
1930 memcpy (set->page, right->page, mgr->page_size);
1931 set->page->page_no = set->latch->page_no;
1932 set->latch->dirty = 1;
1933 bt_unlockpage (BtLockLink, set->latch);
1935 // mark right page deleted and point it to left page
1936 // until we can post parent updates that remove access
1937 // to the deleted page.
1939 bt_putid (right->page->right, set->latch->page_no);
1940 right->latch->dirty = 1;
1941 right->page->kill = 1;
1943 bt_lockpage (BtLockParent, right->latch, thread_no);
1944 bt_unlockpage (mode, right->latch);
1946 bt_lockpage (BtLockParent, set->latch, thread_no);
1947 bt_unlockpage (BtLockWrite, set->latch);
1949 // redirect higher key directly to our new node contents
1951 bt_putid (value, set->latch->page_no);
1952 ptr = (BtKey*)higherfence;
1954 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1957 // delete old lower key to our node
1959 ptr = (BtKey*)lowerfence;
1961 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1964 bt_unlockpage (BtLockParent, set->latch);
1965 return right->latch - mgr->latchsets;
1968 // find and delete key on page by marking delete flag bit
1969 // if page becomes empty, delete it from the btree
1971 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
1973 uint slot, idx, found, fence, entry;
1974 BtPageSet set[1], right[1];
1979 if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
1980 node = slotptr(set->page, slot);
1981 ptr = keyptr(set->page, slot);
1985 // if librarian slot, advance to real slot
1987 if( node->type == Librarian ) {
1988 ptr = keyptr(set->page, ++slot);
1989 node = slotptr(set->page, slot);
1992 fence = slot == set->page->cnt;
1994 // delete the key, ignore request if already dead
1996 if( found = !keycmp (ptr, key, len) )
1997 if( found = node->dead == 0 ) {
1998 val = valptr(set->page,slot);
1999 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2002 // mark node type as delete
2004 node->type = Delete;
2007 // collapse empty slots beneath the fence
2008 // on interiour nodes
2011 while( idx = set->page->cnt - 1 )
2012 if( slotptr(set->page, idx)->dead ) {
2013 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2014 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2022 // did we delete a fence key in an upper level?
2024 if( lvl && set->page->act && fence )
2025 if( bt_fixfence (mgr, set, lvl, thread_no) )
2030 // do we need to collapse root?
2032 if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2033 if( bt_collapseroot (mgr, set, thread_no) )
2038 // delete empty page
2040 if( !set->page->act ) {
2041 if( entry = bt_deletepage (mgr, set, thread_no, BtLockWrite) )
2042 right->latch = mgr->latchsets + entry;
2046 // obtain delete and write locks to right node
2048 bt_unlockpage (BtLockParent, right->latch);
2049 right->page = bt_mappage (mgr, right->latch);
2050 bt_lockpage (BtLockDelete, right->latch, thread_no);
2051 bt_lockpage (BtLockWrite, right->latch, thread_no);
2052 bt_freepage (mgr, right);
2054 bt_unpinlatch (mgr, set->latch);
2058 set->latch->dirty = 1;
2059 bt_unlockpage(BtLockWrite, set->latch);
2060 bt_unpinlatch (mgr, set->latch);
2064 // advance to next slot
2066 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2068 BtLatchSet *prevlatch;
2071 if( slot < set->page->cnt )
2074 prevlatch = set->latch;
2076 if( page_no = bt_getid(set->page->right) )
2077 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2078 set->page = bt_mappage (bt->mgr, set->latch);
2082 return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2084 // obtain access lock using lock chaining with Access mode
2086 bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2088 bt_unlockpage(BtLockRead, prevlatch);
2089 bt_unpinlatch (bt->mgr, prevlatch);
2091 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2092 bt_unlockpage(BtLockAccess, set->latch);
2096 // find unique key == given key, or first duplicate key in
2097 // leaf level and return number of value bytes
2098 // or (-1) if not found.
2100 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2108 if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2110 ptr = keyptr(set->page, slot);
2112 // skip librarian slot place holder
2114 if( slotptr(set->page, slot)->type == Librarian )
2115 ptr = keyptr(set->page, ++slot);
2117 // return actual key found
2119 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2122 if( slotptr(set->page, slot)->type == Duplicate )
2125 // not there if we reach the stopper key
2127 if( slot == set->page->cnt )
2128 if( !bt_getid (set->page->right) )
2131 // if key exists, return >= 0 value bytes copied
2132 // otherwise return (-1)
2134 if( slotptr(set->page, slot)->dead )
2138 if( !memcmp (ptr->key, key, len) ) {
2139 val = valptr (set->page,slot);
2140 if( valmax > val->len )
2142 memcpy (value, val->value, valmax);
2148 } while( slot = bt_findnext (bt, set, slot) );
2150 bt_unlockpage (BtLockRead, set->latch);
2151 bt_unpinlatch (bt->mgr, set->latch);
2155 // check page for space available,
2156 // clean if necessary and return
2157 // 0 - page needs splitting
2158 // >0 new slot value
2160 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2162 BtPage page = set->page, frame;
2163 uint nxt = mgr->page_size;
2164 uint cnt = 0, idx = 0;
2165 uint max = page->cnt;
2170 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2173 // skip cleanup and proceed to split
2174 // if there's not enough garbage
2177 if( page->garbage < nxt / 5 )
2180 frame = malloc (mgr->page_size);
2181 memcpy (frame, page, mgr->page_size);
2183 // skip page info and set rest of page to zero
2185 memset (page+1, 0, mgr->page_size - sizeof(*page));
2186 set->latch->dirty = 1;
2191 // clean up page first by
2192 // removing deleted keys
2194 while( cnt++ < max ) {
2198 if( cnt < max || frame->lvl )
2199 if( slotptr(frame,cnt)->dead )
2202 // copy the value across
2204 val = valptr(frame, cnt);
2205 nxt -= val->len + sizeof(BtVal);
2206 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2208 // copy the key across
2210 key = keyptr(frame, cnt);
2211 nxt -= key->len + sizeof(BtKey);
2212 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2214 // make a librarian slot
2216 slotptr(page, ++idx)->off = nxt;
2217 slotptr(page, idx)->type = Librarian;
2218 slotptr(page, idx)->dead = 1;
2222 slotptr(page, ++idx)->off = nxt;
2223 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2225 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2233 // see if page has enough space now, or does it need splitting?
2235 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2241 // split the root and raise the height of the btree
2243 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2245 unsigned char leftkey[BT_keyarray];
2246 uint nxt = mgr->page_size;
2247 unsigned char value[BtId];
2254 frame = malloc (mgr->page_size);
2255 memcpy (frame, root->page, mgr->page_size);
2257 // save left page fence key for new root
2259 ptr = keyptr(root->page, root->page->cnt);
2260 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2262 // Obtain an empty page to use, and copy the current
2263 // root contents into it, e.g. lower keys
2265 if( bt_newpage(mgr, left, frame, page_no) )
2268 left_page_no = left->latch->page_no;
2269 bt_unpinlatch (mgr, left->latch);
2272 // preserve the page info at the bottom
2273 // of higher keys and set rest to zero
2275 memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2277 // insert stopper key at top of newroot page
2278 // and increase the root height
2280 nxt -= BtId + sizeof(BtVal);
2281 bt_putid (value, right->page_no);
2282 val = (BtVal *)((unsigned char *)root->page + nxt);
2283 memcpy (val->value, value, BtId);
2286 nxt -= 2 + sizeof(BtKey);
2287 slotptr(root->page, 2)->off = nxt;
2288 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2293 // insert lower keys page fence key on newroot page as first key
2295 nxt -= BtId + sizeof(BtVal);
2296 bt_putid (value, left_page_no);
2297 val = (BtVal *)((unsigned char *)root->page + nxt);
2298 memcpy (val->value, value, BtId);
2301 ptr = (BtKey *)leftkey;
2302 nxt -= ptr->len + sizeof(BtKey);
2303 slotptr(root->page, 1)->off = nxt;
2304 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2306 bt_putid(root->page->right, 0);
2307 root->page->min = nxt; // reset lowest used offset and key count
2308 root->page->cnt = 2;
2309 root->page->act = 2;
2312 mgr->pagezero->alloc->lvl = root->page->lvl;
2314 // release and unpin root pages
2316 bt_unlockpage(BtLockWrite, root->latch);
2317 bt_unpinlatch (mgr, root->latch);
2319 bt_unpinlatch (mgr, right);
2323 // split already locked full node
2325 // return pool entry for new right
2326 // page, pinned & unlocked
2328 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2330 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2331 BtPage frame = malloc (mgr->page_size);
2332 uint lvl = set->page->lvl;
2339 // split higher half of keys to frame
2341 memset (frame, 0, mgr->page_size);
2342 max = set->page->cnt;
2346 while( cnt++ < max ) {
2347 if( cnt < max || set->page->lvl )
2348 if( slotptr(set->page, cnt)->dead )
2351 src = valptr(set->page, cnt);
2352 nxt -= src->len + sizeof(BtVal);
2353 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2355 key = keyptr(set->page, cnt);
2356 nxt -= key->len + sizeof(BtKey);
2357 ptr = (BtKey*)((unsigned char *)frame + nxt);
2358 memcpy (ptr, key, key->len + sizeof(BtKey));
2360 // add librarian slot
2362 slotptr(frame, ++idx)->off = nxt;
2363 slotptr(frame, idx)->type = Librarian;
2364 slotptr(frame, idx)->dead = 1;
2368 slotptr(frame, ++idx)->off = nxt;
2369 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2371 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2375 frame->bits = mgr->page_bits;
2382 if( set->latch->page_no > ROOT_page )
2383 bt_putid (frame->right, bt_getid (set->page->right));
2385 // get new free page and write higher keys to it.
2387 if( bt_newpage(mgr, right, frame, thread_no) )
2390 // process lower keys
2392 memcpy (frame, set->page, mgr->page_size);
2393 memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2394 set->latch->dirty = 1;
2396 nxt = mgr->page_size;
2397 set->page->garbage = 0;
2403 if( slotptr(frame, max)->type == Librarian )
2406 // assemble page of smaller keys
2408 while( cnt++ < max ) {
2409 if( slotptr(frame, cnt)->dead )
2411 val = valptr(frame, cnt);
2412 nxt -= val->len + sizeof(BtVal);
2413 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2415 key = keyptr(frame, cnt);
2416 nxt -= key->len + sizeof(BtKey);
2417 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2419 // add librarian slot
2421 slotptr(set->page, ++idx)->off = nxt;
2422 slotptr(set->page, idx)->type = Librarian;
2423 slotptr(set->page, idx)->dead = 1;
2427 slotptr(set->page, ++idx)->off = nxt;
2428 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2432 bt_putid(set->page->right, right->latch->page_no);
2433 set->page->min = nxt;
2434 set->page->cnt = idx;
2437 return right->latch - mgr->latchsets;
2440 // fix keys for newly split page
2441 // call with both pages pinned & locked
2442 // return unlocked and unpinned
2444 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2446 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2447 unsigned char value[BtId];
2448 uint lvl = set->page->lvl;
2452 // if current page is the root page, split it
2454 if( set->latch->page_no == ROOT_page )
2455 return bt_splitroot (mgr, set, right, thread_no);
2457 ptr = keyptr(set->page, set->page->cnt);
2458 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2460 page = bt_mappage (mgr, right);
2462 ptr = keyptr(page, page->cnt);
2463 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2465 // insert new fences in their parent pages
2467 bt_lockpage (BtLockParent, right, thread_no);
2469 bt_lockpage (BtLockParent, set->latch, thread_no);
2470 bt_unlockpage (BtLockWrite, set->latch);
2472 // insert new fence for reformulated left block of smaller keys
2474 bt_putid (value, set->latch->page_no);
2475 ptr = (BtKey *)leftkey;
2477 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2480 // switch fence for right block of larger keys to new right page
2482 bt_putid (value, right->page_no);
2483 ptr = (BtKey *)rightkey;
2485 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2488 bt_unlockpage (BtLockParent, set->latch);
2489 bt_unpinlatch (mgr, set->latch);
2491 bt_unlockpage (BtLockParent, right);
2492 bt_unpinlatch (mgr, right);
2496 // install new key and value onto page
2497 // page must already be checked for
2500 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2502 uint idx, librarian;
2508 // if found slot > desired slot and previous slot
2509 // is a librarian slot, use it
2512 if( slotptr(set->page, slot-1)->type == Librarian )
2515 // copy value onto page
2517 set->page->min -= vallen + sizeof(BtVal);
2518 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2519 memcpy (val->value, value, vallen);
2522 // copy key onto page
2524 set->page->min -= keylen + sizeof(BtKey);
2525 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2526 memcpy (ptr->key, key, keylen);
2529 // find first empty slot
2531 for( idx = slot; idx < set->page->cnt; idx++ )
2532 if( slotptr(set->page, idx)->dead )
2535 // now insert key into array before slot,
2536 // adding as many librarian slots as
2539 if( idx == set->page->cnt ) {
2540 int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2542 librarian = ++idx - slot;
2543 avail /= sizeof(BtSlot);
2548 if( librarian > avail )
2552 rate = (idx - slot) / librarian;
2553 set->page->cnt += librarian;
2558 librarian = 0, rate = 0;
2560 while( idx > slot ) {
2562 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2565 // add librarian slot per rate
2568 if( (idx - slot + 1)/2 <= librarian * rate ) {
2569 // if( rate && !(idx % rate) ) {
2570 node = slotptr(set->page, idx--);
2571 node->off = node[1].off;
2572 node->type = Librarian;
2578 set->latch->dirty = 1;
2583 node = slotptr(set->page, slot);
2584 node->off = set->page->min;
2589 bt_unlockpage (BtLockWrite, set->latch);
2590 bt_unpinlatch (mgr, set->latch);
2596 // Insert new key into the btree at given level.
2597 // either add a new key or update/add an existing one
2599 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2601 uint slot, idx, len, entry;
2607 while( 1 ) { // find the page and slot for the current key
2608 if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2609 node = slotptr(set->page, slot);
2610 ptr = keyptr(set->page, slot);
2613 mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2617 // if librarian slot == found slot, advance to real slot
2619 if( node->type == Librarian )
2620 if( !keycmp (ptr, key, keylen) ) {
2621 ptr = keyptr(set->page, ++slot);
2622 node = slotptr(set->page, slot);
2625 // if inserting a duplicate key or unique
2626 // key that doesn't exist on the page,
2627 // check for adequate space on the page
2628 // and insert the new key before slot.
2633 if( keycmp (ptr, key, keylen) )
2634 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2635 return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
2636 else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2638 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2643 // if key already exists, update value and return
2645 val = valptr(set->page, slot);
2647 if( val->len >= vallen ) {
2648 if( slotptr(set->page, slot)->dead )
2653 set->page->garbage += val->len - vallen;
2654 set->latch->dirty = 1;
2656 memcpy (val->value, value, vallen);
2657 bt_unlockpage(BtLockWrite, set->latch);
2658 bt_unpinlatch (mgr, set->latch);
2662 // new update value doesn't fit in existing value area
2663 // make sure page has room
2666 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2673 if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2674 if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2676 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2681 // copy key and value onto page and update slot
2683 set->page->min -= vallen + sizeof(BtVal);
2684 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2685 memcpy (val->value, value, vallen);
2688 set->latch->dirty = 1;
2689 set->page->min -= keylen + sizeof(BtKey);
2690 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2691 memcpy (ptr->key, key, keylen);
2694 node->off = set->page->min;
2695 bt_unlockpage(BtLockWrite, set->latch);
2696 bt_unpinlatch (mgr, set->latch);
2703 // determine actual page where key is located
2704 // return slot number
2706 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2708 BtKey *key = keyptr(source,src);
2709 uint slot = locks[src].slot;
2712 if( src > 1 && locks[src].reuse )
2713 entry = locks[src-1].entry, slot = 0;
2715 entry = locks[src].entry;
2718 set->latch = mgr->latchsets + entry;
2719 set->page = bt_mappage (mgr, set->latch);
2723 // is locks->reuse set? or was slot zeroed?
2724 // if so, find where our key is located
2725 // on current page or pages split on
2726 // same page txn operations.
2729 set->latch = mgr->latchsets + entry;
2730 set->page = bt_mappage (mgr, set->latch);
2732 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2733 if( slotptr(set->page, slot)->type == Librarian )
2735 if( locks[src].reuse )
2736 locks[src].entry = entry;
2739 } while( entry = set->latch->split );
2741 mgr->line = __LINE__, mgr->err = BTERR_atomic;
2745 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2747 BtKey *key = keyptr(source, src);
2748 BtVal *val = valptr(source, src);
2753 while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2754 if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2755 if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2757 set->page->lsn = lsn;
2763 if( entry = bt_splitpage (mgr, set, thread_no) )
2764 latch = mgr->latchsets + entry;
2768 // splice right page into split chain
2771 bt_lockpage(BtLockWrite, latch, thread_no);
2772 latch->split = set->latch->split;
2773 set->latch->split = entry;
2774 locks[src].slot = 0;
2777 return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2780 // perform delete from smaller btree
2781 // insert a delete slot if not found there
2783 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2785 BtKey *key = keyptr(source, src);
2792 if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2793 node = slotptr(set->page, slot);
2794 ptr = keyptr(set->page, slot);
2795 val = valptr(set->page, slot);
2797 return mgr->line = __LINE__, mgr->err = BTERR_struct;
2799 // if slot is not found, insert a delete slot
2801 if( keycmp (ptr, key->key, key->len) )
2802 return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
2804 // if node is already dead,
2805 // ignore the request.
2810 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2811 set->latch->dirty = 1;
2812 set->page->lsn = lsn;
2816 __sync_fetch_and_add(&mgr->found, 1);
2820 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2822 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2823 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2825 return keycmp (key1, key2->key, key2->len);
2827 // atomic modification of a batch of keys.
2829 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2831 uint src, idx, slot, samepage, entry, que = 0;
2832 BtKey *key, *ptr, *key2;
2838 // stable sort the list of keys into order to
2839 // prevent deadlocks between threads.
2841 for( src = 1; src++ < source->cnt; ) {
2842 *temp = *slotptr(source,src);
2843 key = keyptr (source,src);
2845 for( idx = src; --idx; ) {
2846 key2 = keyptr (source,idx);
2847 if( keycmp (key, key2->key, key2->len) < 0 ) {
2848 *slotptr(source,idx+1) = *slotptr(source,idx);
2849 *slotptr(source,idx) = *temp;
2855 qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2856 // add entries to redo log
2858 if( bt->mgr->pagezero->redopages )
2859 lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2863 // perform the individual actions in the transaction
2865 if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2866 return bt->mgr->err;
2868 // if number of active pages
2869 // is greater than the buffer pool
2870 // promote page into larger btree
2873 while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
2874 if( bt_txnpromote (bt) )
2875 return bt->mgr->err;
2882 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2884 uint src, idx, slot, samepage, entry, que = 0;
2885 BtPageSet set[1], prev[1], right[1];
2886 unsigned char value[BtId];
2894 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2896 // Load the leaf page for each key
2897 // group same page references with reuse bit
2898 // and determine any constraint violations
2900 for( src = 0; src++ < source->cnt; ) {
2901 key = keyptr(source, src);
2904 // first determine if this modification falls
2905 // on the same page as the previous modification
2906 // note that the far right leaf page is a special case
2908 if( samepage = src > 1 )
2909 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2910 slot = bt_findslot(set->page, key->key, key->len);
2913 if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockAtomic + BtLockWrite, thread_no) )
2914 set->latch->split = 0;
2918 if( slotptr(set->page, slot)->type == Librarian )
2922 locks[src].entry = set->latch - mgr->latchsets;
2923 locks[src].slot = slot;
2924 locks[src].reuse = 0;
2926 locks[src].entry = 0;
2927 locks[src].slot = 0;
2928 locks[src].reuse = 1;
2931 // capture current lsn for master page
2933 locks[src].reqlsn = set->page->lsn;
2936 // insert or delete each key
2937 // process any splits or merges
2938 // run through txn list backwards
2940 samepage = source->cnt + 1;
2942 for( src = source->cnt; src; src-- ) {
2943 if( locks[src].reuse )
2946 // perform the txn operations
2947 // from smaller to larger on
2950 for( idx = src; idx < samepage; idx++ )
2951 switch( slotptr(source,idx)->type ) {
2953 if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
2959 if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
2964 bt_atomicpage (mgr, source, locks, idx, set);
2968 // after the same page operations have finished,
2969 // process master page for splits or deletion.
2971 latch = prev->latch = mgr->latchsets + locks[src].entry;
2972 prev->page = bt_mappage (mgr, prev->latch);
2975 // pick-up all splits from master page
2976 // each one is already pinned & WriteLocked.
2978 if( entry = latch->split ) do {
2979 set->latch = mgr->latchsets + entry;
2980 set->page = bt_mappage (mgr, set->latch);
2982 // delete empty master page by undoing its split
2983 // (this is potentially another empty page)
2985 if( !prev->page->act ) {
2986 memcpy (set->page->left, prev->page->left, BtId);
2987 memcpy (prev->page, set->page, mgr->page_size);
2988 bt_lockpage (BtLockDelete, set->latch, thread_no);
2989 prev->latch->split = set->latch->split;
2990 prev->latch->dirty = 1;
2991 bt_freepage (mgr, set);
2995 // remove empty split page from the split chain
2996 // and return it to the free list. No other
2997 // thread has its page number yet.
2999 if( !set->page->act ) {
3000 memcpy (prev->page->right, set->page->right, BtId);
3001 prev->latch->split = set->latch->split;
3003 bt_lockpage (BtLockDelete, set->latch, thread_no);
3004 bt_freepage (mgr, set);
3008 // update prev's fence key
3010 ptr = keyptr(prev->page,prev->page->cnt);
3011 bt_putid (value, prev->latch->page_no);
3013 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3016 // splice in the left link into the split page
3018 bt_putid (set->page->left, prev->latch->page_no);
3021 bt_syncpage (mgr, prev->page, prev->latch);
3023 // page is unlocked & unpinned below to avoid bt_txnpromote
3026 } while( entry = prev->latch->split );
3028 // update left pointer in next right page from last split page
3029 // (if all splits were reversed or none occurred, latch->split == 0)
3031 if( latch->split ) {
3032 // fix left pointer in master's original (now split)
3033 // far right sibling or set rightmost page in page zero
3035 if( right_page_no = bt_getid (prev->page->right) ) {
3036 if( set->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
3037 set->page = bt_mappage (mgr, set->latch);
3041 bt_lockpage (BtLockLink, set->latch, thread_no);
3042 bt_putid (set->page->left, prev->latch->page_no);
3043 set->latch->dirty = 1;
3045 bt_unlockpage (BtLockLink, set->latch);
3046 bt_unpinlatch (mgr, set->latch);
3047 } else { // prev is rightmost page
3048 bt_mutexlock (mgr->lock);
3049 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3050 bt_releasemutex(mgr->lock);
3053 // Process last page split in chain
3054 // by switching the key from the master
3055 // page to the last split.
3057 ptr = keyptr(prev->page,prev->page->cnt);
3058 bt_putid (value, prev->latch->page_no);
3060 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3064 bt_syncpage (mgr, prev->page, prev->latch);
3066 // unlock and unpin master page
3068 bt_unlockpage(BtLockAtomic, latch);
3069 bt_unlockpage(BtLockWrite, latch);
3070 bt_unpinlatch(mgr, latch);
3072 // go through the list of splits and
3073 // release the locks and unpin
3075 while( entry = latch->split ) {
3076 latch = mgr->latchsets + entry;
3077 bt_unlockpage(BtLockWrite, latch);
3078 bt_unpinlatch(mgr, latch);
3084 // since there are no splits, we're
3085 // finished if master page occupied
3087 bt_unlockpage(BtLockAtomic, prev->latch);
3089 if( prev->page->act ) {
3090 bt_unlockpage(BtLockWrite, prev->latch);
3093 bt_syncpage (mgr, prev->page, prev->latch);
3095 bt_unpinlatch(mgr, prev->latch);
3099 // any and all splits were reversed, and the
3100 // master page located in prev is empty, delete it
3102 if( entry = bt_deletepage (mgr, prev, thread_no, BtLockWrite) )
3103 right->latch = mgr->latchsets + entry;
3107 // obtain delete and write locks to right node
3109 bt_unlockpage (BtLockParent, right->latch);
3110 right->page = bt_mappage (mgr, right->latch);
3111 bt_lockpage (BtLockDelete, right->latch, thread_no);
3112 bt_lockpage (BtLockWrite, right->latch, thread_no);
3113 bt_freepage (mgr, right);
3115 bt_unpinlatch (mgr, prev->latch);
3122 // promote a page into the larger btree
3124 BTERR bt_txnpromote (BtDb *bt)
3126 BtPageSet set[1], right[1];
3127 uint entry, slot, idx;
3134 entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
3136 entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
3138 entry %= bt->mgr->latchtotal;
3143 set->latch = bt->mgr->latchsets + entry;
3145 if( !bt_mutextry(set->latch->modify) )
3148 // skip this entry if it is pinned
3150 if( set->latch->pin & ~CLOCK_bit ) {
3151 bt_releasemutex(set->latch->modify);
3155 set->page = bt_mappage (bt->mgr, set->latch);
3157 // entry never used or has no right sibling
3159 if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3160 bt_releasemutex(set->latch->modify);
3164 // entry interiour node or being killed
3166 if( set->page->lvl || set->page->free || set->page->kill ) {
3167 bt_releasemutex(set->latch->modify);
3171 // pin the page for our useage
3174 bt_releasemutex(set->latch->modify);
3175 bt_lockpage (BtLockAtomic | BtLockWrite, set->latch, bt->thread_no);
3176 memcpy (bt->frame, set->page, bt->mgr->page_size);
3178 if( !(set->latch->page_no % 100) )
3179 fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act);
3181 if( entry = bt_deletepage (bt->mgr, set, bt->thread_no, BtLockAtomic | BtLockWrite) )
3182 right->latch = bt->mgr->latchsets + entry;
3184 return bt->mgr->err;
3186 // obtain delete and write locks to right node
3188 bt_unlockpage (BtLockParent, right->latch);
3189 right->page = bt_mappage (bt->mgr, right->latch);
3191 // release page with its new contents
3193 bt_unlockpage (BtLockAtomic, set->latch);
3194 bt_unpinlatch (bt->mgr, set->latch);
3196 // transfer slots in our selected page to larger btree
3198 if( bt_atomicexec (bt->main, bt->frame, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) )
3199 return bt->main->err;
3201 // free the page we took over
3203 bt_lockpage (BtLockDelete, right->latch, bt->thread_no);
3204 bt_lockpage (BtLockWrite, right->latch, bt->thread_no);
3205 bt_freepage (bt->mgr, right);
3210 // set cursor to highest slot on highest page
3212 uint bt_lastkey (BtDb *bt)
3214 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3217 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3218 set->page = bt_mappage (bt->mgr, set->latch);
3222 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3223 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3224 bt_unlockpage(BtLockRead, set->latch);
3225 bt_unpinlatch (bt->mgr, set->latch);
3226 return bt->cursor->cnt;
3229 // return previous slot on cursor page
3231 uint bt_prevkey (BtDb *bt, uint slot)
3233 uid cursor_page = bt->cursor->page_no;
3234 uid ourright, next, us = cursor_page;
3240 ourright = bt_getid(bt->cursor->right);
3243 if( !(next = bt_getid(bt->cursor->left)) )
3249 if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3250 set->page = bt_mappage (bt->mgr, set->latch);
3254 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3255 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3256 bt_unlockpage(BtLockRead, set->latch);
3257 bt_unpinlatch (bt->mgr, set->latch);
3259 next = bt_getid (bt->cursor->right);
3261 if( bt->cursor->kill )
3265 if( next == ourright )
3270 return bt->cursor->cnt;
3273 // return next slot on cursor page
3274 // or slide cursor right into next page
3276 uint bt_nextkey (BtDb *bt, uint slot)
3282 right = bt_getid(bt->cursor->right);
3284 while( slot++ < bt->cursor->cnt )
3285 if( slotptr(bt->cursor,slot)->dead )
3287 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3295 if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3296 set->page = bt_mappage (bt->mgr, set->latch);
3300 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3301 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3302 bt_unlockpage(BtLockRead, set->latch);
3303 bt_unpinlatch (bt->mgr, set->latch);
3308 return bt->mgr->err = 0;
3311 // cache page of keys into cursor and return starting slot for given key
3313 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3318 // cache page for retrieval
3320 if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3321 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3325 bt_unlockpage(BtLockRead, set->latch);
3326 bt_unpinlatch (bt->mgr, set->latch);
3333 double getCpuTime(int type)
3336 FILETIME xittime[1];
3337 FILETIME systime[1];
3338 FILETIME usrtime[1];
3339 SYSTEMTIME timeconv[1];
3342 memset (timeconv, 0, sizeof(SYSTEMTIME));
3346 GetSystemTimeAsFileTime (xittime);
3347 FileTimeToSystemTime (xittime, timeconv);
3348 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3351 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3352 FileTimeToSystemTime (usrtime, timeconv);
3355 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3356 FileTimeToSystemTime (systime, timeconv);
3360 ans += (double)timeconv->wHour * 3600;
3361 ans += (double)timeconv->wMinute * 60;
3362 ans += (double)timeconv->wSecond;
3363 ans += (double)timeconv->wMilliseconds / 1000;
3368 #include <sys/resource.h>
3370 double getCpuTime(int type)
3372 struct rusage used[1];
3373 struct timeval tv[1];
3377 gettimeofday(tv, NULL);
3378 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3381 getrusage(RUSAGE_SELF, used);
3382 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3385 getrusage(RUSAGE_SELF, used);
3386 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3393 void bt_poolaudit (BtMgr *mgr)
3398 while( ++entry < mgr->latchtotal ) {
3399 latch = mgr->latchsets + entry;
3401 if( *latch->readwr->value )
3402 fprintf(stderr, "latchset %d wrtlocked for page %d\n", entry, latch->page_no);
3404 if( *latch->access->value )
3405 fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3407 if( *latch->parent->value )
3408 fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3410 if( *latch->atomic->value )
3411 fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3413 if( *latch->modify->value )
3414 fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3416 if( latch->pin & ~CLOCK_bit )
3417 fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3430 // standalone program to index file of keys
3431 // then list them onto std-out
3434 void *index_file (void *arg)
3436 uint __stdcall index_file (void *arg)
3439 int line = 0, found = 0, cnt = 0, idx;
3440 uid next, page_no = LEAF_page; // start on first page of leaves
3441 int ch, len = 0, slot, type = 0;
3442 unsigned char key[BT_maxkey];
3443 unsigned char txn[65536];
3444 ThreadArg *args = arg;
3453 bt = bt_open (args->mgr, args->main);
3456 if( args->idx < strlen (args->type) )
3457 ch = args->type[args->idx];
3459 ch = args->type[strlen(args->type) - 1];
3471 if( type == Delete )
3472 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3474 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3476 if( type == Delete )
3477 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3479 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3481 if( in = fopen (args->infile, "rb") )
3482 while( ch = getc(in), ch != EOF )
3488 if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3489 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3495 memcpy (txn + nxt, key + 10, len - 10);
3497 txn[nxt] = len - 10;
3499 memcpy (txn + nxt, key, 10);
3502 slotptr(page,++cnt)->off = nxt;
3503 slotptr(page,cnt)->type = type;
3506 if( cnt < args->num )
3513 if( bt_atomictxn (bt, page) )
3514 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3519 else if( len < BT_maxkey )
3521 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3525 fprintf(stderr, "started indexing for %s\n", args->infile);
3526 if( in = fopen (args->infile, "r") )
3527 while( ch = getc(in), ch != EOF )
3532 if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3533 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3536 else if( len < BT_maxkey )
3538 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3542 fprintf(stderr, "started finding keys for %s\n", args->infile);
3543 if( in = fopen (args->infile, "rb") )
3544 while( ch = getc(in), ch != EOF )
3548 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3550 else if( bt->mgr->err )
3551 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3554 else if( len < BT_maxkey )
3556 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3560 fprintf(stderr, "started scanning\n");
3563 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3564 set->page = bt_mappage (bt->mgr, set->latch);
3566 fprintf(stderr, "unable to obtain latch"), exit(1);
3568 bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3569 next = bt_getid (set->page->right);
3571 for( slot = 0; slot++ < set->page->cnt; )
3572 if( next || slot < set->page->cnt )
3573 if( !slotptr(set->page, slot)->dead ) {
3574 ptr = keyptr(set->page, slot);
3577 if( slotptr(set->page, slot)->type == Duplicate )
3580 fwrite (ptr->key, len, 1, stdout);
3581 val = valptr(set->page, slot);
3582 fwrite (val->value, val->len, 1, stdout);
3583 fputc ('\n', stdout);
3587 bt_unlockpage (BtLockRead, set->latch);
3588 bt_unpinlatch (bt->mgr, set->latch);
3589 } while( page_no = next );
3591 fprintf(stderr, " Total keys read %d\n", cnt);
3595 fprintf(stderr, "started reverse scan\n");
3596 if( slot = bt_lastkey (bt) )
3597 while( slot = bt_prevkey (bt, slot) ) {
3598 if( slotptr(bt->cursor, slot)->dead )
3601 ptr = keyptr(bt->cursor, slot);
3604 if( slotptr(bt->cursor, slot)->type == Duplicate )
3607 fwrite (ptr->key, len, 1, stdout);
3608 val = valptr(bt->cursor, slot);
3609 fwrite (val->value, val->len, 1, stdout);
3610 fputc ('\n', stdout);
3614 fprintf(stderr, " Total keys read %d\n", cnt);
3619 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3621 fprintf(stderr, "started counting\n");
3622 next = REDO_page + bt->mgr->pagezero->redopages;
3624 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3625 if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3628 if( !bt->cursor->free && !bt->cursor->lvl )
3629 cnt += bt->cursor->act;
3635 cnt--; // remove stopper key
3636 fprintf(stderr, " Total keys counted %d\n", cnt);
3648 typedef struct timeval timer;
3650 int main (int argc, char **argv)
3652 int idx, cnt, len, slot, err;
3653 int segsize, bits = 16;
3673 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3674 fprintf (stderr, " where idx_file is the name of the cache btree file\n");
3675 fprintf (stderr, " where main_file is the name of the main btree file\n");
3676 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3677 fprintf (stderr, " page_bits is the page size in bits for the cache btree\n");
3678 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3679 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3680 fprintf (stderr, " recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3681 fprintf (stderr, " main_bits is the page size of the main btree in bits\n");
3682 fprintf (stderr, " main_pool is the number of main pages in the main buffer pool\n");
3683 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3687 start = getCpuTime(0);
3690 bits = atoi(argv[4]);
3693 poolsize = atoi(argv[5]);
3696 fprintf (stderr, "Warning: no mapped_pool\n");
3699 num = atoi(argv[6]);
3702 redopages = atoi(argv[7]);
3704 if( redopages + REDO_page > 65535 )
3705 fprintf (stderr, "Warning: Recovery buffer too large\n");
3708 mainbits = atoi(argv[8]);
3711 mainpool = atoi(argv[9]);
3715 threads = malloc (cnt * sizeof(pthread_t));
3717 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3719 args = malloc ((cnt + 1) * sizeof(ThreadArg));
3721 mgr = bt_mgr (argv[1], bits, poolsize, redopages);
3724 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3729 main = bt_mgr (argv[2], mainbits, mainpool, 0);
3732 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3741 for( idx = 0; idx < cnt; idx++ ) {
3742 args[idx].infile = argv[idx + 10];
3743 args[idx].type = argv[3];
3744 args[idx].main = main;
3745 args[idx].mgr = mgr;
3746 args[idx].num = num;
3747 args[idx].idx = idx;
3749 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3750 fprintf(stderr, "Error creating thread %d\n", err);
3752 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3756 args[0].infile = argv[idx + 10];
3757 args[0].type = argv[3];
3758 args[0].main = main;
3765 // wait for termination
3768 for( idx = 0; idx < cnt; idx++ )
3769 pthread_join (threads[idx], NULL);
3771 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3773 for( idx = 0; idx < cnt; idx++ )
3774 CloseHandle(threads[idx]);
3781 fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
3787 elapsed = getCpuTime(0) - start;
3788 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3789 elapsed = getCpuTime(1);
3790 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3791 elapsed = getCpuTime(2);
3792 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);