1 // btree version threadskv9c sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // and redo log for failure recovery
13 // author: karl malbrain, malbrain@cal.berkeley.edu
16 This work, including the source code, documentation
17 and related data, is placed into the public domain.
19 The orginal author is Karl Malbrain.
21 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
22 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
23 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
24 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
25 RESULTING FROM THE USE, MODIFICATION, OR
26 REDISTRIBUTION OF THIS SOFTWARE.
29 // Please see the project home page for documentation
30 // code.google.com/p/high-concurrency-btree
32 #define _FILE_OFFSET_BITS 64
33 #define _LARGEFILE64_SOURCE
37 #include <linux/futex.h>
52 #define WIN32_LEAN_AND_MEAN
66 typedef unsigned long long uid;
67 typedef unsigned long long logseqno;
70 typedef unsigned long long off64_t;
71 typedef unsigned short ushort;
72 typedef unsigned int uint;
75 #define BT_ro 0x6f72 // ro
76 #define BT_rw 0x7772 // rw
78 #define BT_maxbits 24 // maximum page size in bits
79 #define BT_minbits 9 // minimum page size in bits
80 #define BT_minpage (1 << BT_minbits) // minimum page size
81 #define BT_maxpage (1 << BT_maxbits) // maximum page size
83 // BTree page number constants
84 #define ALLOC_page 0 // allocation page
85 #define ROOT_page 1 // root of the btree
86 #define LEAF_page 2 // first page of leaves
87 #define REDO_page 3 // first page of redo buffer
89 // Number of levels to create in a new BTree
94 There are six lock types for each node in four independent sets:
95 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
96 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
97 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
98 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
99 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
100 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
117 uint xlock:1; // one writer has exclusive lock
118 uint wrt:31; // count of other writers waiting
127 // mode & definition for lite latch implementation
130 QueRd = 1, // reader queue
131 QueWr = 2 // writer queue
134 // definition for phase-fair reader/writer lock implementation
137 volatile ushort rin[1];
138 volatile ushort rout[1];
139 volatile ushort ticket[1];
140 volatile ushort serving[1];
158 // hash table entries
161 uint entry; // Latch table entry at head of chain
162 BtMutexLatch latch[1];
165 // latch manager table structure
168 uid page_no; // latch set page number
169 RWLock readwr[1]; // read/write page lock
170 RWLock access[1]; // Access Intent/Page delete
171 WOLock parent[1]; // Posting of fence key in parent
172 WOLock atomic[1]; // Atomic update in progress
173 uint split; // right split page atomic insert
174 uint entry; // entry slot in latch table
175 uint next; // next entry in hash table chain
176 uint prev; // prev entry in hash table chain
177 BtMutexLatch modify[1]; // modify entry lite latch
178 volatile ushort pin; // number of accessing threads
179 volatile unsigned char dirty; // page in cache is dirty (atomic set)
180 volatile unsigned char avail; // page is an available entry
183 // Define the length of the page record numbers
187 // Page key slot definition.
189 // Keys are marked dead, but remain on the page until
190 // it cleanup is called. The fence key (highest key) for
191 // a leaf page is always present, even after cleanup.
195 // In addition to the Unique keys that occupy slots
196 // there are Librarian and Duplicate key
197 // slots occupying the key slot array.
199 // The Librarian slots are dead keys that
200 // serve as filler, available to add new Unique
201 // or Dup slots that are inserted into the B-tree.
203 // The Duplicate slots have had their key bytes extended
204 // by 6 bytes to contain a binary duplicate key uniqueifier.
215 uint off:BT_maxbits; // page offset for key start
216 uint type:3; // type of slot
217 uint dead:1; // set for deleted slot
220 // The key structure occupies space at the upper end of
221 // each page. It's a length byte followed by the key
225 unsigned char len; // this can be changed to a ushort or uint
226 unsigned char key[0];
229 // the value structure also occupies space at the upper
230 // end of the page. Each key is immediately followed by a value.
233 unsigned char len; // this can be changed to a ushort or uint
234 unsigned char value[0];
237 #define BT_maxkey 255 // maximum number of bytes in a key
238 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
240 // The first part of an index page.
241 // It is immediately followed
242 // by the BtSlot array of keys.
244 // note that this structure size
245 // must be a multiple of 8 bytes
246 // in order to place dups correctly.
248 typedef struct BtPage_ {
249 uint cnt; // count of keys in page
250 uint act; // count of active keys
251 uint min; // next key offset
252 uint garbage; // page garbage in bytes
253 unsigned char bits:7; // page size in bits
254 unsigned char free:1; // page is on free chain
255 unsigned char lvl:7; // level of page
256 unsigned char kill:1; // page is being deleted
257 unsigned char right[BtId]; // page number to right
258 unsigned char left[BtId]; // page number to left
259 unsigned char filler[2]; // padding to multiple of 8
260 logseqno lsn; // log sequence number applied
263 // The loadpage interface object
266 BtPage page; // current page pointer
267 BtLatchSet *latch; // current page latch set
270 // structure for latch manager on ALLOC_page
273 struct BtPage_ alloc[1]; // next page_no in right ptr
274 unsigned long long dups[1]; // global duplicate key uniqueifier
275 unsigned char chain[BtId]; // head of free page_nos chain
278 // The object structure for Btree access
281 uint page_size; // page size
282 uint page_bits; // page size in bits
288 BtPageZero *pagezero; // mapped allocation page
289 BtHashEntry *hashtable; // the buffer pool hash table entries
290 BtLatchSet *latchsets; // mapped latch set from buffer pool
291 unsigned char *pagepool; // mapped to the buffer pool pages
292 unsigned char *redobuff; // mapped recovery buffer pointer
293 logseqno lsn, flushlsn; // current & first lsn flushed
294 BtMutexLatch dump[1]; // redo dump lite latch
295 BtMutexLatch redo[1]; // redo area lite latch
296 BtMutexLatch lock[1]; // allocation area lite latch
297 ushort thread_no[1]; // next thread number
298 uint nlatchpage; // number of latch pages at BT_latch
299 uint latchtotal; // number of page latch entries
300 uint latchhash; // number of latch hash table slots
301 uint latchvictim; // next latch entry to examine
302 uint latchavail; // next available latch entry
303 uint availlock[1]; // latch available chain commitments
304 uint available; // size of latch available chain
305 uint redopages; // size of recovery buff in pages
306 uint redoend; // eof/end element in recovery buff
308 HANDLE halloc; // allocation handle
309 HANDLE hpool; // buffer pool handle
314 BtMgr *mgr; // buffer manager for thread
315 BtPage cursor; // cached frame for start/next (never mapped)
316 BtPage frame; // spare frame for the page split (never mapped)
317 uid cursor_page; // current cursor page number
318 unsigned char *mem; // frame, cursor, page memory buffer
319 unsigned char key[BT_keyarray]; // last found complete key
320 int found; // last delete or insert was found
321 int err; // last error
322 int line; // last error line no
323 int reads, writes; // number of reads and writes from the btree
324 ushort thread_no; // thread number
327 // Catastrophic errors
342 #define CLOCK_bit 0x8000
344 // recovery manager entry types
347 BTRM_eof = 0, // rest of buffer is emtpy
348 BTRM_add, // add a unique key-value to btree
349 BTRM_dup, // add a duplicate key-value to btree
350 BTRM_del, // delete a key-value from btree
351 BTRM_upd, // update a key with a new value
352 BTRM_new, // allocate a new empty page
353 BTRM_old // reuse an old empty page
356 // recovery manager entry
357 // structure followed by BtKey & BtVal
360 logseqno reqlsn; // log sequence number required
361 logseqno lsn; // log sequence number for entry
362 uint len; // length of entry
363 unsigned char type; // type of entry
364 unsigned char lvl; // level of btree entry pertains to
369 extern void bt_close (BtDb *bt);
370 extern BtDb *bt_open (BtMgr *mgr);
371 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
372 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
373 extern void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
374 extern void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
375 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
376 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
377 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
378 extern BtKey *bt_foundkey (BtDb *bt);
379 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
380 extern uint bt_nextkey (BtDb *bt, uint slot);
383 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint rmpages);
384 extern void bt_mgrclose (BtMgr *mgr);
385 extern logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val);
386 extern logseqno bt_txnredo (BtDb *bt, BtPage page);
388 // Helper functions to return slot values
389 // from the cursor page.
391 extern BtKey *bt_key (BtDb *bt, uint slot);
392 extern BtVal *bt_val (BtDb *bt, uint slot);
394 // The page is allocated from low and hi ends.
395 // The key slots are allocated from the bottom,
396 // while the text and value of the key
397 // are allocated from the top. When the two
398 // areas meet, the page is split into two.
400 // A key consists of a length byte, two bytes of
401 // index number (0 - 65535), and up to 253 bytes
404 // Associated with each key is a value byte string
405 // containing any value desired.
407 // The b-tree root is always located at page 1.
408 // The first leaf page of level zero is always
409 // located on page 2.
411 // The b-tree pages are linked with next
412 // pointers to facilitate enumerators,
413 // and provide for concurrency.
415 // When to root page fills, it is split in two and
416 // the tree height is raised by a new root at page
417 // one with two keys.
419 // Deleted keys are marked with a dead bit until
420 // page cleanup. The fence key for a leaf node is
423 // To achieve maximum concurrency one page is locked at a time
424 // as the tree is traversed to find leaf key in question. The right
425 // page numbers are used in cases where the page is being split,
428 // Page 0 is dedicated to lock for new page extensions,
429 // and chains empty pages together for reuse. It also
430 // contains the latch manager hash table.
432 // The ParentModification lock on a node is obtained to serialize posting
433 // or changing the fence key for a node.
435 // Empty pages are chained together through the ALLOC page and reused.
437 // Access macros to address slot and key values from the page
438 // Page slots use 1 based indexing.
440 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
441 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
442 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
444 void bt_putid(unsigned char *dest, uid id)
449 dest[i] = (unsigned char)id, id >>= 8;
452 uid bt_getid(unsigned char *src)
457 for( i = 0; i < BtId; i++ )
458 id <<= 8, id |= *src++;
463 uid bt_newdup (BtDb *bt)
466 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
468 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
472 // lite weight spin lock Latch Manager
474 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
476 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
479 void bt_mutexlock(BtMutexLatch *latch)
481 BtMutexLatch prev[1];
485 *prev->value = __sync_fetch_and_or(latch->value, XCL);
487 if( !prev->bits->xlock ) { // did we set XCL bit?
489 __sync_fetch_and_sub(latch->value, WRT);
495 __sync_fetch_and_add(latch->value, WRT);
498 sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr);
503 // try to obtain write lock
505 // return 1 if obtained,
508 int bt_mutextry(BtMutexLatch *latch)
510 BtMutexLatch prev[1];
512 *prev->value = __sync_fetch_and_or(latch->value, XCL);
514 // take write access if exclusive bit is clear
516 return !prev->bits->xlock;
521 void bt_releasemutex(BtMutexLatch *latch)
523 BtMutexLatch prev[1];
525 *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
527 if( prev->bits->wrt )
528 sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
531 // Write-Only Queue Lock
533 void WriteOLock (WOLock *lock, ushort tid)
535 if( lock->tid == tid ) {
540 bt_mutexlock(lock->xcl);
544 void WriteORelease (WOLock *lock)
552 bt_releasemutex(lock->xcl);
555 // Phase-Fair reader/writer lock implementation
557 void WriteLock (RWLock *lock, ushort tid)
561 if( lock->tid == tid ) {
566 tix = __sync_fetch_and_add (lock->ticket, 1);
568 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
570 // wait for our ticket to come up
572 while( tix != lock->serving[0] )
579 w = PRES | (tix & PHID);
581 r = __sync_fetch_and_add (lock->rin, w);
583 r = _InterlockedExchangeAdd16 (lock->rin, w);
585 while( r != *lock->rout )
594 void WriteRelease (RWLock *lock)
603 __sync_fetch_and_and (lock->rin, ~MASK);
605 _InterlockedAnd16 (lock->rin, ~MASK);
610 // try to obtain read lock
611 // return 1 if successful
613 int ReadTry (RWLock *lock, ushort tid)
617 // OK if write lock already held by same thread
619 if( lock->tid == tid ) {
624 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
626 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
629 if( w == (*lock->rin & MASK) ) {
631 __sync_fetch_and_add (lock->rin, -RINC);
633 _InterlockedExchangeAdd16 (lock->rin, -RINC);
641 void ReadLock (RWLock *lock, ushort tid)
644 if( lock->tid == tid ) {
649 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
651 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
654 while( w == (*lock->rin & MASK) )
662 void ReadRelease (RWLock *lock)
670 __sync_fetch_and_add (lock->rout, RINC);
672 _InterlockedExchangeAdd16 (lock->rout, RINC);
676 void bt_flushlsn (BtDb *bt)
678 uint cnt3 = 0, cnt2 = 0, cnt = 0;
683 // flush dirty pool pages to the btree that were
684 // dirty before the start of this redo recovery buffer
685 fprintf(stderr, "Start flushlsn\n");
686 for( entry = 1; entry < bt->mgr->latchtotal; entry++ ) {
687 page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
688 latch = bt->mgr->latchsets + entry;
689 bt_mutexlock (latch->modify);
690 bt_lockpage(bt, BtLockRead, latch);
693 bt_writepage(bt->mgr, page, latch->page_no);
694 latch->dirty = 0, bt->writes++, cnt++;
698 if( latch->pin & ~CLOCK_bit )
700 bt_unlockpage(bt, BtLockRead, latch);
701 bt_releasemutex (latch->modify);
703 fprintf(stderr, "End flushlsn %d pages %d pinned %d available\n", cnt, cnt2, cnt3);
706 // recovery manager -- process current recovery buff on startup
707 // this won't do much if previous session was properly closed.
709 BTERR bt_recoveryredo (BtMgr *mgr)
716 pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
718 hdr = (BtLogHdr *)mgr->redobuff;
719 mgr->flushlsn = hdr->lsn;
722 hdr = (BtLogHdr *)(mgr->redobuff + offset);
723 switch( hdr->type ) {
727 case BTRM_add: // add a unique key-value to btree
729 case BTRM_dup: // add a duplicate key-value to btree
730 case BTRM_del: // delete a key-value from btree
731 case BTRM_upd: // update a key with a new value
732 case BTRM_new: // allocate a new empty page
733 case BTRM_old: // reuse an old empty page
739 // recovery manager -- dump current recovery buff & flush dirty pages
740 // in preparation for next recovery buffer.
742 BTERR bt_dumpredo (BtDb *bt)
745 fprintf(stderr, "Flush pages ");
747 eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
748 memset (eof, 0, sizeof(BtLogHdr));
750 // flush pages written at beginning of this redo buffer
751 // then write the redo buffer out to disk
753 fdatasync (bt->mgr->idx);
755 fprintf(stderr, "Dump ReDo: %d bytes\n", bt->mgr->redoend);
756 pwrite (bt->mgr->idx, bt->mgr->redobuff, bt->mgr->redoend + sizeof(BtLogHdr), REDO_page << bt->mgr->page_bits);
758 sync_file_range (bt->mgr->idx, REDO_page << bt->mgr->page_bits, bt->mgr->redoend + sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
760 bt->mgr->flushlsn = bt->mgr->lsn;
761 bt->mgr->redoend = 0;
763 eof = (BtLogHdr *)(bt->mgr->redobuff);
764 memset (eof, 0, sizeof(BtLogHdr));
765 eof->lsn = bt->mgr->lsn;
769 // recovery manager -- append new entry to recovery log
770 // flush to disk when it overflows.
772 logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val)
774 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
775 uint amt = sizeof(BtLogHdr);
779 bt_mutexlock (bt->mgr->redo);
782 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
784 // see if new entry fits in buffer
785 // flush and reset if it doesn't
787 if( flush = amt > size - bt->mgr->redoend ) {
788 bt_mutexlock (bt->mgr->dump);
790 if( bt_dumpredo (bt) )
794 // fill in new entry & either eof or end block
796 hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
801 hdr->lsn = ++bt->mgr->lsn;
803 bt->mgr->redoend += amt;
805 eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
806 memset (eof, 0, sizeof(BtLogHdr));
808 // fill in key and value
811 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
812 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
815 bt_releasemutex(bt->mgr->redo);
819 bt_releasemutex(bt->mgr->dump);
825 // recovery manager -- append transaction to recovery log
826 // flush to disk when it overflows.
828 logseqno bt_txnredo (BtDb *bt, BtPage source)
830 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
831 uint amt = 0, src, type;
838 // determine amount of redo recovery log space required
840 for( src = 0; src++ < source->cnt; ) {
841 key = keyptr(source,src);
842 val = valptr(source,src);
843 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
844 amt += sizeof(BtLogHdr);
847 bt_mutexlock (bt->mgr->redo);
849 // see if new entry fits in buffer
850 // flush and reset if it doesn't
852 if( flush = amt > size - bt->mgr->redoend ) {
853 bt_mutexlock (bt->mgr->dump);
855 if( bt_dumpredo (bt) )
859 // assign new lsn to transaction
861 lsn = ++bt->mgr->lsn;
863 // fill in new entries
865 for( src = 0; src++ < source->cnt; ) {
866 key = keyptr(source, src);
867 val = valptr(source, src);
869 switch( slotptr(source, src)->type ) {
884 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
885 amt += sizeof(BtLogHdr);
887 hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
893 // fill in key and value
895 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
896 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
898 bt->mgr->redoend += amt;
901 eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
902 memset (eof, 0, sizeof(BtLogHdr));
904 bt_releasemutex(bt->mgr->redo);
908 bt_releasemutex(bt->mgr->dump);
914 // read page into buffer pool from permanent location in Btree file
916 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
918 off64_t off = page_no << mgr->page_bits;
921 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
922 fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno);
929 memset (ovl, 0, sizeof(OVERLAPPED));
931 ovl->OffsetHigh = off >> 32;
933 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
934 fprintf (stderr, "Unable to read page %d GetLastError = %d\n", page_no, GetLastError());
937 if( *amt < mgr->page_size ) {
938 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
945 // write page to permanent location in Btree file
946 // clear the dirty bit
948 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
950 off64_t off = page_no << mgr->page_bits;
953 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
959 memset (ovl, 0, sizeof(OVERLAPPED));
961 ovl->OffsetHigh = off >> 32;
963 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
966 if( *amt < mgr->page_size )
972 // set CLOCK bit in latch
973 // decrement pin count
975 void bt_unpinlatch (BtLatchSet *latch)
977 bt_mutexlock(latch->modify);
978 latch->pin |= CLOCK_bit;
980 bt_releasemutex(latch->modify);
983 // return the btree cached page address
984 // if page is dirty and has not yet been
985 // flushed to disk for the current redo
986 // recovery buffer, write it out.
988 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
990 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
995 // return next available latch entry
996 // and with latch entry locked
998 uint bt_availnext (BtDb *bt)
1005 entry = __sync_fetch_and_add (&bt->mgr->latchavail, 1) + 1;
1007 entry = _InterlockedIncrement (&bt->mgr->latchavail);
1009 entry %= bt->mgr->latchtotal;
1014 latch = bt->mgr->latchsets + entry;
1019 bt_mutexlock(latch->modify);
1021 if( !latch->avail ) {
1022 bt_releasemutex(latch->modify);
1030 // find and add the next available latch entry
1033 BTERR bt_availlatch (BtDb *bt)
1041 // find and reuse previous entry on victim
1043 startattempt = bt->mgr->latchvictim;
1047 entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
1049 entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
1051 // skip entry if it has outstanding pins
1053 entry %= bt->mgr->latchtotal;
1058 // only go around one time before
1059 // flushing redo recovery buffer,
1060 // and the buffer pool to free up entries.
1062 if( bt->mgr->redopages )
1063 if( bt->mgr->latchvictim - startattempt > bt->mgr->latchtotal ) {
1064 if( bt_mutextry (bt->mgr->dump) ) {
1065 if( bt_dumpredo (bt) )
1068 // synchronize the various threads running into this condition
1069 // so that only one thread does the dump and flush
1071 bt_mutexlock(bt->mgr->dump);
1073 startattempt = bt->mgr->latchvictim;
1074 bt_releasemutex(bt->mgr->dump);
1077 latch = bt->mgr->latchsets + entry;
1082 bt_mutexlock(latch->modify);
1084 // skip if already an available entry
1086 if( latch->avail ) {
1087 bt_releasemutex(latch->modify);
1091 // skip this entry if it is pinned
1092 // if the CLOCK bit is set
1093 // reset it to zero.
1096 latch->pin &= ~CLOCK_bit;
1097 bt_releasemutex(latch->modify);
1101 page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
1103 // if dirty page has lsn >= last redo recovery buffer
1104 // then hold page in the buffer pool until next redo
1105 // recovery buffer is being written to disk.
1108 if( page->lsn >= bt->mgr->flushlsn ) {
1109 bt_releasemutex(latch->modify);
1113 // entry is available
1115 __sync_fetch_and_add (&bt->mgr->available, 1);
1117 _InterlockedIncrement(&bt->mgr->available);
1120 bt_releasemutex(latch->modify);
1125 // release available latch requests
1127 void bt_availrelease (BtDb *bt, uint count)
1130 __sync_fetch_and_add(bt->mgr->availlock, -count);
1132 _InterlockedAdd(bt->mgr->availlock, -count);
1136 // commit available chain entries
1137 // find available entries as required
1139 void bt_availrequest (BtDb *bt, uint count)
1142 __sync_fetch_and_add(bt->mgr->availlock, count);
1144 _InterlockedAdd(bt->mgr->availlock, count);
1147 while( *bt->mgr->availlock > bt->mgr->available )
1151 // find available latchset
1152 // return with latchset pinned
1154 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, BtPage loadit)
1156 uint hashidx = page_no % bt->mgr->latchhash;
1161 // try to find our entry
1163 bt_mutexlock(bt->mgr->hashtable[hashidx].latch);
1165 if( entry = bt->mgr->hashtable[hashidx].entry ) do
1167 latch = bt->mgr->latchsets + entry;
1168 if( page_no == latch->page_no )
1170 } while( entry = latch->next );
1172 // found our entry: increment pin
1173 // remove from available status
1176 latch = bt->mgr->latchsets + entry;
1177 bt_mutexlock(latch->modify);
1180 __sync_fetch_and_add (&bt->mgr->available, -1);
1182 _InterlockedDecrement(&bt->mgr->available);
1185 latch->pin |= CLOCK_bit;
1188 bt_releasemutex(latch->modify);
1189 bt_releasemutex(bt->mgr->hashtable[hashidx].latch);
1193 // find and reuse entry from available set
1197 if( entry = bt_availnext (bt) )
1198 latch = bt->mgr->latchsets + entry;
1200 return bt->line = __LINE__, bt->err = BTERR_avail, NULL;
1202 idx = latch->page_no % bt->mgr->latchhash;
1204 // if latch is on a different hash chain
1205 // unlink from the old page_no chain
1207 if( latch->page_no )
1208 if( idx != hashidx ) {
1210 // skip over this entry if latch not available
1212 if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
1213 bt_releasemutex(latch->modify);
1218 bt->mgr->latchsets[latch->prev].next = latch->next;
1220 bt->mgr->hashtable[idx].entry = latch->next;
1223 bt->mgr->latchsets[latch->next].prev = latch->prev;
1225 bt_releasemutex (bt->mgr->hashtable[idx].latch);
1228 // remove available status
1232 __sync_fetch_and_add (&bt->mgr->available, -1);
1234 _InterlockedDecrement(&bt->mgr->available);
1236 page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
1239 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
1240 return bt->line = __LINE__, NULL;
1242 latch->dirty = 0, bt->writes++;
1245 memcpy (page, loadit, bt->mgr->page_size);
1248 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
1249 return bt->line = __LINE__, NULL;
1253 // link page as head of hash table chain
1254 // if this is a never before used entry,
1255 // or it was previously on a different
1256 // hash table chain. Otherwise, just
1257 // leave it in its current hash table
1260 if( !latch->page_no || hashidx != idx ) {
1261 if( latch->next = bt->mgr->hashtable[hashidx].entry )
1262 bt->mgr->latchsets[latch->next].prev = entry;
1264 bt->mgr->hashtable[hashidx].entry = entry;
1268 // fill in latch structure
1270 latch->pin = CLOCK_bit | 1;
1271 latch->page_no = page_no;
1272 latch->entry = entry;
1275 bt_releasemutex (latch->modify);
1276 bt_releasemutex (bt->mgr->hashtable[hashidx].latch);
1280 void bt_mgrclose (BtMgr *mgr)
1288 // flush previously written dirty pages
1289 // and write recovery buffer to disk
1291 fdatasync (mgr->idx);
1293 if( mgr->redoend ) {
1294 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1295 memset (eof, 0, sizeof(BtLogHdr));
1297 pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1300 // write remaining dirty pool pages to the btree
1302 for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1303 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1304 latch = mgr->latchsets + slot;
1306 if( latch->dirty ) {
1307 bt_writepage(mgr, page, latch->page_no);
1308 latch->dirty = 0, num++;
1312 // flush last batch to disk and clear
1313 // redo recovery buffer on disk.
1315 fdatasync (mgr->idx);
1317 if( mgr->redopages ) {
1318 eof = (BtLogHdr *)mgr->redobuff;
1319 memset (eof, 0, sizeof(BtLogHdr));
1320 eof->lsn = mgr->lsn;
1322 pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1324 sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
1327 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1330 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1331 munmap (mgr->pagezero, mgr->page_size);
1333 FlushViewOfFile(mgr->pagezero, 0);
1334 UnmapViewOfFile(mgr->pagezero);
1335 UnmapViewOfFile(mgr->pagepool);
1336 CloseHandle(mgr->halloc);
1337 CloseHandle(mgr->hpool);
1340 if( mgr->redopages )
1341 free (mgr->redobuff);
1345 if( mgr->redopages )
1346 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1347 FlushFileBuffers(mgr->idx);
1348 CloseHandle(mgr->idx);
1353 // close and release memory
1355 void bt_close (BtDb *bt)
1362 VirtualFree (bt->mem, 0, MEM_RELEASE);
1367 // open/create new btree buffer manager
1369 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1370 // size of page pool (e.g. 262144)
1372 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1374 uint lvl, attr, last, slot, idx;
1375 uint nlatchpage, latchhash;
1376 unsigned char value[BtId];
1377 int flag, initit = 0;
1378 BtPageZero *pagezero;
1386 // determine sanity of page size and buffer pool
1388 if( bits > BT_maxbits )
1390 else if( bits < BT_minbits )
1393 if( nodemax < 16 ) {
1394 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
1399 mgr = calloc (1, sizeof(BtMgr));
1401 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1403 if( mgr->idx == -1 ) {
1404 fprintf (stderr, "Unable to open btree file\n");
1405 return free(mgr), NULL;
1408 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1409 attr = FILE_ATTRIBUTE_NORMAL;
1410 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1412 if( mgr->idx == INVALID_HANDLE_VALUE )
1413 return GlobalFree(mgr), NULL;
1417 pagezero = valloc (BT_maxpage);
1420 // read minimum page size to get root info
1421 // to support raw disk partition files
1422 // check if bits == 0 on the disk.
1424 if( size = lseek (mgr->idx, 0L, 2) )
1425 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1426 if( pagezero->alloc->bits )
1427 bits = pagezero->alloc->bits;
1431 return free(mgr), free(pagezero), NULL;
1435 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1436 size = GetFileSize(mgr->idx, amt);
1438 if( size || *amt ) {
1439 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1440 return bt_mgrclose (mgr), NULL;
1441 bits = pagezero->alloc->bits;
1446 mgr->page_size = 1 << bits;
1447 mgr->page_bits = bits;
1449 // calculate number of latch hash table entries
1451 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1453 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1454 mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1455 mgr->latchtotal = nodemax;
1460 // initialize an empty b-tree with latch page, root page, page of leaves
1461 // and page(s) of latches and page pool cache
1463 memset (pagezero, 0, 1 << bits);
1464 pagezero->alloc->bits = mgr->page_bits;
1465 bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
1467 // initialize left-most LEAF page in
1470 bt_putid (pagezero->alloc->left, LEAF_page);
1472 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1473 fprintf (stderr, "Unable to create btree page zero\n");
1474 return bt_mgrclose (mgr), NULL;
1477 memset (pagezero, 0, 1 << bits);
1478 pagezero->alloc->bits = mgr->page_bits;
1480 for( lvl=MIN_lvl; lvl--; ) {
1481 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1482 key = keyptr(pagezero->alloc, 1);
1483 key->len = 2; // create stopper key
1487 bt_putid(value, MIN_lvl - lvl + 1);
1488 val = valptr(pagezero->alloc, 1);
1489 val->len = lvl ? BtId : 0;
1490 memcpy (val->value, value, val->len);
1492 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1493 pagezero->alloc->lvl = lvl;
1494 pagezero->alloc->cnt = 1;
1495 pagezero->alloc->act = 1;
1497 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1498 fprintf (stderr, "Unable to create btree page zero\n");
1499 return bt_mgrclose (mgr), NULL;
1507 VirtualFree (pagezero, 0, MEM_RELEASE);
1510 // mlock the pagezero page
1512 flag = PROT_READ | PROT_WRITE;
1513 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1514 if( mgr->pagezero == MAP_FAILED ) {
1515 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1516 return bt_mgrclose (mgr), NULL;
1518 mlock (mgr->pagezero, mgr->page_size);
1520 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1521 if( mgr->pagepool == MAP_FAILED ) {
1522 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1523 return bt_mgrclose (mgr), NULL;
1525 if( mgr->redopages = redopages )
1526 mgr->redobuff = valloc (redopages * mgr->page_size);
1528 flag = PAGE_READWRITE;
1529 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1530 if( !mgr->halloc ) {
1531 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1532 return bt_mgrclose (mgr), NULL;
1535 flag = FILE_MAP_WRITE;
1536 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1537 if( !mgr->pagezero ) {
1538 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1539 return bt_mgrclose (mgr), NULL;
1542 flag = PAGE_READWRITE;
1543 size = (uid)mgr->nlatchpage << mgr->page_bits;
1544 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1546 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1547 return bt_mgrclose (mgr), NULL;
1550 flag = FILE_MAP_WRITE;
1551 mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1552 if( !mgr->pagepool ) {
1553 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1554 return bt_mgrclose (mgr), NULL;
1556 if( mgr->redopages = redopages )
1557 mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
1560 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1561 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1562 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1564 // mark all pool entries as available
1566 for( idx = 1; idx < mgr->latchtotal; idx++ ) {
1567 latch = mgr->latchsets + idx;
1575 // open BTree access method
1576 // based on buffer manager
1578 BtDb *bt_open (BtMgr *mgr)
1580 BtDb *bt = malloc (sizeof(*bt));
1582 memset (bt, 0, sizeof(*bt));
1585 bt->mem = valloc (2 *mgr->page_size);
1587 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1589 bt->frame = (BtPage)bt->mem;
1590 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1592 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1594 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1599 // compare two keys, return > 0, = 0, or < 0
1600 // =0: keys are same
1603 // as the comparison value
1605 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1607 uint len1 = key1->len;
1610 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1621 // place write, read, or parent lock on requested page_no.
1623 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1627 ReadLock (latch->readwr, bt->thread_no);
1630 WriteLock (latch->readwr, bt->thread_no);
1633 ReadLock (latch->access, bt->thread_no);
1636 WriteLock (latch->access, bt->thread_no);
1639 WriteOLock (latch->parent, bt->thread_no);
1642 WriteOLock (latch->atomic, bt->thread_no);
1644 case BtLockAtomic | BtLockRead:
1645 WriteOLock (latch->atomic, bt->thread_no);
1646 ReadLock (latch->readwr, bt->thread_no);
1651 // remove write, read, or parent lock on requested page
1653 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1657 ReadRelease (latch->readwr);
1660 WriteRelease (latch->readwr);
1663 ReadRelease (latch->access);
1666 WriteRelease (latch->access);
1669 WriteORelease (latch->parent);
1672 WriteORelease (latch->atomic);
1674 case BtLockAtomic | BtLockRead:
1675 WriteORelease (latch->atomic);
1676 ReadRelease (latch->readwr);
1681 // allocate a new page
1682 // return with page latched, but unlocked.
1684 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1689 // lock allocation page
1691 bt_mutexlock(bt->mgr->lock);
1693 // use empty chain first
1694 // else allocate empty page
1696 if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1697 if( set->latch = bt_pinlatch (bt, page_no, NULL) )
1698 set->page = bt_mappage (bt, set->latch);
1700 return bt->err = BTERR_struct, bt->line = __LINE__, -1;
1702 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1703 bt_releasemutex(bt->mgr->lock);
1705 memcpy (set->page, contents, bt->mgr->page_size);
1706 set->latch->dirty = 1;
1710 page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1711 bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1713 // unlock allocation latch and
1714 // extend file into new page.
1716 bt_releasemutex(bt->mgr->lock);
1718 // don't load cache from btree page
1720 if( set->latch = bt_pinlatch (bt, page_no, contents) )
1721 set->page = bt_mappage (bt, set->latch);
1725 set->latch->dirty = 1;
1729 // find slot in page for given key at a given level
1731 int bt_findslot (BtPage page, unsigned char *key, uint len)
1733 uint diff, higher = page->cnt, low = 1, slot;
1736 // make stopper key an infinite fence value
1738 if( bt_getid (page->right) )
1743 // low is the lowest candidate.
1744 // loop ends when they meet
1746 // higher is already
1747 // tested as .ge. the passed key.
1749 while( diff = higher - low ) {
1750 slot = low + ( diff >> 1 );
1751 if( keycmp (keyptr(page, slot), key, len) < 0 )
1754 higher = slot, good++;
1757 // return zero if key is on right link page
1759 return good ? higher : 0;
1762 // find and load page at given level for given key
1763 // leave page rd or wr locked as requested
1765 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1767 uid page_no = ROOT_page, prevpage = 0;
1768 uint drill = 0xff, slot;
1769 BtLatchSet *prevlatch;
1770 uint mode, prevmode;
1773 // start at root of btree and drill down
1776 // determine lock mode of drill level
1777 mode = (drill == lvl) ? lock : BtLockRead;
1779 if( !(set->latch = bt_pinlatch (bt, page_no, NULL)) )
1782 // obtain access lock using lock chaining with Access mode
1784 if( page_no > ROOT_page )
1785 bt_lockpage(bt, BtLockAccess, set->latch);
1787 set->page = bt_mappage (bt, set->latch);
1789 // release & unpin parent or left sibling page
1792 bt_unlockpage(bt, prevmode, prevlatch);
1793 bt_unpinlatch (prevlatch);
1797 // obtain mode lock using lock chaining through AccessLock
1799 bt_lockpage(bt, mode, set->latch);
1801 if( set->page->free )
1802 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1804 if( page_no > ROOT_page )
1805 bt_unlockpage(bt, BtLockAccess, set->latch);
1807 // re-read and re-lock root after determining actual level of root
1809 if( set->page->lvl != drill) {
1810 if( set->latch->page_no != ROOT_page )
1811 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1813 drill = set->page->lvl;
1815 if( lock != BtLockRead && drill == lvl ) {
1816 bt_unlockpage(bt, mode, set->latch);
1817 bt_unpinlatch (set->latch);
1822 prevpage = set->latch->page_no;
1823 prevlatch = set->latch;
1826 // find key on page at this level
1827 // and descend to requested level
1829 if( !set->page->kill )
1830 if( slot = bt_findslot (set->page, key, len) ) {
1834 // find next non-dead slot -- the fence key if nothing else
1836 while( slotptr(set->page, slot)->dead )
1837 if( slot++ < set->page->cnt )
1840 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1842 val = valptr(set->page, slot);
1844 if( val->len == BtId )
1845 page_no = bt_getid(valptr(set->page, slot)->value);
1847 return bt->line = __LINE__, bt->err = BTERR_struct, 0;
1853 // slide right into next page
1855 page_no = bt_getid(set->page->right);
1858 // return error on end of right chain
1860 bt->line = __LINE__, bt->err = BTERR_struct;
1861 return 0; // return error
1864 // return page to free list
1865 // page must be delete & write locked
1867 void bt_freepage (BtDb *bt, BtPageSet *set)
1869 // lock allocation page
1871 bt_mutexlock (bt->mgr->lock);
1875 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1876 bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1877 set->latch->dirty = 1;
1878 set->page->free = 1;
1880 // unlock released page
1882 bt_unlockpage (bt, BtLockDelete, set->latch);
1883 bt_unlockpage (bt, BtLockWrite, set->latch);
1884 bt_unpinlatch (set->latch);
1886 // unlock allocation page
1888 bt_releasemutex (bt->mgr->lock);
1891 // a fence key was deleted from a page
1892 // push new fence value upwards
1894 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1896 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1897 unsigned char value[BtId];
1901 // remove the old fence value
1903 ptr = keyptr(set->page, set->page->cnt);
1904 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1905 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1906 set->latch->dirty = 1;
1908 // cache new fence value
1910 ptr = keyptr(set->page, set->page->cnt);
1911 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1913 bt_lockpage (bt, BtLockParent, set->latch);
1914 bt_unlockpage (bt, BtLockWrite, set->latch);
1916 // insert new (now smaller) fence key
1918 bt_putid (value, set->latch->page_no);
1919 ptr = (BtKey*)leftkey;
1921 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1924 // now delete old fence key
1926 ptr = (BtKey*)rightkey;
1928 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1931 bt_unlockpage (bt, BtLockParent, set->latch);
1932 bt_unpinlatch(set->latch);
1936 // root has a single child
1937 // collapse a level from the tree
1939 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1946 // find the child entry and promote as new root contents
1949 for( idx = 0; idx++ < root->page->cnt; )
1950 if( !slotptr(root->page, idx)->dead )
1953 val = valptr(root->page, idx);
1955 if( val->len == BtId )
1956 page_no = bt_getid (valptr(root->page, idx)->value);
1958 return bt->line = __LINE__, bt->err = BTERR_struct;
1960 if( child->latch = bt_pinlatch (bt, page_no, NULL) )
1961 child->page = bt_mappage (bt, child->latch);
1965 bt_lockpage (bt, BtLockDelete, child->latch);
1966 bt_lockpage (bt, BtLockWrite, child->latch);
1968 memcpy (root->page, child->page, bt->mgr->page_size);
1969 root->latch->dirty = 1;
1971 bt_freepage (bt, child);
1973 } while( root->page->lvl > 1 && root->page->act == 1 );
1975 bt_unlockpage (bt, BtLockWrite, root->latch);
1976 bt_unpinlatch (root->latch);
1980 // delete a page and manage keys
1981 // call with page writelocked
1982 // returns with page unpinned
1984 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1986 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1987 unsigned char value[BtId];
1988 uint lvl = set->page->lvl;
1993 // cache copy of fence key
1994 // to post in parent
1996 ptr = keyptr(set->page, set->page->cnt);
1997 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1999 // obtain lock on right page
2001 page_no = bt_getid(set->page->right);
2003 if( right->latch = bt_pinlatch (bt, page_no, NULL) )
2004 right->page = bt_mappage (bt, right->latch);
2008 bt_lockpage (bt, BtLockWrite, right->latch);
2010 // cache copy of key to update
2012 ptr = keyptr(right->page, right->page->cnt);
2013 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
2015 if( right->page->kill )
2016 return bt->line = __LINE__, bt->err = BTERR_struct;
2018 // pull contents of right peer into our empty page
2020 memcpy (set->page, right->page, bt->mgr->page_size);
2021 set->latch->dirty = 1;
2023 // mark right page deleted and point it to left page
2024 // until we can post parent updates that remove access
2025 // to the deleted page.
2027 bt_putid (right->page->right, set->latch->page_no);
2028 right->latch->dirty = 1;
2029 right->page->kill = 1;
2031 bt_lockpage (bt, BtLockParent, right->latch);
2032 bt_unlockpage (bt, BtLockWrite, right->latch);
2034 bt_lockpage (bt, BtLockParent, set->latch);
2035 bt_unlockpage (bt, BtLockWrite, set->latch);
2037 // redirect higher key directly to our new node contents
2039 bt_putid (value, set->latch->page_no);
2040 ptr = (BtKey*)higherfence;
2042 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2045 // delete old lower key to our node
2047 ptr = (BtKey*)lowerfence;
2049 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
2052 // obtain delete and write locks to right node
2054 bt_unlockpage (bt, BtLockParent, right->latch);
2055 bt_lockpage (bt, BtLockDelete, right->latch);
2056 bt_lockpage (bt, BtLockWrite, right->latch);
2057 bt_freepage (bt, right);
2059 bt_unlockpage (bt, BtLockParent, set->latch);
2060 bt_unpinlatch (set->latch);
2064 // find and delete key on page by marking delete flag bit
2065 // if page becomes empty, delete it from the btree
2067 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
2069 uint slot, idx, found, fence;
2074 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
2075 ptr = keyptr(set->page, slot);
2079 // if librarian slot, advance to real slot
2081 if( slotptr(set->page, slot)->type == Librarian )
2082 ptr = keyptr(set->page, ++slot);
2084 fence = slot == set->page->cnt;
2086 // if key is found delete it, otherwise ignore request
2088 if( found = !keycmp (ptr, key, len) )
2089 if( found = slotptr(set->page, slot)->dead == 0 ) {
2090 val = valptr(set->page,slot);
2091 slotptr(set->page, slot)->dead = 1;
2092 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2095 // collapse empty slots beneath the fence
2097 while( idx = set->page->cnt - 1 )
2098 if( slotptr(set->page, idx)->dead ) {
2099 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2100 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2105 // did we delete a fence key in an upper level?
2107 if( found && lvl && set->page->act && fence )
2108 if( bt_fixfence (bt, set, lvl) )
2113 // do we need to collapse root?
2115 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
2116 if( bt_collapseroot (bt, set) )
2121 // delete empty page
2123 if( !set->page->act )
2124 return bt_deletepage (bt, set);
2126 set->latch->dirty = 1;
2127 bt_unlockpage(bt, BtLockWrite, set->latch);
2128 bt_unpinlatch (set->latch);
2132 BtKey *bt_foundkey (BtDb *bt)
2134 return (BtKey*)(bt->key);
2137 // advance to next slot
2139 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2141 BtLatchSet *prevlatch;
2144 if( slot < set->page->cnt )
2147 prevlatch = set->latch;
2149 if( page_no = bt_getid(set->page->right) )
2150 if( set->latch = bt_pinlatch (bt, page_no, NULL) )
2151 set->page = bt_mappage (bt, set->latch);
2155 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
2157 // obtain access lock using lock chaining with Access mode
2159 bt_lockpage(bt, BtLockAccess, set->latch);
2161 bt_unlockpage(bt, BtLockRead, prevlatch);
2162 bt_unpinlatch (prevlatch);
2164 bt_lockpage(bt, BtLockRead, set->latch);
2165 bt_unlockpage(bt, BtLockAccess, set->latch);
2169 // find unique key or first duplicate key in
2170 // leaf level and return number of value bytes
2171 // or (-1) if not found. Setup key for bt_foundkey
2173 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2181 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
2183 ptr = keyptr(set->page, slot);
2185 // skip librarian slot place holder
2187 if( slotptr(set->page, slot)->type == Librarian )
2188 ptr = keyptr(set->page, ++slot);
2190 // return actual key found
2192 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2195 if( slotptr(set->page, slot)->type == Duplicate )
2198 // not there if we reach the stopper key
2200 if( slot == set->page->cnt )
2201 if( !bt_getid (set->page->right) )
2204 // if key exists, return >= 0 value bytes copied
2205 // otherwise return (-1)
2207 if( slotptr(set->page, slot)->dead )
2211 if( !memcmp (ptr->key, key, len) ) {
2212 val = valptr (set->page,slot);
2213 if( valmax > val->len )
2215 memcpy (value, val->value, valmax);
2221 } while( slot = bt_findnext (bt, set, slot) );
2223 bt_unlockpage (bt, BtLockRead, set->latch);
2224 bt_unpinlatch (set->latch);
2228 // check page for space available,
2229 // clean if necessary and return
2230 // 0 - page needs splitting
2231 // >0 new slot value
2233 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
2235 uint nxt = bt->mgr->page_size;
2236 BtPage page = set->page;
2237 uint cnt = 0, idx = 0;
2238 uint max = page->cnt;
2243 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2246 // skip cleanup and proceed to split
2247 // if there's not enough garbage
2250 if( page->garbage < nxt / 5 )
2253 memcpy (bt->frame, page, bt->mgr->page_size);
2255 // skip page info and set rest of page to zero
2257 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2258 set->latch->dirty = 1;
2263 // clean up page first by
2264 // removing deleted keys
2266 while( cnt++ < max ) {
2270 if( cnt < max || bt->frame->lvl )
2271 if( slotptr(bt->frame,cnt)->dead )
2274 // copy the value across
2276 val = valptr(bt->frame, cnt);
2277 nxt -= val->len + sizeof(BtVal);
2278 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2280 // copy the key across
2282 key = keyptr(bt->frame, cnt);
2283 nxt -= key->len + sizeof(BtKey);
2284 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2286 // make a librarian slot
2288 slotptr(page, ++idx)->off = nxt;
2289 slotptr(page, idx)->type = Librarian;
2290 slotptr(page, idx)->dead = 1;
2294 slotptr(page, ++idx)->off = nxt;
2295 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2297 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2304 // see if page has enough space now, or does it need splitting?
2306 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2312 // split the root and raise the height of the btree
2314 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
2316 unsigned char leftkey[BT_keyarray];
2317 uint nxt = bt->mgr->page_size;
2318 unsigned char value[BtId];
2324 // save left page fence key for new root
2326 ptr = keyptr(root->page, root->page->cnt);
2327 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2329 // Obtain an empty page to use, and copy the current
2330 // root contents into it, e.g. lower keys
2332 if( bt_newpage(bt, left, root->page) )
2335 left_page_no = left->latch->page_no;
2336 bt_unpinlatch (left->latch);
2338 // preserve the page info at the bottom
2339 // of higher keys and set rest to zero
2341 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2343 // insert stopper key at top of newroot page
2344 // and increase the root height
2346 nxt -= BtId + sizeof(BtVal);
2347 bt_putid (value, right->page_no);
2348 val = (BtVal *)((unsigned char *)root->page + nxt);
2349 memcpy (val->value, value, BtId);
2352 nxt -= 2 + sizeof(BtKey);
2353 slotptr(root->page, 2)->off = nxt;
2354 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2359 // insert lower keys page fence key on newroot page as first key
2361 nxt -= BtId + sizeof(BtVal);
2362 bt_putid (value, left_page_no);
2363 val = (BtVal *)((unsigned char *)root->page + nxt);
2364 memcpy (val->value, value, BtId);
2367 ptr = (BtKey *)leftkey;
2368 nxt -= ptr->len + sizeof(BtKey);
2369 slotptr(root->page, 1)->off = nxt;
2370 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2372 bt_putid(root->page->right, 0);
2373 root->page->min = nxt; // reset lowest used offset and key count
2374 root->page->cnt = 2;
2375 root->page->act = 2;
2378 // release and unpin root pages
2380 bt_unlockpage(bt, BtLockWrite, root->latch);
2381 bt_unpinlatch (root->latch);
2383 bt_unpinlatch (right);
2387 // split already locked full node
2389 // return pool entry for new right
2392 uint bt_splitpage (BtDb *bt, BtPageSet *set)
2394 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2395 uint lvl = set->page->lvl;
2402 // split higher half of keys to bt->frame
2404 memset (bt->frame, 0, bt->mgr->page_size);
2405 max = set->page->cnt;
2409 while( cnt++ < max ) {
2410 if( cnt < max || set->page->lvl )
2411 if( slotptr(set->page, cnt)->dead )
2414 src = valptr(set->page, cnt);
2415 nxt -= src->len + sizeof(BtVal);
2416 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
2418 key = keyptr(set->page, cnt);
2419 nxt -= key->len + sizeof(BtKey);
2420 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
2421 memcpy (ptr, key, key->len + sizeof(BtKey));
2423 // add librarian slot
2425 slotptr(bt->frame, ++idx)->off = nxt;
2426 slotptr(bt->frame, idx)->type = Librarian;
2427 slotptr(bt->frame, idx)->dead = 1;
2431 slotptr(bt->frame, ++idx)->off = nxt;
2432 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2434 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2438 bt->frame->bits = bt->mgr->page_bits;
2439 bt->frame->min = nxt;
2440 bt->frame->cnt = idx;
2441 bt->frame->lvl = lvl;
2445 if( set->latch->page_no > ROOT_page )
2446 bt_putid (bt->frame->right, bt_getid (set->page->right));
2448 // get new free page and write higher keys to it.
2450 if( bt_newpage(bt, right, bt->frame) )
2453 // process lower keys
2455 memcpy (bt->frame, set->page, bt->mgr->page_size);
2456 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2457 set->latch->dirty = 1;
2459 nxt = bt->mgr->page_size;
2460 set->page->garbage = 0;
2466 if( slotptr(bt->frame, max)->type == Librarian )
2469 // assemble page of smaller keys
2471 while( cnt++ < max ) {
2472 if( slotptr(bt->frame, cnt)->dead )
2474 val = valptr(bt->frame, cnt);
2475 nxt -= val->len + sizeof(BtVal);
2476 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2478 key = keyptr(bt->frame, cnt);
2479 nxt -= key->len + sizeof(BtKey);
2480 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2482 // add librarian slot
2484 slotptr(set->page, ++idx)->off = nxt;
2485 slotptr(set->page, idx)->type = Librarian;
2486 slotptr(set->page, idx)->dead = 1;
2490 slotptr(set->page, ++idx)->off = nxt;
2491 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2495 bt_putid(set->page->right, right->latch->page_no);
2496 set->page->min = nxt;
2497 set->page->cnt = idx;
2499 return right->latch->entry;
2502 // fix keys for newly split page
2503 // call with page locked, return
2506 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2508 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2509 unsigned char value[BtId];
2510 uint lvl = set->page->lvl;
2514 // if current page is the root page, split it
2516 if( set->latch->page_no == ROOT_page )
2517 return bt_splitroot (bt, set, right);
2519 ptr = keyptr(set->page, set->page->cnt);
2520 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2522 page = bt_mappage (bt, right);
2524 ptr = keyptr(page, page->cnt);
2525 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2527 // insert new fences in their parent pages
2529 bt_lockpage (bt, BtLockParent, right);
2531 bt_lockpage (bt, BtLockParent, set->latch);
2532 bt_unlockpage (bt, BtLockWrite, set->latch);
2534 // insert new fence for reformulated left block of smaller keys
2536 bt_putid (value, set->latch->page_no);
2537 ptr = (BtKey *)leftkey;
2539 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2542 // switch fence for right block of larger keys to new right page
2544 bt_putid (value, right->page_no);
2545 ptr = (BtKey *)rightkey;
2547 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2550 bt_unlockpage (bt, BtLockParent, set->latch);
2551 bt_unpinlatch (set->latch);
2553 bt_unlockpage (bt, BtLockParent, right);
2554 bt_unpinlatch (right);
2558 // install new key and value onto page
2559 // page must already be checked for
2562 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2564 uint idx, librarian;
2569 // if found slot > desired slot and previous slot
2570 // is a librarian slot, use it
2573 if( slotptr(set->page, slot-1)->type == Librarian )
2576 // copy value onto page
2578 set->page->min -= vallen + sizeof(BtVal);
2579 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2580 memcpy (val->value, value, vallen);
2583 // copy key onto page
2585 set->page->min -= keylen + sizeof(BtKey);
2586 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2587 memcpy (ptr->key, key, keylen);
2590 // find first empty slot
2592 for( idx = slot; idx < set->page->cnt; idx++ )
2593 if( slotptr(set->page, idx)->dead )
2596 // now insert key into array before slot
2598 if( idx == set->page->cnt )
2599 idx += 2, set->page->cnt += 2, librarian = 2;
2603 set->latch->dirty = 1;
2606 while( idx > slot + librarian - 1 )
2607 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2609 // add librarian slot
2611 if( librarian > 1 ) {
2612 node = slotptr(set->page, slot++);
2613 node->off = set->page->min;
2614 node->type = Librarian;
2620 node = slotptr(set->page, slot);
2621 node->off = set->page->min;
2626 bt_unlockpage (bt, BtLockWrite, set->latch);
2627 bt_unpinlatch (set->latch);
2633 // Insert new key into the btree at given level.
2634 // either add a new key or update/add an existing one
2636 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2638 unsigned char newkey[BT_keyarray];
2639 uint slot, idx, len, entry;
2646 // set up the key we're working on
2648 ins = (BtKey*)newkey;
2649 memcpy (ins->key, key, keylen);
2652 // is this a non-unique index value?
2658 sequence = bt_newdup (bt);
2659 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2663 while( 1 ) { // find the page and slot for the current key
2664 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2665 ptr = keyptr(set->page, slot);
2668 bt->line = __LINE__, bt->err = BTERR_ovflw;
2672 // if librarian slot == found slot, advance to real slot
2674 if( slotptr(set->page, slot)->type == Librarian )
2675 if( !keycmp (ptr, key, keylen) )
2676 ptr = keyptr(set->page, ++slot);
2680 if( slotptr(set->page, slot)->type == Duplicate )
2683 // if inserting a duplicate key or unique key
2684 // check for adequate space on the page
2685 // and insert the new key before slot.
2687 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2688 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2689 if( !(entry = bt_splitpage (bt, set)) )
2691 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2696 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2699 // if key already exists, update value and return
2701 val = valptr(set->page, slot);
2703 if( val->len >= vallen ) {
2704 if( slotptr(set->page, slot)->dead )
2706 set->page->garbage += val->len - vallen;
2707 set->latch->dirty = 1;
2708 slotptr(set->page, slot)->dead = 0;
2710 memcpy (val->value, value, vallen);
2711 bt_unlockpage(bt, BtLockWrite, set->latch);
2712 bt_unpinlatch (set->latch);
2716 // new update value doesn't fit in existing value area
2718 if( !slotptr(set->page, slot)->dead )
2719 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2721 slotptr(set->page, slot)->dead = 0;
2725 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2726 if( !(entry = bt_splitpage (bt, set)) )
2728 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2733 set->page->min -= vallen + sizeof(BtVal);
2734 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2735 memcpy (val->value, value, vallen);
2738 set->latch->dirty = 1;
2739 set->page->min -= keylen + sizeof(BtKey);
2740 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2741 memcpy (ptr->key, key, keylen);
2744 slotptr(set->page, slot)->off = set->page->min;
2745 bt_unlockpage(bt, BtLockWrite, set->latch);
2746 bt_unpinlatch (set->latch);
2753 logseqno reqlsn; // redo log seq no required
2754 logseqno lsn; // redo log sequence number
2755 uint entry; // latch table entry number
2756 uint slot:31; // page slot number
2757 uint reuse:1; // reused previous page
2761 uid page_no; // page number for split leaf
2762 void *next; // next key to insert
2763 uint entry:29; // latch table entry number
2764 uint type:2; // 0 == insert, 1 == delete, 2 == free
2765 uint nounlock:1; // don't unlock ParentModification
2766 unsigned char leafkey[BT_keyarray];
2769 // determine actual page where key is located
2770 // return slot number
2772 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2774 BtKey *key = keyptr(source,src);
2775 uint slot = locks[src].slot;
2778 if( src > 1 && locks[src].reuse )
2779 entry = locks[src-1].entry, slot = 0;
2781 entry = locks[src].entry;
2784 set->latch = bt->mgr->latchsets + entry;
2785 set->page = bt_mappage (bt, set->latch);
2789 // is locks->reuse set? or was slot zeroed?
2790 // if so, find where our key is located
2791 // on current page or pages split on
2792 // same page txn operations.
2795 set->latch = bt->mgr->latchsets + entry;
2796 set->page = bt_mappage (bt, set->latch);
2798 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2799 if( slotptr(set->page, slot)->type == Librarian )
2801 if( locks[src].reuse )
2802 locks[src].entry = entry;
2805 } while( entry = set->latch->split );
2807 bt->line = __LINE__, bt->err = BTERR_atomic;
2811 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2813 BtKey *key = keyptr(source, src);
2814 BtVal *val = valptr(source, src);
2819 while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2820 if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) ) {
2821 if( bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2823 set->page->lsn = locks[src].lsn;
2827 if( entry = bt_splitpage (bt, set) )
2828 latch = bt->mgr->latchsets + entry;
2832 // splice right page into split chain
2833 // and WriteLock it.
2835 bt_lockpage(bt, BtLockWrite, latch);
2836 latch->split = set->latch->split;
2837 set->latch->split = entry;
2838 locks[src].slot = 0;
2841 return bt->line = __LINE__, bt->err = BTERR_atomic;
2844 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2846 BtKey *key = keyptr(source, src);
2852 if( slot = bt_atomicpage (bt, source, locks, src, set) )
2853 ptr = keyptr(set->page, slot);
2855 return bt->line = __LINE__, bt->err = BTERR_struct;
2857 if( !keycmp (ptr, key->key, key->len) )
2858 if( !slotptr(set->page, slot)->dead )
2859 slotptr(set->page, slot)->dead = 1;
2865 val = valptr(set->page, slot);
2866 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2867 set->latch->dirty = 1;
2868 set->page->lsn = locks[src].lsn;
2874 // delete an empty master page for a transaction
2876 // note that the far right page never empties because
2877 // it always contains (at least) the infinite stopper key
2878 // and that all pages that don't contain any keys are
2879 // deleted, or are being held under Atomic lock.
2881 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2883 BtPageSet right[1], temp[1];
2884 unsigned char value[BtId];
2888 bt_lockpage(bt, BtLockWrite, prev->latch);
2890 // grab the right sibling
2892 if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), NULL) )
2893 right->page = bt_mappage (bt, right->latch);
2897 bt_lockpage(bt, BtLockAtomic, right->latch);
2898 bt_lockpage(bt, BtLockWrite, right->latch);
2900 // and pull contents over empty page
2901 // while preserving master's left link
2903 memcpy (right->page->left, prev->page->left, BtId);
2904 memcpy (prev->page, right->page, bt->mgr->page_size);
2906 // forward seekers to old right sibling
2907 // to new page location in set
2909 bt_putid (right->page->right, prev->latch->page_no);
2910 right->latch->dirty = 1;
2911 right->page->kill = 1;
2913 // remove pointer to right page for searchers by
2914 // changing right fence key to point to the master page
2916 ptr = keyptr(right->page,right->page->cnt);
2917 bt_putid (value, prev->latch->page_no);
2919 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2922 // now that master page is in good shape we can
2923 // remove its locks.
2925 bt_unlockpage (bt, BtLockAtomic, prev->latch);
2926 bt_unlockpage (bt, BtLockWrite, prev->latch);
2928 // fix master's right sibling's left pointer
2929 // to remove scanner's poiner to the right page
2931 if( right_page_no = bt_getid (prev->page->right) ) {
2932 if( temp->latch = bt_pinlatch (bt, right_page_no, NULL) )
2933 temp->page = bt_mappage (bt, temp->latch);
2935 bt_lockpage (bt, BtLockWrite, temp->latch);
2936 bt_putid (temp->page->left, prev->latch->page_no);
2937 temp->latch->dirty = 1;
2939 bt_unlockpage (bt, BtLockWrite, temp->latch);
2940 bt_unpinlatch (temp->latch);
2941 } else { // master is now the far right page
2942 bt_mutexlock (bt->mgr->lock);
2943 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2944 bt_releasemutex(bt->mgr->lock);
2947 // now that there are no pointers to the right page
2948 // we can delete it after the last read access occurs
2950 bt_unlockpage (bt, BtLockWrite, right->latch);
2951 bt_unlockpage (bt, BtLockAtomic, right->latch);
2952 bt_lockpage (bt, BtLockDelete, right->latch);
2953 bt_lockpage (bt, BtLockWrite, right->latch);
2954 bt_freepage (bt, right);
2958 // atomic modification of a batch of keys.
2960 // return -1 if BTERR is set
2961 // otherwise return slot number
2962 // causing the key constraint violation
2963 // or zero on successful completion.
2965 int bt_atomictxn (BtDb *bt, BtPage source)
2967 uint src, idx, slot, samepage, entry, avail, que = 0;
2968 AtomicKey *head, *tail, *leaf;
2969 BtPageSet set[1], prev[1];
2970 unsigned char value[BtId];
2971 BtKey *key, *ptr, *key2;
2982 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2986 // stable sort the list of keys into order to
2987 // prevent deadlocks between threads.
2989 for( src = 1; src++ < source->cnt; ) {
2990 *temp = *slotptr(source,src);
2991 key = keyptr (source,src);
2993 for( idx = src; --idx; ) {
2994 key2 = keyptr (source,idx);
2995 if( keycmp (key, key2->key, key2->len) < 0 ) {
2996 *slotptr(source,idx+1) = *slotptr(source,idx);
2997 *slotptr(source,idx) = *temp;
3003 // reserve enough buffer pool entries
3005 avail = source->cnt * 3 + bt->mgr->pagezero->alloc->lvl + 1;
3006 bt_availrequest (bt, avail);
3008 // Load the leaf page for each key
3009 // group same page references with reuse bit
3010 // and determine any constraint violations
3012 for( src = 0; src++ < source->cnt; ) {
3013 key = keyptr(source, src);
3016 // first determine if this modification falls
3017 // on the same page as the previous modification
3018 // note that the far right leaf page is a special case
3020 if( samepage = src > 1 )
3021 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
3022 slot = bt_findslot(set->page, key->key, key->len);
3024 bt_unlockpage(bt, BtLockRead, set->latch);
3027 if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) )
3028 set->latch->split = 0;
3032 if( slotptr(set->page, slot)->type == Librarian )
3033 ptr = keyptr(set->page, ++slot);
3035 ptr = keyptr(set->page, slot);
3038 locks[src].entry = set->latch->entry;
3039 locks[src].slot = slot;
3040 locks[src].reuse = 0;
3042 locks[src].entry = 0;
3043 locks[src].slot = 0;
3044 locks[src].reuse = 1;
3047 // capture current lsn for master page
3049 locks[src].reqlsn = set->page->lsn;
3051 // perform constraint checks
3053 switch( slotptr(source, src)->type ) {
3056 if( !slotptr(set->page, slot)->dead )
3057 if( slot < set->page->cnt || bt_getid (set->page->right) )
3058 if( !keycmp (ptr, key->key, key->len) ) {
3060 // return constraint violation if key already exists
3062 bt_unlockpage(bt, BtLockRead, set->latch);
3066 if( locks[src].entry ) {
3067 set->latch = bt->mgr->latchsets + locks[src].entry;
3068 bt_unlockpage(bt, BtLockAtomic, set->latch);
3069 bt_unpinlatch (set->latch);
3080 // unlock last loadpage lock
3083 bt_unlockpage(bt, BtLockRead, set->latch);
3085 // and add entries to redo log
3087 if( bt->mgr->redopages )
3088 if( lsn = bt_txnredo (bt, source) )
3089 for( src = 0; src++ < source->cnt; )
3090 locks[src].lsn = lsn;
3094 // obtain write lock for each master page
3096 for( src = 0; src++ < source->cnt; ) {
3097 if( locks[src].reuse )
3100 bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
3103 // insert or delete each key
3104 // process any splits or merges
3105 // release Write & Atomic latches
3106 // set ParentModifications and build
3107 // queue of keys to insert for split pages
3108 // or delete for deleted pages.
3110 // run through txn list backwards
3112 samepage = source->cnt + 1;
3114 for( src = source->cnt; src; src-- ) {
3115 if( locks[src].reuse )
3118 // perform the txn operations
3119 // from smaller to larger on
3122 for( idx = src; idx < samepage; idx++ )
3123 switch( slotptr(source,idx)->type ) {
3125 if( bt_atomicdelete (bt, source, locks, idx) )
3131 if( bt_atomicinsert (bt, source, locks, idx) )
3136 // after the same page operations have finished,
3137 // process master page for splits or deletion.
3139 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
3140 prev->page = bt_mappage (bt, prev->latch);
3143 // pick-up all splits from master page
3144 // each one is already WriteLocked.
3146 entry = prev->latch->split;
3149 set->latch = bt->mgr->latchsets + entry;
3150 set->page = bt_mappage (bt, set->latch);
3151 entry = set->latch->split;
3153 // delete empty master page by undoing its split
3154 // (this is potentially another empty page)
3155 // note that there are no new left pointers yet
3157 if( !prev->page->act ) {
3158 memcpy (set->page->left, prev->page->left, BtId);
3159 memcpy (prev->page, set->page, bt->mgr->page_size);
3160 bt_lockpage (bt, BtLockDelete, set->latch);
3161 bt_freepage (bt, set);
3163 prev->latch->split = set->latch->split;
3164 prev->latch->dirty = 1;
3168 // remove empty page from the split chain
3169 // and return it to the free list.
3171 if( !set->page->act ) {
3172 memcpy (prev->page->right, set->page->right, BtId);
3173 prev->latch->split = set->latch->split;
3174 bt_lockpage (bt, BtLockDelete, set->latch);
3175 bt_freepage (bt, set);
3179 // schedule prev fence key update
3181 ptr = keyptr(prev->page,prev->page->cnt);
3182 leaf = calloc (sizeof(AtomicKey), 1), que++;
3184 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3185 leaf->page_no = prev->latch->page_no;
3186 leaf->entry = prev->latch->entry;
3196 // splice in the left link into the split page
3198 bt_putid (set->page->left, prev->latch->page_no);
3199 bt_lockpage(bt, BtLockParent, prev->latch);
3200 bt_unlockpage(bt, BtLockWrite, prev->latch);
3204 // update left pointer in next right page from last split page
3205 // (if all splits were reversed, latch->split == 0)
3207 if( latch->split ) {
3208 // fix left pointer in master's original (now split)
3209 // far right sibling or set rightmost page in page zero
3211 if( right = bt_getid (prev->page->right) ) {
3212 if( set->latch = bt_pinlatch (bt, right, NULL) )
3213 set->page = bt_mappage (bt, set->latch);
3217 bt_lockpage (bt, BtLockWrite, set->latch);
3218 bt_putid (set->page->left, prev->latch->page_no);
3219 set->latch->dirty = 1;
3220 bt_unlockpage (bt, BtLockWrite, set->latch);
3221 bt_unpinlatch (set->latch);
3222 } else { // prev is rightmost page
3223 bt_mutexlock (bt->mgr->lock);
3224 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
3225 bt_releasemutex(bt->mgr->lock);
3228 // Process last page split in chain
3230 ptr = keyptr(prev->page,prev->page->cnt);
3231 leaf = calloc (sizeof(AtomicKey), 1), que++;
3233 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3234 leaf->page_no = prev->latch->page_no;
3235 leaf->entry = prev->latch->entry;
3245 bt_lockpage(bt, BtLockParent, prev->latch);
3246 bt_unlockpage(bt, BtLockWrite, prev->latch);
3248 // remove atomic lock on master page
3250 bt_unlockpage(bt, BtLockAtomic, latch);
3254 // finished if prev page occupied (either master or final split)
3256 if( prev->page->act ) {
3257 bt_unlockpage(bt, BtLockWrite, latch);
3258 bt_unlockpage(bt, BtLockAtomic, latch);
3259 bt_unpinlatch(latch);
3263 // any and all splits were reversed, and the
3264 // master page located in prev is empty, delete it
3265 // by pulling over master's right sibling.
3267 // Remove empty master's fence key
3269 ptr = keyptr(prev->page,prev->page->cnt);
3271 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3274 // perform the remainder of the delete
3275 // from the FIFO queue
3277 leaf = calloc (sizeof(AtomicKey), 1), que++;
3279 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3280 leaf->page_no = prev->latch->page_no;
3281 leaf->entry = prev->latch->entry;
3292 // leave atomic lock in place until
3293 // deletion completes in next phase.
3295 bt_unlockpage(bt, BtLockWrite, prev->latch);
3298 bt_availrelease (bt, avail);
3300 que *= bt->mgr->pagezero->alloc->lvl;
3301 bt_availrequest (bt, que);
3303 // add & delete keys for any pages split or merged during transaction
3307 set->latch = bt->mgr->latchsets + leaf->entry;
3308 set->page = bt_mappage (bt, set->latch);
3310 bt_putid (value, leaf->page_no);
3311 ptr = (BtKey *)leaf->leafkey;
3313 switch( leaf->type ) {
3314 case 0: // insert key
3315 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
3320 case 1: // delete key
3321 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3326 case 2: // free page
3327 if( bt_atomicfree (bt, set) )
3333 if( !leaf->nounlock )
3334 bt_unlockpage (bt, BtLockParent, set->latch);
3336 bt_unpinlatch (set->latch);
3339 } while( leaf = tail );
3341 bt_availrelease (bt, que);
3351 // set cursor to highest slot on highest page
3353 uint bt_lastkey (BtDb *bt)
3355 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3358 if( set->latch = bt_pinlatch (bt, page_no, NULL) )
3359 set->page = bt_mappage (bt, set->latch);
3363 bt_lockpage(bt, BtLockRead, set->latch);
3364 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3365 bt_unlockpage(bt, BtLockRead, set->latch);
3366 bt_unpinlatch (set->latch);
3368 bt->cursor_page = page_no;
3369 return bt->cursor->cnt;
3372 // return previous slot on cursor page
3374 uint bt_prevkey (BtDb *bt, uint slot)
3376 uid ourright, next, us = bt->cursor_page;
3382 ourright = bt_getid(bt->cursor->right);
3385 if( !(next = bt_getid(bt->cursor->left)) )
3389 bt->cursor_page = next;
3391 if( set->latch = bt_pinlatch (bt, next, NULL) )
3392 set->page = bt_mappage (bt, set->latch);
3396 bt_lockpage(bt, BtLockRead, set->latch);
3397 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3398 bt_unlockpage(bt, BtLockRead, set->latch);
3399 bt_unpinlatch (set->latch);
3401 next = bt_getid (bt->cursor->right);
3403 if( bt->cursor->kill )
3407 if( next == ourright )
3412 return bt->cursor->cnt;
3415 // return next slot on cursor page
3416 // or slide cursor right into next page
3418 uint bt_nextkey (BtDb *bt, uint slot)
3424 right = bt_getid(bt->cursor->right);
3426 while( slot++ < bt->cursor->cnt )
3427 if( slotptr(bt->cursor,slot)->dead )
3429 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3437 bt->cursor_page = right;
3439 if( set->latch = bt_pinlatch (bt, right, NULL) )
3440 set->page = bt_mappage (bt, set->latch);
3444 bt_lockpage(bt, BtLockRead, set->latch);
3446 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3448 bt_unlockpage(bt, BtLockRead, set->latch);
3449 bt_unpinlatch (set->latch);
3457 // cache page of keys into cursor and return starting slot for given key
3459 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3464 // cache page for retrieval
3466 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
3467 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3471 bt->cursor_page = set->latch->page_no;
3473 bt_unlockpage(bt, BtLockRead, set->latch);
3474 bt_unpinlatch (set->latch);
3478 BtKey *bt_key(BtDb *bt, uint slot)
3480 return keyptr(bt->cursor, slot);
3483 BtVal *bt_val(BtDb *bt, uint slot)
3485 return valptr(bt->cursor,slot);
3491 double getCpuTime(int type)
3494 FILETIME xittime[1];
3495 FILETIME systime[1];
3496 FILETIME usrtime[1];
3497 SYSTEMTIME timeconv[1];
3500 memset (timeconv, 0, sizeof(SYSTEMTIME));
3504 GetSystemTimeAsFileTime (xittime);
3505 FileTimeToSystemTime (xittime, timeconv);
3506 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3509 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3510 FileTimeToSystemTime (usrtime, timeconv);
3513 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3514 FileTimeToSystemTime (systime, timeconv);
3518 ans += (double)timeconv->wHour * 3600;
3519 ans += (double)timeconv->wMinute * 60;
3520 ans += (double)timeconv->wSecond;
3521 ans += (double)timeconv->wMilliseconds / 1000;
3526 #include <sys/resource.h>
3528 double getCpuTime(int type)
3530 struct rusage used[1];
3531 struct timeval tv[1];
3535 gettimeofday(tv, NULL);
3536 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3539 getrusage(RUSAGE_SELF, used);
3540 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3543 getrusage(RUSAGE_SELF, used);
3544 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3551 void bt_poolaudit (BtMgr *mgr)
3556 while( ++entry < mgr->latchtotal ) {
3557 latch = mgr->latchsets + entry;
3559 if( *latch->readwr->rin & MASK )
3560 fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3562 if( *latch->access->rin & MASK )
3563 fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3565 if( *latch->parent->xcl->value )
3566 fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3568 if( *latch->atomic->xcl->value )
3569 fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3571 if( *latch->modify->value )
3572 fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3574 if( latch->pin & ~CLOCK_bit )
3575 fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3587 // standalone program to index file of keys
3588 // then list them onto std-out
3591 void *index_file (void *arg)
3593 uint __stdcall index_file (void *arg)
3596 int line = 0, found = 0, cnt = 0, idx;
3597 uid next, page_no = LEAF_page; // start on first page of leaves
3598 int ch, len = 0, slot, type = 0;
3599 unsigned char key[BT_maxkey];
3600 unsigned char txn[65536];
3601 ThreadArg *args = arg;
3610 bt = bt_open (args->mgr);
3613 if( args->idx < strlen (args->type) )
3614 ch = args->type[args->idx];
3616 ch = args->type[strlen(args->type) - 1];
3628 if( type == Delete )
3629 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3631 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3633 if( type == Delete )
3634 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3636 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3638 if( in = fopen (args->infile, "rb") )
3639 while( ch = getc(in), ch != EOF )
3645 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3646 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3652 memcpy (txn + nxt, key + 10, len - 10);
3654 txn[nxt] = len - 10;
3656 memcpy (txn + nxt, key, 10);
3659 slotptr(page,++cnt)->off = nxt;
3660 slotptr(page,cnt)->type = type;
3663 if( cnt < args->num )
3670 if( bt_atomictxn (bt, page) )
3671 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3676 else if( len < BT_maxkey )
3678 fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
3682 fprintf(stderr, "started indexing for %s\n", args->infile);
3683 if( in = fopen (args->infile, "r") )
3684 while( ch = getc(in), ch != EOF )
3689 if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3690 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3693 else if( len < BT_maxkey )
3695 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3699 fprintf(stderr, "started finding keys for %s\n", args->infile);
3700 if( in = fopen (args->infile, "rb") )
3701 while( ch = getc(in), ch != EOF )
3705 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3708 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->err, errno, bt->line, line), exit(0);
3711 else if( len < BT_maxkey )
3713 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3717 fprintf(stderr, "started scanning\n");
3720 if( set->latch = bt_pinlatch (bt, page_no, NULL) )
3721 set->page = bt_mappage (bt, set->latch);
3723 fprintf(stderr, "unable to obtain latch"), exit(1);
3724 bt_lockpage (bt, BtLockRead, set->latch);
3725 next = bt_getid (set->page->right);
3727 for( slot = 0; slot++ < set->page->cnt; )
3728 if( next || slot < set->page->cnt )
3729 if( !slotptr(set->page, slot)->dead ) {
3730 ptr = keyptr(set->page, slot);
3733 if( slotptr(set->page, slot)->type == Duplicate )
3736 fwrite (ptr->key, len, 1, stdout);
3737 val = valptr(set->page, slot);
3738 fwrite (val->value, val->len, 1, stdout);
3739 fputc ('\n', stdout);
3743 set->latch->avail = 1;
3744 bt_unlockpage (bt, BtLockRead, set->latch);
3745 bt_unpinlatch (set->latch);
3746 } while( page_no = next );
3748 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3752 fprintf(stderr, "started reverse scan\n");
3753 if( slot = bt_lastkey (bt) )
3754 while( slot = bt_prevkey (bt, slot) ) {
3755 if( slotptr(bt->cursor, slot)->dead )
3758 ptr = keyptr(bt->cursor, slot);
3761 if( slotptr(bt->cursor, slot)->type == Duplicate )
3764 fwrite (ptr->key, len, 1, stdout);
3765 val = valptr(bt->cursor, slot);
3766 fwrite (val->value, val->len, 1, stdout);
3767 fputc ('\n', stdout);
3771 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3776 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3778 fprintf(stderr, "started counting\n");
3779 next = LEAF_page + bt->mgr->redopages + 1;
3781 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3782 if( bt_readpage (bt->mgr, bt->frame, page_no) )
3785 if( !bt->frame->free && !bt->frame->lvl )
3786 cnt += bt->frame->act;
3792 cnt--; // remove stopper key
3793 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3805 typedef struct timeval timer;
3807 int main (int argc, char **argv)
3809 int idx, cnt, len, slot, err;
3810 int segsize, bits = 16;
3828 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size recovery_pages src_file1 src_file2 ... ]\n", argv[0]);
3829 fprintf (stderr, " where idx_file is the name of the btree file\n");
3830 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3831 fprintf (stderr, " page_bits is the page size in bits\n");
3832 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
3833 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3834 fprintf (stderr, " recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3835 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3839 start = getCpuTime(0);
3842 bits = atoi(argv[3]);
3845 poolsize = atoi(argv[4]);
3848 fprintf (stderr, "Warning: no mapped_pool\n");
3851 num = atoi(argv[5]);
3854 recovery = atoi(argv[6]);
3858 threads = malloc (cnt * sizeof(pthread_t));
3860 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3862 args = malloc (cnt * sizeof(ThreadArg));
3864 mgr = bt_mgr ((argv[1]), bits, poolsize, recovery);
3867 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3873 for( idx = 0; idx < cnt; idx++ ) {
3874 args[idx].infile = argv[idx + 7];
3875 args[idx].type = argv[2];
3876 args[idx].mgr = mgr;
3877 args[idx].num = num;
3878 args[idx].idx = idx;
3880 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3881 fprintf(stderr, "Error creating thread %d\n", err);
3883 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3887 // wait for termination
3890 for( idx = 0; idx < cnt; idx++ )
3891 pthread_join (threads[idx], NULL);
3893 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3895 for( idx = 0; idx < cnt; idx++ )
3896 CloseHandle(threads[idx]);
3902 elapsed = getCpuTime(0) - start;
3903 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3904 elapsed = getCpuTime(1);
3905 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3906 elapsed = getCpuTime(2);
3907 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);