1 // btree version threadskv10 futex version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // redo log for failure recovery
10 // and LSM B-trees for write optimization
14 // author: karl malbrain, malbrain@cal.berkeley.edu
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
20 The orginal author is Karl Malbrain.
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
38 #include <linux/futex.h>
53 #define WIN32_LEAN_AND_MEAN
67 typedef unsigned long long uid;
68 typedef unsigned long long logseqno;
71 typedef unsigned long long off64_t;
72 typedef unsigned short ushort;
73 typedef unsigned int uint;
76 #define BT_ro 0x6f72 // ro
77 #define BT_rw 0x7772 // rw
79 #define BT_maxbits 26 // maximum page size in bits
80 #define BT_minbits 9 // minimum page size in bits
81 #define BT_minpage (1 << BT_minbits) // minimum page size
82 #define BT_maxpage (1 << BT_maxbits) // maximum page size
84 // BTree page number constants
85 #define ALLOC_page 0 // allocation page
86 #define ROOT_page 1 // root of the btree
87 #define LEAF_page 2 // first page of leaves
88 #define REDO_page 3 // first page of redo buffer
90 // Number of levels to create in a new BTree
95 There are six lock types for each node in four independent sets:
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
101 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
113 // lite weight spin latch
115 volatile typedef struct {
127 // exclusive is set for write access
129 volatile typedef struct {
130 unsigned char exclusive[1];
131 unsigned char filler;
138 volatile uint xlock:1; // one writer has exclusive lock
139 volatile uint wrt:31; // count of other writers waiting
148 // definition for phase-fair reader/writer lock implementation
151 volatile ushort rin[1];
152 volatile ushort rout[1];
153 volatile ushort ticket[1];
154 volatile ushort serving[1];
155 volatile ushort tid[1];
156 volatile ushort dup[1];
163 volatile ushort tid[1];
164 volatile ushort dup[1];
172 // mode & definition for lite latch implementation
175 QueRd = 1, // reader queue
176 QueWr = 2 // writer queue
179 // hash table entries
182 uint entry; // Latch table entry at head of chain
183 BtMutexLatch latch[1];
186 // latch manager table structure
189 uid page_no; // latch set page number
190 RWLock readwr[1]; // read/write page lock
191 RWLock access[1]; // Access Intent/Page delete
192 WOLock parent[1]; // Posting of fence key in parent
193 WOLock atomic[1]; // Atomic update in progress
194 uint split; // right split page atomic insert
195 uint entry; // entry slot in latch table
196 uint next; // next entry in hash table chain
197 uint prev; // prev entry in hash table chain
198 ushort pin; // number of accessing threads
199 unsigned char dirty; // page in cache is dirty (atomic set)
200 BtMutexLatch modify[1]; // modify entry lite latch
203 // Define the length of the page record numbers
207 // Page key slot definition.
209 // Keys are marked dead, but remain on the page until
210 // it cleanup is called. The fence key (highest key) for
211 // a leaf page is always present, even after cleanup.
215 // In addition to the Unique keys that occupy slots
216 // there are Librarian and Duplicate key
217 // slots occupying the key slot array.
219 // The Librarian slots are dead keys that
220 // serve as filler, available to add new Unique
221 // or Dup slots that are inserted into the B-tree.
223 // The Duplicate slots have had their key bytes extended
224 // by 6 bytes to contain a binary duplicate key uniqueifier.
234 uint off:BT_maxbits; // page offset for key start
235 uint type:3; // type of slot
236 uint dead:1; // set for deleted slot
239 // The key structure occupies space at the upper end of
240 // each page. It's a length byte followed by the key
244 unsigned char len; // this can be changed to a ushort or uint
245 unsigned char key[0];
248 // the value structure also occupies space at the upper
249 // end of the page. Each key is immediately followed by a value.
252 unsigned char len; // this can be changed to a ushort or uint
253 unsigned char value[0];
256 #define BT_maxkey 255 // maximum number of bytes in a key
257 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
259 // The first part of an index page.
260 // It is immediately followed
261 // by the BtSlot array of keys.
263 // note that this structure size
264 // must be a multiple of 8 bytes
265 // in order to place dups correctly.
267 typedef struct BtPage_ {
268 uint cnt; // count of keys in page
269 uint act; // count of active keys
270 uint min; // next key offset
271 uint garbage; // page garbage in bytes
272 unsigned char bits:7; // page size in bits
273 unsigned char free:1; // page is on free chain
274 unsigned char lvl:7; // level of page
275 unsigned char kill:1; // page is being deleted
276 unsigned char right[BtId]; // page number to right
277 unsigned char left[BtId]; // page number to left
278 unsigned char filler[2]; // padding to multiple of 8
279 logseqno lsn; // log sequence number applied
280 uid page_no; // this page number
283 // The loadpage interface object
286 BtPage page; // current page pointer
287 BtLatchSet *latch; // current page latch set
290 // structure for latch manager on ALLOC_page
293 struct BtPage_ alloc[1]; // next page_no in right ptr
294 unsigned long long dups[1]; // global duplicate key uniqueifier
295 unsigned char freechain[BtId]; // head of free page_nos chain
296 unsigned long long activepages; // number of active pages pages
299 // The object structure for Btree access
302 uint page_size; // page size
303 uint page_bits; // page size in bits
309 BtPageZero *pagezero; // mapped allocation page
310 BtHashEntry *hashtable; // the buffer pool hash table entries
311 BtLatchSet *latchsets; // mapped latch set from buffer pool
312 unsigned char *pagepool; // mapped to the buffer pool pages
313 unsigned char *redobuff; // mapped recovery buffer pointer
314 logseqno lsn, flushlsn; // current & first lsn flushed
315 BtMutexLatch redo[1]; // redo area lite latch
316 BtMutexLatch lock[1]; // allocation area lite latch
317 BtMutexLatch maps[1]; // mapping segments lite latch
318 ushort thread_no[1]; // next thread number
319 uint nlatchpage; // number of latch pages at BT_latch
320 uint latchtotal; // number of page latch entries
321 uint latchhash; // number of latch hash table slots
322 uint latchvictim; // next latch entry to examine
323 uint latchpromote; // next latch entry to promote
324 uint redopages; // size of recovery buff in pages
325 uint redolast; // last msync size of recovery buff
326 uint redoend; // eof/end element in recovery buff
327 int err; // last error
328 int line; // last error line no
329 int found; // number of keys found by delete
330 int reads, writes; // number of reads and writes
332 HANDLE halloc; // allocation handle
333 HANDLE hpool; // buffer pool handle
335 uint segments; // number of memory mapped segments
336 unsigned char *pages[64000];// memory mapped segments of b-tree
340 BtMgr *mgr; // buffer manager for entire process
341 BtMgr *main; // buffer manager for main btree
342 BtPage cursor; // cached page frame for start/next
343 ushort thread_no; // thread number
344 unsigned char key[BT_keyarray]; // last found complete key
347 // Catastrophic errors
361 #define CLOCK_bit 0x8000
363 // recovery manager entry types
366 BTRM_eof = 0, // rest of buffer is emtpy
367 BTRM_add, // add a unique key-value to btree
368 BTRM_dup, // add a duplicate key-value to btree
369 BTRM_del, // delete a key-value from btree
370 BTRM_upd, // update a key with a new value
371 BTRM_new, // allocate a new empty page
372 BTRM_old // reuse an old empty page
375 // recovery manager entry
376 // structure followed by BtKey & BtVal
379 logseqno reqlsn; // log sequence number required
380 logseqno lsn; // log sequence number for entry
381 uint len; // length of entry
382 unsigned char type; // type of entry
383 unsigned char lvl; // level of btree entry pertains to
388 extern void bt_close (BtDb *bt);
389 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
390 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit);
391 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
392 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
393 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
394 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
395 extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
397 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
399 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
400 extern uint bt_nextkey (BtDb *bt, uint slot);
401 extern uint bt_prevkey (BtDb *db, uint slot);
402 extern uint bt_lastkey (BtDb *db);
405 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
406 extern void bt_mgrclose (BtMgr *mgr);
407 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
408 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
410 // transaction functions
411 BTERR bt_txnpromote (BtDb *bt);
413 // The page is allocated from low and hi ends.
414 // The key slots are allocated from the bottom,
415 // while the text and value of the key
416 // are allocated from the top. When the two
417 // areas meet, the page is split into two.
419 // A key consists of a length byte, two bytes of
420 // index number (0 - 65535), and up to 253 bytes
423 // Associated with each key is a value byte string
424 // containing any value desired.
426 // The b-tree root is always located at page 1.
427 // The first leaf page of level zero is always
428 // located on page 2.
430 // The b-tree pages are linked with next
431 // pointers to facilitate enumerators,
432 // and provide for concurrency.
434 // When to root page fills, it is split in two and
435 // the tree height is raised by a new root at page
436 // one with two keys.
438 // Deleted keys are marked with a dead bit until
439 // page cleanup. The fence key for a leaf node is
442 // To achieve maximum concurrency one page is locked at a time
443 // as the tree is traversed to find leaf key in question. The right
444 // page numbers are used in cases where the page is being split,
447 // Page 0 is dedicated to lock for new page extensions,
448 // and chains empty pages together for reuse. It also
449 // contains the latch manager hash table.
451 // The ParentModification lock on a node is obtained to serialize posting
452 // or changing the fence key for a node.
454 // Empty pages are chained together through the ALLOC page and reused.
456 // Access macros to address slot and key values from the page
457 // Page slots use 1 based indexing.
459 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
460 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
461 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
463 void bt_putid(unsigned char *dest, uid id)
468 dest[i] = (unsigned char)id, id >>= 8;
471 uid bt_getid(unsigned char *src)
476 for( i = 0; i < BtId; i++ )
477 id <<= 8, id |= *src++;
482 uid bt_newdup (BtMgr *mgr)
485 return __sync_fetch_and_add (mgr->pagezero->dups, 1) + 1;
487 return _InterlockedIncrement64(mgr->pagezero->dups, 1);
491 // lite weight spin lock Latch Manager
493 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
495 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
498 void bt_mutexlock(BtMutexLatch *latch)
504 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
506 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
509 if( !(prev & ~BOTH) )
513 __sync_fetch_and_and ((ushort *)latch, ~XCL);
515 _InterlockedAnd16((ushort *)latch, ~XCL);
518 } while( sched_yield(), 1 );
520 } while( SwitchToThread(), 1 );
527 prev = __sync_fetch_and_or(latch->exclusive, XCL);
529 prev = _InterlockedOr8(latch->exclusive, XCL);
534 } while( sched_yield(), 1 );
536 } while( SwitchToThread(), 1 );
540 BtMutexLatch prev[1];
544 *prev->value = __sync_fetch_and_or(latch->value, XCL);
546 if( !prev->bits->xlock ) { // did we set XCL bit?
548 __sync_fetch_and_sub(latch->value, WRT);
554 __sync_fetch_and_add(latch->value, WRT);
557 sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr);
562 // try to obtain write lock
564 // return 1 if obtained,
567 int bt_mutextry(BtMutexLatch *latch)
572 prev = __sync_fetch_and_or((ushort *)latch, XCL);
574 prev = _InterlockedOr16((ushort *)latch, XCL);
576 // take write access if all bits are clear
579 if( !(prev & ~BOTH) )
583 __sync_fetch_and_and ((ushort *)latch, ~XCL);
585 _InterlockedAnd16((ushort *)latch, ~XCL);
592 prev = __sync_fetch_and_or(latch->exclusive, XCL);
594 prev = _InterlockedOr8(latch->exclusive, XCL);
596 // take write access if all bits are clear
598 return !(prev & XCL);
601 BtMutexLatch prev[1];
603 *prev->value = __sync_fetch_and_or(latch->value, XCL);
605 // take write access if exclusive bit is clear
607 return !prev->bits->xlock;
613 void bt_releasemutex(BtMutexLatch *latch)
616 __sync_fetch_and_and((ushort *)latch, ~BOTH);
618 _InterlockedAnd16((ushort *)latch, ~BOTH);
621 *latch->exclusive = 0;
624 BtMutexLatch prev[1];
626 *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
628 if( prev->bits->wrt )
629 sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
633 // Write-Only Queue Lock
635 void WriteOLock (WOLock *lock, ushort tid)
637 if( *lock->tid == tid ) {
642 bt_mutexlock(lock->xcl);
646 void WriteORelease (WOLock *lock)
654 bt_releasemutex(lock->xcl);
657 // Phase-Fair reader/writer lock implementation
659 void WriteLock (RWLock *lock, ushort tid)
663 if( *lock->tid == tid ) {
668 tix = __sync_fetch_and_add (lock->ticket, 1);
670 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
672 // wait for our ticket to come up
674 while( tix != lock->serving[0] )
681 w = PRES | (tix & PHID);
683 r = __sync_fetch_and_add (lock->rin, w);
685 r = _InterlockedExchangeAdd16 (lock->rin, w);
687 while( r != *lock->rout )
696 void WriteRelease (RWLock *lock)
705 __sync_fetch_and_and (lock->rin, ~MASK);
707 _InterlockedAnd16 (lock->rin, ~MASK);
712 // try to obtain read lock
713 // return 1 if successful
715 int ReadTry (RWLock *lock, ushort tid)
719 // OK if write lock already held by same thread
721 if( *lock->tid == tid ) {
726 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
728 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
731 if( w == (*lock->rin & MASK) ) {
733 __sync_fetch_and_add (lock->rin, -RINC);
735 _InterlockedExchangeAdd16 (lock->rin, -RINC);
743 void ReadLock (RWLock *lock, ushort tid)
747 if( *lock->tid == tid ) {
752 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
754 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
757 while( w == (*lock->rin & MASK) )
765 void ReadRelease (RWLock *lock)
773 __sync_fetch_and_add (lock->rout, RINC);
775 _InterlockedExchangeAdd16 (lock->rout, RINC);
779 // recovery manager -- flush dirty pages
781 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
783 uint cnt3 = 0, cnt2 = 0, cnt = 0;
788 // flush dirty pool pages to the btree
790 fprintf(stderr, "Start flushlsn ");
791 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
792 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
793 latch = mgr->latchsets + entry;
794 bt_mutexlock (latch->modify);
795 bt_lockpage(BtLockRead, latch, thread_no);
798 bt_writepage(mgr, page, latch->page_no, 0);
799 latch->dirty = 0, cnt++;
801 if( latch->pin & ~CLOCK_bit )
803 bt_unlockpage(BtLockRead, latch);
804 bt_releasemutex (latch->modify);
806 fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
807 fprintf(stderr, "begin sync");
808 sync_file_range (mgr->idx, 0, 0, SYNC_FILE_RANGE_WAIT_AFTER);
809 fprintf(stderr, " end sync\n");
812 // recovery manager -- process current recovery buff on startup
813 // this won't do much if previous session was properly closed.
815 BTERR bt_recoveryredo (BtMgr *mgr)
822 pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
824 hdr = (BtLogHdr *)mgr->redobuff;
825 mgr->flushlsn = hdr->lsn;
828 hdr = (BtLogHdr *)(mgr->redobuff + offset);
829 switch( hdr->type ) {
833 case BTRM_add: // add a unique key-value to btree
835 case BTRM_dup: // add a duplicate key-value to btree
836 case BTRM_del: // delete a key-value from btree
837 case BTRM_upd: // update a key with a new value
838 case BTRM_new: // allocate a new empty page
839 case BTRM_old: // reuse an old empty page
845 // recovery manager -- append new entry to recovery log
846 // flush to disk when it overflows.
848 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
850 uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
851 uint amt = sizeof(BtLogHdr);
855 bt_mutexlock (mgr->redo);
858 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
860 // see if new entry fits in buffer
861 // flush and reset if it doesn't
863 if( amt > size - mgr->redoend ) {
864 mgr->flushlsn = mgr->lsn;
865 msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
868 bt_flushlsn(mgr, thread_no);
871 // fill in new entry & either eof or end block
873 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
878 hdr->lsn = ++mgr->lsn;
882 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
883 memset (eof, 0, sizeof(BtLogHdr));
885 // fill in key and value
888 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
889 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
892 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
893 memset (eof, 0, sizeof(BtLogHdr));
896 last = mgr->redolast & 0xfff;
900 bt_releasemutex(mgr->redo);
902 msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
906 // recovery manager -- append transaction to recovery log
907 // flush to disk when it overflows.
909 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
911 uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
912 uint amt = 0, src, type;
919 // determine amount of redo recovery log space required
921 for( src = 0; src++ < source->cnt; ) {
922 key = keyptr(source,src);
923 val = valptr(source,src);
924 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
925 amt += sizeof(BtLogHdr);
928 bt_mutexlock (mgr->redo);
930 // see if new entry fits in buffer
931 // flush and reset if it doesn't
933 if( amt > size - mgr->redoend ) {
934 mgr->flushlsn = mgr->lsn;
935 msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
938 bt_flushlsn (mgr, thread_no);
941 // assign new lsn to transaction
945 // fill in new entries
947 for( src = 0; src++ < source->cnt; ) {
948 key = keyptr(source, src);
949 val = valptr(source, src);
951 switch( slotptr(source, src)->type ) {
963 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
964 amt += sizeof(BtLogHdr);
966 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
972 // fill in key and value
974 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
975 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
980 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
981 memset (eof, 0, sizeof(BtLogHdr));
984 last = mgr->redolast & 0xfff;
987 bt_releasemutex(mgr->redo);
989 msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
990 bt_releasemutex(mgr->redo);
994 // read page into buffer pool from permanent location in Btree file
996 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
998 off64_t off = page_no << mgr->page_bits;
1000 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
1001 fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno);
1004 if( page->page_no != page_no )
1009 int flag = PROT_READ | PROT_WRITE;
1010 uint segment = page_no >> 32;
1011 unsigned char *perm;
1014 if( segment < mgr->segments ) {
1015 perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
1016 memcpy (page, perm, mgr->page_size);
1017 if( page->page_no != page_no )
1023 bt_mutexlock (mgr->maps);
1025 if( segment < mgr->segments ) {
1026 bt_releasemutex (mgr->maps);
1030 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
1033 bt_releasemutex (mgr->maps);
1037 // write page to permanent location in Btree file
1038 // clear the dirty bit
1040 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit)
1042 off64_t off = page_no << mgr->page_bits;
1044 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size ) {
1045 fprintf (stderr, "Unable to write page %d errno = %d\n", page_no, errno);
1051 int flag = PROT_READ | PROT_WRITE;
1052 uint segment = page_no >> 32;
1053 unsigned char *perm;
1056 if( segment < mgr->segments ) {
1057 perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
1058 memcpy (perm, page, mgr->page_size);
1060 msync (perm, mgr->page_size, MS_SYNC);
1065 bt_mutexlock (mgr->maps);
1067 if( segment < mgr->segments ) {
1068 bt_releasemutex (mgr->maps);
1072 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
1073 bt_releasemutex (mgr->maps);
1078 // set CLOCK bit in latch
1079 // decrement pin count
1081 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
1083 bt_mutexlock(latch->modify);
1084 latch->pin |= CLOCK_bit;
1087 bt_releasemutex(latch->modify);
1090 // return the btree cached page address
1092 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
1094 BtPage page = (BtPage)(((uid)latch->entry << mgr->page_bits) + mgr->pagepool);
1099 // return next available latch entry
1100 // and with latch entry locked
1102 uint bt_availnext (BtMgr *mgr)
1109 entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
1111 entry = _InterlockedIncrement (&mgr->latchvictim);
1113 entry %= mgr->latchtotal;
1118 latch = mgr->latchsets + entry;
1120 if( !bt_mutextry(latch->modify) )
1123 // return this entry if it is not pinned
1128 // if the CLOCK bit is set
1129 // reset it to zero.
1131 latch->pin &= ~CLOCK_bit;
1132 bt_releasemutex(latch->modify);
1136 // pin page in buffer pool
1137 // return with latchset pinned
1139 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
1141 uint hashidx = page_no % mgr->latchhash;
1146 // try to find our entry
1148 bt_mutexlock(mgr->hashtable[hashidx].latch);
1150 if( entry = mgr->hashtable[hashidx].entry ) do
1152 latch = mgr->latchsets + entry;
1153 if( page_no == latch->page_no )
1155 } while( entry = latch->next );
1157 // found our entry: increment pin
1160 latch = mgr->latchsets + entry;
1161 bt_mutexlock(latch->modify);
1162 latch->pin |= CLOCK_bit;
1166 bt_releasemutex(latch->modify);
1167 bt_releasemutex(mgr->hashtable[hashidx].latch);
1171 // find and reuse unpinned entry
1175 entry = bt_availnext (mgr);
1176 latch = mgr->latchsets + entry;
1178 idx = latch->page_no % mgr->latchhash;
1180 // if latch is on a different hash chain
1181 // unlink from the old page_no chain
1183 if( latch->page_no )
1184 if( idx != hashidx ) {
1186 // skip over this entry if latch not available
1188 if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1189 bt_releasemutex(latch->modify);
1194 mgr->latchsets[latch->prev].next = latch->next;
1196 mgr->hashtable[idx].entry = latch->next;
1199 mgr->latchsets[latch->next].prev = latch->prev;
1201 bt_releasemutex (mgr->hashtable[idx].latch);
1204 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1206 // update permanent page area in btree from buffer pool
1207 // no read-lock is required since page is not pinned.
1210 if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1211 return mgr->line = __LINE__, NULL;
1216 memcpy (page, contents, mgr->page_size);
1218 } else if( bt_readpage (mgr, page, page_no) )
1219 return mgr->line = __LINE__, NULL;
1221 // link page as head of hash table chain
1222 // if this is a never before used entry,
1223 // or it was previously on a different
1224 // hash table chain. Otherwise, just
1225 // leave it in its current hash table
1228 if( !latch->page_no || hashidx != idx ) {
1229 if( latch->next = mgr->hashtable[hashidx].entry )
1230 mgr->latchsets[latch->next].prev = entry;
1232 mgr->hashtable[hashidx].entry = entry;
1236 // fill in latch structure
1238 latch->pin = CLOCK_bit | 1;
1239 latch->page_no = page_no;
1240 latch->entry = entry;
1243 bt_releasemutex (latch->modify);
1244 bt_releasemutex (mgr->hashtable[hashidx].latch);
1248 void bt_mgrclose (BtMgr *mgr)
1256 // flush previously written dirty pages
1257 // and write recovery buffer to disk
1259 fdatasync (mgr->idx);
1261 if( mgr->redoend ) {
1262 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1263 memset (eof, 0, sizeof(BtLogHdr));
1265 pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1268 // write remaining dirty pool pages to the btree
1270 for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1271 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1272 latch = mgr->latchsets + slot;
1274 if( latch->dirty ) {
1275 bt_writepage(mgr, page, latch->page_no, 0);
1276 latch->dirty = 0, num++;
1280 // flush last batch to disk and clear
1281 // redo recovery buffer on disk.
1283 fdatasync (mgr->idx);
1285 if( mgr->redopages ) {
1286 eof = (BtLogHdr *)mgr->redobuff;
1287 memset (eof, 0, sizeof(BtLogHdr));
1288 eof->lsn = mgr->lsn;
1290 pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1292 sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
1295 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1298 while( mgr->segments )
1299 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1301 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1302 munmap (mgr->pagezero, mgr->page_size);
1304 FlushViewOfFile(mgr->pagezero, 0);
1305 UnmapViewOfFile(mgr->pagezero);
1306 UnmapViewOfFile(mgr->pagepool);
1307 CloseHandle(mgr->halloc);
1308 CloseHandle(mgr->hpool);
1311 free (mgr->redobuff);
1315 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1316 FlushFileBuffers(mgr->idx);
1317 CloseHandle(mgr->idx);
1322 // close and release memory
1324 void bt_close (BtDb *bt)
1331 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1336 // open/create new btree buffer manager
1338 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1339 // size of page pool (e.g. 262144)
1341 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1343 uint lvl, attr, last, slot, idx;
1344 uint nlatchpage, latchhash;
1345 unsigned char value[BtId];
1346 int flag, initit = 0;
1347 BtPageZero *pagezero;
1355 // determine sanity of page size and buffer pool
1357 if( bits > BT_maxbits )
1359 else if( bits < BT_minbits )
1362 mgr = calloc (1, sizeof(BtMgr));
1364 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1366 if( mgr->idx == -1 ) {
1367 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1368 return free(mgr), NULL;
1371 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1372 attr = FILE_ATTRIBUTE_NORMAL;
1373 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1375 if( mgr->idx == INVALID_HANDLE_VALUE ) {
1376 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1377 return GlobalFree(mgr), NULL;
1382 pagezero = valloc (BT_maxpage);
1385 // read minimum page size to get root info
1386 // to support raw disk partition files
1387 // check if bits == 0 on the disk.
1389 if( size = lseek (mgr->idx, 0L, 2) )
1390 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1391 if( pagezero->alloc->bits )
1392 bits = pagezero->alloc->bits;
1396 return free(mgr), free(pagezero), NULL;
1400 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1401 size = GetFileSize(mgr->idx, amt);
1403 if( size || *amt ) {
1404 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1405 return bt_mgrclose (mgr), NULL;
1406 bits = pagezero->alloc->bits;
1411 mgr->page_size = 1 << bits;
1412 mgr->page_bits = bits;
1414 // calculate number of latch hash table entries
1416 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1418 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1419 mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1420 mgr->latchtotal = nodemax;
1425 // initialize an empty b-tree with latch page, root page, page of leaves
1426 // and page(s) of latches and page pool cache
1428 memset (pagezero, 0, 1 << bits);
1429 pagezero->alloc->lvl = MIN_lvl - 1;
1430 pagezero->alloc->bits = mgr->page_bits;
1431 bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
1432 pagezero->activepages = 2;
1434 // initialize left-most LEAF page in
1435 // alloc->left and count of active leaf pages.
1437 bt_putid (pagezero->alloc->left, LEAF_page);
1439 ftruncate (mgr->idx, REDO_page << mgr->page_bits);
1441 if( bt_writepage (mgr, pagezero->alloc, 0, 1) ) {
1442 fprintf (stderr, "Unable to create btree page zero\n");
1443 return bt_mgrclose (mgr), NULL;
1446 memset (pagezero, 0, 1 << bits);
1447 pagezero->alloc->bits = mgr->page_bits;
1449 for( lvl=MIN_lvl; lvl--; ) {
1450 BtSlot *node = slotptr(pagezero->alloc, 1);
1451 node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1452 key = keyptr(pagezero->alloc, 1);
1453 key->len = 2; // create stopper key
1457 bt_putid(value, MIN_lvl - lvl + 1);
1458 val = valptr(pagezero->alloc, 1);
1459 val->len = lvl ? BtId : 0;
1460 memcpy (val->value, value, val->len);
1462 pagezero->alloc->min = node->off;
1463 pagezero->alloc->lvl = lvl;
1464 pagezero->alloc->cnt = 1;
1465 pagezero->alloc->act = 1;
1466 pagezero->alloc->page_no = MIN_lvl - lvl;
1468 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, 1) ) {
1469 fprintf (stderr, "Unable to create btree page\n");
1470 return bt_mgrclose (mgr), NULL;
1478 VirtualFree (pagezero, 0, MEM_RELEASE);
1481 // mlock the pagezero page
1483 flag = PROT_READ | PROT_WRITE;
1484 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1485 if( mgr->pagezero == MAP_FAILED ) {
1486 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1487 return bt_mgrclose (mgr), NULL;
1489 mlock (mgr->pagezero, mgr->page_size);
1491 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1492 if( mgr->pagepool == MAP_FAILED ) {
1493 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1494 return bt_mgrclose (mgr), NULL;
1496 if( mgr->redopages = redopages ) {
1497 ftruncate (mgr->idx, (REDO_page + redopages) << mgr->page_bits);
1498 mgr->redobuff = valloc (redopages * mgr->page_size);
1501 flag = PAGE_READWRITE;
1502 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1503 if( !mgr->halloc ) {
1504 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1505 return bt_mgrclose (mgr), NULL;
1508 flag = FILE_MAP_WRITE;
1509 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1510 if( !mgr->pagezero ) {
1511 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1512 return bt_mgrclose (mgr), NULL;
1515 flag = PAGE_READWRITE;
1516 size = (uid)mgr->nlatchpage << mgr->page_bits;
1517 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1519 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1520 return bt_mgrclose (mgr), NULL;
1523 flag = FILE_MAP_WRITE;
1524 mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1525 if( !mgr->pagepool ) {
1526 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1527 return bt_mgrclose (mgr), NULL;
1529 if( mgr->redopages = redopages )
1530 mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
1533 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1534 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1535 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1540 // open BTree access method
1541 // based on buffer manager
1543 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1545 BtDb *bt = malloc (sizeof(*bt));
1547 memset (bt, 0, sizeof(*bt));
1551 bt->cursor = valloc (mgr->page_size);
1553 bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1556 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1558 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1563 // compare two keys, return > 0, = 0, or < 0
1564 // =0: keys are same
1567 // as the comparison value
1569 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1571 uint len1 = key1->len;
1574 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1585 // place write, read, or parent lock on requested page_no.
1587 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1591 ReadLock (latch->readwr, thread_no);
1594 WriteLock (latch->readwr, thread_no);
1597 ReadLock (latch->access, thread_no);
1600 WriteLock (latch->access, thread_no);
1603 WriteOLock (latch->parent, thread_no);
1606 WriteOLock (latch->atomic, thread_no);
1608 case BtLockAtomic | BtLockRead:
1609 WriteOLock (latch->atomic, thread_no);
1610 ReadLock (latch->readwr, thread_no);
1615 // remove write, read, or parent lock on requested page
1617 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1621 ReadRelease (latch->readwr);
1624 WriteRelease (latch->readwr);
1627 ReadRelease (latch->access);
1630 WriteRelease (latch->access);
1633 WriteORelease (latch->parent);
1636 WriteORelease (latch->atomic);
1638 case BtLockAtomic | BtLockRead:
1639 WriteORelease (latch->atomic);
1640 ReadRelease (latch->readwr);
1645 // allocate a new page
1646 // return with page latched, but unlocked.
1648 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1653 // lock allocation page
1655 bt_mutexlock(mgr->lock);
1657 // use empty chain first
1658 // else allocate empty page
1660 if( page_no = bt_getid(mgr->pagezero->freechain) ) {
1661 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1662 set->page = bt_mappage (mgr, set->latch);
1664 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1666 bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
1667 mgr->pagezero->activepages++;
1669 bt_releasemutex(mgr->lock);
1671 memcpy (set->page, contents, mgr->page_size);
1672 set->latch->dirty = 1;
1676 page_no = bt_getid(mgr->pagezero->alloc->right);
1677 bt_putid(mgr->pagezero->alloc->right, page_no+1);
1679 // unlock allocation latch and
1680 // extend file into new page.
1682 mgr->pagezero->activepages++;
1684 ftruncate (mgr->idx, (uid)(page_no + 1) << mgr->page_bits);
1685 bt_releasemutex(mgr->lock);
1687 // don't load cache from btree page, load it from contents
1689 contents->page_no = page_no;
1691 if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1692 set->page = bt_mappage (mgr, set->latch);
1699 // find slot in page for given key at a given level
1701 int bt_findslot (BtPage page, unsigned char *key, uint len)
1703 uint diff, higher = page->cnt, low = 1, slot;
1706 // make stopper key an infinite fence value
1708 if( bt_getid (page->right) )
1713 // low is the lowest candidate.
1714 // loop ends when they meet
1716 // higher is already
1717 // tested as .ge. the passed key.
1719 while( diff = higher - low ) {
1720 slot = low + ( diff >> 1 );
1721 if( keycmp (keyptr(page, slot), key, len) < 0 )
1724 higher = slot, good++;
1727 // return zero if key is on right link page
1729 return good ? higher : 0;
1732 // find and load page at given level for given key
1733 // leave page rd or wr locked as requested
1735 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1737 uid page_no = ROOT_page, prevpage = 0;
1738 uint drill = 0xff, slot;
1739 BtLatchSet *prevlatch;
1740 uint mode, prevmode;
1743 // start at root of btree and drill down
1746 // determine lock mode of drill level
1747 mode = (drill == lvl) ? lock : BtLockRead;
1749 if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1752 // obtain access lock using lock chaining with Access mode
1754 if( page_no > ROOT_page )
1755 bt_lockpage(BtLockAccess, set->latch, thread_no);
1757 set->page = bt_mappage (mgr, set->latch);
1759 // release & unpin parent or left sibling page
1762 bt_unlockpage(prevmode, prevlatch);
1763 bt_unpinlatch (mgr, prevlatch);
1767 // obtain mode lock using lock chaining through AccessLock
1769 bt_lockpage(mode, set->latch, thread_no);
1771 if( set->page->free )
1772 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1774 if( page_no > ROOT_page )
1775 bt_unlockpage(BtLockAccess, set->latch);
1777 // re-read and re-lock root after determining actual level of root
1779 if( set->page->lvl != drill) {
1780 if( set->latch->page_no != ROOT_page )
1781 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1783 drill = set->page->lvl;
1785 if( lock != BtLockRead && drill == lvl ) {
1786 bt_unlockpage(mode, set->latch);
1787 bt_unpinlatch (mgr, set->latch);
1792 prevpage = set->latch->page_no;
1793 prevlatch = set->latch;
1796 // find key on page at this level
1797 // and descend to requested level
1799 if( !set->page->kill )
1800 if( slot = bt_findslot (set->page, key, len) ) {
1804 // find next non-dead slot -- the fence key if nothing else
1806 while( slotptr(set->page, slot)->dead )
1807 if( slot++ < set->page->cnt )
1810 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1812 val = valptr(set->page, slot);
1814 if( val->len == BtId )
1815 page_no = bt_getid(valptr(set->page, slot)->value);
1817 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1823 // slide right into next page
1825 page_no = bt_getid(set->page->right);
1828 // return error on end of right chain
1830 mgr->line = __LINE__, mgr->err = BTERR_struct;
1831 return 0; // return error
1834 // return page to free list
1835 // page must be delete & write locked
1837 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1839 // lock allocation page
1841 bt_mutexlock (mgr->lock);
1845 memcpy(set->page->right, mgr->pagezero->freechain, BtId);
1846 bt_putid(mgr->pagezero->freechain, set->latch->page_no);
1847 set->latch->dirty = 1;
1848 set->page->free = 1;
1850 // decrement active page count
1851 // and unlock allocation page
1853 mgr->pagezero->activepages--;
1854 bt_releasemutex (mgr->lock);
1856 // unlock released page
1858 bt_unlockpage (BtLockDelete, set->latch);
1859 bt_unlockpage (BtLockWrite, set->latch);
1860 bt_unpinlatch (mgr, set->latch);
1863 // a fence key was deleted from a page
1864 // push new fence value upwards
1866 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1868 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1869 unsigned char value[BtId];
1873 // remove the old fence value
1875 ptr = keyptr(set->page, set->page->cnt);
1876 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1877 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1878 set->latch->dirty = 1;
1880 // cache new fence value
1882 ptr = keyptr(set->page, set->page->cnt);
1883 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1885 bt_lockpage (BtLockParent, set->latch, thread_no);
1886 bt_unlockpage (BtLockWrite, set->latch);
1888 // insert new (now smaller) fence key
1890 bt_putid (value, set->latch->page_no);
1891 ptr = (BtKey*)leftkey;
1893 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1896 // now delete old fence key
1898 ptr = (BtKey*)rightkey;
1900 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1903 bt_unlockpage (BtLockParent, set->latch);
1904 bt_unpinlatch(mgr, set->latch);
1908 // root has a single child
1909 // collapse a level from the tree
1911 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1918 // find the child entry and promote as new root contents
1921 for( idx = 0; idx++ < root->page->cnt; )
1922 if( !slotptr(root->page, idx)->dead )
1925 val = valptr(root->page, idx);
1927 if( val->len == BtId )
1928 page_no = bt_getid (valptr(root->page, idx)->value);
1930 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1932 if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1933 child->page = bt_mappage (mgr, child->latch);
1937 bt_lockpage (BtLockDelete, child->latch, thread_no);
1938 bt_lockpage (BtLockWrite, child->latch, thread_no);
1940 memcpy (root->page, child->page, mgr->page_size);
1941 root->latch->dirty = 1;
1943 bt_freepage (mgr, child);
1945 } while( root->page->lvl > 1 && root->page->act == 1 );
1947 bt_unlockpage (BtLockWrite, root->latch);
1948 bt_unpinlatch (mgr, root->latch);
1952 // delete a page and manage keys
1953 // call with page writelocked
1955 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1957 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1958 unsigned char value[BtId];
1959 uint lvl = set->page->lvl;
1964 // cache copy of fence key
1965 // to remove in parent
1967 ptr = keyptr(set->page, set->page->cnt);
1968 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1970 // obtain lock on right page
1972 page_no = bt_getid(set->page->right);
1974 if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1975 right->page = bt_mappage (mgr, right->latch);
1979 bt_lockpage (BtLockWrite, right->latch, thread_no);
1981 // cache copy of key to update
1983 ptr = keyptr(right->page, right->page->cnt);
1984 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1986 if( right->page->kill )
1987 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1989 // pull contents of right peer into our empty page
1991 memcpy (set->page, right->page, mgr->page_size);
1992 set->latch->dirty = 1;
1994 // mark right page deleted and point it to left page
1995 // until we can post parent updates that remove access
1996 // to the deleted page.
1998 bt_putid (right->page->right, set->latch->page_no);
1999 right->latch->dirty = 1;
2000 right->page->kill = 1;
2002 bt_lockpage (BtLockParent, right->latch, thread_no);
2003 bt_unlockpage (BtLockWrite, right->latch);
2005 bt_lockpage (BtLockParent, set->latch, thread_no);
2006 bt_unlockpage (BtLockWrite, set->latch);
2008 // redirect higher key directly to our new node contents
2010 bt_putid (value, set->latch->page_no);
2011 ptr = (BtKey*)higherfence;
2013 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2016 // delete old lower key to our node
2018 ptr = (BtKey*)lowerfence;
2020 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
2023 // obtain delete and write locks to right node
2025 bt_unlockpage (BtLockParent, right->latch);
2026 bt_lockpage (BtLockDelete, right->latch, thread_no);
2027 bt_lockpage (BtLockWrite, right->latch, thread_no);
2028 bt_freepage (mgr, right);
2030 bt_unlockpage (BtLockParent, set->latch);
2034 // find and delete key on page by marking delete flag bit
2035 // if page becomes empty, delete it from the btree
2037 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2039 uint slot, idx, found, fence;
2045 if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2046 node = slotptr(set->page, slot);
2047 ptr = keyptr(set->page, slot);
2051 // if librarian slot, advance to real slot
2053 if( node->type == Librarian ) {
2054 ptr = keyptr(set->page, ++slot);
2055 node = slotptr(set->page, slot);
2058 fence = slot == set->page->cnt;
2060 // delete the key, ignore request if already dead
2062 if( found = !keycmp (ptr, key, len) )
2063 if( found = node->dead == 0 ) {
2064 val = valptr(set->page,slot);
2065 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2068 // mark node type as delete
2070 node->type = Delete;
2073 // collapse empty slots beneath the fence
2074 // on interiour nodes
2077 while( idx = set->page->cnt - 1 )
2078 if( slotptr(set->page, idx)->dead ) {
2079 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2080 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2085 // did we delete a fence key in an upper level?
2087 if( found && lvl && set->page->act && fence )
2088 if( bt_fixfence (mgr, set, lvl, thread_no) )
2093 // do we need to collapse root?
2095 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
2096 if( bt_collapseroot (mgr, set, thread_no) )
2101 // delete empty page
2103 if( !set->page->act ) {
2104 if( bt_deletepage (mgr, set, thread_no) )
2107 set->latch->dirty = 1;
2108 bt_unlockpage(BtLockWrite, set->latch);
2111 bt_unpinlatch (mgr, set->latch);
2115 // advance to next slot
2117 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2119 BtLatchSet *prevlatch;
2122 if( slot < set->page->cnt )
2125 prevlatch = set->latch;
2127 if( page_no = bt_getid(set->page->right) )
2128 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2129 set->page = bt_mappage (bt->mgr, set->latch);
2133 return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2135 // obtain access lock using lock chaining with Access mode
2137 bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2139 bt_unlockpage(BtLockRead, prevlatch);
2140 bt_unpinlatch (bt->mgr, prevlatch);
2142 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2143 bt_unlockpage(BtLockAccess, set->latch);
2147 // find unique key == given key, or first duplicate key in
2148 // leaf level and return number of value bytes
2149 // or (-1) if not found.
2151 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2159 if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2161 ptr = keyptr(set->page, slot);
2163 // skip librarian slot place holder
2165 if( slotptr(set->page, slot)->type == Librarian )
2166 ptr = keyptr(set->page, ++slot);
2168 // return actual key found
2170 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2173 if( slotptr(set->page, slot)->type == Duplicate )
2176 // not there if we reach the stopper key
2178 if( slot == set->page->cnt )
2179 if( !bt_getid (set->page->right) )
2182 // if key exists, return >= 0 value bytes copied
2183 // otherwise return (-1)
2185 if( slotptr(set->page, slot)->dead )
2189 if( !memcmp (ptr->key, key, len) ) {
2190 val = valptr (set->page,slot);
2191 if( valmax > val->len )
2193 memcpy (value, val->value, valmax);
2199 } while( slot = bt_findnext (bt, set, slot) );
2201 bt_unlockpage (BtLockRead, set->latch);
2202 bt_unpinlatch (bt->mgr, set->latch);
2206 // check page for space available,
2207 // clean if necessary and return
2208 // 0 - page needs splitting
2209 // >0 new slot value
2211 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2213 BtPage page = set->page, frame;
2214 uint nxt = mgr->page_size;
2215 uint cnt = 0, idx = 0;
2216 uint max = page->cnt;
2221 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2224 // skip cleanup and proceed to split
2225 // if there's not enough garbage
2228 if( page->garbage < nxt / 5 )
2231 frame = malloc (mgr->page_size);
2232 memcpy (frame, page, mgr->page_size);
2234 // skip page info and set rest of page to zero
2236 memset (page+1, 0, mgr->page_size - sizeof(*page));
2237 set->latch->dirty = 1;
2242 // clean up page first by
2243 // removing deleted keys
2245 while( cnt++ < max ) {
2249 if( cnt < max || frame->lvl )
2250 if( slotptr(frame,cnt)->dead )
2253 // copy the value across
2255 val = valptr(frame, cnt);
2256 nxt -= val->len + sizeof(BtVal);
2257 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2259 // copy the key across
2261 key = keyptr(frame, cnt);
2262 nxt -= key->len + sizeof(BtKey);
2263 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2265 // make a librarian slot
2267 slotptr(page, ++idx)->off = nxt;
2268 slotptr(page, idx)->type = Librarian;
2269 slotptr(page, idx)->dead = 1;
2273 slotptr(page, ++idx)->off = nxt;
2274 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2276 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2284 // see if page has enough space now, or does it need splitting?
2286 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2292 // split the root and raise the height of the btree
2294 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2296 unsigned char leftkey[BT_keyarray];
2297 uint nxt = mgr->page_size;
2298 unsigned char value[BtId];
2304 // save left page fence key for new root
2306 ptr = keyptr(root->page, root->page->cnt);
2307 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2309 // Obtain an empty page to use, and copy the current
2310 // root contents into it, e.g. lower keys
2312 if( bt_newpage(mgr, left, root->page, page_no) )
2315 left_page_no = left->latch->page_no;
2316 bt_unpinlatch (mgr, left->latch);
2318 // preserve the page info at the bottom
2319 // of higher keys and set rest to zero
2321 memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2323 // insert stopper key at top of newroot page
2324 // and increase the root height
2326 nxt -= BtId + sizeof(BtVal);
2327 bt_putid (value, right->page_no);
2328 val = (BtVal *)((unsigned char *)root->page + nxt);
2329 memcpy (val->value, value, BtId);
2332 nxt -= 2 + sizeof(BtKey);
2333 slotptr(root->page, 2)->off = nxt;
2334 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2339 // insert lower keys page fence key on newroot page as first key
2341 nxt -= BtId + sizeof(BtVal);
2342 bt_putid (value, left_page_no);
2343 val = (BtVal *)((unsigned char *)root->page + nxt);
2344 memcpy (val->value, value, BtId);
2347 ptr = (BtKey *)leftkey;
2348 nxt -= ptr->len + sizeof(BtKey);
2349 slotptr(root->page, 1)->off = nxt;
2350 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2352 bt_putid(root->page->right, 0);
2353 root->page->min = nxt; // reset lowest used offset and key count
2354 root->page->cnt = 2;
2355 root->page->act = 2;
2358 mgr->pagezero->alloc->lvl = root->page->lvl;
2360 // release and unpin root pages
2362 bt_unlockpage(BtLockWrite, root->latch);
2363 bt_unpinlatch (mgr, root->latch);
2365 bt_unpinlatch (mgr, right);
2369 // split already locked full node
2371 // return pool entry for new right
2374 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2376 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2377 BtPage frame = malloc (mgr->page_size);
2378 uint lvl = set->page->lvl;
2385 // split higher half of keys to frame
2387 memset (frame, 0, mgr->page_size);
2388 max = set->page->cnt;
2392 while( cnt++ < max ) {
2393 if( cnt < max || set->page->lvl )
2394 if( slotptr(set->page, cnt)->dead )
2397 src = valptr(set->page, cnt);
2398 nxt -= src->len + sizeof(BtVal);
2399 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2401 key = keyptr(set->page, cnt);
2402 nxt -= key->len + sizeof(BtKey);
2403 ptr = (BtKey*)((unsigned char *)frame + nxt);
2404 memcpy (ptr, key, key->len + sizeof(BtKey));
2406 // add librarian slot
2408 slotptr(frame, ++idx)->off = nxt;
2409 slotptr(frame, idx)->type = Librarian;
2410 slotptr(frame, idx)->dead = 1;
2414 slotptr(frame, ++idx)->off = nxt;
2415 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2417 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2421 frame->bits = mgr->page_bits;
2428 if( set->latch->page_no > ROOT_page )
2429 bt_putid (frame->right, bt_getid (set->page->right));
2431 // get new free page and write higher keys to it.
2433 if( bt_newpage(mgr, right, frame, thread_no) )
2436 // process lower keys
2438 memcpy (frame, set->page, mgr->page_size);
2439 memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2440 set->latch->dirty = 1;
2442 nxt = mgr->page_size;
2443 set->page->garbage = 0;
2449 if( slotptr(frame, max)->type == Librarian )
2452 // assemble page of smaller keys
2454 while( cnt++ < max ) {
2455 if( slotptr(frame, cnt)->dead )
2457 val = valptr(frame, cnt);
2458 nxt -= val->len + sizeof(BtVal);
2459 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2461 key = keyptr(frame, cnt);
2462 nxt -= key->len + sizeof(BtKey);
2463 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2465 // add librarian slot
2467 slotptr(set->page, ++idx)->off = nxt;
2468 slotptr(set->page, idx)->type = Librarian;
2469 slotptr(set->page, idx)->dead = 1;
2473 slotptr(set->page, ++idx)->off = nxt;
2474 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2478 bt_putid(set->page->right, right->latch->page_no);
2479 set->page->min = nxt;
2480 set->page->cnt = idx;
2482 return right->latch->entry;
2485 // fix keys for newly split page
2486 // call with page locked, return
2489 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2491 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2492 unsigned char value[BtId];
2493 uint lvl = set->page->lvl;
2497 // if current page is the root page, split it
2499 if( set->latch->page_no == ROOT_page )
2500 return bt_splitroot (mgr, set, right, thread_no);
2502 ptr = keyptr(set->page, set->page->cnt);
2503 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2505 page = bt_mappage (mgr, right);
2507 ptr = keyptr(page, page->cnt);
2508 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2510 // insert new fences in their parent pages
2512 bt_lockpage (BtLockParent, right, thread_no);
2514 bt_lockpage (BtLockParent, set->latch, thread_no);
2515 bt_unlockpage (BtLockWrite, set->latch);
2517 // insert new fence for reformulated left block of smaller keys
2519 bt_putid (value, set->latch->page_no);
2520 ptr = (BtKey *)leftkey;
2522 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2525 // switch fence for right block of larger keys to new right page
2527 bt_putid (value, right->page_no);
2528 ptr = (BtKey *)rightkey;
2530 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2533 bt_unlockpage (BtLockParent, set->latch);
2534 bt_unpinlatch (mgr, set->latch);
2536 bt_unlockpage (BtLockParent, right);
2537 bt_unpinlatch (mgr, right);
2541 // install new key and value onto page
2542 // page must already be checked for
2545 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2547 uint idx, librarian;
2552 // if found slot > desired slot and previous slot
2553 // is a librarian slot, use it
2556 if( slotptr(set->page, slot-1)->type == Librarian )
2559 // copy value onto page
2561 set->page->min -= vallen + sizeof(BtVal);
2562 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2563 memcpy (val->value, value, vallen);
2566 // copy key onto page
2568 set->page->min -= keylen + sizeof(BtKey);
2569 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2570 memcpy (ptr->key, key, keylen);
2573 // find first empty slot
2575 for( idx = slot; idx < set->page->cnt; idx++ )
2576 if( slotptr(set->page, idx)->dead )
2579 // now insert key into array before slot
2581 if( idx == set->page->cnt )
2582 idx += 2, set->page->cnt += 2, librarian = 2;
2586 set->latch->dirty = 1;
2589 while( idx > slot + librarian - 1 )
2590 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2592 // add librarian slot
2594 if( librarian > 1 ) {
2595 node = slotptr(set->page, slot++);
2596 node->off = set->page->min;
2597 node->type = Librarian;
2603 node = slotptr(set->page, slot);
2604 node->off = set->page->min;
2609 bt_unlockpage (BtLockWrite, set->latch);
2610 bt_unpinlatch (mgr, set->latch);
2616 // Insert new key into the btree at given level.
2617 // either add a new key or update/add an existing one
2619 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2621 uint slot, idx, len, entry;
2627 while( 1 ) { // find the page and slot for the current key
2628 if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2629 node = slotptr(set->page, slot);
2630 ptr = keyptr(set->page, slot);
2633 mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2637 // if librarian slot == found slot, advance to real slot
2639 if( node->type == Librarian )
2640 if( !keycmp (ptr, key, keylen) ) {
2641 ptr = keyptr(set->page, ++slot);
2642 node = slotptr(set->page, slot);
2645 // if inserting a duplicate key or unique
2646 // key that doesn't exist on the page,
2647 // check for adequate space on the page
2648 // and insert the new key before slot.
2653 if( keycmp (ptr, key, keylen) )
2654 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2655 return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
2656 else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2658 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2663 // if key already exists, update value and return
2665 val = valptr(set->page, slot);
2667 if( val->len >= vallen ) {
2668 if( slotptr(set->page, slot)->dead )
2673 set->page->garbage += val->len - vallen;
2674 set->latch->dirty = 1;
2676 memcpy (val->value, value, vallen);
2677 bt_unlockpage(BtLockWrite, set->latch);
2678 bt_unpinlatch (mgr, set->latch);
2682 // new update value doesn't fit in existing value area
2683 // make sure page has room
2686 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2693 if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2694 if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2696 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2701 // copy key and value onto page and update slot
2703 set->page->min -= vallen + sizeof(BtVal);
2704 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2705 memcpy (val->value, value, vallen);
2708 set->latch->dirty = 1;
2709 set->page->min -= keylen + sizeof(BtKey);
2710 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2711 memcpy (ptr->key, key, keylen);
2714 node->off = set->page->min;
2715 bt_unlockpage(BtLockWrite, set->latch);
2716 bt_unpinlatch (mgr, set->latch);
2724 logseqno reqlsn; // redo log seq no required
2725 logseqno lsn; // redo log sequence number
2726 uint entry; // latch table entry number
2727 uint slot:31; // page slot number
2728 uint reuse:1; // reused previous page
2732 uid page_no; // page number for split leaf
2733 void *next; // next key to insert
2734 uint entry:29; // latch table entry number
2735 uint type:2; // 0 == insert, 1 == delete, 2 == free
2736 uint nounlock:1; // don't unlock ParentModification
2737 unsigned char leafkey[BT_keyarray];
2740 // determine actual page where key is located
2741 // return slot number
2743 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2745 BtKey *key = keyptr(source,src);
2746 uint slot = locks[src].slot;
2749 if( src > 1 && locks[src].reuse )
2750 entry = locks[src-1].entry, slot = 0;
2752 entry = locks[src].entry;
2755 set->latch = mgr->latchsets + entry;
2756 set->page = bt_mappage (mgr, set->latch);
2760 // is locks->reuse set? or was slot zeroed?
2761 // if so, find where our key is located
2762 // on current page or pages split on
2763 // same page txn operations.
2766 set->latch = mgr->latchsets + entry;
2767 set->page = bt_mappage (mgr, set->latch);
2769 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2770 if( slotptr(set->page, slot)->type == Librarian )
2772 if( locks[src].reuse )
2773 locks[src].entry = entry;
2776 } while( entry = set->latch->split );
2778 mgr->line = __LINE__, mgr->err = BTERR_atomic;
2782 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
2784 BtKey *key = keyptr(source, src);
2785 BtVal *val = valptr(source, src);
2790 while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2791 if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2792 if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2794 set->page->lsn = locks[src].lsn;
2798 if( entry = bt_splitpage (mgr, set, thread_no) )
2799 latch = mgr->latchsets + entry;
2803 // splice right page into split chain
2804 // and WriteLock it.
2806 bt_lockpage(BtLockWrite, latch, thread_no);
2807 latch->split = set->latch->split;
2808 set->latch->split = entry;
2809 locks[src].slot = 0;
2812 return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2815 // perform delete from smaller btree
2816 // insert a delete slot if not found there
2818 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
2820 BtKey *key = keyptr(source, src);
2827 if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2828 node = slotptr(set->page, slot);
2829 ptr = keyptr(set->page, slot);
2830 val = valptr(set->page, slot);
2832 return mgr->line = __LINE__, mgr->err = BTERR_struct;
2834 // if slot is not found, insert a delete slot
2836 if( keycmp (ptr, key->key, key->len) )
2837 return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
2839 // if node is already dead,
2840 // ignore the request.
2845 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2846 set->latch->dirty = 1;
2847 set->page->lsn = locks[src].lsn;
2855 // delete an empty master page for a transaction
2857 // note that the far right page never empties because
2858 // it always contains (at least) the infinite stopper key
2859 // and that all pages that don't contain any keys are
2860 // deleted, or are being held under Atomic lock.
2862 BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no)
2864 BtPageSet right[1], temp[1];
2865 unsigned char value[BtId];
2869 bt_lockpage(BtLockWrite, prev->latch, thread_no);
2871 // grab the right sibling
2873 if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) )
2874 right->page = bt_mappage (mgr, right->latch);
2878 bt_lockpage(BtLockAtomic, right->latch, thread_no);
2879 bt_lockpage(BtLockWrite, right->latch, thread_no);
2881 // and pull contents over empty page
2882 // while preserving master's left link
2884 memcpy (right->page->left, prev->page->left, BtId);
2885 memcpy (prev->page, right->page, mgr->page_size);
2887 // forward seekers to old right sibling
2888 // to new page location in set
2890 bt_putid (right->page->right, prev->latch->page_no);
2891 right->latch->dirty = 1;
2892 right->page->kill = 1;
2894 // remove pointer to right page for searchers by
2895 // changing right fence key to point to the master page
2897 ptr = keyptr(right->page,right->page->cnt);
2898 bt_putid (value, prev->latch->page_no);
2900 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
2903 // now that master page is in good shape we can
2904 // remove its locks.
2906 bt_unlockpage (BtLockAtomic, prev->latch);
2907 bt_unlockpage (BtLockWrite, prev->latch);
2909 // fix master's right sibling's left pointer
2910 // to remove scanner's poiner to the right page
2912 if( right_page_no = bt_getid (prev->page->right) ) {
2913 if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
2914 temp->page = bt_mappage (mgr, temp->latch);
2918 bt_lockpage (BtLockWrite, temp->latch, thread_no);
2919 bt_putid (temp->page->left, prev->latch->page_no);
2920 temp->latch->dirty = 1;
2922 bt_unlockpage (BtLockWrite, temp->latch);
2923 bt_unpinlatch (mgr, temp->latch);
2924 } else { // master is now the far right page
2925 bt_mutexlock (mgr->lock);
2926 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
2927 bt_releasemutex(mgr->lock);
2930 // now that there are no pointers to the right page
2931 // we can delete it after the last read access occurs
2933 bt_unlockpage (BtLockWrite, right->latch);
2934 bt_unlockpage (BtLockAtomic, right->latch);
2935 bt_lockpage (BtLockDelete, right->latch, thread_no);
2936 bt_lockpage (BtLockWrite, right->latch, thread_no);
2937 bt_freepage (mgr, right);
2941 // atomic modification of a batch of keys.
2943 // return -1 if BTERR is set
2944 // otherwise return slot number
2945 // causing the key constraint violation
2946 // or zero on successful completion.
2948 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2950 uint src, idx, slot, samepage, entry, que = 0;
2951 AtomicKey *head, *tail, *leaf;
2952 BtPageSet set[1], prev[1];
2953 unsigned char value[BtId];
2954 BtKey *key, *ptr, *key2;
2965 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2969 // stable sort the list of keys into order to
2970 // prevent deadlocks between threads.
2972 for( src = 1; src++ < source->cnt; ) {
2973 *temp = *slotptr(source,src);
2974 key = keyptr (source,src);
2976 for( idx = src; --idx; ) {
2977 key2 = keyptr (source,idx);
2978 if( keycmp (key, key2->key, key2->len) < 0 ) {
2979 *slotptr(source,idx+1) = *slotptr(source,idx);
2980 *slotptr(source,idx) = *temp;
2986 // add entries to redo log
2988 if( bt->mgr->redopages )
2989 if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
2990 for( src = 0; src++ < source->cnt; )
2991 locks[src].lsn = lsn;
2993 return bt->mgr->err;
2995 // Load the leaf page for each key
2996 // group same page references with reuse bit
2997 // and determine any constraint violations
2999 for( src = 0; src++ < source->cnt; ) {
3000 key = keyptr(source, src);
3003 // first determine if this modification falls
3004 // on the same page as the previous modification
3005 // note that the far right leaf page is a special case
3007 if( samepage = src > 1 )
3008 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
3009 slot = bt_findslot(set->page, key->key, key->len);
3011 bt_unlockpage(BtLockRead, set->latch);
3014 if( slot = bt_loadpage(bt->mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, bt->thread_no) )
3015 set->latch->split = 0;
3017 return bt->mgr->err;
3019 if( slotptr(set->page, slot)->type == Librarian )
3020 ptr = keyptr(set->page, ++slot);
3022 ptr = keyptr(set->page, slot);
3025 locks[src].entry = set->latch->entry;
3026 locks[src].slot = slot;
3027 locks[src].reuse = 0;
3029 locks[src].entry = 0;
3030 locks[src].slot = 0;
3031 locks[src].reuse = 1;
3034 // capture current lsn for master page
3036 locks[src].reqlsn = set->page->lsn;
3039 // unlock last loadpage lock
3042 bt_unlockpage(BtLockRead, set->latch);
3044 // obtain write lock for each master page
3045 // sync flushed pages to disk
3047 for( src = 0; src++ < source->cnt; ) {
3048 if( locks[src].reuse )
3051 set->latch = bt->mgr->latchsets + locks[src].entry;
3052 bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3055 // insert or delete each key
3056 // process any splits or merges
3057 // release Write & Atomic latches
3058 // set ParentModifications and build
3059 // queue of keys to insert for split pages
3060 // or delete for deleted pages.
3062 // run through txn list backwards
3064 samepage = source->cnt + 1;
3066 for( src = source->cnt; src; src-- ) {
3067 if( locks[src].reuse )
3070 // perform the txn operations
3071 // from smaller to larger on
3074 for( idx = src; idx < samepage; idx++ )
3075 switch( slotptr(source,idx)->type ) {
3077 if( bt_atomicdelete (bt->mgr, source, locks, idx, bt->thread_no) )
3078 return bt->mgr->err;
3083 if( bt_atomicinsert (bt->mgr, source, locks, idx, bt->thread_no) )
3084 return bt->mgr->err;
3088 // after the same page operations have finished,
3089 // process master page for splits or deletion.
3091 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
3092 prev->page = bt_mappage (bt->mgr, prev->latch);
3095 // pick-up all splits from master page
3096 // each one is already WriteLocked.
3098 entry = prev->latch->split;
3101 set->latch = bt->mgr->latchsets + entry;
3102 set->page = bt_mappage (bt->mgr, set->latch);
3103 entry = set->latch->split;
3105 // delete empty master page by undoing its split
3106 // (this is potentially another empty page)
3107 // note that there are no new left pointers yet
3109 if( !prev->page->act ) {
3110 memcpy (set->page->left, prev->page->left, BtId);
3111 memcpy (prev->page, set->page, bt->mgr->page_size);
3112 bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
3113 bt_freepage (bt->mgr, set);
3115 prev->latch->split = set->latch->split;
3116 prev->latch->dirty = 1;
3120 // remove empty page from the split chain
3121 // and return it to the free list.
3123 if( !set->page->act ) {
3124 memcpy (prev->page->right, set->page->right, BtId);
3125 prev->latch->split = set->latch->split;
3126 bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
3127 bt_freepage (bt->mgr, set);
3131 // schedule prev fence key update
3133 ptr = keyptr(prev->page,prev->page->cnt);
3134 leaf = calloc (sizeof(AtomicKey), 1), que++;
3136 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3137 leaf->page_no = prev->latch->page_no;
3138 leaf->entry = prev->latch->entry;
3148 // splice in the left link into the split page
3150 bt_putid (set->page->left, prev->latch->page_no);
3151 bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
3152 bt_unlockpage(BtLockWrite, prev->latch);
3156 // update left pointer in next right page from last split page
3157 // (if all splits were reversed, latch->split == 0)
3159 if( latch->split ) {
3160 // fix left pointer in master's original (now split)
3161 // far right sibling or set rightmost page in page zero
3163 if( right = bt_getid (prev->page->right) ) {
3164 if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3165 set->page = bt_mappage (bt->mgr, set->latch);
3167 return bt->mgr->err;
3169 bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3170 bt_putid (set->page->left, prev->latch->page_no);
3171 set->latch->dirty = 1;
3172 bt_unlockpage (BtLockWrite, set->latch);
3173 bt_unpinlatch (bt->mgr, set->latch);
3174 } else { // prev is rightmost page
3175 bt_mutexlock (bt->mgr->lock);
3176 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
3177 bt_releasemutex(bt->mgr->lock);
3180 // Process last page split in chain
3182 ptr = keyptr(prev->page,prev->page->cnt);
3183 leaf = calloc (sizeof(AtomicKey), 1), que++;
3185 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3186 leaf->page_no = prev->latch->page_no;
3187 leaf->entry = prev->latch->entry;
3197 bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
3198 bt_unlockpage(BtLockWrite, prev->latch);
3200 // remove atomic lock on master page
3202 bt_unlockpage(BtLockAtomic, latch);
3206 // finished if prev page occupied (either master or final split)
3208 if( prev->page->act ) {
3209 bt_unlockpage(BtLockWrite, latch);
3210 bt_unlockpage(BtLockAtomic, latch);
3211 bt_unpinlatch(bt->mgr, latch);
3215 // any and all splits were reversed, and the
3216 // master page located in prev is empty, delete it
3217 // by pulling over master's right sibling.
3219 // Remove empty master's fence key
3221 ptr = keyptr(prev->page,prev->page->cnt);
3223 if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3224 return bt->mgr->err;
3226 // perform the remainder of the delete
3227 // from the FIFO queue
3229 leaf = calloc (sizeof(AtomicKey), 1), que++;
3231 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3232 leaf->page_no = prev->latch->page_no;
3233 leaf->entry = prev->latch->entry;
3244 // leave atomic lock in place until
3245 // deletion completes in next phase.
3247 bt_unlockpage(BtLockWrite, prev->latch);
3250 // add & delete keys for any pages split or merged during transaction
3254 set->latch = bt->mgr->latchsets + leaf->entry;
3255 set->page = bt_mappage (bt->mgr, set->latch);
3257 bt_putid (value, leaf->page_no);
3258 ptr = (BtKey *)leaf->leafkey;
3260 switch( leaf->type ) {
3261 case 0: // insert key
3262 if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, Unique, bt->thread_no) )
3263 return bt->mgr->err;
3267 case 1: // delete key
3268 if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3269 return bt->mgr->err;
3273 case 2: // free page
3274 if( bt_atomicfree (bt->mgr, set, bt->thread_no) )
3275 return bt->mgr->err;
3280 if( !leaf->nounlock )
3281 bt_unlockpage (BtLockParent, set->latch);
3283 bt_unpinlatch (bt->mgr, set->latch);
3286 } while( leaf = tail );
3288 // if number of active pages
3289 // is greater than the buffer pool
3290 // promote page into larger btree
3292 while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
3293 if( bt_txnpromote (bt) )
3294 return bt->mgr->err;
3302 // promote a page into the larger btree
3304 BTERR bt_txnpromote (BtDb *bt)
3306 uint entry, slot, idx;
3314 entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
3316 entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
3318 entry %= bt->mgr->latchtotal;
3323 set->latch = bt->mgr->latchsets + entry;
3324 idx = set->latch->page_no % bt->mgr->latchhash;
3326 if( !bt_mutextry(set->latch->modify) )
3329 // if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
3330 // bt_releasemutex(set->latch->modify);
3334 // skip this entry if it is pinned
3336 if( set->latch->pin & ~CLOCK_bit ) {
3337 bt_releasemutex(set->latch->modify);
3338 // bt_releasemutex(bt->mgr->hashtable[idx].latch);
3342 set->page = bt_mappage (bt->mgr, set->latch);
3344 // entry never used or has no right sibling
3346 if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3347 bt_releasemutex(set->latch->modify);
3348 // bt_releasemutex(bt->mgr->hashtable[idx].latch);
3352 bt_lockpage (BtLockAccess, set->latch, bt->thread_no);
3353 bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3354 bt_unlockpage(BtLockAccess, set->latch);
3356 // entry interiour node or being or was killed
3358 if( set->page->lvl || set->page->free || set->page->kill ) {
3359 bt_releasemutex(set->latch->modify);
3360 // bt_releasemutex(bt->mgr->hashtable[idx].latch);
3361 bt_unlockpage(BtLockWrite, set->latch);
3365 // pin the page for our useage
3368 bt_releasemutex(set->latch->modify);
3369 // bt_releasemutex(bt->mgr->hashtable[idx].latch);
3371 // if page is dirty, then
3372 // sync it to the disk first.
3374 if( set->latch->dirty )
3375 if( bt->mgr->err = bt_writepage (bt->mgr, set->page, set->latch->page_no, 1) )
3376 return bt->mgr->line = __LINE__, bt->mgr->err;
3378 set->latch->dirty = 0;
3380 // transfer slots in our selected page to larger btree
3381 if( !(set->latch->page_no % 100) )
3382 fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cnt);
3384 for( slot = 0; slot++ < set->page->cnt; ) {
3385 ptr = keyptr (set->page, slot);
3386 val = valptr (set->page, slot);
3387 node = slotptr(set->page, slot);
3389 switch( node->type ) {
3392 if( bt_insertkey (bt->main, ptr->key, ptr->len, 0, val->value, val->len, node->type, bt->thread_no) )
3393 return bt->main->err;
3398 if( bt_deletekey (bt->main, ptr->key, ptr->len, 0, bt->thread_no) )
3399 return bt->main->err;
3405 // now delete the page
3407 if( bt_deletepage (bt->mgr, set, bt->thread_no) )
3408 return bt->mgr->err;
3410 bt_unpinlatch (bt->mgr, set->latch);
3415 // set cursor to highest slot on highest page
3417 uint bt_lastkey (BtDb *bt)
3419 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3422 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3423 set->page = bt_mappage (bt->mgr, set->latch);
3427 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3428 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3429 bt_unlockpage(BtLockRead, set->latch);
3430 bt_unpinlatch (bt->mgr, set->latch);
3431 return bt->cursor->cnt;
3434 // return previous slot on cursor page
3436 uint bt_prevkey (BtDb *bt, uint slot)
3438 uid cursor_page = bt->cursor->page_no;
3439 uid ourright, next, us = cursor_page;
3445 ourright = bt_getid(bt->cursor->right);
3448 if( !(next = bt_getid(bt->cursor->left)) )
3454 if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3455 set->page = bt_mappage (bt->mgr, set->latch);
3459 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3460 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3461 bt_unlockpage(BtLockRead, set->latch);
3462 bt_unpinlatch (bt->mgr, set->latch);
3464 next = bt_getid (bt->cursor->right);
3466 if( bt->cursor->kill )
3470 if( next == ourright )
3475 return bt->cursor->cnt;
3478 // return next slot on cursor page
3479 // or slide cursor right into next page
3481 uint bt_nextkey (BtDb *bt, uint slot)
3487 right = bt_getid(bt->cursor->right);
3489 while( slot++ < bt->cursor->cnt )
3490 if( slotptr(bt->cursor,slot)->dead )
3492 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3500 if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3501 set->page = bt_mappage (bt->mgr, set->latch);
3505 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3506 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3507 bt_unlockpage(BtLockRead, set->latch);
3508 bt_unpinlatch (bt->mgr, set->latch);
3513 return bt->mgr->err = 0;
3516 // cache page of keys into cursor and return starting slot for given key
3518 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3523 // cache page for retrieval
3525 if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3526 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3530 bt_unlockpage(BtLockRead, set->latch);
3531 bt_unpinlatch (bt->mgr, set->latch);
3538 double getCpuTime(int type)
3541 FILETIME xittime[1];
3542 FILETIME systime[1];
3543 FILETIME usrtime[1];
3544 SYSTEMTIME timeconv[1];
3547 memset (timeconv, 0, sizeof(SYSTEMTIME));
3551 GetSystemTimeAsFileTime (xittime);
3552 FileTimeToSystemTime (xittime, timeconv);
3553 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3556 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3557 FileTimeToSystemTime (usrtime, timeconv);
3560 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3561 FileTimeToSystemTime (systime, timeconv);
3565 ans += (double)timeconv->wHour * 3600;
3566 ans += (double)timeconv->wMinute * 60;
3567 ans += (double)timeconv->wSecond;
3568 ans += (double)timeconv->wMilliseconds / 1000;
3573 #include <sys/resource.h>
3575 double getCpuTime(int type)
3577 struct rusage used[1];
3578 struct timeval tv[1];
3582 gettimeofday(tv, NULL);
3583 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3586 getrusage(RUSAGE_SELF, used);
3587 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3590 getrusage(RUSAGE_SELF, used);
3591 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3598 void bt_poolaudit (BtMgr *mgr)
3603 while( ++entry < mgr->latchtotal ) {
3604 latch = mgr->latchsets + entry;
3606 if( *latch->readwr->rin & MASK )
3607 fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3609 if( *latch->access->rin & MASK )
3610 fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3612 // if( *latch->parent->xcl->value )
3613 // fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3615 // if( *latch->atomic->xcl->value )
3616 // fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3618 // if( *latch->modify->value )
3619 // fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3621 if( latch->pin & ~CLOCK_bit )
3622 fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3635 // standalone program to index file of keys
3636 // then list them onto std-out
3639 void *index_file (void *arg)
3641 uint __stdcall index_file (void *arg)
3644 int line = 0, found = 0, cnt = 0, idx;
3645 uid next, page_no = LEAF_page; // start on first page of leaves
3646 int ch, len = 0, slot, type = 0;
3647 unsigned char key[BT_maxkey];
3648 unsigned char txn[65536];
3649 ThreadArg *args = arg;
3658 bt = bt_open (args->mgr, args->main);
3661 if( args->idx < strlen (args->type) )
3662 ch = args->type[args->idx];
3664 ch = args->type[strlen(args->type) - 1];
3676 if( type == Delete )
3677 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3679 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3681 if( type == Delete )
3682 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3684 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3686 if( in = fopen (args->infile, "rb") )
3687 while( ch = getc(in), ch != EOF )
3693 if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3694 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3700 memcpy (txn + nxt, key + 10, len - 10);
3702 txn[nxt] = len - 10;
3704 memcpy (txn + nxt, key, 10);
3707 slotptr(page,++cnt)->off = nxt;
3708 slotptr(page,cnt)->type = type;
3711 if( cnt < args->num )
3718 if( bt_atomictxn (bt, page) )
3719 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3724 else if( len < BT_maxkey )
3726 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3730 fprintf(stderr, "started indexing for %s\n", args->infile);
3731 if( in = fopen (args->infile, "r") )
3732 while( ch = getc(in), ch != EOF )
3737 if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3738 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3741 else if( len < BT_maxkey )
3743 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3747 fprintf(stderr, "started finding keys for %s\n", args->infile);
3748 if( in = fopen (args->infile, "rb") )
3749 while( ch = getc(in), ch != EOF )
3753 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3755 else if( bt->mgr->err )
3756 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3759 else if( len < BT_maxkey )
3761 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3765 fprintf(stderr, "started scanning\n");
3768 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3769 set->page = bt_mappage (bt->mgr, set->latch);
3771 fprintf(stderr, "unable to obtain latch"), exit(1);
3773 bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3774 next = bt_getid (set->page->right);
3776 for( slot = 0; slot++ < set->page->cnt; )
3777 if( next || slot < set->page->cnt )
3778 if( !slotptr(set->page, slot)->dead ) {
3779 ptr = keyptr(set->page, slot);
3782 if( slotptr(set->page, slot)->type == Duplicate )
3785 fwrite (ptr->key, len, 1, stdout);
3786 val = valptr(set->page, slot);
3787 fwrite (val->value, val->len, 1, stdout);
3788 fputc ('\n', stdout);
3792 bt_unlockpage (BtLockRead, set->latch);
3793 bt_unpinlatch (bt->mgr, set->latch);
3794 } while( page_no = next );
3796 fprintf(stderr, " Total keys read %d\n", cnt);
3800 fprintf(stderr, "started reverse scan\n");
3801 if( slot = bt_lastkey (bt) )
3802 while( slot = bt_prevkey (bt, slot) ) {
3803 if( slotptr(bt->cursor, slot)->dead )
3806 ptr = keyptr(bt->cursor, slot);
3809 if( slotptr(bt->cursor, slot)->type == Duplicate )
3812 fwrite (ptr->key, len, 1, stdout);
3813 val = valptr(bt->cursor, slot);
3814 fwrite (val->value, val->len, 1, stdout);
3815 fputc ('\n', stdout);
3819 fprintf(stderr, " Total keys read %d\n", cnt);
3824 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3826 fprintf(stderr, "started counting\n");
3827 next = LEAF_page + bt->mgr->redopages + 1;
3829 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3830 if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3833 if( !bt->cursor->free && !bt->cursor->lvl )
3834 cnt += bt->cursor->act;
3840 cnt--; // remove stopper key
3841 fprintf(stderr, " Total keys counted %d\n", cnt);
3845 fprintf(stderr, "%d reads %d writes %d found\n", bt->mgr->reads, bt->mgr->writes, bt->mgr->found);
3854 typedef struct timeval timer;
3856 int main (int argc, char **argv)
3858 int idx, cnt, len, slot, err;
3859 int segsize, bits = 16;
3879 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3880 fprintf (stderr, " where idx_file is the name of the cache btree file\n");
3881 fprintf (stderr, " where main_file is the name of the main btree file\n");
3882 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3883 fprintf (stderr, " page_bits is the page size in bits for the cache btree\n");
3884 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3885 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3886 fprintf (stderr, " recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3887 fprintf (stderr, " main_bits is the page size of the main btree in bits\n");
3888 fprintf (stderr, " main_pool is the number of main pages in the main buffer pool\n");
3889 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3893 start = getCpuTime(0);
3896 bits = atoi(argv[4]);
3899 poolsize = atoi(argv[5]);
3902 fprintf (stderr, "Warning: no mapped_pool\n");
3905 num = atoi(argv[6]);
3908 recovery = atoi(argv[7]);
3911 mainbits = atoi(argv[8]);
3914 mainpool = atoi(argv[9]);
3918 threads = malloc (cnt * sizeof(pthread_t));
3920 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3922 args = malloc ((cnt + 1) * sizeof(ThreadArg));
3924 mgr = bt_mgr (argv[1], bits, poolsize, recovery);
3927 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3931 main = bt_mgr (argv[2], mainbits, mainpool, 0);
3934 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3941 for( idx = 0; idx < cnt; idx++ ) {
3942 args[idx].infile = argv[idx + 10];
3943 args[idx].type = argv[3];
3944 args[idx].main = main;
3945 args[idx].mgr = mgr;
3946 args[idx].num = num;
3947 args[idx].idx = idx;
3949 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3950 fprintf(stderr, "Error creating thread %d\n", err);
3952 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3956 args[0].infile = argv[idx + 10];
3957 args[0].type = argv[3];
3958 args[0].main = main;
3965 // wait for termination
3968 for( idx = 0; idx < cnt; idx++ )
3969 pthread_join (threads[idx], NULL);
3971 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3973 for( idx = 0; idx < cnt; idx++ )
3974 CloseHandle(threads[idx]);
3982 elapsed = getCpuTime(0) - start;
3983 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3984 elapsed = getCpuTime(1);
3985 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3986 elapsed = getCpuTime(2);
3987 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);