1 // btree version threadskv9c sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // and redo log for failure recovery
13 // author: karl malbrain, malbrain@cal.berkeley.edu
16 This work, including the source code, documentation
17 and related data, is placed into the public domain.
19 The orginal author is Karl Malbrain.
21 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
22 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
23 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
24 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
25 RESULTING FROM THE USE, MODIFICATION, OR
26 REDISTRIBUTION OF THIS SOFTWARE.
29 // Please see the project home page for documentation
30 // code.google.com/p/high-concurrency-btree
32 #define _FILE_OFFSET_BITS 64
33 #define _LARGEFILE64_SOURCE
37 #include <linux/futex.h>
52 #define WIN32_LEAN_AND_MEAN
66 typedef unsigned long long uid;
67 typedef unsigned long long logseqno;
70 typedef unsigned long long off64_t;
71 typedef unsigned short ushort;
72 typedef unsigned int uint;
75 #define BT_ro 0x6f72 // ro
76 #define BT_rw 0x7772 // rw
78 #define BT_maxbits 24 // maximum page size in bits
79 #define BT_minbits 9 // minimum page size in bits
80 #define BT_minpage (1 << BT_minbits) // minimum page size
81 #define BT_maxpage (1 << BT_maxbits) // maximum page size
83 // BTree page number constants
84 #define ALLOC_page 0 // allocation page
85 #define ROOT_page 1 // root of the btree
86 #define LEAF_page 2 // first page of leaves
87 #define REDO_page 3 // first page of redo buffer
89 // Number of levels to create in a new BTree
94 There are six lock types for each node in four independent sets:
95 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
96 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
97 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
98 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
99 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
100 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
112 // definition for phase-fair reader/writer lock implementation
115 volatile ushort rin[1];
116 volatile ushort rout[1];
117 volatile ushort ticket[1];
118 volatile ushort serving[1];
126 volatile ushort ticket[1];
127 volatile ushort serving[1];
139 // exclusive is set for write access
142 volatile unsigned char exclusive[1];
143 unsigned char filler;
148 // mode & definition for lite latch implementation
151 QueRd = 1, // reader queue
152 QueWr = 2 // writer queue
155 // hash table entries
158 uint entry; // Latch table entry at head of chain
159 BtMutexLatch latch[1];
162 // latch manager table structure
165 uid page_no; // latch set page number
166 RWLock readwr[1]; // read/write page lock
167 RWLock access[1]; // Access Intent/Page delete
168 WOLock parent[1]; // Posting of fence key in parent
169 WOLock atomic[1]; // Atomic update in progress
170 uint split; // right split page atomic insert
171 uint entry; // entry slot in latch table
172 uint next; // next entry in hash table chain
173 uint prev; // prev entry in hash table chain
174 BtMutexLatch modify[1]; // modify entry lite latch
175 volatile ushort pin; // number of accessing threads
176 volatile unsigned char dirty; // page in cache is dirty (atomic set)
177 volatile unsigned char avail; // page is an available entry
180 // Define the length of the page record numbers
184 // Page key slot definition.
186 // Keys are marked dead, but remain on the page until
187 // it cleanup is called. The fence key (highest key) for
188 // a leaf page is always present, even after cleanup.
192 // In addition to the Unique keys that occupy slots
193 // there are Librarian and Duplicate key
194 // slots occupying the key slot array.
196 // The Librarian slots are dead keys that
197 // serve as filler, available to add new Unique
198 // or Dup slots that are inserted into the B-tree.
200 // The Duplicate slots have had their key bytes extended
201 // by 6 bytes to contain a binary duplicate key uniqueifier.
212 uint off:BT_maxbits; // page offset for key start
213 uint type:3; // type of slot
214 uint dead:1; // set for deleted slot
217 // The key structure occupies space at the upper end of
218 // each page. It's a length byte followed by the key
222 unsigned char len; // this can be changed to a ushort or uint
223 unsigned char key[0];
226 // the value structure also occupies space at the upper
227 // end of the page. Each key is immediately followed by a value.
230 unsigned char len; // this can be changed to a ushort or uint
231 unsigned char value[0];
234 #define BT_maxkey 255 // maximum number of bytes in a key
235 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
237 // The first part of an index page.
238 // It is immediately followed
239 // by the BtSlot array of keys.
241 // note that this structure size
242 // must be a multiple of 8 bytes
243 // in order to place dups correctly.
245 typedef struct BtPage_ {
246 uint cnt; // count of keys in page
247 uint act; // count of active keys
248 uint min; // next key offset
249 uint garbage; // page garbage in bytes
250 unsigned char bits:7; // page size in bits
251 unsigned char free:1; // page is on free chain
252 unsigned char lvl:7; // level of page
253 unsigned char kill:1; // page is being deleted
254 unsigned char right[BtId]; // page number to right
255 unsigned char left[BtId]; // page number to left
256 unsigned char filler[2]; // padding to multiple of 8
257 logseqno lsn; // log sequence number applied
260 // The loadpage interface object
263 BtPage page; // current page pointer
264 BtLatchSet *latch; // current page latch set
267 // structure for latch manager on ALLOC_page
270 struct BtPage_ alloc[1]; // next page_no in right ptr
271 unsigned long long dups[1]; // global duplicate key uniqueifier
272 unsigned char chain[BtId]; // head of free page_nos chain
275 // The object structure for Btree access
278 uint page_size; // page size
279 uint page_bits; // page size in bits
285 BtPageZero *pagezero; // mapped allocation page
286 BtHashEntry *hashtable; // the buffer pool hash table entries
287 BtLatchSet *latchsets; // mapped latch set from buffer pool
288 unsigned char *pagepool; // mapped to the buffer pool pages
289 unsigned char *redobuff; // mapped recovery buffer pointer
290 logseqno lsn, flushlsn; // current & first lsn flushed
291 BtMutexLatch dump[1]; // redo dump lite latch
292 BtMutexLatch redo[1]; // redo area lite latch
293 BtMutexLatch lock[1]; // allocation area lite latch
294 ushort thread_no[1]; // next thread number
295 uint nlatchpage; // number of latch pages at BT_latch
296 uint latchtotal; // number of page latch entries
297 uint latchhash; // number of latch hash table slots
298 uint latchvictim; // next latch entry to examine
299 uint latchavail; // next available latch entry
300 uint availlock[1]; // latch available chain commitments
301 uint available; // size of latch available chain
302 uint redopages; // size of recovery buff in pages
303 uint redoend; // eof/end element in recovery buff
305 HANDLE halloc; // allocation handle
306 HANDLE hpool; // buffer pool handle
311 BtMgr *mgr; // buffer manager for thread
312 BtPage cursor; // cached frame for start/next (never mapped)
313 BtPage frame; // spare frame for the page split (never mapped)
314 uid cursor_page; // current cursor page number
315 unsigned char *mem; // frame, cursor, page memory buffer
316 unsigned char key[BT_keyarray]; // last found complete key
317 int found; // last delete or insert was found
318 int err; // last error
319 int line; // last error line no
320 int reads, writes; // number of reads and writes from the btree
321 ushort thread_no; // thread number
324 // Catastrophic errors
339 #define CLOCK_bit 0x8000
341 // recovery manager entry types
344 BTRM_eof = 0, // rest of buffer is emtpy
345 BTRM_add, // add a unique key-value to btree
346 BTRM_dup, // add a duplicate key-value to btree
347 BTRM_del, // delete a key-value from btree
348 BTRM_upd, // update a key with a new value
349 BTRM_new, // allocate a new empty page
350 BTRM_old // reuse an old empty page
353 // recovery manager entry
354 // structure followed by BtKey & BtVal
357 logseqno reqlsn; // log sequence number required
358 logseqno lsn; // log sequence number for entry
359 uint len; // length of entry
360 unsigned char type; // type of entry
361 unsigned char lvl; // level of btree entry pertains to
366 extern void bt_close (BtDb *bt);
367 extern BtDb *bt_open (BtMgr *mgr);
368 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
369 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
370 extern void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
371 extern void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
372 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
373 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
374 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
375 extern BtKey *bt_foundkey (BtDb *bt);
376 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
377 extern uint bt_nextkey (BtDb *bt, uint slot);
380 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint rmpages);
381 extern void bt_mgrclose (BtMgr *mgr);
382 extern logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val);
383 extern logseqno bt_txnredo (BtDb *bt, BtPage page);
385 // Helper functions to return slot values
386 // from the cursor page.
388 extern BtKey *bt_key (BtDb *bt, uint slot);
389 extern BtVal *bt_val (BtDb *bt, uint slot);
391 // The page is allocated from low and hi ends.
392 // The key slots are allocated from the bottom,
393 // while the text and value of the key
394 // are allocated from the top. When the two
395 // areas meet, the page is split into two.
397 // A key consists of a length byte, two bytes of
398 // index number (0 - 65535), and up to 253 bytes
401 // Associated with each key is a value byte string
402 // containing any value desired.
404 // The b-tree root is always located at page 1.
405 // The first leaf page of level zero is always
406 // located on page 2.
408 // The b-tree pages are linked with next
409 // pointers to facilitate enumerators,
410 // and provide for concurrency.
412 // When to root page fills, it is split in two and
413 // the tree height is raised by a new root at page
414 // one with two keys.
416 // Deleted keys are marked with a dead bit until
417 // page cleanup. The fence key for a leaf node is
420 // To achieve maximum concurrency one page is locked at a time
421 // as the tree is traversed to find leaf key in question. The right
422 // page numbers are used in cases where the page is being split,
425 // Page 0 is dedicated to lock for new page extensions,
426 // and chains empty pages together for reuse. It also
427 // contains the latch manager hash table.
429 // The ParentModification lock on a node is obtained to serialize posting
430 // or changing the fence key for a node.
432 // Empty pages are chained together through the ALLOC page and reused.
434 // Access macros to address slot and key values from the page
435 // Page slots use 1 based indexing.
437 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
438 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
439 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
441 void bt_putid(unsigned char *dest, uid id)
446 dest[i] = (unsigned char)id, id >>= 8;
449 uid bt_getid(unsigned char *src)
454 for( i = 0; i < BtId; i++ )
455 id <<= 8, id |= *src++;
460 uid bt_newdup (BtDb *bt)
463 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
465 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
469 // Write-Only Queue Lock
471 void WriteOLock (WOLock *lock, ushort tid)
475 if( lock->tid == tid ) {
480 tix = __sync_fetch_and_add (lock->ticket, 1);
482 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
484 // wait for our ticket to come up
486 while( tix != lock->serving[0] )
495 void WriteORelease (WOLock *lock)
506 // Phase-Fair reader/writer lock implementation
508 void WriteLock (RWLock *lock, ushort tid)
512 if( lock->tid == tid ) {
517 tix = __sync_fetch_and_add (lock->ticket, 1);
519 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
521 // wait for our ticket to come up
523 while( tix != lock->serving[0] )
530 w = PRES | (tix & PHID);
532 r = __sync_fetch_and_add (lock->rin, w);
534 r = _InterlockedExchangeAdd16 (lock->rin, w);
536 while( r != *lock->rout )
545 void WriteRelease (RWLock *lock)
554 __sync_fetch_and_and (lock->rin, ~MASK);
556 _InterlockedAnd16 (lock->rin, ~MASK);
561 // try to obtain read lock
562 // return 1 if successful
564 int ReadTry (RWLock *lock, ushort tid)
568 // OK if write lock already held by same thread
570 if( lock->tid == tid ) {
575 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
577 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
580 if( w == (*lock->rin & MASK) ) {
582 __sync_fetch_and_add (lock->rin, -RINC);
584 _InterlockedExchangeAdd16 (lock->rin, -RINC);
592 void ReadLock (RWLock *lock, ushort tid)
595 if( lock->tid == tid ) {
600 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
602 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
605 while( w == (*lock->rin & MASK) )
613 void ReadRelease (RWLock *lock)
621 __sync_fetch_and_add (lock->rout, RINC);
623 _InterlockedExchangeAdd16 (lock->rout, RINC);
627 // lite weight spin lock Latch Manager
629 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
631 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
634 void bt_mutexlock(BtMutexLatch *latch)
640 prev = __sync_fetch_and_or(latch->exclusive, XCL);
642 prev = _InterlockedOr8(latch->exclusive, XCL);
647 } while( sched_yield(), 1 );
649 } while( SwitchToThread(), 1 );
653 // try to obtain write lock
655 // return 1 if obtained,
658 int bt_mutextry(BtMutexLatch *latch)
663 prev = __sync_fetch_and_or(latch->exclusive, XCL);
665 prev = _InterlockedOr8(latch->exclusive, XCL);
667 // take write access if exclusive bit is clear
669 return !(prev & XCL);
674 void bt_releasemutex(BtMutexLatch *latch)
676 *latch->exclusive = 0;
679 // recovery manager -- flush dirty pages
681 void bt_flushlsn (BtDb *bt)
683 uint cnt3 = 0, cnt2 = 0, cnt = 0;
688 // flush dirty pool pages to the btree that were
689 // dirty before the start of this redo recovery buffer
690 fprintf(stderr, "Start flushlsn\n");
691 for( entry = 1; entry < bt->mgr->latchtotal; entry++ ) {
692 page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
693 latch = bt->mgr->latchsets + entry;
694 bt_mutexlock (latch->modify);
695 bt_lockpage(bt, BtLockRead, latch);
698 bt_writepage(bt->mgr, page, latch->page_no);
699 latch->dirty = 0, bt->writes++, cnt++;
703 if( latch->pin & ~CLOCK_bit )
705 bt_unlockpage(bt, BtLockRead, latch);
706 bt_releasemutex (latch->modify);
708 fprintf(stderr, "End flushlsn %d pages %d pinned %d available\n", cnt, cnt2, cnt3);
711 // recovery manager -- process current recovery buff on startup
712 // this won't do much if previous session was properly closed.
714 BTERR bt_recoveryredo (BtMgr *mgr)
721 pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
723 hdr = (BtLogHdr *)mgr->redobuff;
724 mgr->flushlsn = hdr->lsn;
727 hdr = (BtLogHdr *)(mgr->redobuff + offset);
728 switch( hdr->type ) {
732 case BTRM_add: // add a unique key-value to btree
734 case BTRM_dup: // add a duplicate key-value to btree
735 case BTRM_del: // delete a key-value from btree
736 case BTRM_upd: // update a key with a new value
737 case BTRM_new: // allocate a new empty page
738 case BTRM_old: // reuse an old empty page
744 // recovery manager -- dump current recovery buff & flush dirty pages
745 // in preparation for next recovery buffer.
747 BTERR bt_dumpredo (BtDb *bt)
750 fprintf(stderr, "Flush pages ");
752 eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
753 memset (eof, 0, sizeof(BtLogHdr));
755 // flush pages written at beginning of this redo buffer
756 // then write the redo buffer out to disk
758 fdatasync (bt->mgr->idx);
760 fprintf(stderr, "Dump ReDo: %d bytes\n", bt->mgr->redoend);
761 pwrite (bt->mgr->idx, bt->mgr->redobuff, bt->mgr->redoend + sizeof(BtLogHdr), REDO_page << bt->mgr->page_bits);
763 sync_file_range (bt->mgr->idx, REDO_page << bt->mgr->page_bits, bt->mgr->redoend + sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
765 bt->mgr->flushlsn = bt->mgr->lsn;
766 bt->mgr->redoend = 0;
768 eof = (BtLogHdr *)(bt->mgr->redobuff);
769 memset (eof, 0, sizeof(BtLogHdr));
770 eof->lsn = bt->mgr->lsn;
774 // recovery manager -- append new entry to recovery log
775 // flush to disk when it overflows.
777 logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val)
779 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
780 uint amt = sizeof(BtLogHdr);
784 bt_mutexlock (bt->mgr->redo);
787 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
789 // see if new entry fits in buffer
790 // flush and reset if it doesn't
792 if( flush = amt > size - bt->mgr->redoend ) {
793 bt_mutexlock (bt->mgr->dump);
795 if( bt_dumpredo (bt) )
799 // fill in new entry & either eof or end block
801 hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
806 hdr->lsn = ++bt->mgr->lsn;
808 bt->mgr->redoend += amt;
810 eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
811 memset (eof, 0, sizeof(BtLogHdr));
813 // fill in key and value
816 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
817 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
820 bt_releasemutex(bt->mgr->redo);
824 bt_releasemutex(bt->mgr->dump);
830 // recovery manager -- append transaction to recovery log
831 // flush to disk when it overflows.
833 logseqno bt_txnredo (BtDb *bt, BtPage source)
835 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
836 uint amt = 0, src, type;
843 // determine amount of redo recovery log space required
845 for( src = 0; src++ < source->cnt; ) {
846 key = keyptr(source,src);
847 val = valptr(source,src);
848 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
849 amt += sizeof(BtLogHdr);
852 bt_mutexlock (bt->mgr->redo);
854 // see if new entry fits in buffer
855 // flush and reset if it doesn't
857 if( flush = amt > size - bt->mgr->redoend ) {
858 bt_mutexlock (bt->mgr->dump);
860 if( bt_dumpredo (bt) )
864 // assign new lsn to transaction
866 lsn = ++bt->mgr->lsn;
868 // fill in new entries
870 for( src = 0; src++ < source->cnt; ) {
871 key = keyptr(source, src);
872 val = valptr(source, src);
874 switch( slotptr(source, src)->type ) {
889 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
890 amt += sizeof(BtLogHdr);
892 hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
898 // fill in key and value
900 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
901 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
903 bt->mgr->redoend += amt;
906 eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
907 memset (eof, 0, sizeof(BtLogHdr));
909 bt_releasemutex(bt->mgr->redo);
913 bt_releasemutex(bt->mgr->dump);
919 // read page into buffer pool from permanent location in Btree file
921 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
923 off64_t off = page_no << mgr->page_bits;
926 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
927 fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno);
934 memset (ovl, 0, sizeof(OVERLAPPED));
936 ovl->OffsetHigh = off >> 32;
938 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
939 fprintf (stderr, "Unable to read page %d GetLastError = %d\n", page_no, GetLastError());
942 if( *amt < mgr->page_size ) {
943 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
950 // write page to permanent location in Btree file
951 // clear the dirty bit
953 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
955 off64_t off = page_no << mgr->page_bits;
958 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
964 memset (ovl, 0, sizeof(OVERLAPPED));
966 ovl->OffsetHigh = off >> 32;
968 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
971 if( *amt < mgr->page_size )
977 // set CLOCK bit in latch
978 // decrement pin count
980 void bt_unpinlatch (BtLatchSet *latch)
982 bt_mutexlock(latch->modify);
983 latch->pin |= CLOCK_bit;
985 bt_releasemutex(latch->modify);
988 // return the btree cached page address
989 // if page is dirty and has not yet been
990 // flushed to disk for the current redo
991 // recovery buffer, write it out.
993 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
995 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
1000 // return next available latch entry
1001 // and with latch entry locked
1003 uint bt_availnext (BtDb *bt)
1010 entry = __sync_fetch_and_add (&bt->mgr->latchavail, 1) + 1;
1012 entry = _InterlockedIncrement (&bt->mgr->latchavail);
1014 entry %= bt->mgr->latchtotal;
1019 latch = bt->mgr->latchsets + entry;
1024 bt_mutexlock(latch->modify);
1026 if( !latch->avail ) {
1027 bt_releasemutex(latch->modify);
1035 // find and add the next available latch entry
1038 BTERR bt_availlatch (BtDb *bt)
1046 // find and reuse previous entry on victim
1048 startattempt = bt->mgr->latchvictim;
1052 entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
1054 entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
1056 // skip entry if it has outstanding pins
1058 entry %= bt->mgr->latchtotal;
1063 // only go around one time before
1064 // flushing redo recovery buffer,
1065 // and the buffer pool to free up entries.
1067 if( bt->mgr->redopages )
1068 if( bt->mgr->latchvictim - startattempt > bt->mgr->latchtotal ) {
1069 if( bt_mutextry (bt->mgr->dump) ) {
1070 if( bt_dumpredo (bt) )
1073 // synchronize the various threads running into this condition
1074 // so that only one thread does the dump and flush
1076 bt_mutexlock(bt->mgr->dump);
1078 startattempt = bt->mgr->latchvictim;
1079 bt_releasemutex(bt->mgr->dump);
1082 latch = bt->mgr->latchsets + entry;
1087 bt_mutexlock(latch->modify);
1089 // skip if already an available entry
1091 if( latch->avail ) {
1092 bt_releasemutex(latch->modify);
1096 // skip this entry if it is pinned
1097 // if the CLOCK bit is set
1098 // reset it to zero.
1101 latch->pin &= ~CLOCK_bit;
1102 bt_releasemutex(latch->modify);
1106 page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
1108 // if dirty page has lsn >= last redo recovery buffer
1109 // then hold page in the buffer pool until next redo
1110 // recovery buffer is being written to disk.
1113 if( page->lsn >= bt->mgr->flushlsn ) {
1114 bt_releasemutex(latch->modify);
1118 // entry is available
1120 __sync_fetch_and_add (&bt->mgr->available, 1);
1122 _InterlockedIncrement(&bt->mgr->available);
1125 bt_releasemutex(latch->modify);
1130 // release available latch requests
1132 void bt_availrelease (BtDb *bt, uint count)
1135 __sync_fetch_and_add(bt->mgr->availlock, -count);
1137 _InterlockedAdd(bt->mgr->availlock, -count);
1141 // commit available chain entries
1142 // find available entries as required
1144 void bt_availrequest (BtDb *bt, uint count)
1147 __sync_fetch_and_add(bt->mgr->availlock, count);
1149 _InterlockedAdd(bt->mgr->availlock, count);
1152 while( *bt->mgr->availlock > bt->mgr->available )
1156 // find available latchset
1157 // return with latchset pinned
1159 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, BtPage loadit)
1161 uint hashidx = page_no % bt->mgr->latchhash;
1166 // try to find our entry
1168 bt_mutexlock(bt->mgr->hashtable[hashidx].latch);
1170 if( entry = bt->mgr->hashtable[hashidx].entry ) do
1172 latch = bt->mgr->latchsets + entry;
1173 if( page_no == latch->page_no )
1175 } while( entry = latch->next );
1177 // found our entry: increment pin
1178 // remove from available status
1181 latch = bt->mgr->latchsets + entry;
1182 bt_mutexlock(latch->modify);
1185 __sync_fetch_and_add (&bt->mgr->available, -1);
1187 _InterlockedDecrement(&bt->mgr->available);
1190 latch->pin |= CLOCK_bit;
1193 bt_releasemutex(latch->modify);
1194 bt_releasemutex(bt->mgr->hashtable[hashidx].latch);
1198 // find and reuse entry from available set
1202 if( entry = bt_availnext (bt) )
1203 latch = bt->mgr->latchsets + entry;
1205 return bt->line = __LINE__, bt->err = BTERR_avail, NULL;
1207 idx = latch->page_no % bt->mgr->latchhash;
1209 // if latch is on a different hash chain
1210 // unlink from the old page_no chain
1212 if( latch->page_no )
1213 if( idx != hashidx ) {
1215 // skip over this entry if latch not available
1217 if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
1218 bt_releasemutex(latch->modify);
1223 bt->mgr->latchsets[latch->prev].next = latch->next;
1225 bt->mgr->hashtable[idx].entry = latch->next;
1228 bt->mgr->latchsets[latch->next].prev = latch->prev;
1230 bt_releasemutex (bt->mgr->hashtable[idx].latch);
1233 // remove available status
1237 __sync_fetch_and_add (&bt->mgr->available, -1);
1239 _InterlockedDecrement(&bt->mgr->available);
1241 page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
1244 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
1245 return bt->line = __LINE__, NULL;
1247 latch->dirty = 0, bt->writes++;
1250 memcpy (page, loadit, bt->mgr->page_size);
1253 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
1254 return bt->line = __LINE__, NULL;
1258 // link page as head of hash table chain
1259 // if this is a never before used entry,
1260 // or it was previously on a different
1261 // hash table chain. Otherwise, just
1262 // leave it in its current hash table
1265 if( !latch->page_no || hashidx != idx ) {
1266 if( latch->next = bt->mgr->hashtable[hashidx].entry )
1267 bt->mgr->latchsets[latch->next].prev = entry;
1269 bt->mgr->hashtable[hashidx].entry = entry;
1273 // fill in latch structure
1275 latch->pin = CLOCK_bit | 1;
1276 latch->page_no = page_no;
1277 latch->entry = entry;
1280 bt_releasemutex (latch->modify);
1281 bt_releasemutex (bt->mgr->hashtable[hashidx].latch);
1285 void bt_mgrclose (BtMgr *mgr)
1293 // flush previously written dirty pages
1294 // and write recovery buffer to disk
1296 fdatasync (mgr->idx);
1298 if( mgr->redoend ) {
1299 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1300 memset (eof, 0, sizeof(BtLogHdr));
1302 pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1305 // write remaining dirty pool pages to the btree
1307 for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1308 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1309 latch = mgr->latchsets + slot;
1311 if( latch->dirty ) {
1312 bt_writepage(mgr, page, latch->page_no);
1313 latch->dirty = 0, num++;
1317 // flush last batch to disk and clear
1318 // redo recovery buffer on disk.
1320 fdatasync (mgr->idx);
1322 if( mgr->redopages ) {
1323 eof = (BtLogHdr *)mgr->redobuff;
1324 memset (eof, 0, sizeof(BtLogHdr));
1325 eof->lsn = mgr->lsn;
1327 pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1329 sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
1332 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1335 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1336 munmap (mgr->pagezero, mgr->page_size);
1338 FlushViewOfFile(mgr->pagezero, 0);
1339 UnmapViewOfFile(mgr->pagezero);
1340 UnmapViewOfFile(mgr->pagepool);
1341 CloseHandle(mgr->halloc);
1342 CloseHandle(mgr->hpool);
1345 if( mgr->redopages )
1346 free (mgr->redobuff);
1350 if( mgr->redopages )
1351 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1352 FlushFileBuffers(mgr->idx);
1353 CloseHandle(mgr->idx);
1358 // close and release memory
1360 void bt_close (BtDb *bt)
1367 VirtualFree (bt->mem, 0, MEM_RELEASE);
1372 // open/create new btree buffer manager
1374 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1375 // size of page pool (e.g. 262144)
1377 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1379 uint lvl, attr, last, slot, idx;
1380 uint nlatchpage, latchhash;
1381 unsigned char value[BtId];
1382 int flag, initit = 0;
1383 BtPageZero *pagezero;
1391 // determine sanity of page size and buffer pool
1393 if( bits > BT_maxbits )
1395 else if( bits < BT_minbits )
1398 if( nodemax < 16 ) {
1399 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
1404 mgr = calloc (1, sizeof(BtMgr));
1406 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1408 if( mgr->idx == -1 ) {
1409 fprintf (stderr, "Unable to open btree file\n");
1410 return free(mgr), NULL;
1413 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1414 attr = FILE_ATTRIBUTE_NORMAL;
1415 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1417 if( mgr->idx == INVALID_HANDLE_VALUE )
1418 return GlobalFree(mgr), NULL;
1422 pagezero = valloc (BT_maxpage);
1425 // read minimum page size to get root info
1426 // to support raw disk partition files
1427 // check if bits == 0 on the disk.
1429 if( size = lseek (mgr->idx, 0L, 2) )
1430 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1431 if( pagezero->alloc->bits )
1432 bits = pagezero->alloc->bits;
1436 return free(mgr), free(pagezero), NULL;
1440 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1441 size = GetFileSize(mgr->idx, amt);
1443 if( size || *amt ) {
1444 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1445 return bt_mgrclose (mgr), NULL;
1446 bits = pagezero->alloc->bits;
1451 mgr->page_size = 1 << bits;
1452 mgr->page_bits = bits;
1454 // calculate number of latch hash table entries
1456 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1458 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1459 mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1460 mgr->latchtotal = nodemax;
1465 // initialize an empty b-tree with latch page, root page, page of leaves
1466 // and page(s) of latches and page pool cache
1468 memset (pagezero, 0, 1 << bits);
1469 pagezero->alloc->bits = mgr->page_bits;
1470 bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
1472 // initialize left-most LEAF page in
1475 bt_putid (pagezero->alloc->left, LEAF_page);
1477 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1478 fprintf (stderr, "Unable to create btree page zero\n");
1479 return bt_mgrclose (mgr), NULL;
1482 memset (pagezero, 0, 1 << bits);
1483 pagezero->alloc->bits = mgr->page_bits;
1485 for( lvl=MIN_lvl; lvl--; ) {
1486 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1487 key = keyptr(pagezero->alloc, 1);
1488 key->len = 2; // create stopper key
1492 bt_putid(value, MIN_lvl - lvl + 1);
1493 val = valptr(pagezero->alloc, 1);
1494 val->len = lvl ? BtId : 0;
1495 memcpy (val->value, value, val->len);
1497 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1498 pagezero->alloc->lvl = lvl;
1499 pagezero->alloc->cnt = 1;
1500 pagezero->alloc->act = 1;
1502 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1503 fprintf (stderr, "Unable to create btree page zero\n");
1504 return bt_mgrclose (mgr), NULL;
1512 VirtualFree (pagezero, 0, MEM_RELEASE);
1515 // mlock the pagezero page
1517 flag = PROT_READ | PROT_WRITE;
1518 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1519 if( mgr->pagezero == MAP_FAILED ) {
1520 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1521 return bt_mgrclose (mgr), NULL;
1523 mlock (mgr->pagezero, mgr->page_size);
1525 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1526 if( mgr->pagepool == MAP_FAILED ) {
1527 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1528 return bt_mgrclose (mgr), NULL;
1530 if( mgr->redopages = redopages )
1531 mgr->redobuff = valloc (redopages * mgr->page_size);
1533 flag = PAGE_READWRITE;
1534 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1535 if( !mgr->halloc ) {
1536 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1537 return bt_mgrclose (mgr), NULL;
1540 flag = FILE_MAP_WRITE;
1541 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1542 if( !mgr->pagezero ) {
1543 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1544 return bt_mgrclose (mgr), NULL;
1547 flag = PAGE_READWRITE;
1548 size = (uid)mgr->nlatchpage << mgr->page_bits;
1549 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1551 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1552 return bt_mgrclose (mgr), NULL;
1555 flag = FILE_MAP_WRITE;
1556 mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1557 if( !mgr->pagepool ) {
1558 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1559 return bt_mgrclose (mgr), NULL;
1561 if( mgr->redopages = redopages )
1562 mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
1565 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1566 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1567 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1569 // mark all pool entries as available
1571 for( idx = 1; idx < mgr->latchtotal; idx++ ) {
1572 latch = mgr->latchsets + idx;
1580 // open BTree access method
1581 // based on buffer manager
1583 BtDb *bt_open (BtMgr *mgr)
1585 BtDb *bt = malloc (sizeof(*bt));
1587 memset (bt, 0, sizeof(*bt));
1590 bt->mem = valloc (2 *mgr->page_size);
1592 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1594 bt->frame = (BtPage)bt->mem;
1595 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1597 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1599 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1604 // compare two keys, return > 0, = 0, or < 0
1605 // =0: keys are same
1608 // as the comparison value
1610 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1612 uint len1 = key1->len;
1615 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1626 // place write, read, or parent lock on requested page_no.
1628 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1632 ReadLock (latch->readwr, bt->thread_no);
1635 WriteLock (latch->readwr, bt->thread_no);
1638 ReadLock (latch->access, bt->thread_no);
1641 WriteLock (latch->access, bt->thread_no);
1644 WriteOLock (latch->parent, bt->thread_no);
1647 WriteOLock (latch->atomic, bt->thread_no);
1649 case BtLockAtomic | BtLockRead:
1650 WriteOLock (latch->atomic, bt->thread_no);
1651 ReadLock (latch->readwr, bt->thread_no);
1656 // remove write, read, or parent lock on requested page
1658 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1662 ReadRelease (latch->readwr);
1665 WriteRelease (latch->readwr);
1668 ReadRelease (latch->access);
1671 WriteRelease (latch->access);
1674 WriteORelease (latch->parent);
1677 WriteORelease (latch->atomic);
1679 case BtLockAtomic | BtLockRead:
1680 WriteORelease (latch->atomic);
1681 ReadRelease (latch->readwr);
1686 // allocate a new page
1687 // return with page latched, but unlocked.
1689 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1694 // lock allocation page
1696 bt_mutexlock(bt->mgr->lock);
1698 // use empty chain first
1699 // else allocate empty page
1701 if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1702 if( set->latch = bt_pinlatch (bt, page_no, NULL) )
1703 set->page = bt_mappage (bt, set->latch);
1705 return bt->err = BTERR_struct, bt->line = __LINE__, -1;
1707 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1708 bt_releasemutex(bt->mgr->lock);
1710 memcpy (set->page, contents, bt->mgr->page_size);
1711 set->latch->dirty = 1;
1715 page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1716 bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1718 // unlock allocation latch and
1719 // extend file into new page.
1721 bt_releasemutex(bt->mgr->lock);
1723 // don't load cache from btree page
1725 if( set->latch = bt_pinlatch (bt, page_no, contents) )
1726 set->page = bt_mappage (bt, set->latch);
1730 set->latch->dirty = 1;
1734 // find slot in page for given key at a given level
1736 int bt_findslot (BtPage page, unsigned char *key, uint len)
1738 uint diff, higher = page->cnt, low = 1, slot;
1741 // make stopper key an infinite fence value
1743 if( bt_getid (page->right) )
1748 // low is the lowest candidate.
1749 // loop ends when they meet
1751 // higher is already
1752 // tested as .ge. the passed key.
1754 while( diff = higher - low ) {
1755 slot = low + ( diff >> 1 );
1756 if( keycmp (keyptr(page, slot), key, len) < 0 )
1759 higher = slot, good++;
1762 // return zero if key is on right link page
1764 return good ? higher : 0;
1767 // find and load page at given level for given key
1768 // leave page rd or wr locked as requested
1770 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1772 uid page_no = ROOT_page, prevpage = 0;
1773 uint drill = 0xff, slot;
1774 BtLatchSet *prevlatch;
1775 uint mode, prevmode;
1778 // start at root of btree and drill down
1781 // determine lock mode of drill level
1782 mode = (drill == lvl) ? lock : BtLockRead;
1784 if( !(set->latch = bt_pinlatch (bt, page_no, NULL)) )
1787 // obtain access lock using lock chaining with Access mode
1789 if( page_no > ROOT_page )
1790 bt_lockpage(bt, BtLockAccess, set->latch);
1792 set->page = bt_mappage (bt, set->latch);
1794 // release & unpin parent or left sibling page
1797 bt_unlockpage(bt, prevmode, prevlatch);
1798 bt_unpinlatch (prevlatch);
1802 // obtain mode lock using lock chaining through AccessLock
1804 bt_lockpage(bt, mode, set->latch);
1806 if( set->page->free )
1807 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1809 if( page_no > ROOT_page )
1810 bt_unlockpage(bt, BtLockAccess, set->latch);
1812 // re-read and re-lock root after determining actual level of root
1814 if( set->page->lvl != drill) {
1815 if( set->latch->page_no != ROOT_page )
1816 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1818 drill = set->page->lvl;
1820 if( lock != BtLockRead && drill == lvl ) {
1821 bt_unlockpage(bt, mode, set->latch);
1822 bt_unpinlatch (set->latch);
1827 prevpage = set->latch->page_no;
1828 prevlatch = set->latch;
1831 // find key on page at this level
1832 // and descend to requested level
1834 if( !set->page->kill )
1835 if( slot = bt_findslot (set->page, key, len) ) {
1839 // find next non-dead slot -- the fence key if nothing else
1841 while( slotptr(set->page, slot)->dead )
1842 if( slot++ < set->page->cnt )
1845 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1847 val = valptr(set->page, slot);
1849 if( val->len == BtId )
1850 page_no = bt_getid(valptr(set->page, slot)->value);
1852 return bt->line = __LINE__, bt->err = BTERR_struct, 0;
1858 // slide right into next page
1860 page_no = bt_getid(set->page->right);
1863 // return error on end of right chain
1865 bt->line = __LINE__, bt->err = BTERR_struct;
1866 return 0; // return error
1869 // return page to free list
1870 // page must be delete & write locked
1872 void bt_freepage (BtDb *bt, BtPageSet *set)
1874 // lock allocation page
1876 bt_mutexlock (bt->mgr->lock);
1880 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1881 bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1882 set->latch->dirty = 1;
1883 set->page->free = 1;
1885 // unlock released page
1887 bt_unlockpage (bt, BtLockDelete, set->latch);
1888 bt_unlockpage (bt, BtLockWrite, set->latch);
1889 bt_unpinlatch (set->latch);
1891 // unlock allocation page
1893 bt_releasemutex (bt->mgr->lock);
1896 // a fence key was deleted from a page
1897 // push new fence value upwards
1899 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1901 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1902 unsigned char value[BtId];
1906 // remove the old fence value
1908 ptr = keyptr(set->page, set->page->cnt);
1909 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1910 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1911 set->latch->dirty = 1;
1913 // cache new fence value
1915 ptr = keyptr(set->page, set->page->cnt);
1916 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1918 bt_lockpage (bt, BtLockParent, set->latch);
1919 bt_unlockpage (bt, BtLockWrite, set->latch);
1921 // insert new (now smaller) fence key
1923 bt_putid (value, set->latch->page_no);
1924 ptr = (BtKey*)leftkey;
1926 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1929 // now delete old fence key
1931 ptr = (BtKey*)rightkey;
1933 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1936 bt_unlockpage (bt, BtLockParent, set->latch);
1937 bt_unpinlatch(set->latch);
1941 // root has a single child
1942 // collapse a level from the tree
1944 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1951 // find the child entry and promote as new root contents
1954 for( idx = 0; idx++ < root->page->cnt; )
1955 if( !slotptr(root->page, idx)->dead )
1958 val = valptr(root->page, idx);
1960 if( val->len == BtId )
1961 page_no = bt_getid (valptr(root->page, idx)->value);
1963 return bt->line = __LINE__, bt->err = BTERR_struct;
1965 if( child->latch = bt_pinlatch (bt, page_no, NULL) )
1966 child->page = bt_mappage (bt, child->latch);
1970 bt_lockpage (bt, BtLockDelete, child->latch);
1971 bt_lockpage (bt, BtLockWrite, child->latch);
1973 memcpy (root->page, child->page, bt->mgr->page_size);
1974 root->latch->dirty = 1;
1976 bt_freepage (bt, child);
1978 } while( root->page->lvl > 1 && root->page->act == 1 );
1980 bt_unlockpage (bt, BtLockWrite, root->latch);
1981 bt_unpinlatch (root->latch);
1985 // delete a page and manage keys
1986 // call with page writelocked
1987 // returns with page unpinned
1989 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1991 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1992 unsigned char value[BtId];
1993 uint lvl = set->page->lvl;
1998 // cache copy of fence key
1999 // to post in parent
2001 ptr = keyptr(set->page, set->page->cnt);
2002 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
2004 // obtain lock on right page
2006 page_no = bt_getid(set->page->right);
2008 if( right->latch = bt_pinlatch (bt, page_no, NULL) )
2009 right->page = bt_mappage (bt, right->latch);
2013 bt_lockpage (bt, BtLockWrite, right->latch);
2015 // cache copy of key to update
2017 ptr = keyptr(right->page, right->page->cnt);
2018 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
2020 if( right->page->kill )
2021 return bt->line = __LINE__, bt->err = BTERR_struct;
2023 // pull contents of right peer into our empty page
2025 memcpy (set->page, right->page, bt->mgr->page_size);
2026 set->latch->dirty = 1;
2028 // mark right page deleted and point it to left page
2029 // until we can post parent updates that remove access
2030 // to the deleted page.
2032 bt_putid (right->page->right, set->latch->page_no);
2033 right->latch->dirty = 1;
2034 right->page->kill = 1;
2036 bt_lockpage (bt, BtLockParent, right->latch);
2037 bt_unlockpage (bt, BtLockWrite, right->latch);
2039 bt_lockpage (bt, BtLockParent, set->latch);
2040 bt_unlockpage (bt, BtLockWrite, set->latch);
2042 // redirect higher key directly to our new node contents
2044 bt_putid (value, set->latch->page_no);
2045 ptr = (BtKey*)higherfence;
2047 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2050 // delete old lower key to our node
2052 ptr = (BtKey*)lowerfence;
2054 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
2057 // obtain delete and write locks to right node
2059 bt_unlockpage (bt, BtLockParent, right->latch);
2060 bt_lockpage (bt, BtLockDelete, right->latch);
2061 bt_lockpage (bt, BtLockWrite, right->latch);
2062 bt_freepage (bt, right);
2064 bt_unlockpage (bt, BtLockParent, set->latch);
2065 bt_unpinlatch (set->latch);
2069 // find and delete key on page by marking delete flag bit
2070 // if page becomes empty, delete it from the btree
2072 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
2074 uint slot, idx, found, fence;
2079 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
2080 ptr = keyptr(set->page, slot);
2084 // if librarian slot, advance to real slot
2086 if( slotptr(set->page, slot)->type == Librarian )
2087 ptr = keyptr(set->page, ++slot);
2089 fence = slot == set->page->cnt;
2091 // if key is found delete it, otherwise ignore request
2093 if( found = !keycmp (ptr, key, len) )
2094 if( found = slotptr(set->page, slot)->dead == 0 ) {
2095 val = valptr(set->page,slot);
2096 slotptr(set->page, slot)->dead = 1;
2097 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2100 // collapse empty slots beneath the fence
2102 while( idx = set->page->cnt - 1 )
2103 if( slotptr(set->page, idx)->dead ) {
2104 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2105 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2110 // did we delete a fence key in an upper level?
2112 if( found && lvl && set->page->act && fence )
2113 if( bt_fixfence (bt, set, lvl) )
2118 // do we need to collapse root?
2120 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
2121 if( bt_collapseroot (bt, set) )
2126 // delete empty page
2128 if( !set->page->act )
2129 return bt_deletepage (bt, set);
2131 set->latch->dirty = 1;
2132 bt_unlockpage(bt, BtLockWrite, set->latch);
2133 bt_unpinlatch (set->latch);
2137 BtKey *bt_foundkey (BtDb *bt)
2139 return (BtKey*)(bt->key);
2142 // advance to next slot
2144 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2146 BtLatchSet *prevlatch;
2149 if( slot < set->page->cnt )
2152 prevlatch = set->latch;
2154 if( page_no = bt_getid(set->page->right) )
2155 if( set->latch = bt_pinlatch (bt, page_no, NULL) )
2156 set->page = bt_mappage (bt, set->latch);
2160 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
2162 // obtain access lock using lock chaining with Access mode
2164 bt_lockpage(bt, BtLockAccess, set->latch);
2166 bt_unlockpage(bt, BtLockRead, prevlatch);
2167 bt_unpinlatch (prevlatch);
2169 bt_lockpage(bt, BtLockRead, set->latch);
2170 bt_unlockpage(bt, BtLockAccess, set->latch);
2174 // find unique key or first duplicate key in
2175 // leaf level and return number of value bytes
2176 // or (-1) if not found. Setup key for bt_foundkey
2178 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2186 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
2188 ptr = keyptr(set->page, slot);
2190 // skip librarian slot place holder
2192 if( slotptr(set->page, slot)->type == Librarian )
2193 ptr = keyptr(set->page, ++slot);
2195 // return actual key found
2197 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2200 if( slotptr(set->page, slot)->type == Duplicate )
2203 // not there if we reach the stopper key
2205 if( slot == set->page->cnt )
2206 if( !bt_getid (set->page->right) )
2209 // if key exists, return >= 0 value bytes copied
2210 // otherwise return (-1)
2212 if( slotptr(set->page, slot)->dead )
2216 if( !memcmp (ptr->key, key, len) ) {
2217 val = valptr (set->page,slot);
2218 if( valmax > val->len )
2220 memcpy (value, val->value, valmax);
2226 } while( slot = bt_findnext (bt, set, slot) );
2228 bt_unlockpage (bt, BtLockRead, set->latch);
2229 bt_unpinlatch (set->latch);
2233 // check page for space available,
2234 // clean if necessary and return
2235 // 0 - page needs splitting
2236 // >0 new slot value
2238 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
2240 uint nxt = bt->mgr->page_size;
2241 BtPage page = set->page;
2242 uint cnt = 0, idx = 0;
2243 uint max = page->cnt;
2248 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2251 // skip cleanup and proceed to split
2252 // if there's not enough garbage
2255 if( page->garbage < nxt / 5 )
2258 memcpy (bt->frame, page, bt->mgr->page_size);
2260 // skip page info and set rest of page to zero
2262 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2263 set->latch->dirty = 1;
2268 // clean up page first by
2269 // removing deleted keys
2271 while( cnt++ < max ) {
2275 if( cnt < max || bt->frame->lvl )
2276 if( slotptr(bt->frame,cnt)->dead )
2279 // copy the value across
2281 val = valptr(bt->frame, cnt);
2282 nxt -= val->len + sizeof(BtVal);
2283 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2285 // copy the key across
2287 key = keyptr(bt->frame, cnt);
2288 nxt -= key->len + sizeof(BtKey);
2289 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2291 // make a librarian slot
2293 slotptr(page, ++idx)->off = nxt;
2294 slotptr(page, idx)->type = Librarian;
2295 slotptr(page, idx)->dead = 1;
2299 slotptr(page, ++idx)->off = nxt;
2300 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2302 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2309 // see if page has enough space now, or does it need splitting?
2311 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2317 // split the root and raise the height of the btree
2319 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
2321 unsigned char leftkey[BT_keyarray];
2322 uint nxt = bt->mgr->page_size;
2323 unsigned char value[BtId];
2329 // save left page fence key for new root
2331 ptr = keyptr(root->page, root->page->cnt);
2332 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2334 // Obtain an empty page to use, and copy the current
2335 // root contents into it, e.g. lower keys
2337 if( bt_newpage(bt, left, root->page) )
2340 left_page_no = left->latch->page_no;
2341 bt_unpinlatch (left->latch);
2343 // preserve the page info at the bottom
2344 // of higher keys and set rest to zero
2346 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2348 // insert stopper key at top of newroot page
2349 // and increase the root height
2351 nxt -= BtId + sizeof(BtVal);
2352 bt_putid (value, right->page_no);
2353 val = (BtVal *)((unsigned char *)root->page + nxt);
2354 memcpy (val->value, value, BtId);
2357 nxt -= 2 + sizeof(BtKey);
2358 slotptr(root->page, 2)->off = nxt;
2359 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2364 // insert lower keys page fence key on newroot page as first key
2366 nxt -= BtId + sizeof(BtVal);
2367 bt_putid (value, left_page_no);
2368 val = (BtVal *)((unsigned char *)root->page + nxt);
2369 memcpy (val->value, value, BtId);
2372 ptr = (BtKey *)leftkey;
2373 nxt -= ptr->len + sizeof(BtKey);
2374 slotptr(root->page, 1)->off = nxt;
2375 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2377 bt_putid(root->page->right, 0);
2378 root->page->min = nxt; // reset lowest used offset and key count
2379 root->page->cnt = 2;
2380 root->page->act = 2;
2383 // release and unpin root pages
2385 bt_unlockpage(bt, BtLockWrite, root->latch);
2386 bt_unpinlatch (root->latch);
2388 bt_unpinlatch (right);
2392 // split already locked full node
2394 // return pool entry for new right
2397 uint bt_splitpage (BtDb *bt, BtPageSet *set)
2399 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2400 uint lvl = set->page->lvl;
2407 // split higher half of keys to bt->frame
2409 memset (bt->frame, 0, bt->mgr->page_size);
2410 max = set->page->cnt;
2414 while( cnt++ < max ) {
2415 if( cnt < max || set->page->lvl )
2416 if( slotptr(set->page, cnt)->dead )
2419 src = valptr(set->page, cnt);
2420 nxt -= src->len + sizeof(BtVal);
2421 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
2423 key = keyptr(set->page, cnt);
2424 nxt -= key->len + sizeof(BtKey);
2425 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
2426 memcpy (ptr, key, key->len + sizeof(BtKey));
2428 // add librarian slot
2430 slotptr(bt->frame, ++idx)->off = nxt;
2431 slotptr(bt->frame, idx)->type = Librarian;
2432 slotptr(bt->frame, idx)->dead = 1;
2436 slotptr(bt->frame, ++idx)->off = nxt;
2437 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2439 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2443 bt->frame->bits = bt->mgr->page_bits;
2444 bt->frame->min = nxt;
2445 bt->frame->cnt = idx;
2446 bt->frame->lvl = lvl;
2450 if( set->latch->page_no > ROOT_page )
2451 bt_putid (bt->frame->right, bt_getid (set->page->right));
2453 // get new free page and write higher keys to it.
2455 if( bt_newpage(bt, right, bt->frame) )
2458 // process lower keys
2460 memcpy (bt->frame, set->page, bt->mgr->page_size);
2461 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2462 set->latch->dirty = 1;
2464 nxt = bt->mgr->page_size;
2465 set->page->garbage = 0;
2471 if( slotptr(bt->frame, max)->type == Librarian )
2474 // assemble page of smaller keys
2476 while( cnt++ < max ) {
2477 if( slotptr(bt->frame, cnt)->dead )
2479 val = valptr(bt->frame, cnt);
2480 nxt -= val->len + sizeof(BtVal);
2481 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2483 key = keyptr(bt->frame, cnt);
2484 nxt -= key->len + sizeof(BtKey);
2485 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2487 // add librarian slot
2489 slotptr(set->page, ++idx)->off = nxt;
2490 slotptr(set->page, idx)->type = Librarian;
2491 slotptr(set->page, idx)->dead = 1;
2495 slotptr(set->page, ++idx)->off = nxt;
2496 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2500 bt_putid(set->page->right, right->latch->page_no);
2501 set->page->min = nxt;
2502 set->page->cnt = idx;
2504 return right->latch->entry;
2507 // fix keys for newly split page
2508 // call with page locked, return
2511 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2513 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2514 unsigned char value[BtId];
2515 uint lvl = set->page->lvl;
2519 // if current page is the root page, split it
2521 if( set->latch->page_no == ROOT_page )
2522 return bt_splitroot (bt, set, right);
2524 ptr = keyptr(set->page, set->page->cnt);
2525 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2527 page = bt_mappage (bt, right);
2529 ptr = keyptr(page, page->cnt);
2530 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2532 // insert new fences in their parent pages
2534 bt_lockpage (bt, BtLockParent, right);
2536 bt_lockpage (bt, BtLockParent, set->latch);
2537 bt_unlockpage (bt, BtLockWrite, set->latch);
2539 // insert new fence for reformulated left block of smaller keys
2541 bt_putid (value, set->latch->page_no);
2542 ptr = (BtKey *)leftkey;
2544 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2547 // switch fence for right block of larger keys to new right page
2549 bt_putid (value, right->page_no);
2550 ptr = (BtKey *)rightkey;
2552 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2555 bt_unlockpage (bt, BtLockParent, set->latch);
2556 bt_unpinlatch (set->latch);
2558 bt_unlockpage (bt, BtLockParent, right);
2559 bt_unpinlatch (right);
2563 // install new key and value onto page
2564 // page must already be checked for
2567 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2569 uint idx, librarian;
2574 // if found slot > desired slot and previous slot
2575 // is a librarian slot, use it
2578 if( slotptr(set->page, slot-1)->type == Librarian )
2581 // copy value onto page
2583 set->page->min -= vallen + sizeof(BtVal);
2584 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2585 memcpy (val->value, value, vallen);
2588 // copy key onto page
2590 set->page->min -= keylen + sizeof(BtKey);
2591 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2592 memcpy (ptr->key, key, keylen);
2595 // find first empty slot
2597 for( idx = slot; idx < set->page->cnt; idx++ )
2598 if( slotptr(set->page, idx)->dead )
2601 // now insert key into array before slot
2603 if( idx == set->page->cnt )
2604 idx += 2, set->page->cnt += 2, librarian = 2;
2608 set->latch->dirty = 1;
2611 while( idx > slot + librarian - 1 )
2612 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2614 // add librarian slot
2616 if( librarian > 1 ) {
2617 node = slotptr(set->page, slot++);
2618 node->off = set->page->min;
2619 node->type = Librarian;
2625 node = slotptr(set->page, slot);
2626 node->off = set->page->min;
2631 bt_unlockpage (bt, BtLockWrite, set->latch);
2632 bt_unpinlatch (set->latch);
2638 // Insert new key into the btree at given level.
2639 // either add a new key or update/add an existing one
2641 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2643 unsigned char newkey[BT_keyarray];
2644 uint slot, idx, len, entry;
2651 // set up the key we're working on
2653 ins = (BtKey*)newkey;
2654 memcpy (ins->key, key, keylen);
2657 // is this a non-unique index value?
2663 sequence = bt_newdup (bt);
2664 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2668 while( 1 ) { // find the page and slot for the current key
2669 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2670 ptr = keyptr(set->page, slot);
2673 bt->line = __LINE__, bt->err = BTERR_ovflw;
2677 // if librarian slot == found slot, advance to real slot
2679 if( slotptr(set->page, slot)->type == Librarian )
2680 if( !keycmp (ptr, key, keylen) )
2681 ptr = keyptr(set->page, ++slot);
2685 if( slotptr(set->page, slot)->type == Duplicate )
2688 // if inserting a duplicate key or unique key
2689 // check for adequate space on the page
2690 // and insert the new key before slot.
2692 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2693 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2694 if( !(entry = bt_splitpage (bt, set)) )
2696 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2701 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2704 // if key already exists, update value and return
2706 val = valptr(set->page, slot);
2708 if( val->len >= vallen ) {
2709 if( slotptr(set->page, slot)->dead )
2711 set->page->garbage += val->len - vallen;
2712 set->latch->dirty = 1;
2713 slotptr(set->page, slot)->dead = 0;
2715 memcpy (val->value, value, vallen);
2716 bt_unlockpage(bt, BtLockWrite, set->latch);
2717 bt_unpinlatch (set->latch);
2721 // new update value doesn't fit in existing value area
2723 if( !slotptr(set->page, slot)->dead )
2724 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2726 slotptr(set->page, slot)->dead = 0;
2730 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2731 if( !(entry = bt_splitpage (bt, set)) )
2733 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2738 set->page->min -= vallen + sizeof(BtVal);
2739 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2740 memcpy (val->value, value, vallen);
2743 set->latch->dirty = 1;
2744 set->page->min -= keylen + sizeof(BtKey);
2745 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2746 memcpy (ptr->key, key, keylen);
2749 slotptr(set->page, slot)->off = set->page->min;
2750 bt_unlockpage(bt, BtLockWrite, set->latch);
2751 bt_unpinlatch (set->latch);
2758 logseqno reqlsn; // redo log seq no required
2759 logseqno lsn; // redo log sequence number
2760 uint entry; // latch table entry number
2761 uint slot:31; // page slot number
2762 uint reuse:1; // reused previous page
2766 uid page_no; // page number for split leaf
2767 void *next; // next key to insert
2768 uint entry:29; // latch table entry number
2769 uint type:2; // 0 == insert, 1 == delete, 2 == free
2770 uint nounlock:1; // don't unlock ParentModification
2771 unsigned char leafkey[BT_keyarray];
2774 // determine actual page where key is located
2775 // return slot number
2777 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2779 BtKey *key = keyptr(source,src);
2780 uint slot = locks[src].slot;
2783 if( src > 1 && locks[src].reuse )
2784 entry = locks[src-1].entry, slot = 0;
2786 entry = locks[src].entry;
2789 set->latch = bt->mgr->latchsets + entry;
2790 set->page = bt_mappage (bt, set->latch);
2794 // is locks->reuse set? or was slot zeroed?
2795 // if so, find where our key is located
2796 // on current page or pages split on
2797 // same page txn operations.
2800 set->latch = bt->mgr->latchsets + entry;
2801 set->page = bt_mappage (bt, set->latch);
2803 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2804 if( slotptr(set->page, slot)->type == Librarian )
2806 if( locks[src].reuse )
2807 locks[src].entry = entry;
2810 } while( entry = set->latch->split );
2812 bt->line = __LINE__, bt->err = BTERR_atomic;
2816 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2818 BtKey *key = keyptr(source, src);
2819 BtVal *val = valptr(source, src);
2824 while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2825 if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) ) {
2826 if( bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2828 set->page->lsn = locks[src].lsn;
2832 if( entry = bt_splitpage (bt, set) )
2833 latch = bt->mgr->latchsets + entry;
2837 // splice right page into split chain
2838 // and WriteLock it.
2840 bt_lockpage(bt, BtLockWrite, latch);
2841 latch->split = set->latch->split;
2842 set->latch->split = entry;
2843 locks[src].slot = 0;
2846 return bt->line = __LINE__, bt->err = BTERR_atomic;
2849 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2851 BtKey *key = keyptr(source, src);
2857 if( slot = bt_atomicpage (bt, source, locks, src, set) )
2858 ptr = keyptr(set->page, slot);
2860 return bt->line = __LINE__, bt->err = BTERR_struct;
2862 if( !keycmp (ptr, key->key, key->len) )
2863 if( !slotptr(set->page, slot)->dead )
2864 slotptr(set->page, slot)->dead = 1;
2870 val = valptr(set->page, slot);
2871 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2872 set->latch->dirty = 1;
2873 set->page->lsn = locks[src].lsn;
2879 // delete an empty master page for a transaction
2881 // note that the far right page never empties because
2882 // it always contains (at least) the infinite stopper key
2883 // and that all pages that don't contain any keys are
2884 // deleted, or are being held under Atomic lock.
2886 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2888 BtPageSet right[1], temp[1];
2889 unsigned char value[BtId];
2893 bt_lockpage(bt, BtLockWrite, prev->latch);
2895 // grab the right sibling
2897 if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), NULL) )
2898 right->page = bt_mappage (bt, right->latch);
2902 bt_lockpage(bt, BtLockAtomic, right->latch);
2903 bt_lockpage(bt, BtLockWrite, right->latch);
2905 // and pull contents over empty page
2906 // while preserving master's left link
2908 memcpy (right->page->left, prev->page->left, BtId);
2909 memcpy (prev->page, right->page, bt->mgr->page_size);
2911 // forward seekers to old right sibling
2912 // to new page location in set
2914 bt_putid (right->page->right, prev->latch->page_no);
2915 right->latch->dirty = 1;
2916 right->page->kill = 1;
2918 // remove pointer to right page for searchers by
2919 // changing right fence key to point to the master page
2921 ptr = keyptr(right->page,right->page->cnt);
2922 bt_putid (value, prev->latch->page_no);
2924 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2927 // now that master page is in good shape we can
2928 // remove its locks.
2930 bt_unlockpage (bt, BtLockAtomic, prev->latch);
2931 bt_unlockpage (bt, BtLockWrite, prev->latch);
2933 // fix master's right sibling's left pointer
2934 // to remove scanner's poiner to the right page
2936 if( right_page_no = bt_getid (prev->page->right) ) {
2937 if( temp->latch = bt_pinlatch (bt, right_page_no, NULL) )
2938 temp->page = bt_mappage (bt, temp->latch);
2940 bt_lockpage (bt, BtLockWrite, temp->latch);
2941 bt_putid (temp->page->left, prev->latch->page_no);
2942 temp->latch->dirty = 1;
2944 bt_unlockpage (bt, BtLockWrite, temp->latch);
2945 bt_unpinlatch (temp->latch);
2946 } else { // master is now the far right page
2947 bt_mutexlock (bt->mgr->lock);
2948 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2949 bt_releasemutex(bt->mgr->lock);
2952 // now that there are no pointers to the right page
2953 // we can delete it after the last read access occurs
2955 bt_unlockpage (bt, BtLockWrite, right->latch);
2956 bt_unlockpage (bt, BtLockAtomic, right->latch);
2957 bt_lockpage (bt, BtLockDelete, right->latch);
2958 bt_lockpage (bt, BtLockWrite, right->latch);
2959 bt_freepage (bt, right);
2963 // atomic modification of a batch of keys.
2965 // return -1 if BTERR is set
2966 // otherwise return slot number
2967 // causing the key constraint violation
2968 // or zero on successful completion.
2970 int bt_atomictxn (BtDb *bt, BtPage source)
2972 uint src, idx, slot, samepage, entry, avail, que = 0;
2973 AtomicKey *head, *tail, *leaf;
2974 BtPageSet set[1], prev[1];
2975 unsigned char value[BtId];
2976 BtKey *key, *ptr, *key2;
2987 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2991 // stable sort the list of keys into order to
2992 // prevent deadlocks between threads.
2994 for( src = 1; src++ < source->cnt; ) {
2995 *temp = *slotptr(source,src);
2996 key = keyptr (source,src);
2998 for( idx = src; --idx; ) {
2999 key2 = keyptr (source,idx);
3000 if( keycmp (key, key2->key, key2->len) < 0 ) {
3001 *slotptr(source,idx+1) = *slotptr(source,idx);
3002 *slotptr(source,idx) = *temp;
3008 // reserve enough buffer pool entries
3010 avail = source->cnt * 3 + bt->mgr->pagezero->alloc->lvl + 1;
3011 bt_availrequest (bt, avail);
3013 // Load the leaf page for each key
3014 // group same page references with reuse bit
3015 // and determine any constraint violations
3017 for( src = 0; src++ < source->cnt; ) {
3018 key = keyptr(source, src);
3021 // first determine if this modification falls
3022 // on the same page as the previous modification
3023 // note that the far right leaf page is a special case
3025 if( samepage = src > 1 )
3026 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
3027 slot = bt_findslot(set->page, key->key, key->len);
3029 bt_unlockpage(bt, BtLockRead, set->latch);
3032 if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) )
3033 set->latch->split = 0;
3037 if( slotptr(set->page, slot)->type == Librarian )
3038 ptr = keyptr(set->page, ++slot);
3040 ptr = keyptr(set->page, slot);
3043 locks[src].entry = set->latch->entry;
3044 locks[src].slot = slot;
3045 locks[src].reuse = 0;
3047 locks[src].entry = 0;
3048 locks[src].slot = 0;
3049 locks[src].reuse = 1;
3052 // capture current lsn for master page
3054 locks[src].reqlsn = set->page->lsn;
3056 // perform constraint checks
3058 switch( slotptr(source, src)->type ) {
3061 if( !slotptr(set->page, slot)->dead )
3062 if( slot < set->page->cnt || bt_getid (set->page->right) )
3063 if( !keycmp (ptr, key->key, key->len) ) {
3065 // return constraint violation if key already exists
3067 bt_unlockpage(bt, BtLockRead, set->latch);
3071 if( locks[src].entry ) {
3072 set->latch = bt->mgr->latchsets + locks[src].entry;
3073 bt_unlockpage(bt, BtLockAtomic, set->latch);
3074 bt_unpinlatch (set->latch);
3085 // unlock last loadpage lock
3088 bt_unlockpage(bt, BtLockRead, set->latch);
3090 // and add entries to redo log
3092 if( bt->mgr->redopages )
3093 if( lsn = bt_txnredo (bt, source) )
3094 for( src = 0; src++ < source->cnt; )
3095 locks[src].lsn = lsn;
3099 // obtain write lock for each master page
3101 for( src = 0; src++ < source->cnt; ) {
3102 if( locks[src].reuse )
3105 bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
3108 // insert or delete each key
3109 // process any splits or merges
3110 // release Write & Atomic latches
3111 // set ParentModifications and build
3112 // queue of keys to insert for split pages
3113 // or delete for deleted pages.
3115 // run through txn list backwards
3117 samepage = source->cnt + 1;
3119 for( src = source->cnt; src; src-- ) {
3120 if( locks[src].reuse )
3123 // perform the txn operations
3124 // from smaller to larger on
3127 for( idx = src; idx < samepage; idx++ )
3128 switch( slotptr(source,idx)->type ) {
3130 if( bt_atomicdelete (bt, source, locks, idx) )
3136 if( bt_atomicinsert (bt, source, locks, idx) )
3141 // after the same page operations have finished,
3142 // process master page for splits or deletion.
3144 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
3145 prev->page = bt_mappage (bt, prev->latch);
3148 // pick-up all splits from master page
3149 // each one is already WriteLocked.
3151 entry = prev->latch->split;
3154 set->latch = bt->mgr->latchsets + entry;
3155 set->page = bt_mappage (bt, set->latch);
3156 entry = set->latch->split;
3158 // delete empty master page by undoing its split
3159 // (this is potentially another empty page)
3160 // note that there are no new left pointers yet
3162 if( !prev->page->act ) {
3163 memcpy (set->page->left, prev->page->left, BtId);
3164 memcpy (prev->page, set->page, bt->mgr->page_size);
3165 bt_lockpage (bt, BtLockDelete, set->latch);
3166 bt_freepage (bt, set);
3168 prev->latch->split = set->latch->split;
3169 prev->latch->dirty = 1;
3173 // remove empty page from the split chain
3174 // and return it to the free list.
3176 if( !set->page->act ) {
3177 memcpy (prev->page->right, set->page->right, BtId);
3178 prev->latch->split = set->latch->split;
3179 bt_lockpage (bt, BtLockDelete, set->latch);
3180 bt_freepage (bt, set);
3184 // schedule prev fence key update
3186 ptr = keyptr(prev->page,prev->page->cnt);
3187 leaf = calloc (sizeof(AtomicKey), 1), que++;
3189 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3190 leaf->page_no = prev->latch->page_no;
3191 leaf->entry = prev->latch->entry;
3201 // splice in the left link into the split page
3203 bt_putid (set->page->left, prev->latch->page_no);
3204 bt_lockpage(bt, BtLockParent, prev->latch);
3205 bt_unlockpage(bt, BtLockWrite, prev->latch);
3209 // update left pointer in next right page from last split page
3210 // (if all splits were reversed, latch->split == 0)
3212 if( latch->split ) {
3213 // fix left pointer in master's original (now split)
3214 // far right sibling or set rightmost page in page zero
3216 if( right = bt_getid (prev->page->right) ) {
3217 if( set->latch = bt_pinlatch (bt, right, NULL) )
3218 set->page = bt_mappage (bt, set->latch);
3222 bt_lockpage (bt, BtLockWrite, set->latch);
3223 bt_putid (set->page->left, prev->latch->page_no);
3224 set->latch->dirty = 1;
3225 bt_unlockpage (bt, BtLockWrite, set->latch);
3226 bt_unpinlatch (set->latch);
3227 } else { // prev is rightmost page
3228 bt_mutexlock (bt->mgr->lock);
3229 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
3230 bt_releasemutex(bt->mgr->lock);
3233 // Process last page split in chain
3235 ptr = keyptr(prev->page,prev->page->cnt);
3236 leaf = calloc (sizeof(AtomicKey), 1), que++;
3238 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3239 leaf->page_no = prev->latch->page_no;
3240 leaf->entry = prev->latch->entry;
3250 bt_lockpage(bt, BtLockParent, prev->latch);
3251 bt_unlockpage(bt, BtLockWrite, prev->latch);
3253 // remove atomic lock on master page
3255 bt_unlockpage(bt, BtLockAtomic, latch);
3259 // finished if prev page occupied (either master or final split)
3261 if( prev->page->act ) {
3262 bt_unlockpage(bt, BtLockWrite, latch);
3263 bt_unlockpage(bt, BtLockAtomic, latch);
3264 bt_unpinlatch(latch);
3268 // any and all splits were reversed, and the
3269 // master page located in prev is empty, delete it
3270 // by pulling over master's right sibling.
3272 // Remove empty master's fence key
3274 ptr = keyptr(prev->page,prev->page->cnt);
3276 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3279 // perform the remainder of the delete
3280 // from the FIFO queue
3282 leaf = calloc (sizeof(AtomicKey), 1), que++;
3284 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3285 leaf->page_no = prev->latch->page_no;
3286 leaf->entry = prev->latch->entry;
3297 // leave atomic lock in place until
3298 // deletion completes in next phase.
3300 bt_unlockpage(bt, BtLockWrite, prev->latch);
3303 bt_availrelease (bt, avail);
3305 que *= bt->mgr->pagezero->alloc->lvl;
3306 bt_availrequest (bt, que);
3308 // add & delete keys for any pages split or merged during transaction
3312 set->latch = bt->mgr->latchsets + leaf->entry;
3313 set->page = bt_mappage (bt, set->latch);
3315 bt_putid (value, leaf->page_no);
3316 ptr = (BtKey *)leaf->leafkey;
3318 switch( leaf->type ) {
3319 case 0: // insert key
3320 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
3325 case 1: // delete key
3326 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3331 case 2: // free page
3332 if( bt_atomicfree (bt, set) )
3338 if( !leaf->nounlock )
3339 bt_unlockpage (bt, BtLockParent, set->latch);
3341 bt_unpinlatch (set->latch);
3344 } while( leaf = tail );
3346 bt_availrelease (bt, que);
3356 // set cursor to highest slot on highest page
3358 uint bt_lastkey (BtDb *bt)
3360 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3363 if( set->latch = bt_pinlatch (bt, page_no, NULL) )
3364 set->page = bt_mappage (bt, set->latch);
3368 bt_lockpage(bt, BtLockRead, set->latch);
3369 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3370 bt_unlockpage(bt, BtLockRead, set->latch);
3371 bt_unpinlatch (set->latch);
3373 bt->cursor_page = page_no;
3374 return bt->cursor->cnt;
3377 // return previous slot on cursor page
3379 uint bt_prevkey (BtDb *bt, uint slot)
3381 uid ourright, next, us = bt->cursor_page;
3387 ourright = bt_getid(bt->cursor->right);
3390 if( !(next = bt_getid(bt->cursor->left)) )
3394 bt->cursor_page = next;
3396 if( set->latch = bt_pinlatch (bt, next, NULL) )
3397 set->page = bt_mappage (bt, set->latch);
3401 bt_lockpage(bt, BtLockRead, set->latch);
3402 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3403 bt_unlockpage(bt, BtLockRead, set->latch);
3404 bt_unpinlatch (set->latch);
3406 next = bt_getid (bt->cursor->right);
3408 if( bt->cursor->kill )
3412 if( next == ourright )
3417 return bt->cursor->cnt;
3420 // return next slot on cursor page
3421 // or slide cursor right into next page
3423 uint bt_nextkey (BtDb *bt, uint slot)
3429 right = bt_getid(bt->cursor->right);
3431 while( slot++ < bt->cursor->cnt )
3432 if( slotptr(bt->cursor,slot)->dead )
3434 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3442 bt->cursor_page = right;
3444 if( set->latch = bt_pinlatch (bt, right, NULL) )
3445 set->page = bt_mappage (bt, set->latch);
3449 bt_lockpage(bt, BtLockRead, set->latch);
3451 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3453 bt_unlockpage(bt, BtLockRead, set->latch);
3454 bt_unpinlatch (set->latch);
3462 // cache page of keys into cursor and return starting slot for given key
3464 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3469 // cache page for retrieval
3471 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
3472 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3476 bt->cursor_page = set->latch->page_no;
3478 bt_unlockpage(bt, BtLockRead, set->latch);
3479 bt_unpinlatch (set->latch);
3483 BtKey *bt_key(BtDb *bt, uint slot)
3485 return keyptr(bt->cursor, slot);
3488 BtVal *bt_val(BtDb *bt, uint slot)
3490 return valptr(bt->cursor,slot);
3496 double getCpuTime(int type)
3499 FILETIME xittime[1];
3500 FILETIME systime[1];
3501 FILETIME usrtime[1];
3502 SYSTEMTIME timeconv[1];
3505 memset (timeconv, 0, sizeof(SYSTEMTIME));
3509 GetSystemTimeAsFileTime (xittime);
3510 FileTimeToSystemTime (xittime, timeconv);
3511 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3514 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3515 FileTimeToSystemTime (usrtime, timeconv);
3518 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3519 FileTimeToSystemTime (systime, timeconv);
3523 ans += (double)timeconv->wHour * 3600;
3524 ans += (double)timeconv->wMinute * 60;
3525 ans += (double)timeconv->wSecond;
3526 ans += (double)timeconv->wMilliseconds / 1000;
3531 #include <sys/resource.h>
3533 double getCpuTime(int type)
3535 struct rusage used[1];
3536 struct timeval tv[1];
3540 gettimeofday(tv, NULL);
3541 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3544 getrusage(RUSAGE_SELF, used);
3545 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3548 getrusage(RUSAGE_SELF, used);
3549 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3556 void bt_poolaudit (BtMgr *mgr)
3561 while( ++entry < mgr->latchtotal ) {
3562 latch = mgr->latchsets + entry;
3564 if( *latch->readwr->rin & MASK )
3565 fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3567 if( *latch->access->rin & MASK )
3568 fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3570 if( *latch->parent->ticket != *latch->parent->serving )
3571 fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3573 if( *latch->atomic->ticket != *latch->atomic->serving )
3574 fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3576 if( *latch->modify->exclusive )
3577 fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3579 if( latch->pin & ~CLOCK_bit )
3580 fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3592 // standalone program to index file of keys
3593 // then list them onto std-out
3596 void *index_file (void *arg)
3598 uint __stdcall index_file (void *arg)
3601 int line = 0, found = 0, cnt = 0, idx;
3602 uid next, page_no = LEAF_page; // start on first page of leaves
3603 int ch, len = 0, slot, type = 0;
3604 unsigned char key[BT_maxkey];
3605 unsigned char txn[65536];
3606 ThreadArg *args = arg;
3615 bt = bt_open (args->mgr);
3618 if( args->idx < strlen (args->type) )
3619 ch = args->type[args->idx];
3621 ch = args->type[strlen(args->type) - 1];
3633 if( type == Delete )
3634 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3636 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3638 if( type == Delete )
3639 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3641 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3643 if( in = fopen (args->infile, "rb") )
3644 while( ch = getc(in), ch != EOF )
3650 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3651 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3657 memcpy (txn + nxt, key + 10, len - 10);
3659 txn[nxt] = len - 10;
3661 memcpy (txn + nxt, key, 10);
3664 slotptr(page,++cnt)->off = nxt;
3665 slotptr(page,cnt)->type = type;
3668 if( cnt < args->num )
3675 if( bt_atomictxn (bt, page) )
3676 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3681 else if( len < BT_maxkey )
3683 fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
3687 fprintf(stderr, "started indexing for %s\n", args->infile);
3688 if( in = fopen (args->infile, "r") )
3689 while( ch = getc(in), ch != EOF )
3694 if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3695 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3698 else if( len < BT_maxkey )
3700 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3704 fprintf(stderr, "started finding keys for %s\n", args->infile);
3705 if( in = fopen (args->infile, "rb") )
3706 while( ch = getc(in), ch != EOF )
3710 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3713 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->err, errno, bt->line, line), exit(0);
3716 else if( len < BT_maxkey )
3718 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3722 fprintf(stderr, "started scanning\n");
3725 if( set->latch = bt_pinlatch (bt, page_no, NULL) )
3726 set->page = bt_mappage (bt, set->latch);
3728 fprintf(stderr, "unable to obtain latch"), exit(1);
3729 bt_lockpage (bt, BtLockRead, set->latch);
3730 next = bt_getid (set->page->right);
3732 for( slot = 0; slot++ < set->page->cnt; )
3733 if( next || slot < set->page->cnt )
3734 if( !slotptr(set->page, slot)->dead ) {
3735 ptr = keyptr(set->page, slot);
3738 if( slotptr(set->page, slot)->type == Duplicate )
3741 fwrite (ptr->key, len, 1, stdout);
3742 val = valptr(set->page, slot);
3743 fwrite (val->value, val->len, 1, stdout);
3744 fputc ('\n', stdout);
3748 set->latch->avail = 1;
3749 bt_unlockpage (bt, BtLockRead, set->latch);
3750 bt_unpinlatch (set->latch);
3751 } while( page_no = next );
3753 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3757 fprintf(stderr, "started reverse scan\n");
3758 if( slot = bt_lastkey (bt) )
3759 while( slot = bt_prevkey (bt, slot) ) {
3760 if( slotptr(bt->cursor, slot)->dead )
3763 ptr = keyptr(bt->cursor, slot);
3766 if( slotptr(bt->cursor, slot)->type == Duplicate )
3769 fwrite (ptr->key, len, 1, stdout);
3770 val = valptr(bt->cursor, slot);
3771 fwrite (val->value, val->len, 1, stdout);
3772 fputc ('\n', stdout);
3776 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3781 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3783 fprintf(stderr, "started counting\n");
3784 next = LEAF_page + bt->mgr->redopages + 1;
3786 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3787 if( bt_readpage (bt->mgr, bt->frame, page_no) )
3790 if( !bt->frame->free && !bt->frame->lvl )
3791 cnt += bt->frame->act;
3797 cnt--; // remove stopper key
3798 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3810 typedef struct timeval timer;
3812 int main (int argc, char **argv)
3814 int idx, cnt, len, slot, err;
3815 int segsize, bits = 16;
3833 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size recovery_pages src_file1 src_file2 ... ]\n", argv[0]);
3834 fprintf (stderr, " where idx_file is the name of the btree file\n");
3835 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3836 fprintf (stderr, " page_bits is the page size in bits\n");
3837 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
3838 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3839 fprintf (stderr, " recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3840 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3844 start = getCpuTime(0);
3847 bits = atoi(argv[3]);
3850 poolsize = atoi(argv[4]);
3853 fprintf (stderr, "Warning: no mapped_pool\n");
3856 num = atoi(argv[5]);
3859 recovery = atoi(argv[6]);
3863 threads = malloc (cnt * sizeof(pthread_t));
3865 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3867 args = malloc (cnt * sizeof(ThreadArg));
3869 mgr = bt_mgr ((argv[1]), bits, poolsize, recovery);
3872 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3878 for( idx = 0; idx < cnt; idx++ ) {
3879 args[idx].infile = argv[idx + 7];
3880 args[idx].type = argv[2];
3881 args[idx].mgr = mgr;
3882 args[idx].num = num;
3883 args[idx].idx = idx;
3885 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3886 fprintf(stderr, "Error creating thread %d\n", err);
3888 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3892 // wait for termination
3895 for( idx = 0; idx < cnt; idx++ )
3896 pthread_join (threads[idx], NULL);
3898 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3900 for( idx = 0; idx < cnt; idx++ )
3901 CloseHandle(threads[idx]);
3907 elapsed = getCpuTime(0) - start;
3908 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3909 elapsed = getCpuTime(1);
3910 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3911 elapsed = getCpuTime(2);
3912 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);