1 // btree version threadskv10 FUTEX version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // redo log for failure recovery
10 // and dual B-trees for write optimization
14 // author: karl malbrain, malbrain@cal.berkeley.edu
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
20 The orginal author is Karl Malbrain.
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
38 #include <linux/futex.h>
53 #define WIN32_LEAN_AND_MEAN
67 typedef unsigned long long uid;
68 typedef unsigned long long logseqno;
71 typedef unsigned long long off64_t;
72 typedef unsigned short ushort;
73 typedef unsigned int uint;
76 #define BT_ro 0x6f72 // ro
77 #define BT_rw 0x7772 // rw
79 #define BT_maxbits 26 // maximum page size in bits
80 #define BT_minbits 9 // minimum page size in bits
81 #define BT_minpage (1 << BT_minbits) // minimum page size
82 #define BT_maxpage (1 << BT_maxbits) // maximum page size
84 // BTree page number constants
85 #define ALLOC_page 0 // allocation page
86 #define ROOT_page 1 // root of the btree
87 #define LEAF_page 2 // first page of leaves
88 #define REDO_page 3 // first page of redo buffer
90 // Number of levels to create in a new BTree
95 There are six lock types for each node in four independent sets:
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
101 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
113 // definition for phase-fair reader/writer lock implementation
116 volatile ushort rin[1];
117 volatile ushort rout[1];
118 volatile ushort ticket[1];
119 volatile ushort serving[1];
127 volatile uint exclusive[1];
139 // exclusive is set for write access
142 volatile uint exclusive[1];
147 // mode & definition for lite latch implementation
150 QueRd = 1, // reader queue
151 QueWr = 2 // writer queue
154 // hash table entries
157 uint entry; // Latch table entry at head of chain
158 BtMutexLatch latch[1];
161 // latch manager table structure
164 uid page_no; // latch set page number
165 RWLock readwr[1]; // read/write page lock
166 RWLock access[1]; // Access Intent/Page delete
167 WOLock parent[1]; // Posting of fence key in parent
168 WOLock atomic[1]; // Atomic update in progress
169 uint split; // right split page atomic insert
170 uint entry; // entry slot in latch table
171 uint next; // next entry in hash table chain
172 uint prev; // prev entry in hash table chain
173 ushort pin; // number of accessing threads
174 unsigned char dirty; // page in cache is dirty (atomic set)
175 unsigned char avail; // page is an available entry
176 BtMutexLatch modify[1]; // modify entry lite latch
179 // Define the length of the page record numbers
183 // Page key slot definition.
185 // Keys are marked dead, but remain on the page until
186 // it cleanup is called. The fence key (highest key) for
187 // a leaf page is always present, even after cleanup.
191 // In addition to the Unique keys that occupy slots
192 // there are Librarian and Duplicate key
193 // slots occupying the key slot array.
195 // The Librarian slots are dead keys that
196 // serve as filler, available to add new Unique
197 // or Dup slots that are inserted into the B-tree.
199 // The Duplicate slots have had their key bytes extended
200 // by 6 bytes to contain a binary duplicate key uniqueifier.
211 uint off:BT_maxbits; // page offset for key start
212 uint type:3; // type of slot
213 uint dead:1; // set for deleted slot
216 // The key structure occupies space at the upper end of
217 // each page. It's a length byte followed by the key
221 unsigned char len; // this can be changed to a ushort or uint
222 unsigned char key[0];
225 // the value structure also occupies space at the upper
226 // end of the page. Each key is immediately followed by a value.
229 unsigned char len; // this can be changed to a ushort or uint
230 unsigned char value[0];
233 #define BT_maxkey 255 // maximum number of bytes in a key
234 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
236 // The first part of an index page.
237 // It is immediately followed
238 // by the BtSlot array of keys.
240 // note that this structure size
241 // must be a multiple of 8 bytes
242 // in order to place dups correctly.
244 typedef struct BtPage_ {
245 uint cnt; // count of keys in page
246 uint act; // count of active keys
247 uint min; // next key offset
248 uint garbage; // page garbage in bytes
249 unsigned char bits:7; // page size in bits
250 unsigned char free:1; // page is on free chain
251 unsigned char lvl:7; // level of page
252 unsigned char kill:1; // page is being deleted
253 unsigned char right[BtId]; // page number to right
254 unsigned char left[BtId]; // page number to left
255 unsigned char filler[2]; // padding to multiple of 8
256 logseqno lsn; // log sequence number applied
257 uid page_no; // this page number
260 // The loadpage interface object
263 BtPage page; // current page pointer
264 BtLatchSet *latch; // current page latch set
267 // structure for latch manager on ALLOC_page
270 struct BtPage_ alloc[1]; // next page_no in right ptr
271 unsigned long long dups[1]; // global duplicate key uniqueifier
272 unsigned char chain[BtId]; // head of free page_nos chain
275 // The object structure for Btree access
278 uint page_size; // page size
279 uint page_bits; // page size in bits
285 BtPageZero *pagezero; // mapped allocation page
286 BtHashEntry *hashtable; // the buffer pool hash table entries
287 BtLatchSet *latchsets; // mapped latch set from buffer pool
288 unsigned char *pagepool; // mapped to the buffer pool pages
289 unsigned char *redobuff; // mapped recovery buffer pointer
290 logseqno lsn, flushlsn; // current & first lsn flushed
291 BtMutexLatch dump[1]; // redo dump lite latch
292 BtMutexLatch redo[1]; // redo area lite latch
293 BtMutexLatch lock[1]; // allocation area lite latch
294 ushort thread_no[1]; // next thread number
295 uint nlatchpage; // number of latch pages at BT_latch
296 uint latchtotal; // number of page latch entries
297 uint latchhash; // number of latch hash table slots
298 uint latchvictim; // next latch entry to examine
299 uint latchavail; // next available latch entry
300 uint availlock[1]; // latch available commitments
301 uint available; // number of available latches
302 uint redopages; // size of recovery buff in pages
303 uint redoend; // eof/end element in recovery buff
304 int err; // last error
305 int line; // last error line no
306 int found; // number of keys found by delete
307 int reads, writes; // number of reads and writes
309 HANDLE halloc; // allocation handle
310 HANDLE hpool; // buffer pool handle
315 BtMgr *mgr; // buffer manager for entire process
316 BtMgr *main; // buffer manager for main btree
317 BtPage cursor; // cached page frame for start/next
318 ushort thread_no; // thread number
319 unsigned char key[BT_keyarray]; // last found complete key
322 // Catastrophic errors
337 #define CLOCK_bit 0x8000
339 // recovery manager entry types
342 BTRM_eof = 0, // rest of buffer is emtpy
343 BTRM_add, // add a unique key-value to btree
344 BTRM_dup, // add a duplicate key-value to btree
345 BTRM_del, // delete a key-value from btree
346 BTRM_upd, // update a key with a new value
347 BTRM_new, // allocate a new empty page
348 BTRM_old // reuse an old empty page
351 // recovery manager entry
352 // structure followed by BtKey & BtVal
355 logseqno reqlsn; // log sequence number required
356 logseqno lsn; // log sequence number for entry
357 uint len; // length of entry
358 unsigned char type; // type of entry
359 unsigned char lvl; // level of btree entry pertains to
364 extern void bt_close (BtDb *bt);
365 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
366 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
367 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
368 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
369 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
370 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update, ushort thread_no);
371 extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
373 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
375 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
376 extern uint bt_nextkey (BtDb *bt, uint slot);
377 extern uint bt_prevkey (BtDb *db, uint slot);
378 extern uint bt_lastkey (BtDb *db);
381 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
382 extern void bt_mgrclose (BtMgr *mgr);
383 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
384 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
386 // The page is allocated from low and hi ends.
387 // The key slots are allocated from the bottom,
388 // while the text and value of the key
389 // are allocated from the top. When the two
390 // areas meet, the page is split into two.
392 // A key consists of a length byte, two bytes of
393 // index number (0 - 65535), and up to 253 bytes
396 // Associated with each key is a value byte string
397 // containing any value desired.
399 // The b-tree root is always located at page 1.
400 // The first leaf page of level zero is always
401 // located on page 2.
403 // The b-tree pages are linked with next
404 // pointers to facilitate enumerators,
405 // and provide for concurrency.
407 // When to root page fills, it is split in two and
408 // the tree height is raised by a new root at page
409 // one with two keys.
411 // Deleted keys are marked with a dead bit until
412 // page cleanup. The fence key for a leaf node is
415 // To achieve maximum concurrency one page is locked at a time
416 // as the tree is traversed to find leaf key in question. The right
417 // page numbers are used in cases where the page is being split,
420 // Page 0 is dedicated to lock for new page extensions,
421 // and chains empty pages together for reuse. It also
422 // contains the latch manager hash table.
424 // The ParentModification lock on a node is obtained to serialize posting
425 // or changing the fence key for a node.
427 // Empty pages are chained together through the ALLOC page and reused.
429 // Access macros to address slot and key values from the page
430 // Page slots use 1 based indexing.
432 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
433 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
434 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
436 void bt_putid(unsigned char *dest, uid id)
441 dest[i] = (unsigned char)id, id >>= 8;
444 uid bt_getid(unsigned char *src)
449 for( i = 0; i < BtId; i++ )
450 id <<= 8, id |= *src++;
455 uid bt_newdup (BtMgr *mgr)
458 return __sync_fetch_and_add (mgr->pagezero->dups, 1) + 1;
460 return _InterlockedIncrement64(mgr->pagezero->dups, 1);
464 // Write-Only Queue Lock
466 void WriteOLock (WOLock *lock, ushort tid)
470 if( lock->tid == tid ) {
477 prev = __sync_fetch_and_or (lock->exclusive, 1);
479 prev = _InterlockedExchangeOr (lock->exclusive, 1);
481 if( !(prev & XCL) ) {
486 sys_futex( (void *)lock->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
493 void WriteORelease (WOLock *lock)
500 *lock->exclusive = 0;
503 sys_futex( (void *)lock->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
507 // Phase-Fair reader/writer lock implementation
509 void WriteLock (RWLock *lock, ushort tid)
513 if( lock->tid == tid ) {
518 tix = __sync_fetch_and_add (lock->ticket, 1);
520 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
522 // wait for our ticket to come up
524 while( tix != lock->serving[0] )
531 w = PRES | (tix & PHID);
533 r = __sync_fetch_and_add (lock->rin, w);
535 r = _InterlockedExchangeAdd16 (lock->rin, w);
537 while( r != *lock->rout )
546 void WriteRelease (RWLock *lock)
555 __sync_fetch_and_and (lock->rin, ~MASK);
557 _InterlockedAnd16 (lock->rin, ~MASK);
562 // try to obtain read lock
563 // return 1 if successful
565 int ReadTry (RWLock *lock, ushort tid)
569 // OK if write lock already held by same thread
571 if( lock->tid == tid ) {
576 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
578 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
581 if( w == (*lock->rin & MASK) ) {
583 __sync_fetch_and_add (lock->rin, -RINC);
585 _InterlockedExchangeAdd16 (lock->rin, -RINC);
593 void ReadLock (RWLock *lock, ushort tid)
596 if( lock->tid == tid ) {
601 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
603 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
606 while( w == (*lock->rin & MASK) )
614 void ReadRelease (RWLock *lock)
622 __sync_fetch_and_add (lock->rout, RINC);
624 _InterlockedExchangeAdd16 (lock->rout, RINC);
628 // lite weight spin lock Latch Manager
630 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
632 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
635 void bt_mutexlock(BtMutexLatch *latch)
641 prev = __sync_fetch_and_or(latch->exclusive, XCL);
643 prev = _InterlockedOr(latch->exclusive, XCL);
648 sys_futex( (void *)latch->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
655 // try to obtain write lock
657 // return 1 if obtained,
660 int bt_mutextry(BtMutexLatch *latch)
665 prev = __sync_fetch_and_or(latch->exclusive, XCL);
667 prev = _InterlockedOr(latch->exclusive, XCL);
669 // take write access if exclusive bit is clear
671 return !(prev & XCL);
676 void bt_releasemutex(BtMutexLatch *latch)
678 *latch->exclusive = 0;
680 sys_futex( (void *)latch->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
684 // recovery manager -- flush dirty pages
686 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
688 uint cnt3 = 0, cnt2 = 0, cnt = 0;
693 // flush dirty pool pages to the btree
695 fprintf(stderr, "Start flushlsn\n");
696 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
697 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
698 latch = mgr->latchsets + entry;
699 bt_mutexlock (latch->modify);
700 bt_lockpage(BtLockRead, latch, thread_no);
703 bt_writepage(mgr, page, latch->page_no);
704 latch->dirty = 0, cnt++;
708 if( latch->pin & ~CLOCK_bit )
710 bt_unlockpage(BtLockRead, latch);
711 bt_releasemutex (latch->modify);
713 fprintf(stderr, "End flushlsn %d pages %d pinned %d available\n", cnt, cnt2, cnt3);
716 // recovery manager -- process current recovery buff on startup
717 // this won't do much if previous session was properly closed.
719 BTERR bt_recoveryredo (BtMgr *mgr)
726 pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
728 hdr = (BtLogHdr *)mgr->redobuff;
729 mgr->flushlsn = hdr->lsn;
732 hdr = (BtLogHdr *)(mgr->redobuff + offset);
733 switch( hdr->type ) {
737 case BTRM_add: // add a unique key-value to btree
739 case BTRM_dup: // add a duplicate key-value to btree
740 case BTRM_del: // delete a key-value from btree
741 case BTRM_upd: // update a key with a new value
742 case BTRM_new: // allocate a new empty page
743 case BTRM_old: // reuse an old empty page
749 // recovery manager -- dump current recovery buff & flush dirty pages
750 // in preparation for next recovery buffer.
752 BTERR bt_dumpredo (BtMgr *mgr)
755 fprintf(stderr, "Flush pages ");
757 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
758 memset (eof, 0, sizeof(BtLogHdr));
760 // flush pages written at beginning of this redo buffer
761 // then write the redo buffer out to disk
763 fdatasync (mgr->idx);
765 fprintf(stderr, "Dump ReDo: %d bytes\n", mgr->redoend);
766 pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
768 sync_file_range (mgr->idx, REDO_page << mgr->page_bits, mgr->redoend + sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
770 mgr->flushlsn = mgr->lsn;
773 eof = (BtLogHdr *)(mgr->redobuff);
774 memset (eof, 0, sizeof(BtLogHdr));
779 // recovery manager -- append new entry to recovery log
780 // flush to disk when it overflows.
782 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
784 uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
785 uint amt = sizeof(BtLogHdr);
789 bt_mutexlock (mgr->redo);
792 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
794 // see if new entry fits in buffer
795 // flush and reset if it doesn't
797 if( flush = amt > size - mgr->redoend ) {
798 bt_mutexlock (mgr->dump);
800 if( bt_dumpredo (mgr) )
804 // fill in new entry & either eof or end block
806 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
811 hdr->lsn = ++mgr->lsn;
815 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
816 memset (eof, 0, sizeof(BtLogHdr));
818 // fill in key and value
821 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
822 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
825 bt_releasemutex(mgr->redo);
828 bt_flushlsn (mgr, thread_no);
829 bt_releasemutex(mgr->dump);
835 // recovery manager -- append transaction to recovery log
836 // flush to disk when it overflows.
838 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
840 uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
841 uint amt = 0, src, type;
848 // determine amount of redo recovery log space required
850 for( src = 0; src++ < source->cnt; ) {
851 key = keyptr(source,src);
852 val = valptr(source,src);
853 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
854 amt += sizeof(BtLogHdr);
857 bt_mutexlock (mgr->redo);
859 // see if new entry fits in buffer
860 // flush and reset if it doesn't
862 if( flush = amt > size - mgr->redoend ) {
863 bt_mutexlock (mgr->dump);
865 if( bt_dumpredo (mgr) )
869 // assign new lsn to transaction
873 // fill in new entries
875 for( src = 0; src++ < source->cnt; ) {
876 key = keyptr(source, src);
877 val = valptr(source, src);
879 switch( slotptr(source, src)->type ) {
894 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
895 amt += sizeof(BtLogHdr);
897 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
903 // fill in key and value
905 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
906 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
911 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
912 memset (eof, 0, sizeof(BtLogHdr));
914 bt_releasemutex(mgr->redo);
917 bt_flushlsn (mgr, thread_no);
918 bt_releasemutex(mgr->dump);
924 // read page into buffer pool from permanent location in Btree file
926 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
928 off64_t off = page_no << mgr->page_bits;
931 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
932 fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno);
933 return mgr->err = BTERR_read;
939 memset (ovl, 0, sizeof(OVERLAPPED));
941 ovl->OffsetHigh = off >> 32;
943 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
944 fprintf (stderr, "Unable to read page %d GetLastError = %d\n", page_no, GetLastError());
947 if( *amt < mgr->page_size ) {
948 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
956 // write page to permanent location in Btree file
957 // clear the dirty bit
959 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
961 off64_t off = page_no << mgr->page_bits;
963 page->page_no = page_no;
966 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
972 memset (ovl, 0, sizeof(OVERLAPPED));
974 ovl->OffsetHigh = off >> 32;
976 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
979 if( *amt < mgr->page_size )
986 // set CLOCK bit in latch
987 // decrement pin count
989 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
991 bt_mutexlock(latch->modify);
992 latch->pin |= CLOCK_bit;
995 // if not doing redo recovery
996 // set latch available
999 if( !(latch->pin & ~CLOCK_bit) ) {
1002 __sync_fetch_and_add (&mgr->available, 1);
1004 _InterlockedIncrement(&mgr->available);
1008 bt_releasemutex(latch->modify);
1011 // return the btree cached page address
1012 // if page is dirty and has not yet been
1013 // flushed to disk for the current redo
1014 // recovery buffer, write it out.
1016 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
1018 BtPage page = (BtPage)(((uid)latch->entry << mgr->page_bits) + mgr->pagepool);
1023 // return next available latch entry
1024 // and with latch entry locked
1026 uint bt_availnext (BtMgr *mgr, ushort thread_id)
1033 // flush dirty pages if none are available
1034 // and we aren't doing redo recovery
1036 if( !mgr->redopages )
1037 if( !mgr->available )
1038 bt_flushlsn (mgr, thread_id);
1041 entry = __sync_fetch_and_add (&mgr->latchavail, 1) + 1;
1043 entry = _InterlockedIncrement (&mgr->latchavail);
1045 entry %= mgr->latchtotal;
1050 latch = mgr->latchsets + entry;
1055 bt_mutexlock(latch->modify);
1057 if( !latch->avail ) {
1058 bt_releasemutex(latch->modify);
1066 // find available latchset
1067 // return with latchset pinned
1069 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage loadit, ushort thread_id)
1071 uint hashidx = page_no % mgr->latchhash;
1076 // try to find our entry
1078 bt_mutexlock(mgr->hashtable[hashidx].latch);
1080 if( entry = mgr->hashtable[hashidx].entry ) do
1082 latch = mgr->latchsets + entry;
1083 if( page_no == latch->page_no )
1085 } while( entry = latch->next );
1087 // found our entry: increment pin
1088 // remove from available status
1091 latch = mgr->latchsets + entry;
1092 bt_mutexlock(latch->modify);
1095 __sync_fetch_and_add (&mgr->available, -1);
1097 _InterlockedDecrement(&mgr->available);
1100 latch->pin |= CLOCK_bit;
1103 bt_releasemutex(latch->modify);
1104 bt_releasemutex(mgr->hashtable[hashidx].latch);
1108 // find and reuse entry from available set
1112 if( entry = bt_availnext (mgr, thread_id) )
1113 latch = mgr->latchsets + entry;
1115 return mgr->line = __LINE__, mgr->err = BTERR_avail, NULL;
1117 idx = latch->page_no % mgr->latchhash;
1119 // if latch is on a different hash chain
1120 // unlink from the old page_no chain
1122 if( latch->page_no )
1123 if( idx != hashidx ) {
1125 // skip over this entry if latch not available
1127 if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1128 bt_releasemutex(latch->modify);
1133 mgr->latchsets[latch->prev].next = latch->next;
1135 mgr->hashtable[idx].entry = latch->next;
1138 mgr->latchsets[latch->next].prev = latch->prev;
1140 bt_releasemutex (mgr->hashtable[idx].latch);
1143 // remove available status
1147 __sync_fetch_and_add (&mgr->available, -1);
1149 _InterlockedDecrement(&mgr->available);
1151 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1154 if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
1155 return mgr->line = __LINE__, NULL;
1160 memcpy (page, loadit, mgr->page_size);
1163 if( bt_readpage (mgr, page, page_no) )
1164 return mgr->line = __LINE__, NULL;
1166 // link page as head of hash table chain
1167 // if this is a never before used entry,
1168 // or it was previously on a different
1169 // hash table chain. Otherwise, just
1170 // leave it in its current hash table
1173 if( !latch->page_no || hashidx != idx ) {
1174 if( latch->next = mgr->hashtable[hashidx].entry )
1175 mgr->latchsets[latch->next].prev = entry;
1177 mgr->hashtable[hashidx].entry = entry;
1181 // fill in latch structure
1183 latch->pin = CLOCK_bit | 1;
1184 latch->page_no = page_no;
1185 latch->entry = entry;
1188 bt_releasemutex (latch->modify);
1189 bt_releasemutex (mgr->hashtable[hashidx].latch);
1193 void bt_mgrclose (BtMgr *mgr)
1201 // flush previously written dirty pages
1202 // and write recovery buffer to disk
1204 fdatasync (mgr->idx);
1206 if( mgr->redoend ) {
1207 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1208 memset (eof, 0, sizeof(BtLogHdr));
1210 pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1213 // write remaining dirty pool pages to the btree
1215 for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1216 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1217 latch = mgr->latchsets + slot;
1219 if( latch->dirty ) {
1220 bt_writepage(mgr, page, latch->page_no);
1221 latch->dirty = 0, num++;
1225 // flush last batch to disk and clear
1226 // redo recovery buffer on disk.
1228 fdatasync (mgr->idx);
1230 if( mgr->redopages ) {
1231 eof = (BtLogHdr *)mgr->redobuff;
1232 memset (eof, 0, sizeof(BtLogHdr));
1233 eof->lsn = mgr->lsn;
1235 pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1237 sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
1240 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1243 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1244 munmap (mgr->pagezero, mgr->page_size);
1246 FlushViewOfFile(mgr->pagezero, 0);
1247 UnmapViewOfFile(mgr->pagezero);
1248 UnmapViewOfFile(mgr->pagepool);
1249 CloseHandle(mgr->halloc);
1250 CloseHandle(mgr->hpool);
1253 free (mgr->redobuff);
1257 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1258 FlushFileBuffers(mgr->idx);
1259 CloseHandle(mgr->idx);
1264 // close and release memory
1266 void bt_close (BtDb *bt)
1273 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1278 // open/create new btree buffer manager
1280 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1281 // size of page pool (e.g. 262144)
1283 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1285 uint lvl, attr, last, slot, idx;
1286 uint nlatchpage, latchhash;
1287 unsigned char value[BtId];
1288 int flag, initit = 0;
1289 BtPageZero *pagezero;
1297 // determine sanity of page size and buffer pool
1299 if( bits > BT_maxbits )
1301 else if( bits < BT_minbits )
1304 mgr = calloc (1, sizeof(BtMgr));
1306 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1308 if( mgr->idx == -1 ) {
1309 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1310 return free(mgr), NULL;
1313 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1314 attr = FILE_ATTRIBUTE_NORMAL;
1315 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1317 if( mgr->idx == INVALID_HANDLE_VALUE ) {
1318 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1319 return GlobalFree(mgr), NULL;
1324 pagezero = valloc (BT_maxpage);
1327 // read minimum page size to get root info
1328 // to support raw disk partition files
1329 // check if bits == 0 on the disk.
1331 if( size = lseek (mgr->idx, 0L, 2) )
1332 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1333 if( pagezero->alloc->bits )
1334 bits = pagezero->alloc->bits;
1338 return free(mgr), free(pagezero), NULL;
1342 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1343 size = GetFileSize(mgr->idx, amt);
1345 if( size || *amt ) {
1346 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1347 return bt_mgrclose (mgr), NULL;
1348 bits = pagezero->alloc->bits;
1353 mgr->page_size = 1 << bits;
1354 mgr->page_bits = bits;
1356 // calculate number of latch hash table entries
1358 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1360 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1361 mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1362 mgr->latchtotal = nodemax;
1367 // initialize an empty b-tree with latch page, root page, page of leaves
1368 // and page(s) of latches and page pool cache
1370 memset (pagezero, 0, 1 << bits);
1371 pagezero->alloc->bits = mgr->page_bits;
1372 bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
1374 // initialize left-most LEAF page in
1377 bt_putid (pagezero->alloc->left, LEAF_page);
1379 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1380 fprintf (stderr, "Unable to create btree page zero\n");
1381 return bt_mgrclose (mgr), NULL;
1384 memset (pagezero, 0, 1 << bits);
1385 pagezero->alloc->bits = mgr->page_bits;
1387 for( lvl=MIN_lvl; lvl--; ) {
1388 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1389 key = keyptr(pagezero->alloc, 1);
1390 key->len = 2; // create stopper key
1394 bt_putid(value, MIN_lvl - lvl + 1);
1395 val = valptr(pagezero->alloc, 1);
1396 val->len = lvl ? BtId : 0;
1397 memcpy (val->value, value, val->len);
1399 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1400 pagezero->alloc->lvl = lvl;
1401 pagezero->alloc->cnt = 1;
1402 pagezero->alloc->act = 1;
1404 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1405 fprintf (stderr, "Unable to create btree page zero\n");
1406 return bt_mgrclose (mgr), NULL;
1414 VirtualFree (pagezero, 0, MEM_RELEASE);
1417 // mlock the pagezero page
1419 flag = PROT_READ | PROT_WRITE;
1420 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1421 if( mgr->pagezero == MAP_FAILED ) {
1422 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1423 return bt_mgrclose (mgr), NULL;
1425 mlock (mgr->pagezero, mgr->page_size);
1427 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1428 if( mgr->pagepool == MAP_FAILED ) {
1429 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1430 return bt_mgrclose (mgr), NULL;
1432 if( mgr->redopages = redopages )
1433 mgr->redobuff = valloc (redopages * mgr->page_size);
1435 flag = PAGE_READWRITE;
1436 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1437 if( !mgr->halloc ) {
1438 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1439 return bt_mgrclose (mgr), NULL;
1442 flag = FILE_MAP_WRITE;
1443 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1444 if( !mgr->pagezero ) {
1445 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1446 return bt_mgrclose (mgr), NULL;
1449 flag = PAGE_READWRITE;
1450 size = (uid)mgr->nlatchpage << mgr->page_bits;
1451 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1453 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1454 return bt_mgrclose (mgr), NULL;
1457 flag = FILE_MAP_WRITE;
1458 mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1459 if( !mgr->pagepool ) {
1460 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1461 return bt_mgrclose (mgr), NULL;
1463 if( mgr->redopages = redopages )
1464 mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
1467 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1468 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1469 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1471 // mark all pool entries as available
1473 for( idx = 1; idx < mgr->latchtotal; idx++ ) {
1474 latch = mgr->latchsets + idx;
1482 // open BTree access method
1483 // based on buffer manager
1485 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1487 BtDb *bt = malloc (sizeof(*bt));
1489 memset (bt, 0, sizeof(*bt));
1493 bt->cursor = valloc (mgr->page_size);
1495 bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1498 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1500 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1505 // compare two keys, return > 0, = 0, or < 0
1506 // =0: keys are same
1509 // as the comparison value
1511 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1513 uint len1 = key1->len;
1516 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1527 // place write, read, or parent lock on requested page_no.
1529 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1533 ReadLock (latch->readwr, thread_no);
1536 WriteLock (latch->readwr, thread_no);
1539 ReadLock (latch->access, thread_no);
1542 WriteLock (latch->access, thread_no);
1545 WriteOLock (latch->parent, thread_no);
1548 WriteOLock (latch->atomic, thread_no);
1550 case BtLockAtomic | BtLockRead:
1551 WriteOLock (latch->atomic, thread_no);
1552 ReadLock (latch->readwr, thread_no);
1557 // remove write, read, or parent lock on requested page
1559 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1563 ReadRelease (latch->readwr);
1566 WriteRelease (latch->readwr);
1569 ReadRelease (latch->access);
1572 WriteRelease (latch->access);
1575 WriteORelease (latch->parent);
1578 WriteORelease (latch->atomic);
1580 case BtLockAtomic | BtLockRead:
1581 WriteORelease (latch->atomic);
1582 ReadRelease (latch->readwr);
1587 // allocate a new page
1588 // return with page latched, but unlocked.
1590 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1595 // lock allocation page
1597 bt_mutexlock(mgr->lock);
1599 // use empty chain first
1600 // else allocate empty page
1602 if( page_no = bt_getid(mgr->pagezero->chain) ) {
1603 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1604 set->page = bt_mappage (mgr, set->latch);
1606 return mgr->err = BTERR_struct, mgr->line = __LINE__, -1;
1608 bt_putid(mgr->pagezero->chain, bt_getid(set->page->right));
1609 bt_releasemutex(mgr->lock);
1611 memcpy (set->page, contents, mgr->page_size);
1612 set->latch->dirty = 1;
1616 page_no = bt_getid(mgr->pagezero->alloc->right);
1617 bt_putid(mgr->pagezero->alloc->right, page_no+1);
1619 // unlock allocation latch and
1620 // extend file into new page.
1622 bt_releasemutex(mgr->lock);
1624 // don't load cache from btree page
1626 if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1627 set->page = bt_mappage (mgr, set->latch);
1631 set->latch->dirty = 1;
1635 // find slot in page for given key at a given level
1637 int bt_findslot (BtPage page, unsigned char *key, uint len)
1639 uint diff, higher = page->cnt, low = 1, slot;
1642 // make stopper key an infinite fence value
1644 if( bt_getid (page->right) )
1649 // low is the lowest candidate.
1650 // loop ends when they meet
1652 // higher is already
1653 // tested as .ge. the passed key.
1655 while( diff = higher - low ) {
1656 slot = low + ( diff >> 1 );
1657 if( keycmp (keyptr(page, slot), key, len) < 0 )
1660 higher = slot, good++;
1663 // return zero if key is on right link page
1665 return good ? higher : 0;
1668 // find and load page at given level for given key
1669 // leave page rd or wr locked as requested
1671 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1673 uid page_no = ROOT_page, prevpage = 0;
1674 uint drill = 0xff, slot;
1675 BtLatchSet *prevlatch;
1676 uint mode, prevmode;
1679 // start at root of btree and drill down
1682 // determine lock mode of drill level
1683 mode = (drill == lvl) ? lock : BtLockRead;
1685 if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1688 // obtain access lock using lock chaining with Access mode
1690 if( page_no > ROOT_page )
1691 bt_lockpage(BtLockAccess, set->latch, thread_no);
1693 set->page = bt_mappage (mgr, set->latch);
1695 // release & unpin parent or left sibling page
1698 bt_unlockpage(prevmode, prevlatch);
1699 bt_unpinlatch (mgr, prevlatch);
1703 // obtain mode lock using lock chaining through AccessLock
1705 bt_lockpage(mode, set->latch, thread_no);
1707 if( set->page->free )
1708 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1710 if( page_no > ROOT_page )
1711 bt_unlockpage(BtLockAccess, set->latch);
1713 // re-read and re-lock root after determining actual level of root
1715 if( set->page->lvl != drill) {
1716 if( set->latch->page_no != ROOT_page )
1717 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1719 drill = set->page->lvl;
1721 if( lock != BtLockRead && drill == lvl ) {
1722 bt_unlockpage(mode, set->latch);
1723 bt_unpinlatch (mgr, set->latch);
1728 prevpage = set->latch->page_no;
1729 prevlatch = set->latch;
1732 // find key on page at this level
1733 // and descend to requested level
1735 if( !set->page->kill )
1736 if( slot = bt_findslot (set->page, key, len) ) {
1740 // find next non-dead slot -- the fence key if nothing else
1742 while( slotptr(set->page, slot)->dead )
1743 if( slot++ < set->page->cnt )
1746 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1748 val = valptr(set->page, slot);
1750 if( val->len == BtId )
1751 page_no = bt_getid(valptr(set->page, slot)->value);
1753 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1759 // slide right into next page
1761 page_no = bt_getid(set->page->right);
1764 // return error on end of right chain
1766 mgr->line = __LINE__, mgr->err = BTERR_struct;
1767 return 0; // return error
1770 // return page to free list
1771 // page must be delete & write locked
1773 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1775 // lock allocation page
1777 bt_mutexlock (mgr->lock);
1781 memcpy(set->page->right, mgr->pagezero->chain, BtId);
1782 bt_putid(mgr->pagezero->chain, set->latch->page_no);
1783 set->latch->dirty = 1;
1784 set->page->free = 1;
1786 // unlock released page
1788 bt_unlockpage (BtLockDelete, set->latch);
1789 bt_unlockpage (BtLockWrite, set->latch);
1790 bt_unpinlatch (mgr, set->latch);
1792 // unlock allocation page
1794 bt_releasemutex (mgr->lock);
1797 // a fence key was deleted from a page
1798 // push new fence value upwards
1800 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1802 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1803 unsigned char value[BtId];
1807 // remove the old fence value
1809 ptr = keyptr(set->page, set->page->cnt);
1810 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1811 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1812 set->latch->dirty = 1;
1814 // cache new fence value
1816 ptr = keyptr(set->page, set->page->cnt);
1817 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1819 bt_lockpage (BtLockParent, set->latch, thread_no);
1820 bt_unlockpage (BtLockWrite, set->latch);
1822 // insert new (now smaller) fence key
1824 bt_putid (value, set->latch->page_no);
1825 ptr = (BtKey*)leftkey;
1827 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) )
1830 // now delete old fence key
1832 ptr = (BtKey*)rightkey;
1834 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1837 bt_unlockpage (BtLockParent, set->latch);
1838 bt_unpinlatch(mgr, set->latch);
1842 // root has a single child
1843 // collapse a level from the tree
1845 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1852 // find the child entry and promote as new root contents
1855 for( idx = 0; idx++ < root->page->cnt; )
1856 if( !slotptr(root->page, idx)->dead )
1859 val = valptr(root->page, idx);
1861 if( val->len == BtId )
1862 page_no = bt_getid (valptr(root->page, idx)->value);
1864 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1866 if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1867 child->page = bt_mappage (mgr, child->latch);
1871 bt_lockpage (BtLockDelete, child->latch, thread_no);
1872 bt_lockpage (BtLockWrite, child->latch, thread_no);
1874 memcpy (root->page, child->page, mgr->page_size);
1875 root->latch->dirty = 1;
1877 bt_freepage (mgr, child);
1879 } while( root->page->lvl > 1 && root->page->act == 1 );
1881 bt_unlockpage (BtLockWrite, root->latch);
1882 bt_unpinlatch (mgr, root->latch);
1886 // delete a page and manage keys
1887 // call with page writelocked
1888 // returns with page unpinned
1890 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1892 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1893 unsigned char value[BtId];
1894 uint lvl = set->page->lvl;
1899 // cache copy of fence key
1900 // to post in parent
1902 ptr = keyptr(set->page, set->page->cnt);
1903 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1905 // obtain lock on right page
1907 page_no = bt_getid(set->page->right);
1909 if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1910 right->page = bt_mappage (mgr, right->latch);
1914 bt_lockpage (BtLockWrite, right->latch, thread_no);
1916 // cache copy of key to update
1918 ptr = keyptr(right->page, right->page->cnt);
1919 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1921 if( right->page->kill )
1922 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1924 // pull contents of right peer into our empty page
1926 memcpy (set->page, right->page, mgr->page_size);
1927 set->latch->dirty = 1;
1929 // mark right page deleted and point it to left page
1930 // until we can post parent updates that remove access
1931 // to the deleted page.
1933 bt_putid (right->page->right, set->latch->page_no);
1934 right->latch->dirty = 1;
1935 right->page->kill = 1;
1937 bt_lockpage (BtLockParent, right->latch, thread_no);
1938 bt_unlockpage (BtLockWrite, right->latch);
1940 bt_lockpage (BtLockParent, set->latch, thread_no);
1941 bt_unlockpage (BtLockWrite, set->latch);
1943 // redirect higher key directly to our new node contents
1945 bt_putid (value, set->latch->page_no);
1946 ptr = (BtKey*)higherfence;
1948 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) )
1951 // delete old lower key to our node
1953 ptr = (BtKey*)lowerfence;
1955 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1958 // obtain delete and write locks to right node
1960 bt_unlockpage (BtLockParent, right->latch);
1961 bt_lockpage (BtLockDelete, right->latch, thread_no);
1962 bt_lockpage (BtLockWrite, right->latch, thread_no);
1963 bt_freepage (mgr, right);
1965 bt_unlockpage (BtLockParent, set->latch);
1966 bt_unpinlatch (mgr, set->latch);
1970 // find and delete key on page by marking delete flag bit
1971 // if page becomes empty, delete it from the btree
1973 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
1975 uint slot, idx, found, fence;
1980 if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) )
1981 ptr = keyptr(set->page, slot);
1985 // if librarian slot, advance to real slot
1987 if( slotptr(set->page, slot)->type == Librarian )
1988 ptr = keyptr(set->page, ++slot);
1990 fence = slot == set->page->cnt;
1992 // if key is found delete it, otherwise ignore request
1994 if( found = !keycmp (ptr, key, len) )
1995 if( found = slotptr(set->page, slot)->dead == 0 ) {
1996 val = valptr(set->page,slot);
1997 slotptr(set->page, slot)->dead = 1;
1998 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2001 // collapse empty slots beneath the fence
2003 while( idx = set->page->cnt - 1 )
2004 if( slotptr(set->page, idx)->dead ) {
2005 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2006 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2011 // did we delete a fence key in an upper level?
2013 if( found && lvl && set->page->act && fence )
2014 if( bt_fixfence (mgr, set, lvl, thread_no) )
2019 // do we need to collapse root?
2021 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
2022 if( bt_collapseroot (mgr, set, thread_no) )
2027 // delete empty page
2029 if( !set->page->act )
2030 return bt_deletepage (mgr, set, thread_no);
2032 set->latch->dirty = 1;
2033 bt_unlockpage(BtLockWrite, set->latch);
2034 bt_unpinlatch (mgr, set->latch);
2038 // advance to next slot
2040 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2042 BtLatchSet *prevlatch;
2045 if( slot < set->page->cnt )
2048 prevlatch = set->latch;
2050 if( page_no = bt_getid(set->page->right) )
2051 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2052 set->page = bt_mappage (bt->mgr, set->latch);
2056 return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2058 // obtain access lock using lock chaining with Access mode
2060 bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2062 bt_unlockpage(BtLockRead, prevlatch);
2063 bt_unpinlatch (bt->mgr, prevlatch);
2065 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2066 bt_unlockpage(BtLockAccess, set->latch);
2070 // find unique key == given key, or first duplicate key in
2071 // leaf level and return number of value bytes
2072 // or (-1) if not found.
2074 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2082 if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2084 ptr = keyptr(set->page, slot);
2086 // skip librarian slot place holder
2088 if( slotptr(set->page, slot)->type == Librarian )
2089 ptr = keyptr(set->page, ++slot);
2091 // return actual key found
2093 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2096 if( slotptr(set->page, slot)->type == Duplicate )
2099 // not there if we reach the stopper key
2101 if( slot == set->page->cnt )
2102 if( !bt_getid (set->page->right) )
2105 // if key exists, return >= 0 value bytes copied
2106 // otherwise return (-1)
2108 if( slotptr(set->page, slot)->dead )
2112 if( !memcmp (ptr->key, key, len) ) {
2113 val = valptr (set->page,slot);
2114 if( valmax > val->len )
2116 memcpy (value, val->value, valmax);
2122 } while( slot = bt_findnext (bt, set, slot) );
2124 bt_unlockpage (BtLockRead, set->latch);
2125 bt_unpinlatch (bt->mgr, set->latch);
2129 // check page for space available,
2130 // clean if necessary and return
2131 // 0 - page needs splitting
2132 // >0 new slot value
2134 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2136 BtPage page = set->page, frame;
2137 uint nxt = mgr->page_size;
2138 uint cnt = 0, idx = 0;
2139 uint max = page->cnt;
2144 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2147 // skip cleanup and proceed to split
2148 // if there's not enough garbage
2151 if( page->garbage < nxt / 5 )
2154 frame = malloc (mgr->page_size);
2155 memcpy (frame, page, mgr->page_size);
2157 // skip page info and set rest of page to zero
2159 memset (page+1, 0, mgr->page_size - sizeof(*page));
2160 set->latch->dirty = 1;
2165 // clean up page first by
2166 // removing deleted keys
2168 while( cnt++ < max ) {
2172 if( cnt < max || frame->lvl )
2173 if( slotptr(frame,cnt)->dead )
2176 // copy the value across
2178 val = valptr(frame, cnt);
2179 nxt -= val->len + sizeof(BtVal);
2180 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2182 // copy the key across
2184 key = keyptr(frame, cnt);
2185 nxt -= key->len + sizeof(BtKey);
2186 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2188 // make a librarian slot
2190 slotptr(page, ++idx)->off = nxt;
2191 slotptr(page, idx)->type = Librarian;
2192 slotptr(page, idx)->dead = 1;
2196 slotptr(page, ++idx)->off = nxt;
2197 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2199 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2207 // see if page has enough space now, or does it need splitting?
2209 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2215 // split the root and raise the height of the btree
2217 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2219 unsigned char leftkey[BT_keyarray];
2220 uint nxt = mgr->page_size;
2221 unsigned char value[BtId];
2227 // save left page fence key for new root
2229 ptr = keyptr(root->page, root->page->cnt);
2230 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2232 // Obtain an empty page to use, and copy the current
2233 // root contents into it, e.g. lower keys
2235 if( bt_newpage(mgr, left, root->page, page_no) )
2238 left_page_no = left->latch->page_no;
2239 bt_unpinlatch (mgr, left->latch);
2241 // preserve the page info at the bottom
2242 // of higher keys and set rest to zero
2244 memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2246 // insert stopper key at top of newroot page
2247 // and increase the root height
2249 nxt -= BtId + sizeof(BtVal);
2250 bt_putid (value, right->page_no);
2251 val = (BtVal *)((unsigned char *)root->page + nxt);
2252 memcpy (val->value, value, BtId);
2255 nxt -= 2 + sizeof(BtKey);
2256 slotptr(root->page, 2)->off = nxt;
2257 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2262 // insert lower keys page fence key on newroot page as first key
2264 nxt -= BtId + sizeof(BtVal);
2265 bt_putid (value, left_page_no);
2266 val = (BtVal *)((unsigned char *)root->page + nxt);
2267 memcpy (val->value, value, BtId);
2270 ptr = (BtKey *)leftkey;
2271 nxt -= ptr->len + sizeof(BtKey);
2272 slotptr(root->page, 1)->off = nxt;
2273 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2275 bt_putid(root->page->right, 0);
2276 root->page->min = nxt; // reset lowest used offset and key count
2277 root->page->cnt = 2;
2278 root->page->act = 2;
2281 // release and unpin root pages
2283 bt_unlockpage(BtLockWrite, root->latch);
2284 bt_unpinlatch (mgr, root->latch);
2286 bt_unpinlatch (mgr, right);
2290 // split already locked full node
2292 // return pool entry for new right
2295 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2297 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2298 BtPage frame = malloc (mgr->page_size);
2299 uint lvl = set->page->lvl;
2306 // split higher half of keys to frame
2308 memset (frame, 0, mgr->page_size);
2309 max = set->page->cnt;
2313 while( cnt++ < max ) {
2314 if( cnt < max || set->page->lvl )
2315 if( slotptr(set->page, cnt)->dead )
2318 src = valptr(set->page, cnt);
2319 nxt -= src->len + sizeof(BtVal);
2320 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2322 key = keyptr(set->page, cnt);
2323 nxt -= key->len + sizeof(BtKey);
2324 ptr = (BtKey*)((unsigned char *)frame + nxt);
2325 memcpy (ptr, key, key->len + sizeof(BtKey));
2327 // add librarian slot
2329 slotptr(frame, ++idx)->off = nxt;
2330 slotptr(frame, idx)->type = Librarian;
2331 slotptr(frame, idx)->dead = 1;
2335 slotptr(frame, ++idx)->off = nxt;
2336 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2338 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2342 frame->bits = mgr->page_bits;
2349 if( set->latch->page_no > ROOT_page )
2350 bt_putid (frame->right, bt_getid (set->page->right));
2352 // get new free page and write higher keys to it.
2354 if( bt_newpage(mgr, right, frame, thread_no) )
2357 // process lower keys
2359 memcpy (frame, set->page, mgr->page_size);
2360 memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2361 set->latch->dirty = 1;
2363 nxt = mgr->page_size;
2364 set->page->garbage = 0;
2370 if( slotptr(frame, max)->type == Librarian )
2373 // assemble page of smaller keys
2375 while( cnt++ < max ) {
2376 if( slotptr(frame, cnt)->dead )
2378 val = valptr(frame, cnt);
2379 nxt -= val->len + sizeof(BtVal);
2380 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2382 key = keyptr(frame, cnt);
2383 nxt -= key->len + sizeof(BtKey);
2384 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2386 // add librarian slot
2388 slotptr(set->page, ++idx)->off = nxt;
2389 slotptr(set->page, idx)->type = Librarian;
2390 slotptr(set->page, idx)->dead = 1;
2394 slotptr(set->page, ++idx)->off = nxt;
2395 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2399 bt_putid(set->page->right, right->latch->page_no);
2400 set->page->min = nxt;
2401 set->page->cnt = idx;
2403 return right->latch->entry;
2406 // fix keys for newly split page
2407 // call with page locked, return
2410 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2412 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2413 unsigned char value[BtId];
2414 uint lvl = set->page->lvl;
2418 // if current page is the root page, split it
2420 if( set->latch->page_no == ROOT_page )
2421 return bt_splitroot (mgr, set, right, thread_no);
2423 ptr = keyptr(set->page, set->page->cnt);
2424 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2426 page = bt_mappage (mgr, right);
2428 ptr = keyptr(page, page->cnt);
2429 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2431 // insert new fences in their parent pages
2433 bt_lockpage (BtLockParent, right, thread_no);
2435 bt_lockpage (BtLockParent, set->latch, thread_no);
2436 bt_unlockpage (BtLockWrite, set->latch);
2438 // insert new fence for reformulated left block of smaller keys
2440 bt_putid (value, set->latch->page_no);
2441 ptr = (BtKey *)leftkey;
2443 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) )
2446 // switch fence for right block of larger keys to new right page
2448 bt_putid (value, right->page_no);
2449 ptr = (BtKey *)rightkey;
2451 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) )
2454 bt_unlockpage (BtLockParent, set->latch);
2455 bt_unpinlatch (mgr, set->latch);
2457 bt_unlockpage (BtLockParent, right);
2458 bt_unpinlatch (mgr, right);
2462 // install new key and value onto page
2463 // page must already be checked for
2466 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2468 uint idx, librarian;
2473 // if found slot > desired slot and previous slot
2474 // is a librarian slot, use it
2477 if( slotptr(set->page, slot-1)->type == Librarian )
2480 // copy value onto page
2482 set->page->min -= vallen + sizeof(BtVal);
2483 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2484 memcpy (val->value, value, vallen);
2487 // copy key onto page
2489 set->page->min -= keylen + sizeof(BtKey);
2490 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2491 memcpy (ptr->key, key, keylen);
2494 // find first empty slot
2496 for( idx = slot; idx < set->page->cnt; idx++ )
2497 if( slotptr(set->page, idx)->dead )
2500 // now insert key into array before slot
2502 if( idx == set->page->cnt )
2503 idx += 2, set->page->cnt += 2, librarian = 2;
2507 set->latch->dirty = 1;
2510 while( idx > slot + librarian - 1 )
2511 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2513 // add librarian slot
2515 if( librarian > 1 ) {
2516 node = slotptr(set->page, slot++);
2517 node->off = set->page->min;
2518 node->type = Librarian;
2524 node = slotptr(set->page, slot);
2525 node->off = set->page->min;
2530 bt_unlockpage (BtLockWrite, set->latch);
2531 bt_unpinlatch (mgr, set->latch);
2537 // Insert new key into the btree at given level.
2538 // either add a new key or update/add an existing one
2540 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique, ushort thread_no)
2542 unsigned char newkey[BT_keyarray];
2543 uint slot, idx, len, entry;
2550 // set up the key we're working on
2552 ins = (BtKey*)newkey;
2553 memcpy (ins->key, key, keylen);
2556 // is this a non-unique index value?
2562 sequence = bt_newdup (mgr);
2563 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2567 while( 1 ) { // find the page and slot for the current key
2568 if( slot = bt_loadpage (mgr, set, ins->key, ins->len, lvl, BtLockWrite, thread_no) )
2569 ptr = keyptr(set->page, slot);
2572 mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2576 // if librarian slot == found slot, advance to real slot
2578 if( slotptr(set->page, slot)->type == Librarian )
2579 if( !keycmp (ptr, key, keylen) )
2580 ptr = keyptr(set->page, ++slot);
2584 if( slotptr(set->page, slot)->type == Duplicate )
2587 // if inserting a duplicate key or unique key
2588 // check for adequate space on the page
2589 // and insert the new key before slot.
2591 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2592 if( !(slot = bt_cleanpage (mgr, set, ins->len, slot, vallen)) )
2593 if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2595 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2600 return bt_insertslot (mgr, set, slot, ins->key, ins->len, value, vallen, type, 1);
2603 // if key already exists, update value and return
2605 val = valptr(set->page, slot);
2607 if( val->len >= vallen ) {
2608 if( slotptr(set->page, slot)->dead )
2610 set->page->garbage += val->len - vallen;
2611 set->latch->dirty = 1;
2612 slotptr(set->page, slot)->dead = 0;
2614 memcpy (val->value, value, vallen);
2615 bt_unlockpage(BtLockWrite, set->latch);
2616 bt_unpinlatch (mgr, set->latch);
2620 // new update value doesn't fit in existing value area
2622 if( !slotptr(set->page, slot)->dead )
2623 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2625 slotptr(set->page, slot)->dead = 0;
2629 if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2630 if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2632 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2637 set->page->min -= vallen + sizeof(BtVal);
2638 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2639 memcpy (val->value, value, vallen);
2642 set->latch->dirty = 1;
2643 set->page->min -= keylen + sizeof(BtKey);
2644 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2645 memcpy (ptr->key, key, keylen);
2648 slotptr(set->page, slot)->off = set->page->min;
2649 bt_unlockpage(BtLockWrite, set->latch);
2650 bt_unpinlatch (mgr, set->latch);
2657 logseqno reqlsn; // redo log seq no required
2658 logseqno lsn; // redo log sequence number
2659 uint entry; // latch table entry number
2660 uint slot:31; // page slot number
2661 uint reuse:1; // reused previous page
2665 uid page_no; // page number for split leaf
2666 void *next; // next key to insert
2667 uint entry:29; // latch table entry number
2668 uint type:2; // 0 == insert, 1 == delete, 2 == free
2669 uint nounlock:1; // don't unlock ParentModification
2670 unsigned char leafkey[BT_keyarray];
2673 // determine actual page where key is located
2674 // return slot number
2676 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2678 BtKey *key = keyptr(source,src);
2679 uint slot = locks[src].slot;
2682 if( src > 1 && locks[src].reuse )
2683 entry = locks[src-1].entry, slot = 0;
2685 entry = locks[src].entry;
2688 set->latch = mgr->latchsets + entry;
2689 set->page = bt_mappage (mgr, set->latch);
2693 // is locks->reuse set? or was slot zeroed?
2694 // if so, find where our key is located
2695 // on current page or pages split on
2696 // same page txn operations.
2699 set->latch = mgr->latchsets + entry;
2700 set->page = bt_mappage (mgr, set->latch);
2702 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2703 if( slotptr(set->page, slot)->type == Librarian )
2705 if( locks[src].reuse )
2706 locks[src].entry = entry;
2709 } while( entry = set->latch->split );
2711 mgr->line = __LINE__, mgr->err = BTERR_atomic;
2715 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
2717 BtKey *key = keyptr(source, src);
2718 BtVal *val = valptr(source, src);
2723 while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2724 if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2725 if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2727 set->page->lsn = locks[src].lsn;
2731 if( entry = bt_splitpage (mgr, set, thread_no) )
2732 latch = mgr->latchsets + entry;
2736 // splice right page into split chain
2737 // and WriteLock it.
2739 bt_lockpage(BtLockWrite, latch, thread_no);
2740 latch->split = set->latch->split;
2741 set->latch->split = entry;
2742 locks[src].slot = 0;
2745 return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2748 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
2750 BtKey *key = keyptr(source, src);
2756 if( slot = bt_atomicpage (mgr, source, locks, src, set) )
2757 ptr = keyptr(set->page, slot);
2759 return mgr->line = __LINE__, mgr->err = BTERR_struct;
2761 if( !keycmp (ptr, key->key, key->len) )
2762 if( !slotptr(set->page, slot)->dead )
2763 slotptr(set->page, slot)->dead = 1;
2769 val = valptr(set->page, slot);
2770 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2771 set->latch->dirty = 1;
2772 set->page->lsn = locks[src].lsn;
2778 // delete an empty master page for a transaction
2780 // note that the far right page never empties because
2781 // it always contains (at least) the infinite stopper key
2782 // and that all pages that don't contain any keys are
2783 // deleted, or are being held under Atomic lock.
2785 BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no)
2787 BtPageSet right[1], temp[1];
2788 unsigned char value[BtId];
2792 bt_lockpage(BtLockWrite, prev->latch, thread_no);
2794 // grab the right sibling
2796 if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) )
2797 right->page = bt_mappage (mgr, right->latch);
2801 bt_lockpage(BtLockAtomic, right->latch, thread_no);
2802 bt_lockpage(BtLockWrite, right->latch, thread_no);
2804 // and pull contents over empty page
2805 // while preserving master's left link
2807 memcpy (right->page->left, prev->page->left, BtId);
2808 memcpy (prev->page, right->page, mgr->page_size);
2810 // forward seekers to old right sibling
2811 // to new page location in set
2813 bt_putid (right->page->right, prev->latch->page_no);
2814 right->latch->dirty = 1;
2815 right->page->kill = 1;
2817 // remove pointer to right page for searchers by
2818 // changing right fence key to point to the master page
2820 ptr = keyptr(right->page,right->page->cnt);
2821 bt_putid (value, prev->latch->page_no);
2823 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, 1, thread_no) )
2826 // now that master page is in good shape we can
2827 // remove its locks.
2829 bt_unlockpage (BtLockAtomic, prev->latch);
2830 bt_unlockpage (BtLockWrite, prev->latch);
2832 // fix master's right sibling's left pointer
2833 // to remove scanner's poiner to the right page
2835 if( right_page_no = bt_getid (prev->page->right) ) {
2836 if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
2837 temp->page = bt_mappage (mgr, temp->latch);
2839 bt_lockpage (BtLockWrite, temp->latch, thread_no);
2840 bt_putid (temp->page->left, prev->latch->page_no);
2841 temp->latch->dirty = 1;
2843 bt_unlockpage (BtLockWrite, temp->latch);
2844 bt_unpinlatch (mgr, temp->latch);
2845 } else { // master is now the far right page
2846 bt_mutexlock (mgr->lock);
2847 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
2848 bt_releasemutex(mgr->lock);
2851 // now that there are no pointers to the right page
2852 // we can delete it after the last read access occurs
2854 bt_unlockpage (BtLockWrite, right->latch);
2855 bt_unlockpage (BtLockAtomic, right->latch);
2856 bt_lockpage (BtLockDelete, right->latch, thread_no);
2857 bt_lockpage (BtLockWrite, right->latch, thread_no);
2858 bt_freepage (mgr, right);
2862 // find and add the next available latch entry
2865 BTERR bt_txnavaillatch (BtDb *bt)
2873 // find and reuse previous entry on victim
2875 startattempt = bt->mgr->latchvictim;
2879 entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
2881 entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
2883 // skip entry if it has outstanding pins
2885 entry %= bt->mgr->latchtotal;
2890 // only go around one time before
2891 // flushing redo recovery buffer,
2892 // and the buffer pool to free up entries.
2894 if( bt->mgr->redopages )
2895 if( bt->mgr->latchvictim - startattempt > bt->mgr->latchtotal ) {
2896 if( bt_mutextry (bt->mgr->dump) ) {
2897 if( bt_dumpredo (bt->mgr) )
2898 return bt->mgr->err;
2899 bt_flushlsn (bt->mgr, bt->thread_no);
2900 // synchronize the various threads running into this condition
2901 // so that only one thread does the dump and flush
2903 bt_mutexlock(bt->mgr->dump);
2905 startattempt = bt->mgr->latchvictim;
2906 bt_releasemutex(bt->mgr->dump);
2909 latch = bt->mgr->latchsets + entry;
2914 bt_mutexlock(latch->modify);
2916 // skip if already an available entry
2918 if( latch->avail ) {
2919 bt_releasemutex(latch->modify);
2923 // skip this entry if it is pinned
2924 // if the CLOCK bit is set
2925 // reset it to zero.
2928 latch->pin &= ~CLOCK_bit;
2929 bt_releasemutex(latch->modify);
2933 page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
2935 // if dirty page has lsn >= last redo recovery buffer
2936 // then hold page in the buffer pool until next redo
2937 // recovery buffer is being written to disk.
2940 if( page->lsn >= bt->mgr->flushlsn ) {
2941 bt_releasemutex(latch->modify);
2945 // entry is now available
2947 __sync_fetch_and_add (&bt->mgr->available, 1);
2949 _InterlockedIncrement(&bt->mgr->available);
2952 bt_releasemutex(latch->modify);
2957 // release available latch requests
2959 void bt_txnavailrelease (BtDb *bt, uint count)
2962 __sync_fetch_and_add(bt->mgr->availlock, -count);
2964 _InterlockedAdd(bt->mgr->availlock, -count);
2968 // promote page of keys from first btree
2971 BTERR bt_txnavailmain (BtDb *bt)
2978 entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
2980 entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
2982 // skip entry if it has outstanding pins
2984 entry %= bt->mgr->latchtotal;
2989 latch = bt->mgr->latchsets + entry;
2994 bt_mutexlock(latch->modify);
2996 // skip if already an available entry
2998 if( latch->avail ) {
2999 bt_releasemutex(latch->modify);
3003 // skip this entry if it is pinned
3004 // if the CLOCK bit is set
3005 // reset it to zero.
3008 latch->pin &= ~CLOCK_bit;
3009 bt_releasemutex(latch->modify);
3016 // commit available pool entries
3017 // find available entries as required
3019 BTERR bt_txnavailrequest (BtDb *bt, uint count)
3022 __sync_fetch_and_add(bt->mgr->availlock, count);
3024 _InterlockedAdd(bt->mgr->availlock, count);
3027 // find another available pool entry
3029 while( *bt->mgr->availlock > bt->mgr->available )
3030 if( bt->mgr->redopages )
3031 bt_txnavaillatch (bt);
3033 if( bt_txnavailmain (bt) )
3034 return bt->mgr->err;
3037 // atomic modification of a batch of keys.
3039 // return -1 if BTERR is set
3040 // otherwise return slot number
3041 // causing the key constraint violation
3042 // or zero on successful completion.
3044 int bt_atomictxn (BtDb *bt, BtPage source)
3046 uint src, idx, slot, samepage, entry, avail, que = 0;
3047 AtomicKey *head, *tail, *leaf;
3048 BtPageSet set[1], prev[1];
3049 unsigned char value[BtId];
3050 BtKey *key, *ptr, *key2;
3061 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
3065 // stable sort the list of keys into order to
3066 // prevent deadlocks between threads.
3068 for( src = 1; src++ < source->cnt; ) {
3069 *temp = *slotptr(source,src);
3070 key = keyptr (source,src);
3072 for( idx = src; --idx; ) {
3073 key2 = keyptr (source,idx);
3074 if( keycmp (key, key2->key, key2->len) < 0 ) {
3075 *slotptr(source,idx+1) = *slotptr(source,idx);
3076 *slotptr(source,idx) = *temp;
3082 // reserve enough buffer pool entries
3084 avail = source->cnt * 3 + bt->mgr->pagezero->alloc->lvl + 1;
3086 if( bt_txnavailrequest (bt, avail) )
3087 return bt->mgr->err;
3089 // Load the leaf page for each key
3090 // group same page references with reuse bit
3091 // and determine any constraint violations
3093 for( src = 0; src++ < source->cnt; ) {
3094 key = keyptr(source, src);
3097 // first determine if this modification falls
3098 // on the same page as the previous modification
3099 // note that the far right leaf page is a special case
3101 if( samepage = src > 1 )
3102 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
3103 slot = bt_findslot(set->page, key->key, key->len);
3105 bt_unlockpage(BtLockRead, set->latch);
3108 if( slot = bt_loadpage(bt->mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, bt->thread_no) )
3109 set->latch->split = 0;
3113 if( slotptr(set->page, slot)->type == Librarian )
3114 ptr = keyptr(set->page, ++slot);
3116 ptr = keyptr(set->page, slot);
3119 locks[src].entry = set->latch->entry;
3120 locks[src].slot = slot;
3121 locks[src].reuse = 0;
3123 locks[src].entry = 0;
3124 locks[src].slot = 0;
3125 locks[src].reuse = 1;
3128 // capture current lsn for master page
3130 locks[src].reqlsn = set->page->lsn;
3132 // perform constraint checks
3134 switch( slotptr(source, src)->type ) {
3137 if( !slotptr(set->page, slot)->dead )
3138 if( slot < set->page->cnt || bt_getid (set->page->right) )
3139 if( !keycmp (ptr, key->key, key->len) ) {
3141 // return constraint violation if key already exists
3143 bt_unlockpage(BtLockRead, set->latch);
3147 if( locks[src].entry ) {
3148 set->latch = bt->mgr->latchsets + locks[src].entry;
3149 bt_unlockpage(BtLockAtomic, set->latch);
3150 bt_unpinlatch (bt->mgr, set->latch);
3161 // unlock last loadpage lock
3164 bt_unlockpage(BtLockRead, set->latch);
3166 // and add entries to redo log
3168 if( bt->mgr->redopages )
3169 if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
3170 for( src = 0; src++ < source->cnt; )
3171 locks[src].lsn = lsn;
3175 // obtain write lock for each master page
3177 for( src = 0; src++ < source->cnt; ) {
3178 if( locks[src].reuse )
3181 bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry, bt->thread_no);
3184 // insert or delete each key
3185 // process any splits or merges
3186 // release Write & Atomic latches
3187 // set ParentModifications and build
3188 // queue of keys to insert for split pages
3189 // or delete for deleted pages.
3191 // run through txn list backwards
3193 samepage = source->cnt + 1;
3195 for( src = source->cnt; src; src-- ) {
3196 if( locks[src].reuse )
3199 // perform the txn operations
3200 // from smaller to larger on
3203 for( idx = src; idx < samepage; idx++ )
3204 switch( slotptr(source,idx)->type ) {
3206 if( bt_atomicdelete (bt->mgr, source, locks, idx, bt->thread_no) )
3212 if( bt_atomicinsert (bt->mgr, source, locks, idx, bt->thread_no) )
3217 // after the same page operations have finished,
3218 // process master page for splits or deletion.
3220 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
3221 prev->page = bt_mappage (bt->mgr, prev->latch);
3224 // pick-up all splits from master page
3225 // each one is already WriteLocked.
3227 entry = prev->latch->split;
3230 set->latch = bt->mgr->latchsets + entry;
3231 set->page = bt_mappage (bt->mgr, set->latch);
3232 entry = set->latch->split;
3234 // delete empty master page by undoing its split
3235 // (this is potentially another empty page)
3236 // note that there are no new left pointers yet
3238 if( !prev->page->act ) {
3239 memcpy (set->page->left, prev->page->left, BtId);
3240 memcpy (prev->page, set->page, bt->mgr->page_size);
3241 bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
3242 bt_freepage (bt->mgr, set);
3244 prev->latch->split = set->latch->split;
3245 prev->latch->dirty = 1;
3249 // remove empty page from the split chain
3250 // and return it to the free list.
3252 if( !set->page->act ) {
3253 memcpy (prev->page->right, set->page->right, BtId);
3254 prev->latch->split = set->latch->split;
3255 bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
3256 bt_freepage (bt->mgr, set);
3260 // schedule prev fence key update
3262 ptr = keyptr(prev->page,prev->page->cnt);
3263 leaf = calloc (sizeof(AtomicKey), 1), que++;
3265 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3266 leaf->page_no = prev->latch->page_no;
3267 leaf->entry = prev->latch->entry;
3277 // splice in the left link into the split page
3279 bt_putid (set->page->left, prev->latch->page_no);
3280 bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
3281 bt_unlockpage(BtLockWrite, prev->latch);
3285 // update left pointer in next right page from last split page
3286 // (if all splits were reversed, latch->split == 0)
3288 if( latch->split ) {
3289 // fix left pointer in master's original (now split)
3290 // far right sibling or set rightmost page in page zero
3292 if( right = bt_getid (prev->page->right) ) {
3293 if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3294 set->page = bt_mappage (bt->mgr, set->latch);
3298 bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3299 bt_putid (set->page->left, prev->latch->page_no);
3300 set->latch->dirty = 1;
3301 bt_unlockpage (BtLockWrite, set->latch);
3302 bt_unpinlatch (bt->mgr, set->latch);
3303 } else { // prev is rightmost page
3304 bt_mutexlock (bt->mgr->lock);
3305 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
3306 bt_releasemutex(bt->mgr->lock);
3309 // Process last page split in chain
3311 ptr = keyptr(prev->page,prev->page->cnt);
3312 leaf = calloc (sizeof(AtomicKey), 1), que++;
3314 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3315 leaf->page_no = prev->latch->page_no;
3316 leaf->entry = prev->latch->entry;
3326 bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
3327 bt_unlockpage(BtLockWrite, prev->latch);
3329 // remove atomic lock on master page
3331 bt_unlockpage(BtLockAtomic, latch);
3335 // finished if prev page occupied (either master or final split)
3337 if( prev->page->act ) {
3338 bt_unlockpage(BtLockWrite, latch);
3339 bt_unlockpage(BtLockAtomic, latch);
3340 bt_unpinlatch(bt->mgr, latch);
3344 // any and all splits were reversed, and the
3345 // master page located in prev is empty, delete it
3346 // by pulling over master's right sibling.
3348 // Remove empty master's fence key
3350 ptr = keyptr(prev->page,prev->page->cnt);
3352 if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3355 // perform the remainder of the delete
3356 // from the FIFO queue
3358 leaf = calloc (sizeof(AtomicKey), 1), que++;
3360 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3361 leaf->page_no = prev->latch->page_no;
3362 leaf->entry = prev->latch->entry;
3373 // leave atomic lock in place until
3374 // deletion completes in next phase.
3376 bt_unlockpage(BtLockWrite, prev->latch);
3379 bt_txnavailrelease (bt, avail);
3381 que *= bt->mgr->pagezero->alloc->lvl;
3383 if( bt_txnavailrequest (bt, que) )
3384 return bt->mgr->err;
3386 // add & delete keys for any pages split or merged during transaction
3390 set->latch = bt->mgr->latchsets + leaf->entry;
3391 set->page = bt_mappage (bt->mgr, set->latch);
3393 bt_putid (value, leaf->page_no);
3394 ptr = (BtKey *)leaf->leafkey;
3396 switch( leaf->type ) {
3397 case 0: // insert key
3398 if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, 1, bt->thread_no) )
3403 case 1: // delete key
3404 if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3409 case 2: // free page
3410 if( bt_atomicfree (bt->mgr, set, bt->thread_no) )
3416 if( !leaf->nounlock )
3417 bt_unlockpage (BtLockParent, set->latch);
3419 bt_unpinlatch (bt->mgr, set->latch);
3422 } while( leaf = tail );
3424 bt_txnavailrelease (bt, que);
3434 // set cursor to highest slot on highest page
3436 uint bt_lastkey (BtDb *bt)
3438 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3441 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3442 set->page = bt_mappage (bt->mgr, set->latch);
3446 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3447 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3448 bt_unlockpage(BtLockRead, set->latch);
3449 bt_unpinlatch (bt->mgr, set->latch);
3450 return bt->cursor->cnt;
3453 // return previous slot on cursor page
3455 uint bt_prevkey (BtDb *bt, uint slot)
3457 uid cursor_page = bt->cursor->page_no;
3458 uid ourright, next, us = cursor_page;
3464 ourright = bt_getid(bt->cursor->right);
3467 if( !(next = bt_getid(bt->cursor->left)) )
3473 if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3474 set->page = bt_mappage (bt->mgr, set->latch);
3478 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3479 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3480 bt_unlockpage(BtLockRead, set->latch);
3481 bt_unpinlatch (bt->mgr, set->latch);
3483 next = bt_getid (bt->cursor->right);
3485 if( bt->cursor->kill )
3489 if( next == ourright )
3494 return bt->cursor->cnt;
3497 // return next slot on cursor page
3498 // or slide cursor right into next page
3500 uint bt_nextkey (BtDb *bt, uint slot)
3506 right = bt_getid(bt->cursor->right);
3508 while( slot++ < bt->cursor->cnt )
3509 if( slotptr(bt->cursor,slot)->dead )
3511 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3519 if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3520 set->page = bt_mappage (bt->mgr, set->latch);
3524 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3525 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3526 bt_unlockpage(BtLockRead, set->latch);
3527 bt_unpinlatch (bt->mgr, set->latch);
3532 return bt->mgr->err = 0;
3535 // cache page of keys into cursor and return starting slot for given key
3537 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3542 // cache page for retrieval
3544 if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3545 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3549 bt_unlockpage(BtLockRead, set->latch);
3550 bt_unpinlatch (bt->mgr, set->latch);
3557 double getCpuTime(int type)
3560 FILETIME xittime[1];
3561 FILETIME systime[1];
3562 FILETIME usrtime[1];
3563 SYSTEMTIME timeconv[1];
3566 memset (timeconv, 0, sizeof(SYSTEMTIME));
3570 GetSystemTimeAsFileTime (xittime);
3571 FileTimeToSystemTime (xittime, timeconv);
3572 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3575 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3576 FileTimeToSystemTime (usrtime, timeconv);
3579 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3580 FileTimeToSystemTime (systime, timeconv);
3584 ans += (double)timeconv->wHour * 3600;
3585 ans += (double)timeconv->wMinute * 60;
3586 ans += (double)timeconv->wSecond;
3587 ans += (double)timeconv->wMilliseconds / 1000;
3592 #include <sys/resource.h>
3594 double getCpuTime(int type)
3596 struct rusage used[1];
3597 struct timeval tv[1];
3601 gettimeofday(tv, NULL);
3602 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3605 getrusage(RUSAGE_SELF, used);
3606 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3609 getrusage(RUSAGE_SELF, used);
3610 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3617 void bt_poolaudit (BtMgr *mgr)
3622 while( ++entry < mgr->latchtotal ) {
3623 latch = mgr->latchsets + entry;
3625 if( *latch->readwr->rin & MASK )
3626 fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3628 if( *latch->access->rin & MASK )
3629 fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3631 if( *latch->parent->exclusive )
3632 fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3634 if( *latch->atomic->exclusive )
3635 fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3637 if( *latch->modify->exclusive )
3638 fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3640 if( latch->pin & ~CLOCK_bit )
3641 fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3654 // standalone program to index file of keys
3655 // then list them onto std-out
3658 void *index_file (void *arg)
3660 uint __stdcall index_file (void *arg)
3663 int line = 0, found = 0, cnt = 0, idx;
3664 uid next, page_no = LEAF_page; // start on first page of leaves
3665 int ch, len = 0, slot, type = 0;
3666 unsigned char key[BT_maxkey];
3667 unsigned char txn[65536];
3668 ThreadArg *args = arg;
3677 bt = bt_open (args->mgr, args->main);
3680 if( args->idx < strlen (args->type) )
3681 ch = args->type[args->idx];
3683 ch = args->type[strlen(args->type) - 1];
3695 if( type == Delete )
3696 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3698 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3700 if( type == Delete )
3701 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3703 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3705 if( in = fopen (args->infile, "rb") )
3706 while( ch = getc(in), ch != EOF )
3712 if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, 1, bt->thread_no) )
3713 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3719 memcpy (txn + nxt, key + 10, len - 10);
3721 txn[nxt] = len - 10;
3723 memcpy (txn + nxt, key, 10);
3726 slotptr(page,++cnt)->off = nxt;
3727 slotptr(page,cnt)->type = type;
3730 if( cnt < args->num )
3737 if( bt_atomictxn (bt, page) )
3738 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3743 else if( len < BT_maxkey )
3745 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3749 fprintf(stderr, "started indexing for %s\n", args->infile);
3750 if( in = fopen (args->infile, "r") )
3751 while( ch = getc(in), ch != EOF )
3756 if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, 1, bt->thread_no) )
3757 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3760 else if( len < BT_maxkey )
3762 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3766 fprintf(stderr, "started finding keys for %s\n", args->infile);
3767 if( in = fopen (args->infile, "rb") )
3768 while( ch = getc(in), ch != EOF )
3772 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3774 else if( bt->mgr->err )
3775 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3778 else if( len < BT_maxkey )
3780 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3784 fprintf(stderr, "started scanning\n");
3787 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3788 set->page = bt_mappage (bt->mgr, set->latch);
3790 fprintf(stderr, "unable to obtain latch"), exit(1);
3792 bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3793 next = bt_getid (set->page->right);
3795 for( slot = 0; slot++ < set->page->cnt; )
3796 if( next || slot < set->page->cnt )
3797 if( !slotptr(set->page, slot)->dead ) {
3798 ptr = keyptr(set->page, slot);
3801 if( slotptr(set->page, slot)->type == Duplicate )
3804 fwrite (ptr->key, len, 1, stdout);
3805 val = valptr(set->page, slot);
3806 fwrite (val->value, val->len, 1, stdout);
3807 fputc ('\n', stdout);
3811 set->latch->avail = 1;
3812 bt_unlockpage (BtLockRead, set->latch);
3813 bt_unpinlatch (bt->mgr, set->latch);
3814 } while( page_no = next );
3816 fprintf(stderr, " Total keys read %d\n", cnt);
3820 fprintf(stderr, "started reverse scan\n");
3821 if( slot = bt_lastkey (bt) )
3822 while( slot = bt_prevkey (bt, slot) ) {
3823 if( slotptr(bt->cursor, slot)->dead )
3826 ptr = keyptr(bt->cursor, slot);
3829 if( slotptr(bt->cursor, slot)->type == Duplicate )
3832 fwrite (ptr->key, len, 1, stdout);
3833 val = valptr(bt->cursor, slot);
3834 fwrite (val->value, val->len, 1, stdout);
3835 fputc ('\n', stdout);
3839 fprintf(stderr, " Total keys read %d\n", cnt);
3844 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3846 fprintf(stderr, "started counting\n");
3847 page_no = LEAF_page;
3849 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3850 if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3853 if( !bt->cursor->free && !bt->cursor->lvl )
3854 cnt += bt->cursor->act;
3859 cnt--; // remove stopper key
3860 fprintf(stderr, " Total keys counted %d\n", cnt);
3864 fprintf(stderr, "%d reads %d writes %d found\n", bt->mgr->reads, bt->mgr->writes, bt->mgr->found);
3873 typedef struct timeval timer;
3875 int main (int argc, char **argv)
3877 int idx, cnt, len, slot, err;
3878 int segsize, bits = 16;
3898 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3899 fprintf (stderr, " where idx_file is the name of the cache btree file\n");
3900 fprintf (stderr, " where main_file is the name of the main btree file\n");
3901 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3902 fprintf (stderr, " page_bits is the page size in bits for the cache btree\n");
3903 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3904 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3905 fprintf (stderr, " recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3906 fprintf (stderr, " main_bits is the page size of the main btree in bits\n");
3907 fprintf (stderr, " main_pool is the number of main pages in the main buffer pool\n");
3908 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3912 start = getCpuTime(0);
3915 bits = atoi(argv[4]);
3918 poolsize = atoi(argv[5]);
3921 fprintf (stderr, "Warning: no mapped_pool\n");
3924 num = atoi(argv[6]);
3927 recovery = atoi(argv[7]);
3930 mainbits = atoi(argv[8]);
3933 mainpool = atoi(argv[9]);
3937 threads = malloc (cnt * sizeof(pthread_t));
3939 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3941 args = malloc ((cnt + 1) * sizeof(ThreadArg));
3943 mgr = bt_mgr (argv[1], bits, poolsize, recovery);
3946 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3950 main = bt_mgr (argv[2], mainbits, mainpool, 0);
3953 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3960 for( idx = 0; idx < cnt; idx++ ) {
3961 args[idx].infile = argv[idx + 10];
3962 args[idx].type = argv[3];
3963 args[idx].main = main;
3964 args[idx].mgr = mgr;
3965 args[idx].num = num;
3966 args[idx].idx = idx;
3968 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3969 fprintf(stderr, "Error creating thread %d\n", err);
3971 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3975 args[0].infile = argv[idx + 10];
3976 args[0].type = argv[3];
3977 args[0].main = main;
3984 // wait for termination
3987 for( idx = 0; idx < cnt; idx++ )
3988 pthread_join (threads[idx], NULL);
3990 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3992 for( idx = 0; idx < cnt; idx++ )
3993 CloseHandle(threads[idx]);
4001 elapsed = getCpuTime(0) - start;
4002 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4003 elapsed = getCpuTime(1);
4004 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4005 elapsed = getCpuTime(2);
4006 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);