1 // btree version threadskv10 futex version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // redo log for failure recovery
10 // and LSM B-trees for write optimization
14 // author: karl malbrain, malbrain@cal.berkeley.edu
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
20 The orginal author is Karl Malbrain.
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
38 #include <linux/futex.h>
53 #define WIN32_LEAN_AND_MEAN
67 typedef unsigned long long uid;
68 typedef unsigned long long logseqno;
71 typedef unsigned long long off64_t;
72 typedef unsigned short ushort;
73 typedef unsigned int uint;
76 #define BT_ro 0x6f72 // ro
77 #define BT_rw 0x7772 // rw
79 #define BT_maxbits 26 // maximum page size in bits
80 #define BT_minbits 9 // minimum page size in bits
81 #define BT_minpage (1 << BT_minbits) // minimum page size
82 #define BT_maxpage (1 << BT_maxbits) // maximum page size
84 // BTree page number constants
85 #define ALLOC_page 0 // allocation page
86 #define ROOT_page 1 // root of the btree
87 #define LEAF_page 2 // first page of leaves
88 #define REDO_page 3 // first page of redo buffer
90 // Number of levels to create in a new BTree
95 There are six lock types for each node in four independent sets:
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
101 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
116 volatile ushort xlock; // one writer has exclusive lock
117 volatile ushort wrt; // count of other writers waiting
126 // definition for phase-fair reader/writer lock implementation
129 volatile ushort rin[1];
130 volatile ushort rout[1];
131 volatile ushort ticket[1];
132 volatile ushort serving[1];
139 volatile ushort tid[1];
140 volatile ushort dup[1];
148 // mode & definition for lite latch implementation
151 QueRd = 1, // reader queue
152 QueWr = 2 // writer queue
155 // hash table entries
158 uint entry; // Latch table entry at head of chain
159 BtMutexLatch latch[1];
162 // latch manager table structure
165 uid page_no; // latch set page number
166 RWLock readwr[1]; // read/write page lock
167 RWLock access[1]; // Access Intent/Page delete
168 WOLock parent[1]; // Posting of fence key in parent
169 WOLock atomic[1]; // Atomic update in progress
170 uint split; // right split page atomic insert
171 uint next; // next entry in hash table chain
172 uint prev; // prev entry in hash table chain
173 ushort pin; // number of accessing threads
174 unsigned char dirty; // page in cache is dirty (atomic set)
175 BtMutexLatch modify[1]; // modify entry lite latch
178 // Define the length of the page record numbers
182 // Page key slot definition.
184 // Keys are marked dead, but remain on the page until
185 // it cleanup is called. The fence key (highest key) for
186 // a leaf page is always present, even after cleanup.
190 // In addition to the Unique keys that occupy slots
191 // there are Librarian and Duplicate key
192 // slots occupying the key slot array.
194 // The Librarian slots are dead keys that
195 // serve as filler, available to add new Unique
196 // or Dup slots that are inserted into the B-tree.
198 // The Duplicate slots have had their key bytes extended
199 // by 6 bytes to contain a binary duplicate key uniqueifier.
209 uint off:BT_maxbits; // page offset for key start
210 uint type:3; // type of slot
211 uint dead:1; // set for deleted slot
214 // The key structure occupies space at the upper end of
215 // each page. It's a length byte followed by the key
219 unsigned char len; // this can be changed to a ushort or uint
220 unsigned char key[0];
223 // the value structure also occupies space at the upper
224 // end of the page. Each key is immediately followed by a value.
227 unsigned char len; // this can be changed to a ushort or uint
228 unsigned char value[0];
231 #define BT_maxkey 255 // maximum number of bytes in a key
232 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
234 // The first part of an index page.
235 // It is immediately followed
236 // by the BtSlot array of keys.
238 // note that this structure size
239 // must be a multiple of 8 bytes
240 // in order to place PageZero correctly.
242 typedef struct BtPage_ {
243 uint cnt; // count of keys in page
244 uint act; // count of active keys
245 uint min; // next key offset
246 uint garbage; // page garbage in bytes
247 unsigned char bits:7; // page size in bits
248 unsigned char free:1; // page is on free chain
249 unsigned char lvl:7; // level of page
250 unsigned char kill:1; // page is being deleted
251 unsigned char right[BtId]; // page number to right
252 unsigned char left[BtId]; // page number to left
253 unsigned char filler[2]; // padding to multiple of 8
254 logseqno lsn; // log sequence number applied
255 uid page_no; // this page number
258 // The loadpage interface object
261 BtPage page; // current page pointer
262 BtLatchSet *latch; // current page latch set
265 // structure for latch manager on ALLOC_page
268 struct BtPage_ alloc[1]; // next page_no in right ptr
269 unsigned char freechain[BtId]; // head of free page_nos chain
270 unsigned long long activepages; // number of active pages
271 uint redopages; // number of redo pages in file
274 // The object structure for Btree access
277 uint page_size; // page size
278 uint page_bits; // page size in bits
284 BtPageZero *pagezero; // mapped allocation page
285 BtHashEntry *hashtable; // the buffer pool hash table entries
286 BtLatchSet *latchsets; // mapped latch set from buffer pool
287 unsigned char *pagepool; // mapped to the buffer pool pages
288 unsigned char *redobuff; // mapped recovery buffer pointer
289 logseqno lsn, flushlsn; // current & first lsn flushed
290 BtMutexLatch redo[1]; // redo area lite latch
291 BtMutexLatch lock[1]; // allocation area lite latch
292 BtMutexLatch maps[1]; // mapping segments lite latch
293 ushort thread_no[1]; // next thread number
294 uint nlatchpage; // number of latch pages at BT_latch
295 uint latchtotal; // number of page latch entries
296 uint latchhash; // number of latch hash table slots
297 uint latchvictim; // next latch entry to examine
298 uint latchpromote; // next latch entry to promote
299 uint redolast; // last msync size of recovery buff
300 uint redoend; // eof/end element in recovery buff
301 int err; // last error
302 int line; // last error line no
303 int found; // number of keys found by delete
304 int reads, writes; // number of reads and writes
306 HANDLE halloc; // allocation handle
307 HANDLE hpool; // buffer pool handle
309 uint segments; // number of memory mapped segments
310 unsigned char *pages[64000];// memory mapped segments of b-tree
314 BtMgr *mgr; // buffer manager for entire process
315 BtMgr *main; // buffer manager for main btree
316 BtPage cursor; // cached page frame for start/next
317 ushort thread_no; // thread number
318 unsigned char key[BT_keyarray]; // last found complete key
321 // atomic txn structures
324 logseqno reqlsn; // redo log seq no required
325 uint entry; // latch table entry number
326 uint slot:31; // page slot number
327 uint reuse:1; // reused previous page
331 uid page_no; // page number for split leaf
332 void *next; // next key to insert
333 uint entry:29; // latch table entry number
334 uint type:2; // 0 == insert, 1 == delete, 2 == free
335 uint nounlock:1; // don't unlock ParentModification
336 unsigned char leafkey[BT_keyarray];
339 // Catastrophic errors
353 #define CLOCK_bit 0x8000
355 // recovery manager entry types
358 BTRM_eof = 0, // rest of buffer is emtpy
359 BTRM_add, // add a unique key-value to btree
360 BTRM_dup, // add a duplicate key-value to btree
361 BTRM_del, // delete a key-value from btree
362 BTRM_upd, // update a key with a new value
363 BTRM_new, // allocate a new empty page
364 BTRM_old // reuse an old empty page
367 // recovery manager entry
368 // structure followed by BtKey & BtVal
371 logseqno reqlsn; // log sequence number required
372 logseqno lsn; // log sequence number for entry
373 uint len; // length of entry
374 unsigned char type; // type of entry
375 unsigned char lvl; // level of btree entry pertains to
380 extern void bt_close (BtDb *bt);
381 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
382 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
383 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
384 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
385 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
386 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
387 extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
389 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
391 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
392 extern uint bt_nextkey (BtDb *bt, uint slot);
393 extern uint bt_prevkey (BtDb *db, uint slot);
394 extern uint bt_lastkey (BtDb *db);
397 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
398 extern void bt_mgrclose (BtMgr *mgr);
399 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
400 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
402 // atomic transaction functions
403 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
404 BTERR bt_txnpromote (BtDb *bt);
406 // The page is allocated from low and hi ends.
407 // The key slots are allocated from the bottom,
408 // while the text and value of the key
409 // are allocated from the top. When the two
410 // areas meet, the page is split into two.
412 // A key consists of a length byte, two bytes of
413 // index number (0 - 65535), and up to 253 bytes
416 // Associated with each key is a value byte string
417 // containing any value desired.
419 // The b-tree root is always located at page 1.
420 // The first leaf page of level zero is always
421 // located on page 2.
423 // The b-tree pages are linked with next
424 // pointers to facilitate enumerators,
425 // and provide for concurrency.
427 // When to root page fills, it is split in two and
428 // the tree height is raised by a new root at page
429 // one with two keys.
431 // Deleted keys are marked with a dead bit until
432 // page cleanup. The fence key for a leaf node is
435 // To achieve maximum concurrency one page is locked at a time
436 // as the tree is traversed to find leaf key in question. The right
437 // page numbers are used in cases where the page is being split,
440 // Page 0 is dedicated to lock for new page extensions,
441 // and chains empty pages together for reuse. It also
442 // contains the latch manager hash table.
444 // The ParentModification lock on a node is obtained to serialize posting
445 // or changing the fence key for a node.
447 // Empty pages are chained together through the ALLOC page and reused.
449 // Access macros to address slot and key values from the page
450 // Page slots use 1 based indexing.
452 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
453 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
454 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
456 void bt_putid(unsigned char *dest, uid id)
461 dest[i] = (unsigned char)id, id >>= 8;
464 uid bt_getid(unsigned char *src)
469 for( i = 0; i < BtId; i++ )
470 id <<= 8, id |= *src++;
475 // lite weight spin lock Latch Manager
477 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
479 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
482 void bt_mutexlock(BtMutexLatch *latch)
484 BtMutexLatch prev[1];
488 *prev->value = __sync_fetch_and_or(latch->value, XCL);
490 if( !prev->bits->xlock ) { // did we set XCL?
492 __sync_fetch_and_sub(latch->value, WRT);
498 __sync_fetch_and_add(latch->value, WRT);
501 sys_futex (latch->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr);
506 // try to obtain write lock
508 // return 1 if obtained,
511 int bt_mutextry(BtMutexLatch *latch)
513 BtMutexLatch prev[1];
515 *prev->value = __sync_fetch_and_or(latch->value, XCL);
517 // take write access if exclusive bit was clear
519 return !prev->bits->xlock;
524 void bt_releasemutex(BtMutexLatch *latch)
526 BtMutexLatch prev[1];
528 *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
530 if( prev->bits->wrt )
531 sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
534 // Write-Only Queue Lock
536 void WriteOLock (WOLock *lock, ushort tid)
539 bt_mutexlock(lock->xcl);
541 if( *lock->tid == tid ) {
543 bt_releasemutex(lock->xcl);
548 bt_releasemutex(lock->xcl);
551 bt_releasemutex(lock->xcl);
556 void WriteORelease (WOLock *lock)
566 // Phase-Fair reader/writer lock implementation
568 void WriteLock (RWLock *lock, ushort tid)
573 tix = __sync_fetch_and_add (lock->ticket, 1);
575 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
577 // wait for our ticket to come up
579 while( tix != lock->serving[0] )
586 w = PRES | (tix & PHID);
588 r = __sync_fetch_and_add (lock->rin, w);
590 r = _InterlockedExchangeAdd16 (lock->rin, w);
592 while( r != *lock->rout )
600 void WriteRelease (RWLock *lock)
603 __sync_fetch_and_and (lock->rin, ~MASK);
605 _InterlockedAnd16 (lock->rin, ~MASK);
610 // try to obtain read lock
611 // return 1 if successful
613 int ReadTry (RWLock *lock, ushort tid)
618 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
620 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
623 if( w == (*lock->rin & MASK) ) {
625 __sync_fetch_and_add (lock->rin, -RINC);
627 _InterlockedExchangeAdd16 (lock->rin, -RINC);
635 void ReadLock (RWLock *lock, ushort tid)
640 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
642 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
645 while( w == (*lock->rin & MASK) )
653 void ReadRelease (RWLock *lock)
656 __sync_fetch_and_add (lock->rout, RINC);
658 _InterlockedExchangeAdd16 (lock->rout, RINC);
662 // recovery manager -- flush dirty pages
664 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
666 uint cnt3 = 0, cnt2 = 0, cnt = 0;
671 // flush dirty pool pages to the btree
673 fprintf(stderr, "Start flushlsn ");
674 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
675 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
676 latch = mgr->latchsets + entry;
677 bt_mutexlock (latch->modify);
678 bt_lockpage(BtLockRead, latch, thread_no);
681 bt_writepage(mgr, page, latch->page_no);
682 latch->dirty = 0, cnt++;
684 if( latch->pin & ~CLOCK_bit )
686 bt_unlockpage(BtLockRead, latch);
687 bt_releasemutex (latch->modify);
689 fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
690 fprintf(stderr, "begin sync");
691 for( segment = 0; segment < mgr->segments; segment++ )
692 if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
693 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
694 fprintf(stderr, " end sync\n");
697 // recovery manager -- process current recovery buff on startup
698 // this won't do much if previous session was properly closed.
700 BTERR bt_recoveryredo (BtMgr *mgr)
707 hdr = (BtLogHdr *)mgr->redobuff;
708 mgr->flushlsn = hdr->lsn;
711 hdr = (BtLogHdr *)(mgr->redobuff + offset);
712 switch( hdr->type ) {
716 case BTRM_add: // add a unique key-value to btree
718 case BTRM_dup: // add a duplicate key-value to btree
719 case BTRM_del: // delete a key-value from btree
720 case BTRM_upd: // update a key with a new value
721 case BTRM_new: // allocate a new empty page
722 case BTRM_old: // reuse an old empty page
728 // recovery manager -- append new entry to recovery log
729 // flush dirty pages to disk when it overflows.
731 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
733 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
734 uint amt = sizeof(BtLogHdr);
738 bt_mutexlock (mgr->redo);
741 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
743 // see if new entry fits in buffer
744 // flush and reset if it doesn't
746 if( amt > size - mgr->redoend ) {
747 mgr->flushlsn = mgr->lsn;
748 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
749 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
752 bt_flushlsn(mgr, thread_no);
755 // fill in new entry & either eof or end block
757 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
762 hdr->lsn = ++mgr->lsn;
766 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
767 memset (eof, 0, sizeof(BtLogHdr));
769 // fill in key and value
772 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
773 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
776 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
777 memset (eof, 0, sizeof(BtLogHdr));
780 last = mgr->redolast & ~0xfff;
783 if( end - last + sizeof(BtLogHdr) >= 32768 )
784 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
785 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
789 bt_releasemutex(mgr->redo);
793 // recovery manager -- append transaction to recovery log
794 // flush dirty pages to disk when it overflows.
796 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
798 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
799 uint amt = 0, src, type;
806 // determine amount of redo recovery log space required
808 for( src = 0; src++ < source->cnt; ) {
809 key = keyptr(source,src);
810 val = valptr(source,src);
811 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
812 amt += sizeof(BtLogHdr);
815 bt_mutexlock (mgr->redo);
817 // see if new entry fits in buffer
818 // flush and reset if it doesn't
820 if( amt > size - mgr->redoend ) {
821 mgr->flushlsn = mgr->lsn;
822 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
823 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
826 bt_flushlsn (mgr, thread_no);
829 // assign new lsn to transaction
833 // fill in new entries
835 for( src = 0; src++ < source->cnt; ) {
836 key = keyptr(source, src);
837 val = valptr(source, src);
839 switch( slotptr(source, src)->type ) {
851 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
852 amt += sizeof(BtLogHdr);
854 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
860 // fill in key and value
862 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
863 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
868 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
869 memset (eof, 0, sizeof(BtLogHdr));
872 last = mgr->redolast & ~0xfff;
875 if( end - last + sizeof(BtLogHdr) >= 32768 )
876 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
877 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
881 bt_releasemutex(mgr->redo);
885 // sync a single btree page to disk
887 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
889 uint segment = latch->page_no >> 16;
892 if( bt_writepage (mgr, page, latch->page_no) )
895 perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
897 if( msync (perm, mgr->page_size, MS_SYNC) < 0 )
898 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
904 // read page into buffer pool from permanent location in Btree file
906 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
908 int flag = PROT_READ | PROT_WRITE;
909 uint segment = page_no >> 16;
913 if( segment < mgr->segments ) {
914 perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
915 if( perm->page_no != page_no )
917 memcpy (page, perm, mgr->page_size);
922 bt_mutexlock (mgr->maps);
924 if( segment < mgr->segments ) {
925 bt_releasemutex (mgr->maps);
929 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
932 bt_releasemutex (mgr->maps);
936 // write page to permanent location in Btree file
937 // clear the dirty bit
939 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
941 int flag = PROT_READ | PROT_WRITE;
942 uint segment = page_no >> 16;
946 if( segment < mgr->segments ) {
947 perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
948 if( page_no > LEAF_page && perm->page_no != page_no)
950 memcpy (perm, page, mgr->page_size);
955 bt_mutexlock (mgr->maps);
957 if( segment < mgr->segments ) {
958 bt_releasemutex (mgr->maps);
962 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
963 bt_releasemutex (mgr->maps);
968 // set CLOCK bit in latch
969 // decrement pin count
971 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
973 bt_mutexlock(latch->modify);
974 latch->pin |= CLOCK_bit;
977 bt_releasemutex(latch->modify);
980 // return the btree cached page address
982 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
984 uid entry = latch - mgr->latchsets;
985 BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
990 // return next available latch entry
991 // and with latch entry locked
993 uint bt_availnext (BtMgr *mgr)
1000 entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
1002 entry = _InterlockedIncrement (&mgr->latchvictim);
1004 entry %= mgr->latchtotal;
1009 latch = mgr->latchsets + entry;
1011 if( !bt_mutextry(latch->modify) )
1014 // return this entry if it is not pinned
1019 // if the CLOCK bit is set
1020 // reset it to zero.
1022 latch->pin &= ~CLOCK_bit;
1023 bt_releasemutex(latch->modify);
1027 // pin page in buffer pool
1028 // return with latchset pinned
1030 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
1032 uint hashidx = page_no % mgr->latchhash;
1037 // try to find our entry
1039 bt_mutexlock(mgr->hashtable[hashidx].latch);
1041 if( entry = mgr->hashtable[hashidx].entry ) do
1043 latch = mgr->latchsets + entry;
1044 if( page_no == latch->page_no )
1046 } while( entry = latch->next );
1048 // found our entry: increment pin
1051 latch = mgr->latchsets + entry;
1052 bt_mutexlock(latch->modify);
1053 latch->pin |= CLOCK_bit;
1057 bt_releasemutex(latch->modify);
1058 bt_releasemutex(mgr->hashtable[hashidx].latch);
1062 // find and reuse unpinned entry
1066 entry = bt_availnext (mgr);
1067 latch = mgr->latchsets + entry;
1069 idx = latch->page_no % mgr->latchhash;
1071 // if latch is on a different hash chain
1072 // unlink from the old page_no chain
1074 if( latch->page_no )
1075 if( idx != hashidx ) {
1077 // skip over this entry if latch not available
1079 if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1080 bt_releasemutex(latch->modify);
1085 mgr->latchsets[latch->prev].next = latch->next;
1087 mgr->hashtable[idx].entry = latch->next;
1090 mgr->latchsets[latch->next].prev = latch->prev;
1092 bt_releasemutex (mgr->hashtable[idx].latch);
1095 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1097 // update permanent page area in btree from buffer pool
1098 // no read-lock is required since page is not pinned.
1101 if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
1102 return mgr->line = __LINE__, NULL;
1107 memcpy (page, contents, mgr->page_size);
1109 } else if( bt_readpage (mgr, page, page_no) )
1110 return mgr->line = __LINE__, NULL;
1112 // link page as head of hash table chain
1113 // if this is a never before used entry,
1114 // or it was previously on a different
1115 // hash table chain. Otherwise, just
1116 // leave it in its current hash table
1119 if( !latch->page_no || hashidx != idx ) {
1120 if( latch->next = mgr->hashtable[hashidx].entry )
1121 mgr->latchsets[latch->next].prev = entry;
1123 mgr->hashtable[hashidx].entry = entry;
1127 // fill in latch structure
1129 latch->pin = CLOCK_bit | 1;
1130 latch->page_no = page_no;
1133 bt_releasemutex (latch->modify);
1134 bt_releasemutex (mgr->hashtable[hashidx].latch);
1138 void bt_mgrclose (BtMgr *mgr)
1146 // flush previously written dirty pages
1147 // and write recovery buffer to disk
1149 fdatasync (mgr->idx);
1151 if( mgr->redoend ) {
1152 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1153 memset (eof, 0, sizeof(BtLogHdr));
1156 // write remaining dirty pool pages to the btree
1158 for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1159 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1160 latch = mgr->latchsets + slot;
1162 if( latch->dirty ) {
1163 bt_writepage(mgr, page, latch->page_no);
1164 latch->dirty = 0, num++;
1168 // clear redo recovery buffer on disk.
1170 if( mgr->pagezero->redopages ) {
1171 eof = (BtLogHdr *)mgr->redobuff;
1172 memset (eof, 0, sizeof(BtLogHdr));
1173 eof->lsn = mgr->lsn;
1174 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1175 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1178 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1181 while( mgr->segments )
1182 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1184 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1185 munmap (mgr->pagezero, mgr->page_size);
1187 FlushViewOfFile(mgr->pagezero, 0);
1188 UnmapViewOfFile(mgr->pagezero);
1189 UnmapViewOfFile(mgr->pagepool);
1190 CloseHandle(mgr->halloc);
1191 CloseHandle(mgr->hpool);
1197 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1198 FlushFileBuffers(mgr->idx);
1199 CloseHandle(mgr->idx);
1204 // close and release memory
1206 void bt_close (BtDb *bt)
1213 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1218 // open/create new btree buffer manager
1220 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1221 // size of page pool (e.g. 262144)
1223 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1225 uint lvl, attr, last, slot, idx;
1226 uint nlatchpage, latchhash;
1227 unsigned char value[BtId];
1228 int flag, initit = 0;
1229 BtPageZero *pagezero;
1237 // determine sanity of page size and buffer pool
1239 if( bits > BT_maxbits )
1241 else if( bits < BT_minbits )
1244 mgr = calloc (1, sizeof(BtMgr));
1246 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1248 if( mgr->idx == -1 ) {
1249 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1250 return free(mgr), NULL;
1253 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1254 attr = FILE_ATTRIBUTE_NORMAL;
1255 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1257 if( mgr->idx == INVALID_HANDLE_VALUE ) {
1258 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1259 return GlobalFree(mgr), NULL;
1264 pagezero = valloc (BT_maxpage);
1267 // read minimum page size to get root info
1268 // to support raw disk partition files
1269 // check if bits == 0 on the disk.
1271 if( size = lseek (mgr->idx, 0L, 2) )
1272 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1273 if( pagezero->alloc->bits )
1274 bits = pagezero->alloc->bits;
1278 return free(mgr), free(pagezero), NULL;
1282 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1283 size = GetFileSize(mgr->idx, amt);
1285 if( size || *amt ) {
1286 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1287 return bt_mgrclose (mgr), NULL;
1288 bits = pagezero->alloc->bits;
1293 mgr->page_size = 1 << bits;
1294 mgr->page_bits = bits;
1296 // calculate number of latch hash table entries
1298 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1300 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1301 mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1302 mgr->latchtotal = nodemax;
1307 // initialize an empty b-tree with latch page, root page, page of leaves
1308 // and page(s) of latches and page pool cache
1310 memset (pagezero, 0, 1 << bits);
1311 pagezero->alloc->lvl = MIN_lvl - 1;
1312 pagezero->alloc->bits = mgr->page_bits;
1313 pagezero->redopages = redopages;
1315 bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1);
1316 pagezero->activepages = 2;
1318 // initialize left-most LEAF page in
1319 // alloc->left and count of active leaf pages.
1321 bt_putid (pagezero->alloc->left, LEAF_page);
1322 ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits);
1324 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1325 fprintf (stderr, "Unable to create btree page zero\n");
1326 return bt_mgrclose (mgr), NULL;
1329 memset (pagezero, 0, 1 << bits);
1330 pagezero->alloc->bits = mgr->page_bits;
1332 for( lvl=MIN_lvl; lvl--; ) {
1333 BtSlot *node = slotptr(pagezero->alloc, 1);
1334 node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1335 key = keyptr(pagezero->alloc, 1);
1336 key->len = 2; // create stopper key
1340 bt_putid(value, MIN_lvl - lvl + 1);
1341 val = valptr(pagezero->alloc, 1);
1342 val->len = lvl ? BtId : 0;
1343 memcpy (val->value, value, val->len);
1345 pagezero->alloc->min = node->off;
1346 pagezero->alloc->lvl = lvl;
1347 pagezero->alloc->cnt = 1;
1348 pagezero->alloc->act = 1;
1349 pagezero->alloc->page_no = MIN_lvl - lvl;
1351 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1352 fprintf (stderr, "Unable to create btree page\n");
1353 return bt_mgrclose (mgr), NULL;
1361 VirtualFree (pagezero, 0, MEM_RELEASE);
1364 // mlock the first segment of 64K pages
1366 flag = PROT_READ | PROT_WRITE;
1367 mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1370 if( mgr->pages[0] == MAP_FAILED ) {
1371 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1372 return bt_mgrclose (mgr), NULL;
1375 mgr->pagezero = (BtPageZero *)mgr->pages[0];
1376 mlock (mgr->pagezero, mgr->page_size);
1378 mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size;
1379 mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1381 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1382 if( mgr->pagepool == MAP_FAILED ) {
1383 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1384 return bt_mgrclose (mgr), NULL;
1387 flag = PAGE_READWRITE;
1388 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1389 if( !mgr->halloc ) {
1390 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1391 return bt_mgrclose (mgr), NULL;
1394 flag = FILE_MAP_WRITE;
1395 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1396 if( !mgr->pagezero ) {
1397 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1398 return bt_mgrclose (mgr), NULL;
1401 flag = PAGE_READWRITE;
1402 size = (uid)mgr->nlatchpage << mgr->page_bits;
1403 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1405 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1406 return bt_mgrclose (mgr), NULL;
1409 flag = FILE_MAP_WRITE;
1410 mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1411 if( !mgr->pagepool ) {
1412 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1413 return bt_mgrclose (mgr), NULL;
1417 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1418 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1419 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1424 // open BTree access method
1425 // based on buffer manager
1427 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1429 BtDb *bt = malloc (sizeof(*bt));
1431 memset (bt, 0, sizeof(*bt));
1435 bt->cursor = valloc (mgr->page_size);
1437 bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1440 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1442 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1447 // compare two keys, return > 0, = 0, or < 0
1448 // =0: keys are same
1451 // as the comparison value
1453 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1455 uint len1 = key1->len;
1458 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1469 // place write, read, or parent lock on requested page_no.
1471 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1475 ReadLock (latch->readwr, thread_no);
1478 WriteLock (latch->readwr, thread_no);
1481 ReadLock (latch->access, thread_no);
1484 WriteLock (latch->access, thread_no);
1487 WriteOLock (latch->parent, thread_no);
1490 WriteOLock (latch->atomic, thread_no);
1492 case BtLockAtomic | BtLockRead:
1493 WriteOLock (latch->atomic, thread_no);
1494 ReadLock (latch->readwr, thread_no);
1499 // remove write, read, or parent lock on requested page
1501 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1505 ReadRelease (latch->readwr);
1508 WriteRelease (latch->readwr);
1511 ReadRelease (latch->access);
1514 WriteRelease (latch->access);
1517 WriteORelease (latch->parent);
1520 WriteORelease (latch->atomic);
1522 case BtLockAtomic | BtLockRead:
1523 WriteORelease (latch->atomic);
1524 ReadRelease (latch->readwr);
1529 // allocate a new page
1530 // return with page latched, but unlocked.
1532 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1537 // lock allocation page
1539 bt_mutexlock(mgr->lock);
1541 // use empty chain first
1542 // else allocate empty page
1544 if( page_no = bt_getid(mgr->pagezero->freechain) ) {
1545 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1546 set->page = bt_mappage (mgr, set->latch);
1548 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1550 mgr->pagezero->activepages++;
1551 bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
1552 if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1553 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1555 bt_releasemutex(mgr->lock);
1557 contents->page_no = page_no;
1558 memcpy (set->page, contents, mgr->page_size);
1559 set->latch->dirty = 1;
1563 page_no = bt_getid(mgr->pagezero->alloc->right);
1564 bt_putid(mgr->pagezero->alloc->right, page_no+1);
1566 // unlock allocation latch and
1567 // extend file into new page.
1569 mgr->pagezero->activepages++;
1570 if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1571 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1572 bt_releasemutex(mgr->lock);
1574 contents->page_no = page_no;
1575 pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
1577 // don't load cache from btree page, load it from contents
1579 if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1580 set->page = bt_mappage (mgr, set->latch);
1587 // find slot in page for given key at a given level
1589 int bt_findslot (BtPage page, unsigned char *key, uint len)
1591 uint diff, higher = page->cnt, low = 1, slot;
1594 // make stopper key an infinite fence value
1596 if( bt_getid (page->right) )
1601 // low is the lowest candidate.
1602 // loop ends when they meet
1604 // higher is already
1605 // tested as .ge. the passed key.
1607 while( diff = higher - low ) {
1608 slot = low + ( diff >> 1 );
1609 if( keycmp (keyptr(page, slot), key, len) < 0 )
1612 higher = slot, good++;
1615 // return zero if key is on right link page
1617 return good ? higher : 0;
1620 // find and load page at given level for given key
1621 // leave page rd or wr locked as requested
1623 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1625 uid page_no = ROOT_page, prevpage = 0;
1626 uint drill = 0xff, slot;
1627 BtLatchSet *prevlatch;
1628 uint mode, prevmode;
1631 // start at root of btree and drill down
1634 // determine lock mode of drill level
1635 mode = (drill == lvl) ? lock : BtLockRead;
1637 if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1640 // obtain access lock using lock chaining with Access mode
1642 if( page_no > ROOT_page )
1643 bt_lockpage(BtLockAccess, set->latch, thread_no);
1645 set->page = bt_mappage (mgr, set->latch);
1647 // release & unpin parent or left sibling page
1650 bt_unlockpage(prevmode, prevlatch);
1651 bt_unpinlatch (mgr, prevlatch);
1655 // obtain mode lock using lock chaining through AccessLock
1657 bt_lockpage(mode, set->latch, thread_no);
1659 if( set->page->free )
1660 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1662 if( page_no > ROOT_page )
1663 bt_unlockpage(BtLockAccess, set->latch);
1665 // re-read and re-lock root after determining actual level of root
1667 if( set->page->lvl != drill) {
1668 if( set->latch->page_no != ROOT_page )
1669 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1671 drill = set->page->lvl;
1673 if( lock != BtLockRead && drill == lvl ) {
1674 bt_unlockpage(mode, set->latch);
1675 bt_unpinlatch (mgr, set->latch);
1680 prevpage = set->latch->page_no;
1681 prevlatch = set->latch;
1684 // find key on page at this level
1685 // and descend to requested level
1687 if( !set->page->kill )
1688 if( slot = bt_findslot (set->page, key, len) ) {
1692 // find next non-dead slot -- the fence key if nothing else
1694 while( slotptr(set->page, slot)->dead )
1695 if( slot++ < set->page->cnt )
1698 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1700 val = valptr(set->page, slot);
1702 if( val->len == BtId )
1703 page_no = bt_getid(valptr(set->page, slot)->value);
1705 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1711 // slide right into next page
1713 page_no = bt_getid(set->page->right);
1716 // return error on end of right chain
1718 mgr->line = __LINE__, mgr->err = BTERR_struct;
1719 return 0; // return error
1722 // return page to free list
1723 // page must be delete & write locked
1725 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1727 // lock allocation page
1729 bt_mutexlock (mgr->lock);
1733 memcpy(set->page->right, mgr->pagezero->freechain, BtId);
1734 bt_putid(mgr->pagezero->freechain, set->latch->page_no);
1735 set->latch->dirty = 1;
1736 set->page->free = 1;
1738 // decrement active page count
1740 mgr->pagezero->activepages--;
1741 if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1742 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1744 // unlock released page
1745 // and unlock allocation page
1747 bt_unlockpage (BtLockDelete, set->latch);
1748 bt_unlockpage (BtLockWrite, set->latch);
1749 bt_unpinlatch (mgr, set->latch);
1750 bt_releasemutex (mgr->lock);
1753 // a fence key was deleted from a page
1754 // push new fence value upwards
1756 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1758 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1759 unsigned char value[BtId];
1763 // remove the old fence value
1765 ptr = keyptr(set->page, set->page->cnt);
1766 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1767 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1768 set->latch->dirty = 1;
1770 // cache new fence value
1772 ptr = keyptr(set->page, set->page->cnt);
1773 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1775 bt_lockpage (BtLockParent, set->latch, thread_no);
1776 bt_unlockpage (BtLockWrite, set->latch);
1778 // insert new (now smaller) fence key
1780 bt_putid (value, set->latch->page_no);
1781 ptr = (BtKey*)leftkey;
1783 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1786 // now delete old fence key
1788 ptr = (BtKey*)rightkey;
1790 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1793 bt_unlockpage (BtLockParent, set->latch);
1794 bt_unpinlatch(mgr, set->latch);
1798 // root has a single child
1799 // collapse a level from the tree
1801 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1808 // find the child entry and promote as new root contents
1811 for( idx = 0; idx++ < root->page->cnt; )
1812 if( !slotptr(root->page, idx)->dead )
1815 val = valptr(root->page, idx);
1817 if( val->len == BtId )
1818 page_no = bt_getid (valptr(root->page, idx)->value);
1820 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1822 if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1823 child->page = bt_mappage (mgr, child->latch);
1827 bt_lockpage (BtLockDelete, child->latch, thread_no);
1828 bt_lockpage (BtLockWrite, child->latch, thread_no);
1830 memcpy (root->page, child->page, mgr->page_size);
1831 root->latch->dirty = 1;
1833 bt_freepage (mgr, child);
1835 } while( root->page->lvl > 1 && root->page->act == 1 );
1837 bt_unlockpage (BtLockWrite, root->latch);
1838 bt_unpinlatch (mgr, root->latch);
1842 // delete a page and manage keys
1843 // call with page writelocked
1845 // returns with page removed
1846 // from the page pool.
1848 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1850 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1851 unsigned char value[BtId];
1852 uint lvl = set->page->lvl;
1858 // cache copy of fence key
1859 // to remove in parent
1861 ptr = keyptr(set->page, set->page->cnt);
1862 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1864 // obtain lock on right page
1866 page_no = bt_getid(set->page->right);
1868 if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1869 right->page = bt_mappage (mgr, right->latch);
1873 bt_lockpage (BtLockWrite, right->latch, thread_no);
1875 // cache copy of key to update
1877 ptr = keyptr(right->page, right->page->cnt);
1878 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1880 if( right->page->kill )
1881 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1883 // pull contents of right peer into our empty page
1885 memcpy (set->page, right->page, mgr->page_size);
1886 set->page->page_no = set->latch->page_no;
1887 set->latch->dirty = 1;
1889 // mark right page deleted and point it to left page
1890 // until we can post parent updates that remove access
1891 // to the deleted page.
1893 bt_putid (right->page->right, set->latch->page_no);
1894 right->latch->dirty = 1;
1895 right->page->kill = 1;
1897 bt_lockpage (BtLockParent, right->latch, thread_no);
1898 bt_unlockpage (BtLockWrite, right->latch);
1900 bt_lockpage (BtLockParent, set->latch, thread_no);
1901 bt_unlockpage (BtLockWrite, set->latch);
1903 // redirect higher key directly to our new node contents
1905 bt_putid (value, set->latch->page_no);
1906 ptr = (BtKey*)higherfence;
1908 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1911 // delete old lower key to our node
1913 ptr = (BtKey*)lowerfence;
1915 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1918 // obtain delete and write locks to right node
1920 bt_unlockpage (BtLockParent, right->latch);
1921 bt_lockpage (BtLockDelete, right->latch, thread_no);
1922 bt_lockpage (BtLockWrite, right->latch, thread_no);
1923 bt_freepage (mgr, right);
1925 bt_unlockpage (BtLockParent, set->latch);
1926 bt_unpinlatch (mgr, set->latch);
1930 // find and delete key on page by marking delete flag bit
1931 // if page becomes empty, delete it from the btree
1933 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
1935 uint slot, idx, found, fence;
1941 if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
1942 node = slotptr(set->page, slot);
1943 ptr = keyptr(set->page, slot);
1947 // if librarian slot, advance to real slot
1949 if( node->type == Librarian ) {
1950 ptr = keyptr(set->page, ++slot);
1951 node = slotptr(set->page, slot);
1954 fence = slot == set->page->cnt;
1956 // delete the key, ignore request if already dead
1958 if( found = !keycmp (ptr, key, len) )
1959 if( found = node->dead == 0 ) {
1960 val = valptr(set->page,slot);
1961 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1964 // mark node type as delete
1966 node->type = Delete;
1969 // collapse empty slots beneath the fence
1970 // on interiour nodes
1973 while( idx = set->page->cnt - 1 )
1974 if( slotptr(set->page, idx)->dead ) {
1975 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1976 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1981 // did we delete a fence key in an upper level?
1983 if( found && lvl && set->page->act && fence )
1984 if( bt_fixfence (mgr, set, lvl, thread_no) )
1989 // do we need to collapse root?
1991 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1992 if( bt_collapseroot (mgr, set, thread_no) )
1997 // delete empty page
1999 if( !set->page->act ) {
2000 if( bt_deletepage (mgr, set, thread_no) )
2003 set->latch->dirty = 1;
2004 bt_unlockpage(BtLockWrite, set->latch);
2005 bt_unpinlatch (mgr, set->latch);
2011 // advance to next slot
2013 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2015 BtLatchSet *prevlatch;
2018 if( slot < set->page->cnt )
2021 prevlatch = set->latch;
2023 if( page_no = bt_getid(set->page->right) )
2024 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2025 set->page = bt_mappage (bt->mgr, set->latch);
2029 return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2031 // obtain access lock using lock chaining with Access mode
2033 bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2035 bt_unlockpage(BtLockRead, prevlatch);
2036 bt_unpinlatch (bt->mgr, prevlatch);
2038 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2039 bt_unlockpage(BtLockAccess, set->latch);
2043 // find unique key == given key, or first duplicate key in
2044 // leaf level and return number of value bytes
2045 // or (-1) if not found.
2047 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2055 if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2057 ptr = keyptr(set->page, slot);
2059 // skip librarian slot place holder
2061 if( slotptr(set->page, slot)->type == Librarian )
2062 ptr = keyptr(set->page, ++slot);
2064 // return actual key found
2066 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2069 if( slotptr(set->page, slot)->type == Duplicate )
2072 // not there if we reach the stopper key
2074 if( slot == set->page->cnt )
2075 if( !bt_getid (set->page->right) )
2078 // if key exists, return >= 0 value bytes copied
2079 // otherwise return (-1)
2081 if( slotptr(set->page, slot)->dead )
2085 if( !memcmp (ptr->key, key, len) ) {
2086 val = valptr (set->page,slot);
2087 if( valmax > val->len )
2089 memcpy (value, val->value, valmax);
2095 } while( slot = bt_findnext (bt, set, slot) );
2097 bt_unlockpage (BtLockRead, set->latch);
2098 bt_unpinlatch (bt->mgr, set->latch);
2102 // check page for space available,
2103 // clean if necessary and return
2104 // 0 - page needs splitting
2105 // >0 new slot value
2107 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2109 BtPage page = set->page, frame;
2110 uint nxt = mgr->page_size;
2111 uint cnt = 0, idx = 0;
2112 uint max = page->cnt;
2117 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2120 // skip cleanup and proceed to split
2121 // if there's not enough garbage
2124 if( page->garbage < nxt / 5 )
2127 frame = malloc (mgr->page_size);
2128 memcpy (frame, page, mgr->page_size);
2130 // skip page info and set rest of page to zero
2132 memset (page+1, 0, mgr->page_size - sizeof(*page));
2133 set->latch->dirty = 1;
2138 // clean up page first by
2139 // removing deleted keys
2141 while( cnt++ < max ) {
2145 if( cnt < max || frame->lvl )
2146 if( slotptr(frame,cnt)->dead )
2149 // copy the value across
2151 val = valptr(frame, cnt);
2152 nxt -= val->len + sizeof(BtVal);
2153 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2155 // copy the key across
2157 key = keyptr(frame, cnt);
2158 nxt -= key->len + sizeof(BtKey);
2159 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2161 // make a librarian slot
2163 slotptr(page, ++idx)->off = nxt;
2164 slotptr(page, idx)->type = Librarian;
2165 slotptr(page, idx)->dead = 1;
2169 slotptr(page, ++idx)->off = nxt;
2170 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2172 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2180 // see if page has enough space now, or does it need splitting?
2182 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2188 // split the root and raise the height of the btree
2190 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2192 unsigned char leftkey[BT_keyarray];
2193 uint nxt = mgr->page_size;
2194 unsigned char value[BtId];
2201 frame = malloc (mgr->page_size);
2202 memcpy (frame, root->page, mgr->page_size);
2204 // save left page fence key for new root
2206 ptr = keyptr(root->page, root->page->cnt);
2207 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2209 // Obtain an empty page to use, and copy the current
2210 // root contents into it, e.g. lower keys
2212 if( bt_newpage(mgr, left, frame, page_no) )
2215 left_page_no = left->latch->page_no;
2216 bt_unpinlatch (mgr, left->latch);
2219 // preserve the page info at the bottom
2220 // of higher keys and set rest to zero
2222 memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2224 // insert stopper key at top of newroot page
2225 // and increase the root height
2227 nxt -= BtId + sizeof(BtVal);
2228 bt_putid (value, right->page_no);
2229 val = (BtVal *)((unsigned char *)root->page + nxt);
2230 memcpy (val->value, value, BtId);
2233 nxt -= 2 + sizeof(BtKey);
2234 slotptr(root->page, 2)->off = nxt;
2235 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2240 // insert lower keys page fence key on newroot page as first key
2242 nxt -= BtId + sizeof(BtVal);
2243 bt_putid (value, left_page_no);
2244 val = (BtVal *)((unsigned char *)root->page + nxt);
2245 memcpy (val->value, value, BtId);
2248 ptr = (BtKey *)leftkey;
2249 nxt -= ptr->len + sizeof(BtKey);
2250 slotptr(root->page, 1)->off = nxt;
2251 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2253 bt_putid(root->page->right, 0);
2254 root->page->min = nxt; // reset lowest used offset and key count
2255 root->page->cnt = 2;
2256 root->page->act = 2;
2259 mgr->pagezero->alloc->lvl = root->page->lvl;
2261 // release and unpin root pages
2263 bt_unlockpage(BtLockWrite, root->latch);
2264 bt_unpinlatch (mgr, root->latch);
2266 bt_unpinlatch (mgr, right);
2270 // split already locked full node
2272 // return pool entry for new right
2275 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2277 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2278 BtPage frame = malloc (mgr->page_size);
2279 uint lvl = set->page->lvl;
2286 // split higher half of keys to frame
2288 memset (frame, 0, mgr->page_size);
2289 max = set->page->cnt;
2293 while( cnt++ < max ) {
2294 if( cnt < max || set->page->lvl )
2295 if( slotptr(set->page, cnt)->dead )
2298 src = valptr(set->page, cnt);
2299 nxt -= src->len + sizeof(BtVal);
2300 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2302 key = keyptr(set->page, cnt);
2303 nxt -= key->len + sizeof(BtKey);
2304 ptr = (BtKey*)((unsigned char *)frame + nxt);
2305 memcpy (ptr, key, key->len + sizeof(BtKey));
2307 // add librarian slot
2309 slotptr(frame, ++idx)->off = nxt;
2310 slotptr(frame, idx)->type = Librarian;
2311 slotptr(frame, idx)->dead = 1;
2315 slotptr(frame, ++idx)->off = nxt;
2316 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2318 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2322 frame->bits = mgr->page_bits;
2329 if( set->latch->page_no > ROOT_page )
2330 bt_putid (frame->right, bt_getid (set->page->right));
2332 // get new free page and write higher keys to it.
2334 if( bt_newpage(mgr, right, frame, thread_no) )
2337 // process lower keys
2339 memcpy (frame, set->page, mgr->page_size);
2340 memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2341 set->latch->dirty = 1;
2343 nxt = mgr->page_size;
2344 set->page->garbage = 0;
2350 if( slotptr(frame, max)->type == Librarian )
2353 // assemble page of smaller keys
2355 while( cnt++ < max ) {
2356 if( slotptr(frame, cnt)->dead )
2358 val = valptr(frame, cnt);
2359 nxt -= val->len + sizeof(BtVal);
2360 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2362 key = keyptr(frame, cnt);
2363 nxt -= key->len + sizeof(BtKey);
2364 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2366 // add librarian slot
2368 slotptr(set->page, ++idx)->off = nxt;
2369 slotptr(set->page, idx)->type = Librarian;
2370 slotptr(set->page, idx)->dead = 1;
2374 slotptr(set->page, ++idx)->off = nxt;
2375 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2379 bt_putid(set->page->right, right->latch->page_no);
2380 set->page->min = nxt;
2381 set->page->cnt = idx;
2384 return right->latch - mgr->latchsets;
2387 // fix keys for newly split page
2388 // call with page locked, return
2391 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2393 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2394 unsigned char value[BtId];
2395 uint lvl = set->page->lvl;
2399 // if current page is the root page, split it
2401 if( set->latch->page_no == ROOT_page )
2402 return bt_splitroot (mgr, set, right, thread_no);
2404 ptr = keyptr(set->page, set->page->cnt);
2405 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2407 page = bt_mappage (mgr, right);
2409 ptr = keyptr(page, page->cnt);
2410 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2412 // insert new fences in their parent pages
2414 bt_lockpage (BtLockParent, right, thread_no);
2416 bt_lockpage (BtLockParent, set->latch, thread_no);
2417 bt_unlockpage (BtLockWrite, set->latch);
2419 // insert new fence for reformulated left block of smaller keys
2421 bt_putid (value, set->latch->page_no);
2422 ptr = (BtKey *)leftkey;
2424 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2427 // switch fence for right block of larger keys to new right page
2429 bt_putid (value, right->page_no);
2430 ptr = (BtKey *)rightkey;
2432 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2435 bt_unlockpage (BtLockParent, set->latch);
2436 bt_unpinlatch (mgr, set->latch);
2438 bt_unlockpage (BtLockParent, right);
2439 bt_unpinlatch (mgr, right);
2443 // install new key and value onto page
2444 // page must already be checked for
2447 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2449 uint idx, librarian;
2455 // if found slot > desired slot and previous slot
2456 // is a librarian slot, use it
2459 if( slotptr(set->page, slot-1)->type == Librarian )
2462 // copy value onto page
2464 set->page->min -= vallen + sizeof(BtVal);
2465 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2466 memcpy (val->value, value, vallen);
2469 // copy key onto page
2471 set->page->min -= keylen + sizeof(BtKey);
2472 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2473 memcpy (ptr->key, key, keylen);
2476 // find first empty slot
2478 for( idx = slot; idx < set->page->cnt; idx++ )
2479 if( slotptr(set->page, idx)->dead )
2482 // now insert key into array before slot,
2483 // adding as many librarian slots as
2486 if( idx == set->page->cnt ) {
2487 int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2489 librarian = ++idx - slot;
2490 avail /= sizeof(BtSlot);
2495 if( librarian > avail )
2499 rate = (idx - slot) / librarian;
2500 set->page->cnt += librarian;
2505 librarian = 0, rate = 0;
2507 while( idx > slot ) {
2509 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2512 // add librarian slot per rate
2515 if( (idx - slot + 1)/2 <= librarian * rate ) {
2516 // if( rate && !(idx % rate) ) {
2517 node = slotptr(set->page, idx--);
2518 node->off = node[1].off;
2519 node->type = Librarian;
2526 set->latch->dirty = 1;
2531 node = slotptr(set->page, slot);
2532 node->off = set->page->min;
2537 bt_unlockpage (BtLockWrite, set->latch);
2538 bt_unpinlatch (mgr, set->latch);
2544 // Insert new key into the btree at given level.
2545 // either add a new key or update/add an existing one
2547 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2549 uint slot, idx, len, entry;
2555 while( 1 ) { // find the page and slot for the current key
2556 if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2557 node = slotptr(set->page, slot);
2558 ptr = keyptr(set->page, slot);
2561 mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2565 // if librarian slot == found slot, advance to real slot
2567 if( node->type == Librarian )
2568 if( !keycmp (ptr, key, keylen) ) {
2569 ptr = keyptr(set->page, ++slot);
2570 node = slotptr(set->page, slot);
2573 // if inserting a duplicate key or unique
2574 // key that doesn't exist on the page,
2575 // check for adequate space on the page
2576 // and insert the new key before slot.
2581 if( keycmp (ptr, key, keylen) )
2582 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2583 return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
2584 else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2586 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2591 // if key already exists, update value and return
2593 val = valptr(set->page, slot);
2595 if( val->len >= vallen ) {
2596 if( slotptr(set->page, slot)->dead )
2601 set->page->garbage += val->len - vallen;
2602 set->latch->dirty = 1;
2604 memcpy (val->value, value, vallen);
2605 bt_unlockpage(BtLockWrite, set->latch);
2606 bt_unpinlatch (mgr, set->latch);
2610 // new update value doesn't fit in existing value area
2611 // make sure page has room
2614 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2621 if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2622 if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2624 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2629 // copy key and value onto page and update slot
2631 set->page->min -= vallen + sizeof(BtVal);
2632 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2633 memcpy (val->value, value, vallen);
2636 set->latch->dirty = 1;
2637 set->page->min -= keylen + sizeof(BtKey);
2638 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2639 memcpy (ptr->key, key, keylen);
2642 node->off = set->page->min;
2643 bt_unlockpage(BtLockWrite, set->latch);
2644 bt_unpinlatch (mgr, set->latch);
2651 // determine actual page where key is located
2652 // return slot number
2654 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2656 BtKey *key = keyptr(source,src);
2657 uint slot = locks[src].slot;
2660 if( src > 1 && locks[src].reuse )
2661 entry = locks[src-1].entry, slot = 0;
2663 entry = locks[src].entry;
2666 set->latch = mgr->latchsets + entry;
2667 set->page = bt_mappage (mgr, set->latch);
2671 // is locks->reuse set? or was slot zeroed?
2672 // if so, find where our key is located
2673 // on current page or pages split on
2674 // same page txn operations.
2677 set->latch = mgr->latchsets + entry;
2678 set->page = bt_mappage (mgr, set->latch);
2680 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2681 if( slotptr(set->page, slot)->type == Librarian )
2683 if( locks[src].reuse )
2684 locks[src].entry = entry;
2687 } while( entry = set->latch->split );
2689 mgr->line = __LINE__, mgr->err = BTERR_atomic;
2693 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2695 BtKey *key = keyptr(source, src);
2696 BtVal *val = valptr(source, src);
2701 while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2702 if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2703 if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2705 set->page->lsn = lsn;
2709 if( entry = bt_splitpage (mgr, set, thread_no) )
2710 latch = mgr->latchsets + entry;
2714 // splice right page into split chain
2715 // and WriteLock it.
2717 bt_lockpage(BtLockWrite, latch, thread_no);
2718 latch->split = set->latch->split;
2719 set->latch->split = entry;
2720 locks[src].slot = 0;
2723 return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2726 // perform delete from smaller btree
2727 // insert a delete slot if not found there
2729 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2731 BtKey *key = keyptr(source, src);
2738 if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2739 node = slotptr(set->page, slot);
2740 ptr = keyptr(set->page, slot);
2741 val = valptr(set->page, slot);
2743 return mgr->line = __LINE__, mgr->err = BTERR_struct;
2745 // if slot is not found, insert a delete slot
2747 if( keycmp (ptr, key->key, key->len) )
2748 return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
2750 // if node is already dead,
2751 // ignore the request.
2756 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2757 set->latch->dirty = 1;
2758 set->page->lsn = lsn;
2762 __sync_fetch_and_add(&mgr->found, 1);
2766 // delete an empty master page for a transaction
2768 // note that the far right page never empties because
2769 // it always contains (at least) the infinite stopper key
2770 // and that all pages that don't contain any keys are
2771 // deleted, or are being held under Atomic lock.
2773 BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no)
2775 BtPageSet right[1], temp[1];
2776 unsigned char value[BtId];
2780 bt_lockpage(BtLockWrite, prev->latch, thread_no);
2782 // grab the right sibling
2784 if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) )
2785 right->page = bt_mappage (mgr, right->latch);
2789 bt_lockpage(BtLockAtomic, right->latch, thread_no);
2790 bt_lockpage(BtLockWrite, right->latch, thread_no);
2792 // and pull contents over empty page
2793 // while preserving master's left link
2795 memcpy (right->page->left, prev->page->left, BtId);
2796 memcpy (prev->page, right->page, mgr->page_size);
2798 // forward seekers to old right sibling
2799 // to new page location in set
2801 bt_putid (right->page->right, prev->latch->page_no);
2802 right->latch->dirty = 1;
2803 right->page->kill = 1;
2805 // remove pointer to right page for searchers by
2806 // changing right fence key to point to the master page
2808 ptr = keyptr(right->page,right->page->cnt);
2809 bt_putid (value, prev->latch->page_no);
2811 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
2814 // now that master page is in good shape we can
2815 // remove its locks.
2817 bt_unlockpage (BtLockAtomic, prev->latch);
2818 bt_unlockpage (BtLockWrite, prev->latch);
2820 // fix master's right sibling's left pointer
2821 // to remove scanner's poiner to the right page
2823 if( right_page_no = bt_getid (prev->page->right) ) {
2824 if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
2825 temp->page = bt_mappage (mgr, temp->latch);
2829 bt_lockpage (BtLockWrite, temp->latch, thread_no);
2830 bt_putid (temp->page->left, prev->latch->page_no);
2831 temp->latch->dirty = 1;
2833 bt_unlockpage (BtLockWrite, temp->latch);
2834 bt_unpinlatch (mgr, temp->latch);
2835 } else { // master is now the far right page
2836 bt_mutexlock (mgr->lock);
2837 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
2838 bt_releasemutex(mgr->lock);
2841 // now that there are no pointers to the right page
2842 // we can delete it after the last read access occurs
2844 bt_unlockpage (BtLockWrite, right->latch);
2845 bt_unlockpage (BtLockAtomic, right->latch);
2846 bt_lockpage (BtLockDelete, right->latch, thread_no);
2847 bt_lockpage (BtLockWrite, right->latch, thread_no);
2848 bt_freepage (mgr, right);
2852 // atomic modification of a batch of keys.
2854 // return -1 if BTERR is set
2855 // otherwise return slot number
2856 // causing the key constraint violation
2857 // or zero on successful completion.
2859 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2861 uint src, idx, slot, samepage, entry, que = 0;
2862 BtKey *key, *ptr, *key2;
2868 // stable sort the list of keys into order to
2869 // prevent deadlocks between threads.
2871 for( src = 1; src++ < source->cnt; ) {
2872 *temp = *slotptr(source,src);
2873 key = keyptr (source,src);
2875 for( idx = src; --idx; ) {
2876 key2 = keyptr (source,idx);
2877 if( keycmp (key, key2->key, key2->len) < 0 ) {
2878 *slotptr(source,idx+1) = *slotptr(source,idx);
2879 *slotptr(source,idx) = *temp;
2885 // add entries to redo log
2887 if( bt->mgr->pagezero->redopages )
2888 lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2892 // perform the individual actions in the transaction
2894 if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2895 return bt->mgr->err;
2897 // if number of active pages
2898 // is greater than the buffer pool
2899 // promote page into larger btree
2902 while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
2903 if( bt_txnpromote (bt) )
2904 return bt->mgr->err;
2911 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2913 uint src, idx, slot, samepage, entry, que = 0;
2914 AtomicKey *head, *tail, *leaf;
2915 BtPageSet set[1], prev[1];
2916 unsigned char value[BtId];
2924 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2929 // Load the leaf page for each key
2930 // group same page references with reuse bit
2931 // and determine any constraint violations
2933 for( src = 0; src++ < source->cnt; ) {
2934 key = keyptr(source, src);
2937 // first determine if this modification falls
2938 // on the same page as the previous modification
2939 // note that the far right leaf page is a special case
2941 if( samepage = src > 1 )
2942 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2943 slot = bt_findslot(set->page, key->key, key->len);
2945 bt_unlockpage(BtLockRead, set->latch);
2948 if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, thread_no) )
2949 set->latch->split = 0;
2953 if( slotptr(set->page, slot)->type == Librarian )
2954 ptr = keyptr(set->page, ++slot);
2956 ptr = keyptr(set->page, slot);
2959 locks[src].entry = set->latch - mgr->latchsets;
2960 locks[src].slot = slot;
2961 locks[src].reuse = 0;
2963 locks[src].entry = 0;
2964 locks[src].slot = 0;
2965 locks[src].reuse = 1;
2968 // capture current lsn for master page
2970 locks[src].reqlsn = set->page->lsn;
2973 // unlock last loadpage lock
2976 bt_unlockpage(BtLockRead, set->latch);
2978 // obtain write lock for each master page
2979 // sync flushed pages to disk
2981 for( src = 0; src++ < source->cnt; ) {
2982 if( locks[src].reuse )
2985 set->latch = mgr->latchsets + locks[src].entry;
2986 bt_lockpage (BtLockWrite, set->latch, thread_no);
2989 // insert or delete each key
2990 // process any splits or merges
2991 // release Write & Atomic latches
2992 // set ParentModifications and build
2993 // queue of keys to insert for split pages
2994 // or delete for deleted pages.
2996 // run through txn list backwards
2998 samepage = source->cnt + 1;
3000 for( src = source->cnt; src; src-- ) {
3001 if( locks[src].reuse )
3004 // perform the txn operations
3005 // from smaller to larger on
3008 for( idx = src; idx < samepage; idx++ )
3009 switch( slotptr(source,idx)->type ) {
3011 if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
3017 if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
3022 bt_atomicpage (mgr, source, locks, idx, set);
3026 // after the same page operations have finished,
3027 // process master page for splits or deletion.
3029 latch = prev->latch = mgr->latchsets + locks[src].entry;
3030 prev->page = bt_mappage (mgr, prev->latch);
3033 // pick-up all splits from master page
3034 // each one is already WriteLocked.
3036 entry = prev->latch->split;
3039 set->latch = mgr->latchsets + entry;
3040 set->page = bt_mappage (mgr, set->latch);
3041 entry = set->latch->split;
3043 // delete empty master page by undoing its split
3044 // (this is potentially another empty page)
3045 // note that there are no new left pointers yet
3047 if( !prev->page->act ) {
3048 memcpy (set->page->left, prev->page->left, BtId);
3049 memcpy (prev->page, set->page, mgr->page_size);
3050 bt_lockpage (BtLockDelete, set->latch, thread_no);
3051 bt_freepage (mgr, set);
3053 prev->latch->split = set->latch->split;
3054 prev->latch->dirty = 1;
3058 // remove empty page from the split chain
3059 // and return it to the free list.
3061 if( !set->page->act ) {
3062 memcpy (prev->page->right, set->page->right, BtId);
3063 prev->latch->split = set->latch->split;
3064 bt_lockpage (BtLockDelete, set->latch, thread_no);
3065 bt_freepage (mgr, set);
3069 // schedule prev fence key update
3071 ptr = keyptr(prev->page,prev->page->cnt);
3072 leaf = calloc (sizeof(AtomicKey), 1), que++;
3074 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3075 leaf->entry = prev->latch - mgr->latchsets;
3076 leaf->page_no = prev->latch->page_no;
3086 // splice in the left link into the split page
3088 bt_putid (set->page->left, prev->latch->page_no);
3089 bt_lockpage(BtLockParent, prev->latch, thread_no);
3090 bt_unlockpage(BtLockWrite, prev->latch);
3092 bt_syncpage (mgr, prev->page, prev->latch);
3096 // update left pointer in next right page from last split page
3097 // (if all splits were reversed, latch->split == 0)
3099 if( latch->split ) {
3100 // fix left pointer in master's original (now split)
3101 // far right sibling or set rightmost page in page zero
3103 if( right = bt_getid (prev->page->right) ) {
3104 if( set->latch = bt_pinlatch (mgr, right, NULL, thread_no) )
3105 set->page = bt_mappage (mgr, set->latch);
3109 bt_lockpage (BtLockWrite, set->latch, thread_no);
3110 bt_putid (set->page->left, prev->latch->page_no);
3111 set->latch->dirty = 1;
3112 bt_unlockpage (BtLockWrite, set->latch);
3113 bt_unpinlatch (mgr, set->latch);
3114 } else { // prev is rightmost page
3115 bt_mutexlock (mgr->lock);
3116 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3117 bt_releasemutex(mgr->lock);
3120 // Process last page split in chain
3122 ptr = keyptr(prev->page,prev->page->cnt);
3123 leaf = calloc (sizeof(AtomicKey), 1), que++;
3125 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3126 leaf->entry = prev->latch - mgr->latchsets;
3127 leaf->page_no = prev->latch->page_no;
3137 bt_lockpage(BtLockParent, prev->latch, thread_no);
3138 bt_unlockpage(BtLockWrite, prev->latch);
3141 bt_syncpage (mgr, prev->page, prev->latch);
3143 // remove atomic lock on master page
3145 bt_unlockpage(BtLockAtomic, latch);
3149 // finished if prev page occupied (either master or final split)
3151 if( prev->page->act ) {
3152 bt_unlockpage(BtLockWrite, latch);
3153 bt_unlockpage(BtLockAtomic, latch);
3156 bt_syncpage (mgr, prev->page, latch);
3158 bt_unpinlatch(mgr, latch);
3162 // any and all splits were reversed, and the
3163 // master page located in prev is empty, delete it
3164 // by pulling over master's right sibling.
3166 // Remove empty master's fence key
3168 ptr = keyptr(prev->page,prev->page->cnt);
3170 if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) )
3173 // perform the remainder of the delete
3174 // from the FIFO queue
3176 leaf = calloc (sizeof(AtomicKey), 1), que++;
3178 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3179 leaf->entry = prev->latch - mgr->latchsets;
3180 leaf->page_no = prev->latch->page_no;
3191 // leave atomic lock in place until
3192 // deletion completes in next phase.
3194 bt_unlockpage(BtLockWrite, prev->latch);
3197 bt_syncpage (mgr, prev->page, prev->latch);
3201 // add & delete keys for any pages split or merged during transaction
3205 set->latch = mgr->latchsets + leaf->entry;
3206 set->page = bt_mappage (mgr, set->latch);
3208 bt_putid (value, leaf->page_no);
3209 ptr = (BtKey *)leaf->leafkey;
3211 switch( leaf->type ) {
3212 case 0: // insert key
3213 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3218 case 1: // delete key
3219 if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) )
3224 case 2: // free page
3225 if( bt_atomicfree (mgr, set, thread_no) )
3231 if( !leaf->nounlock )
3232 bt_unlockpage (BtLockParent, set->latch);
3234 bt_unpinlatch (mgr, set->latch);
3237 } while( leaf = tail );
3243 // promote a page into the larger btree
3245 BTERR bt_txnpromote (BtDb *bt)
3247 uint entry, slot, idx;
3255 entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
3257 entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
3259 entry %= bt->mgr->latchtotal;
3264 set->latch = bt->mgr->latchsets + entry;
3265 idx = set->latch->page_no % bt->mgr->latchhash;
3267 if( !bt_mutextry(set->latch->modify) )
3270 // skip this entry if it is pinned
3272 if( set->latch->pin & ~CLOCK_bit ) {
3273 bt_releasemutex(set->latch->modify);
3277 set->page = bt_mappage (bt->mgr, set->latch);
3279 // entry never used or has no right sibling
3281 if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3282 bt_releasemutex(set->latch->modify);
3286 bt_lockpage (BtLockAtomic, set->latch, bt->thread_no);
3287 bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3289 // entry interiour node or being or was killed
3291 if( set->page->lvl || set->page->free || set->page->kill ) {
3292 bt_releasemutex(set->latch->modify);
3293 bt_unlockpage(BtLockAtomic, set->latch);
3294 bt_unlockpage(BtLockWrite, set->latch);
3298 // pin the page for our useage
3301 bt_releasemutex(set->latch->modify);
3303 // transfer slots in our selected page to larger btree
3304 if( !(set->latch->page_no % 100) )
3305 fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act);
3307 if( bt_atomicexec (bt->main, set->page, 0, 1, bt->thread_no) )
3308 return bt->main->err;
3310 bt_unlockpage(BtLockAtomic, set->latch);
3312 // now delete the page
3314 if( bt_deletepage (bt->mgr, set, bt->thread_no) )
3315 return bt->mgr->err;
3321 // set cursor to highest slot on highest page
3323 uint bt_lastkey (BtDb *bt)
3325 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3328 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3329 set->page = bt_mappage (bt->mgr, set->latch);
3333 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3334 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3335 bt_unlockpage(BtLockRead, set->latch);
3336 bt_unpinlatch (bt->mgr, set->latch);
3337 return bt->cursor->cnt;
3340 // return previous slot on cursor page
3342 uint bt_prevkey (BtDb *bt, uint slot)
3344 uid cursor_page = bt->cursor->page_no;
3345 uid ourright, next, us = cursor_page;
3351 ourright = bt_getid(bt->cursor->right);
3354 if( !(next = bt_getid(bt->cursor->left)) )
3360 if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3361 set->page = bt_mappage (bt->mgr, set->latch);
3365 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3366 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3367 bt_unlockpage(BtLockRead, set->latch);
3368 bt_unpinlatch (bt->mgr, set->latch);
3370 next = bt_getid (bt->cursor->right);
3372 if( bt->cursor->kill )
3376 if( next == ourright )
3381 return bt->cursor->cnt;
3384 // return next slot on cursor page
3385 // or slide cursor right into next page
3387 uint bt_nextkey (BtDb *bt, uint slot)
3393 right = bt_getid(bt->cursor->right);
3395 while( slot++ < bt->cursor->cnt )
3396 if( slotptr(bt->cursor,slot)->dead )
3398 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3406 if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3407 set->page = bt_mappage (bt->mgr, set->latch);
3411 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3412 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3413 bt_unlockpage(BtLockRead, set->latch);
3414 bt_unpinlatch (bt->mgr, set->latch);
3419 return bt->mgr->err = 0;
3422 // cache page of keys into cursor and return starting slot for given key
3424 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3429 // cache page for retrieval
3431 if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3432 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3436 bt_unlockpage(BtLockRead, set->latch);
3437 bt_unpinlatch (bt->mgr, set->latch);
3444 double getCpuTime(int type)
3447 FILETIME xittime[1];
3448 FILETIME systime[1];
3449 FILETIME usrtime[1];
3450 SYSTEMTIME timeconv[1];
3453 memset (timeconv, 0, sizeof(SYSTEMTIME));
3457 GetSystemTimeAsFileTime (xittime);
3458 FileTimeToSystemTime (xittime, timeconv);
3459 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3462 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3463 FileTimeToSystemTime (usrtime, timeconv);
3466 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3467 FileTimeToSystemTime (systime, timeconv);
3471 ans += (double)timeconv->wHour * 3600;
3472 ans += (double)timeconv->wMinute * 60;
3473 ans += (double)timeconv->wSecond;
3474 ans += (double)timeconv->wMilliseconds / 1000;
3479 #include <sys/resource.h>
3481 double getCpuTime(int type)
3483 struct rusage used[1];
3484 struct timeval tv[1];
3488 gettimeofday(tv, NULL);
3489 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3492 getrusage(RUSAGE_SELF, used);
3493 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3496 getrusage(RUSAGE_SELF, used);
3497 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3504 void bt_poolaudit (BtMgr *mgr)
3509 while( ++entry < mgr->latchtotal ) {
3510 latch = mgr->latchsets + entry;
3512 if( *latch->readwr->rin & MASK )
3513 fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3515 if( *latch->access->rin & MASK )
3516 fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3518 // if( *latch->parent->xcl->value )
3519 // fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3521 // if( *latch->atomic->xcl->value )
3522 // fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3524 // if( *latch->modify->value )
3525 // fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3527 if( latch->pin & ~CLOCK_bit )
3528 fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3541 // standalone program to index file of keys
3542 // then list them onto std-out
3545 void *index_file (void *arg)
3547 uint __stdcall index_file (void *arg)
3550 int line = 0, found = 0, cnt = 0, idx;
3551 uid next, page_no = LEAF_page; // start on first page of leaves
3552 int ch, len = 0, slot, type = 0;
3553 unsigned char key[BT_maxkey];
3554 unsigned char txn[65536];
3555 ThreadArg *args = arg;
3564 bt = bt_open (args->mgr, args->main);
3567 if( args->idx < strlen (args->type) )
3568 ch = args->type[args->idx];
3570 ch = args->type[strlen(args->type) - 1];
3582 if( type == Delete )
3583 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3585 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3587 if( type == Delete )
3588 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3590 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3592 if( in = fopen (args->infile, "rb") )
3593 while( ch = getc(in), ch != EOF )
3599 if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3600 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3606 memcpy (txn + nxt, key + 10, len - 10);
3608 txn[nxt] = len - 10;
3610 memcpy (txn + nxt, key, 10);
3613 slotptr(page,++cnt)->off = nxt;
3614 slotptr(page,cnt)->type = type;
3617 if( cnt < args->num )
3624 if( bt_atomictxn (bt, page) )
3625 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3630 else if( len < BT_maxkey )
3632 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3636 fprintf(stderr, "started indexing for %s\n", args->infile);
3637 if( in = fopen (args->infile, "r") )
3638 while( ch = getc(in), ch != EOF )
3643 if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3644 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3647 else if( len < BT_maxkey )
3649 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3653 fprintf(stderr, "started finding keys for %s\n", args->infile);
3654 if( in = fopen (args->infile, "rb") )
3655 while( ch = getc(in), ch != EOF )
3659 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3661 else if( bt->mgr->err )
3662 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3665 else if( len < BT_maxkey )
3667 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3671 fprintf(stderr, "started scanning\n");
3674 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3675 set->page = bt_mappage (bt->mgr, set->latch);
3677 fprintf(stderr, "unable to obtain latch"), exit(1);
3679 bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3680 next = bt_getid (set->page->right);
3682 for( slot = 0; slot++ < set->page->cnt; )
3683 if( next || slot < set->page->cnt )
3684 if( !slotptr(set->page, slot)->dead ) {
3685 ptr = keyptr(set->page, slot);
3688 if( slotptr(set->page, slot)->type == Duplicate )
3691 fwrite (ptr->key, len, 1, stdout);
3692 val = valptr(set->page, slot);
3693 fwrite (val->value, val->len, 1, stdout);
3694 fputc ('\n', stdout);
3698 bt_unlockpage (BtLockRead, set->latch);
3699 bt_unpinlatch (bt->mgr, set->latch);
3700 } while( page_no = next );
3702 fprintf(stderr, " Total keys read %d\n", cnt);
3706 fprintf(stderr, "started reverse scan\n");
3707 if( slot = bt_lastkey (bt) )
3708 while( slot = bt_prevkey (bt, slot) ) {
3709 if( slotptr(bt->cursor, slot)->dead )
3712 ptr = keyptr(bt->cursor, slot);
3715 if( slotptr(bt->cursor, slot)->type == Duplicate )
3718 fwrite (ptr->key, len, 1, stdout);
3719 val = valptr(bt->cursor, slot);
3720 fwrite (val->value, val->len, 1, stdout);
3721 fputc ('\n', stdout);
3725 fprintf(stderr, " Total keys read %d\n", cnt);
3730 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3732 fprintf(stderr, "started counting\n");
3733 next = REDO_page + bt->mgr->pagezero->redopages;
3735 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3736 if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3739 if( !bt->cursor->free && !bt->cursor->lvl )
3740 cnt += bt->cursor->act;
3746 cnt--; // remove stopper key
3747 fprintf(stderr, " Total keys counted %d\n", cnt);
3759 typedef struct timeval timer;
3761 int main (int argc, char **argv)
3763 int idx, cnt, len, slot, err;
3764 int segsize, bits = 16;
3784 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3785 fprintf (stderr, " where idx_file is the name of the cache btree file\n");
3786 fprintf (stderr, " where main_file is the name of the main btree file\n");
3787 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3788 fprintf (stderr, " page_bits is the page size in bits for the cache btree\n");
3789 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3790 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3791 fprintf (stderr, " recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3792 fprintf (stderr, " main_bits is the page size of the main btree in bits\n");
3793 fprintf (stderr, " main_pool is the number of main pages in the main buffer pool\n");
3794 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3798 start = getCpuTime(0);
3801 bits = atoi(argv[4]);
3804 poolsize = atoi(argv[5]);
3807 fprintf (stderr, "Warning: no mapped_pool\n");
3810 num = atoi(argv[6]);
3813 redopages = atoi(argv[7]);
3815 if( redopages + REDO_page > 65535 )
3816 fprintf (stderr, "Warning: Recovery buffer too large\n");
3819 mainbits = atoi(argv[8]);
3822 mainpool = atoi(argv[9]);
3826 threads = malloc (cnt * sizeof(pthread_t));
3828 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3830 args = malloc ((cnt + 1) * sizeof(ThreadArg));
3832 mgr = bt_mgr (argv[1], bits, poolsize, redopages);
3835 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3840 main = bt_mgr (argv[2], mainbits, mainpool, 0);
3843 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3852 for( idx = 0; idx < cnt; idx++ ) {
3853 args[idx].infile = argv[idx + 10];
3854 args[idx].type = argv[3];
3855 args[idx].main = main;
3856 args[idx].mgr = mgr;
3857 args[idx].num = num;
3858 args[idx].idx = idx;
3860 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3861 fprintf(stderr, "Error creating thread %d\n", err);
3863 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3867 args[0].infile = argv[idx + 10];
3868 args[0].type = argv[3];
3869 args[0].main = main;
3876 // wait for termination
3879 for( idx = 0; idx < cnt; idx++ )
3880 pthread_join (threads[idx], NULL);
3882 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3884 for( idx = 0; idx < cnt; idx++ )
3885 CloseHandle(threads[idx]);
3892 fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
3898 elapsed = getCpuTime(0) - start;
3899 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3900 elapsed = getCpuTime(1);
3901 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3902 elapsed = getCpuTime(2);
3903 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);