1 // btree version threadskv10 futex version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // redo log for failure recovery
10 // and LSM B-trees for write optimization
14 // author: karl malbrain, malbrain@cal.berkeley.edu
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
20 The orginal author is Karl Malbrain.
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
38 #include <linux/futex.h>
53 #define WIN32_LEAN_AND_MEAN
67 typedef unsigned long long uid;
68 typedef unsigned long long logseqno;
71 typedef unsigned long long off64_t;
72 typedef unsigned short ushort;
73 typedef unsigned int uint;
76 #define BT_ro 0x6f72 // ro
77 #define BT_rw 0x7772 // rw
79 #define BT_maxbits 26 // maximum page size in bits
80 #define BT_minbits 9 // minimum page size in bits
81 #define BT_minpage (1 << BT_minbits) // minimum page size
82 #define BT_maxpage (1 << BT_maxbits) // maximum page size
84 // BTree page number constants
85 #define ALLOC_page 0 // allocation page
86 #define ROOT_page 1 // root of the btree
87 #define LEAF_page 2 // first page of leaves
88 #define REDO_page 3 // first page of redo buffer
90 // Number of levels to create in a new BTree
95 There are six lock types for each node in four independent sets:
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
101 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
116 volatile uint xlock:1; // one writer has exclusive lock
117 volatile uint wrt:31; // count of other writers waiting
126 // definition for phase-fair reader/writer lock implementation
129 volatile ushort rin[1];
130 volatile ushort rout[1];
131 volatile ushort ticket[1];
132 volatile ushort serving[1];
133 volatile ushort tid[1];
134 volatile ushort dup[1];
141 volatile ushort tid[1];
142 volatile ushort dup[1];
150 // mode & definition for lite latch implementation
153 QueRd = 1, // reader queue
154 QueWr = 2 // writer queue
157 // hash table entries
160 uint entry; // Latch table entry at head of chain
161 BtMutexLatch latch[1];
164 // latch manager table structure
167 uid page_no; // latch set page number
168 RWLock readwr[1]; // read/write page lock
169 RWLock access[1]; // Access Intent/Page delete
170 WOLock parent[1]; // Posting of fence key in parent
171 WOLock atomic[1]; // Atomic update in progress
172 uint split; // right split page atomic insert
173 uint entry; // entry slot in latch table
174 uint next; // next entry in hash table chain
175 uint prev; // prev entry in hash table chain
176 ushort pin; // number of accessing threads
177 unsigned char dirty; // page in cache is dirty (atomic set)
178 BtMutexLatch modify[1]; // modify entry lite latch
181 // Define the length of the page record numbers
185 // Page key slot definition.
187 // Keys are marked dead, but remain on the page until
188 // it cleanup is called. The fence key (highest key) for
189 // a leaf page is always present, even after cleanup.
193 // In addition to the Unique keys that occupy slots
194 // there are Librarian and Duplicate key
195 // slots occupying the key slot array.
197 // The Librarian slots are dead keys that
198 // serve as filler, available to add new Unique
199 // or Dup slots that are inserted into the B-tree.
201 // The Duplicate slots have had their key bytes extended
202 // by 6 bytes to contain a binary duplicate key uniqueifier.
212 uint off:BT_maxbits; // page offset for key start
213 uint type:3; // type of slot
214 uint dead:1; // set for deleted slot
217 // The key structure occupies space at the upper end of
218 // each page. It's a length byte followed by the key
222 unsigned char len; // this can be changed to a ushort or uint
223 unsigned char key[0];
226 // the value structure also occupies space at the upper
227 // end of the page. Each key is immediately followed by a value.
230 unsigned char len; // this can be changed to a ushort or uint
231 unsigned char value[0];
234 #define BT_maxkey 255 // maximum number of bytes in a key
235 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
237 // The first part of an index page.
238 // It is immediately followed
239 // by the BtSlot array of keys.
241 // note that this structure size
242 // must be a multiple of 8 bytes
243 // in order to place dups correctly.
245 typedef struct BtPage_ {
246 uint cnt; // count of keys in page
247 uint act; // count of active keys
248 uint min; // next key offset
249 uint garbage; // page garbage in bytes
250 unsigned char bits:7; // page size in bits
251 unsigned char free:1; // page is on free chain
252 unsigned char lvl:7; // level of page
253 unsigned char kill:1; // page is being deleted
254 unsigned char right[BtId]; // page number to right
255 unsigned char left[BtId]; // page number to left
256 unsigned char filler[2]; // padding to multiple of 8
257 logseqno lsn; // log sequence number applied
258 uid page_no; // this page number
261 // The loadpage interface object
264 BtPage page; // current page pointer
265 BtLatchSet *latch; // current page latch set
268 // structure for latch manager on ALLOC_page
271 struct BtPage_ alloc[1]; // next page_no in right ptr
272 unsigned long long dups[1]; // global duplicate key uniqueifier
273 unsigned char freechain[BtId]; // head of free page_nos chain
274 unsigned long long activepages; // number of active pages pages
277 // The object structure for Btree access
280 uint page_size; // page size
281 uint page_bits; // page size in bits
287 BtPageZero *pagezero; // mapped allocation page
288 BtHashEntry *hashtable; // the buffer pool hash table entries
289 BtLatchSet *latchsets; // mapped latch set from buffer pool
290 unsigned char *pagepool; // mapped to the buffer pool pages
291 unsigned char *redobuff; // mapped recovery buffer pointer
292 logseqno lsn, flushlsn; // current & first lsn flushed
293 BtMutexLatch redo[1]; // redo area lite latch
294 BtMutexLatch lock[1]; // allocation area lite latch
295 BtMutexLatch maps[1]; // mapping segments lite latch
296 ushort thread_no[1]; // next thread number
297 uint nlatchpage; // number of latch pages at BT_latch
298 uint latchtotal; // number of page latch entries
299 uint latchhash; // number of latch hash table slots
300 uint latchvictim; // next latch entry to examine
301 uint latchpromote; // next latch entry to promote
302 uint redopages; // size of recovery buff in pages
303 uint redolast; // last msync size of recovery buff
304 uint redoend; // eof/end element in recovery buff
305 int err; // last error
306 int line; // last error line no
307 int found; // number of keys found by delete
308 int reads, writes; // number of reads and writes
310 HANDLE halloc; // allocation handle
311 HANDLE hpool; // buffer pool handle
313 uint segments; // number of memory mapped segments
314 unsigned char *pages[64000];// memory mapped segments of b-tree
318 BtMgr *mgr; // buffer manager for entire process
319 BtMgr *main; // buffer manager for main btree
320 BtPage cursor; // cached page frame for start/next
321 ushort thread_no; // thread number
322 unsigned char key[BT_keyarray]; // last found complete key
325 // Catastrophic errors
339 #define CLOCK_bit 0x8000
341 // recovery manager entry types
344 BTRM_eof = 0, // rest of buffer is emtpy
345 BTRM_add, // add a unique key-value to btree
346 BTRM_dup, // add a duplicate key-value to btree
347 BTRM_del, // delete a key-value from btree
348 BTRM_upd, // update a key with a new value
349 BTRM_new, // allocate a new empty page
350 BTRM_old // reuse an old empty page
353 // recovery manager entry
354 // structure followed by BtKey & BtVal
357 logseqno reqlsn; // log sequence number required
358 logseqno lsn; // log sequence number for entry
359 uint len; // length of entry
360 unsigned char type; // type of entry
361 unsigned char lvl; // level of btree entry pertains to
366 extern void bt_close (BtDb *bt);
367 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
368 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit);
369 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
370 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
371 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
372 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
373 extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
375 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
377 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
378 extern uint bt_nextkey (BtDb *bt, uint slot);
379 extern uint bt_prevkey (BtDb *db, uint slot);
380 extern uint bt_lastkey (BtDb *db);
383 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
384 extern void bt_mgrclose (BtMgr *mgr);
385 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
386 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
388 // transaction functions
389 BTERR bt_txnpromote (BtDb *bt);
391 // The page is allocated from low and hi ends.
392 // The key slots are allocated from the bottom,
393 // while the text and value of the key
394 // are allocated from the top. When the two
395 // areas meet, the page is split into two.
397 // A key consists of a length byte, two bytes of
398 // index number (0 - 65535), and up to 253 bytes
401 // Associated with each key is a value byte string
402 // containing any value desired.
404 // The b-tree root is always located at page 1.
405 // The first leaf page of level zero is always
406 // located on page 2.
408 // The b-tree pages are linked with next
409 // pointers to facilitate enumerators,
410 // and provide for concurrency.
412 // When to root page fills, it is split in two and
413 // the tree height is raised by a new root at page
414 // one with two keys.
416 // Deleted keys are marked with a dead bit until
417 // page cleanup. The fence key for a leaf node is
420 // To achieve maximum concurrency one page is locked at a time
421 // as the tree is traversed to find leaf key in question. The right
422 // page numbers are used in cases where the page is being split,
425 // Page 0 is dedicated to lock for new page extensions,
426 // and chains empty pages together for reuse. It also
427 // contains the latch manager hash table.
429 // The ParentModification lock on a node is obtained to serialize posting
430 // or changing the fence key for a node.
432 // Empty pages are chained together through the ALLOC page and reused.
434 // Access macros to address slot and key values from the page
435 // Page slots use 1 based indexing.
437 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
438 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
439 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
441 void bt_putid(unsigned char *dest, uid id)
446 dest[i] = (unsigned char)id, id >>= 8;
449 uid bt_getid(unsigned char *src)
454 for( i = 0; i < BtId; i++ )
455 id <<= 8, id |= *src++;
460 uid bt_newdup (BtMgr *mgr)
463 return __sync_fetch_and_add (mgr->pagezero->dups, 1) + 1;
465 return _InterlockedIncrement64(mgr->pagezero->dups, 1);
469 // lite weight spin lock Latch Manager
471 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
473 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
476 void bt_mutexlock(BtMutexLatch *latch)
478 BtMutexLatch prev[1];
482 *prev->value = __sync_fetch_and_or(latch->value, XCL);
484 if( !prev->bits->xlock ) { // did we set XCL bit?
486 __sync_fetch_and_sub(latch->value, WRT);
492 __sync_fetch_and_add(latch->value, WRT);
495 sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr);
500 // try to obtain write lock
502 // return 1 if obtained,
505 int bt_mutextry(BtMutexLatch *latch)
507 BtMutexLatch prev[1];
509 *prev->value = __sync_fetch_and_or(latch->value, XCL);
511 // take write access if exclusive bit is clear
513 return !prev->bits->xlock;
518 void bt_releasemutex(BtMutexLatch *latch)
520 BtMutexLatch prev[1];
522 *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
524 if( prev->bits->wrt )
525 sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
528 // Write-Only Queue Lock
530 void WriteOLock (WOLock *lock, ushort tid)
532 if( *lock->tid == tid ) {
537 bt_mutexlock(lock->xcl);
541 void WriteORelease (WOLock *lock)
549 bt_releasemutex(lock->xcl);
552 // Phase-Fair reader/writer lock implementation
554 void WriteLock (RWLock *lock, ushort tid)
558 if( *lock->tid == tid ) {
563 tix = __sync_fetch_and_add (lock->ticket, 1);
565 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
567 // wait for our ticket to come up
569 while( tix != lock->serving[0] )
576 w = PRES | (tix & PHID);
578 r = __sync_fetch_and_add (lock->rin, w);
580 r = _InterlockedExchangeAdd16 (lock->rin, w);
582 while( r != *lock->rout )
591 void WriteRelease (RWLock *lock)
600 __sync_fetch_and_and (lock->rin, ~MASK);
602 _InterlockedAnd16 (lock->rin, ~MASK);
607 // try to obtain read lock
608 // return 1 if successful
610 int ReadTry (RWLock *lock, ushort tid)
614 // OK if write lock already held by same thread
616 if( *lock->tid == tid ) {
621 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
623 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
626 if( w == (*lock->rin & MASK) ) {
628 __sync_fetch_and_add (lock->rin, -RINC);
630 _InterlockedExchangeAdd16 (lock->rin, -RINC);
638 void ReadLock (RWLock *lock, ushort tid)
642 if( *lock->tid == tid ) {
647 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
649 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
652 while( w == (*lock->rin & MASK) )
660 void ReadRelease (RWLock *lock)
668 __sync_fetch_and_add (lock->rout, RINC);
670 _InterlockedExchangeAdd16 (lock->rout, RINC);
674 // recovery manager -- flush dirty pages
676 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
678 uint cnt3 = 0, cnt2 = 0, cnt = 0;
683 // flush dirty pool pages to the btree
685 fprintf(stderr, "Start flushlsn ");
686 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
687 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
688 latch = mgr->latchsets + entry;
689 bt_mutexlock (latch->modify);
690 bt_lockpage(BtLockRead, latch, thread_no);
693 bt_writepage(mgr, page, latch->page_no, 0);
694 latch->dirty = 0, cnt++;
696 if( latch->pin & ~CLOCK_bit )
698 bt_unlockpage(BtLockRead, latch);
699 bt_releasemutex (latch->modify);
701 fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
702 fprintf(stderr, "begin sync");
703 sync_file_range (mgr->idx, 0, 0, SYNC_FILE_RANGE_WAIT_AFTER);
704 fprintf(stderr, " end sync\n");
707 // recovery manager -- process current recovery buff on startup
708 // this won't do much if previous session was properly closed.
710 BTERR bt_recoveryredo (BtMgr *mgr)
717 pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
719 hdr = (BtLogHdr *)mgr->redobuff;
720 mgr->flushlsn = hdr->lsn;
723 hdr = (BtLogHdr *)(mgr->redobuff + offset);
724 switch( hdr->type ) {
728 case BTRM_add: // add a unique key-value to btree
730 case BTRM_dup: // add a duplicate key-value to btree
731 case BTRM_del: // delete a key-value from btree
732 case BTRM_upd: // update a key with a new value
733 case BTRM_new: // allocate a new empty page
734 case BTRM_old: // reuse an old empty page
740 // recovery manager -- append new entry to recovery log
741 // flush to disk when it overflows.
743 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
745 uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
746 uint amt = sizeof(BtLogHdr);
750 bt_mutexlock (mgr->redo);
753 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
755 // see if new entry fits in buffer
756 // flush and reset if it doesn't
758 if( amt > size - mgr->redoend ) {
759 mgr->flushlsn = mgr->lsn;
760 msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
763 bt_flushlsn(mgr, thread_no);
766 // fill in new entry & either eof or end block
768 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
773 hdr->lsn = ++mgr->lsn;
777 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
778 memset (eof, 0, sizeof(BtLogHdr));
780 // fill in key and value
783 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
784 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
787 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
788 memset (eof, 0, sizeof(BtLogHdr));
791 last = mgr->redolast & 0xfff;
795 bt_releasemutex(mgr->redo);
797 msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
801 // recovery manager -- append transaction to recovery log
802 // flush to disk when it overflows.
804 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
806 uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
807 uint amt = 0, src, type;
814 // determine amount of redo recovery log space required
816 for( src = 0; src++ < source->cnt; ) {
817 key = keyptr(source,src);
818 val = valptr(source,src);
819 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
820 amt += sizeof(BtLogHdr);
823 bt_mutexlock (mgr->redo);
825 // see if new entry fits in buffer
826 // flush and reset if it doesn't
828 if( amt > size - mgr->redoend ) {
829 mgr->flushlsn = mgr->lsn;
830 msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
833 bt_flushlsn (mgr, thread_no);
836 // assign new lsn to transaction
840 // fill in new entries
842 for( src = 0; src++ < source->cnt; ) {
843 key = keyptr(source, src);
844 val = valptr(source, src);
846 switch( slotptr(source, src)->type ) {
858 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
859 amt += sizeof(BtLogHdr);
861 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
867 // fill in key and value
869 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
870 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
875 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
876 memset (eof, 0, sizeof(BtLogHdr));
879 last = mgr->redolast & 0xfff;
882 bt_releasemutex(mgr->redo);
884 msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
885 bt_releasemutex(mgr->redo);
889 // read page into buffer pool from permanent location in Btree file
891 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
893 int flag = PROT_READ | PROT_WRITE;
894 uint segment = page_no >> 32;
898 if( segment < mgr->segments ) {
899 perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
900 memcpy (page, perm, mgr->page_size);
901 if( page->page_no != page_no )
907 bt_mutexlock (mgr->maps);
909 if( segment < mgr->segments ) {
910 bt_releasemutex (mgr->maps);
914 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
917 bt_releasemutex (mgr->maps);
921 // write page to permanent location in Btree file
922 // clear the dirty bit
924 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit)
926 int flag = PROT_READ | PROT_WRITE;
927 uint segment = page_no >> 32;
931 if( segment < mgr->segments ) {
932 perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
933 memcpy (perm, page, mgr->page_size);
935 msync (perm, mgr->page_size, MS_SYNC);
940 bt_mutexlock (mgr->maps);
942 if( segment < mgr->segments ) {
943 bt_releasemutex (mgr->maps);
947 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
948 bt_releasemutex (mgr->maps);
953 // set CLOCK bit in latch
954 // decrement pin count
956 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
958 bt_mutexlock(latch->modify);
959 latch->pin |= CLOCK_bit;
962 bt_releasemutex(latch->modify);
965 // return the btree cached page address
967 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
969 BtPage page = (BtPage)(((uid)latch->entry << mgr->page_bits) + mgr->pagepool);
974 // return next available latch entry
975 // and with latch entry locked
977 uint bt_availnext (BtMgr *mgr)
984 entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
986 entry = _InterlockedIncrement (&mgr->latchvictim);
988 entry %= mgr->latchtotal;
993 latch = mgr->latchsets + entry;
995 if( !bt_mutextry(latch->modify) )
998 // return this entry if it is not pinned
1003 // if the CLOCK bit is set
1004 // reset it to zero.
1006 latch->pin &= ~CLOCK_bit;
1007 bt_releasemutex(latch->modify);
1011 // pin page in buffer pool
1012 // return with latchset pinned
1014 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
1016 uint hashidx = page_no % mgr->latchhash;
1021 // try to find our entry
1023 bt_mutexlock(mgr->hashtable[hashidx].latch);
1025 if( entry = mgr->hashtable[hashidx].entry ) do
1027 latch = mgr->latchsets + entry;
1028 if( page_no == latch->page_no )
1030 } while( entry = latch->next );
1032 // found our entry: increment pin
1035 latch = mgr->latchsets + entry;
1036 bt_mutexlock(latch->modify);
1037 latch->pin |= CLOCK_bit;
1041 bt_releasemutex(latch->modify);
1042 bt_releasemutex(mgr->hashtable[hashidx].latch);
1046 // find and reuse unpinned entry
1050 entry = bt_availnext (mgr);
1051 latch = mgr->latchsets + entry;
1053 idx = latch->page_no % mgr->latchhash;
1055 // if latch is on a different hash chain
1056 // unlink from the old page_no chain
1058 if( latch->page_no )
1059 if( idx != hashidx ) {
1061 // skip over this entry if latch not available
1063 if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1064 bt_releasemutex(latch->modify);
1069 mgr->latchsets[latch->prev].next = latch->next;
1071 mgr->hashtable[idx].entry = latch->next;
1074 mgr->latchsets[latch->next].prev = latch->prev;
1076 bt_releasemutex (mgr->hashtable[idx].latch);
1079 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1081 // update permanent page area in btree from buffer pool
1082 // no read-lock is required since page is not pinned.
1085 if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1086 return mgr->line = __LINE__, NULL;
1091 memcpy (page, contents, mgr->page_size);
1093 } else if( bt_readpage (mgr, page, page_no) )
1094 return mgr->line = __LINE__, NULL;
1096 // link page as head of hash table chain
1097 // if this is a never before used entry,
1098 // or it was previously on a different
1099 // hash table chain. Otherwise, just
1100 // leave it in its current hash table
1103 if( !latch->page_no || hashidx != idx ) {
1104 if( latch->next = mgr->hashtable[hashidx].entry )
1105 mgr->latchsets[latch->next].prev = entry;
1107 mgr->hashtable[hashidx].entry = entry;
1111 // fill in latch structure
1113 latch->pin = CLOCK_bit | 1;
1114 latch->page_no = page_no;
1115 latch->entry = entry;
1118 bt_releasemutex (latch->modify);
1119 bt_releasemutex (mgr->hashtable[hashidx].latch);
1123 void bt_mgrclose (BtMgr *mgr)
1131 // flush previously written dirty pages
1132 // and write recovery buffer to disk
1134 fdatasync (mgr->idx);
1136 if( mgr->redoend ) {
1137 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1138 memset (eof, 0, sizeof(BtLogHdr));
1140 pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1143 // write remaining dirty pool pages to the btree
1145 for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1146 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1147 latch = mgr->latchsets + slot;
1149 if( latch->dirty ) {
1150 bt_writepage(mgr, page, latch->page_no, 0);
1151 latch->dirty = 0, num++;
1155 // flush last batch to disk and clear
1156 // redo recovery buffer on disk.
1158 fdatasync (mgr->idx);
1160 if( mgr->redopages ) {
1161 eof = (BtLogHdr *)mgr->redobuff;
1162 memset (eof, 0, sizeof(BtLogHdr));
1163 eof->lsn = mgr->lsn;
1165 pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1167 sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
1170 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1173 while( mgr->segments )
1174 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1176 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1177 munmap (mgr->pagezero, mgr->page_size);
1179 FlushViewOfFile(mgr->pagezero, 0);
1180 UnmapViewOfFile(mgr->pagezero);
1181 UnmapViewOfFile(mgr->pagepool);
1182 CloseHandle(mgr->halloc);
1183 CloseHandle(mgr->hpool);
1186 free (mgr->redobuff);
1190 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1191 FlushFileBuffers(mgr->idx);
1192 CloseHandle(mgr->idx);
1197 // close and release memory
1199 void bt_close (BtDb *bt)
1206 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1211 // open/create new btree buffer manager
1213 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1214 // size of page pool (e.g. 262144)
1216 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1218 uint lvl, attr, last, slot, idx;
1219 uint nlatchpage, latchhash;
1220 unsigned char value[BtId];
1221 int flag, initit = 0;
1222 BtPageZero *pagezero;
1230 // determine sanity of page size and buffer pool
1232 if( bits > BT_maxbits )
1234 else if( bits < BT_minbits )
1237 mgr = calloc (1, sizeof(BtMgr));
1239 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1241 if( mgr->idx == -1 ) {
1242 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1243 return free(mgr), NULL;
1246 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1247 attr = FILE_ATTRIBUTE_NORMAL;
1248 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1250 if( mgr->idx == INVALID_HANDLE_VALUE ) {
1251 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1252 return GlobalFree(mgr), NULL;
1257 pagezero = valloc (BT_maxpage);
1260 // read minimum page size to get root info
1261 // to support raw disk partition files
1262 // check if bits == 0 on the disk.
1264 if( size = lseek (mgr->idx, 0L, 2) )
1265 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1266 if( pagezero->alloc->bits )
1267 bits = pagezero->alloc->bits;
1271 return free(mgr), free(pagezero), NULL;
1275 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1276 size = GetFileSize(mgr->idx, amt);
1278 if( size || *amt ) {
1279 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1280 return bt_mgrclose (mgr), NULL;
1281 bits = pagezero->alloc->bits;
1286 mgr->page_size = 1 << bits;
1287 mgr->page_bits = bits;
1289 // calculate number of latch hash table entries
1291 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1293 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1294 mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1295 mgr->latchtotal = nodemax;
1300 // initialize an empty b-tree with latch page, root page, page of leaves
1301 // and page(s) of latches and page pool cache
1303 memset (pagezero, 0, 1 << bits);
1304 pagezero->alloc->lvl = MIN_lvl - 1;
1305 pagezero->alloc->bits = mgr->page_bits;
1306 bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
1307 pagezero->activepages = 2;
1309 // initialize left-most LEAF page in
1310 // alloc->left and count of active leaf pages.
1312 bt_putid (pagezero->alloc->left, LEAF_page);
1314 ftruncate (mgr->idx, REDO_page << mgr->page_bits);
1316 if( bt_writepage (mgr, pagezero->alloc, 0, 1) ) {
1317 fprintf (stderr, "Unable to create btree page zero\n");
1318 return bt_mgrclose (mgr), NULL;
1321 memset (pagezero, 0, 1 << bits);
1322 pagezero->alloc->bits = mgr->page_bits;
1324 for( lvl=MIN_lvl; lvl--; ) {
1325 BtSlot *node = slotptr(pagezero->alloc, 1);
1326 node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1327 key = keyptr(pagezero->alloc, 1);
1328 key->len = 2; // create stopper key
1332 bt_putid(value, MIN_lvl - lvl + 1);
1333 val = valptr(pagezero->alloc, 1);
1334 val->len = lvl ? BtId : 0;
1335 memcpy (val->value, value, val->len);
1337 pagezero->alloc->min = node->off;
1338 pagezero->alloc->lvl = lvl;
1339 pagezero->alloc->cnt = 1;
1340 pagezero->alloc->act = 1;
1341 pagezero->alloc->page_no = MIN_lvl - lvl;
1343 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, 1) ) {
1344 fprintf (stderr, "Unable to create btree page\n");
1345 return bt_mgrclose (mgr), NULL;
1353 VirtualFree (pagezero, 0, MEM_RELEASE);
1356 // mlock the pagezero page
1358 flag = PROT_READ | PROT_WRITE;
1359 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1360 if( mgr->pagezero == MAP_FAILED ) {
1361 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1362 return bt_mgrclose (mgr), NULL;
1364 mlock (mgr->pagezero, mgr->page_size);
1366 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1367 if( mgr->pagepool == MAP_FAILED ) {
1368 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1369 return bt_mgrclose (mgr), NULL;
1371 if( mgr->redopages = redopages ) {
1372 ftruncate (mgr->idx, (REDO_page + redopages) << mgr->page_bits);
1373 mgr->redobuff = valloc (redopages * mgr->page_size);
1376 flag = PAGE_READWRITE;
1377 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1378 if( !mgr->halloc ) {
1379 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1380 return bt_mgrclose (mgr), NULL;
1383 flag = FILE_MAP_WRITE;
1384 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1385 if( !mgr->pagezero ) {
1386 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1387 return bt_mgrclose (mgr), NULL;
1390 flag = PAGE_READWRITE;
1391 size = (uid)mgr->nlatchpage << mgr->page_bits;
1392 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1394 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1395 return bt_mgrclose (mgr), NULL;
1398 flag = FILE_MAP_WRITE;
1399 mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1400 if( !mgr->pagepool ) {
1401 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1402 return bt_mgrclose (mgr), NULL;
1404 if( mgr->redopages = redopages )
1405 mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
1408 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1409 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1410 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1415 // open BTree access method
1416 // based on buffer manager
1418 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1420 BtDb *bt = malloc (sizeof(*bt));
1422 memset (bt, 0, sizeof(*bt));
1426 bt->cursor = valloc (mgr->page_size);
1428 bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1431 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1433 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1438 // compare two keys, return > 0, = 0, or < 0
1439 // =0: keys are same
1442 // as the comparison value
1444 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1446 uint len1 = key1->len;
1449 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1460 // place write, read, or parent lock on requested page_no.
1462 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1466 ReadLock (latch->readwr, thread_no);
1469 WriteLock (latch->readwr, thread_no);
1472 ReadLock (latch->access, thread_no);
1475 WriteLock (latch->access, thread_no);
1478 WriteOLock (latch->parent, thread_no);
1481 WriteOLock (latch->atomic, thread_no);
1483 case BtLockAtomic | BtLockRead:
1484 WriteOLock (latch->atomic, thread_no);
1485 ReadLock (latch->readwr, thread_no);
1490 // remove write, read, or parent lock on requested page
1492 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1496 ReadRelease (latch->readwr);
1499 WriteRelease (latch->readwr);
1502 ReadRelease (latch->access);
1505 WriteRelease (latch->access);
1508 WriteORelease (latch->parent);
1511 WriteORelease (latch->atomic);
1513 case BtLockAtomic | BtLockRead:
1514 WriteORelease (latch->atomic);
1515 ReadRelease (latch->readwr);
1520 // allocate a new page
1521 // return with page latched, but unlocked.
1523 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1528 // lock allocation page
1530 bt_mutexlock(mgr->lock);
1532 // use empty chain first
1533 // else allocate empty page
1535 if( page_no = bt_getid(mgr->pagezero->freechain) ) {
1536 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1537 set->page = bt_mappage (mgr, set->latch);
1539 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1541 bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
1542 mgr->pagezero->activepages++;
1544 bt_releasemutex(mgr->lock);
1546 memcpy (set->page, contents, mgr->page_size);
1547 set->latch->dirty = 1;
1551 page_no = bt_getid(mgr->pagezero->alloc->right);
1552 bt_putid(mgr->pagezero->alloc->right, page_no+1);
1554 // unlock allocation latch and
1555 // extend file into new page.
1557 mgr->pagezero->activepages++;
1558 contents->page_no = page_no;
1560 pwrite (mgr->idx, contents, mgr->page_size, (uid)(page_no + 1) << mgr->page_bits);
1561 bt_releasemutex(mgr->lock);
1563 // don't load cache from btree page, load it from contents
1565 if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1566 set->page = bt_mappage (mgr, set->latch);
1573 // find slot in page for given key at a given level
1575 int bt_findslot (BtPage page, unsigned char *key, uint len)
1577 uint diff, higher = page->cnt, low = 1, slot;
1580 // make stopper key an infinite fence value
1582 if( bt_getid (page->right) )
1587 // low is the lowest candidate.
1588 // loop ends when they meet
1590 // higher is already
1591 // tested as .ge. the passed key.
1593 while( diff = higher - low ) {
1594 slot = low + ( diff >> 1 );
1595 if( keycmp (keyptr(page, slot), key, len) < 0 )
1598 higher = slot, good++;
1601 // return zero if key is on right link page
1603 return good ? higher : 0;
1606 // find and load page at given level for given key
1607 // leave page rd or wr locked as requested
1609 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1611 uid page_no = ROOT_page, prevpage = 0;
1612 uint drill = 0xff, slot;
1613 BtLatchSet *prevlatch;
1614 uint mode, prevmode;
1617 // start at root of btree and drill down
1620 // determine lock mode of drill level
1621 mode = (drill == lvl) ? lock : BtLockRead;
1623 if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1626 // obtain access lock using lock chaining with Access mode
1628 if( page_no > ROOT_page )
1629 bt_lockpage(BtLockAccess, set->latch, thread_no);
1631 set->page = bt_mappage (mgr, set->latch);
1633 // release & unpin parent or left sibling page
1636 bt_unlockpage(prevmode, prevlatch);
1637 bt_unpinlatch (mgr, prevlatch);
1641 // obtain mode lock using lock chaining through AccessLock
1643 bt_lockpage(mode, set->latch, thread_no);
1645 if( set->page->free )
1646 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1648 if( page_no > ROOT_page )
1649 bt_unlockpage(BtLockAccess, set->latch);
1651 // re-read and re-lock root after determining actual level of root
1653 if( set->page->lvl != drill) {
1654 if( set->latch->page_no != ROOT_page )
1655 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1657 drill = set->page->lvl;
1659 if( lock != BtLockRead && drill == lvl ) {
1660 bt_unlockpage(mode, set->latch);
1661 bt_unpinlatch (mgr, set->latch);
1666 prevpage = set->latch->page_no;
1667 prevlatch = set->latch;
1670 // find key on page at this level
1671 // and descend to requested level
1673 if( !set->page->kill )
1674 if( slot = bt_findslot (set->page, key, len) ) {
1678 // find next non-dead slot -- the fence key if nothing else
1680 while( slotptr(set->page, slot)->dead )
1681 if( slot++ < set->page->cnt )
1684 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1686 val = valptr(set->page, slot);
1688 if( val->len == BtId )
1689 page_no = bt_getid(valptr(set->page, slot)->value);
1691 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1697 // slide right into next page
1699 page_no = bt_getid(set->page->right);
1702 // return error on end of right chain
1704 mgr->line = __LINE__, mgr->err = BTERR_struct;
1705 return 0; // return error
1708 // return page to free list
1709 // page must be delete & write locked
1711 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1713 // lock allocation page
1715 bt_mutexlock (mgr->lock);
1719 memcpy(set->page->right, mgr->pagezero->freechain, BtId);
1720 bt_putid(mgr->pagezero->freechain, set->latch->page_no);
1721 set->latch->dirty = 1;
1722 set->page->free = 1;
1724 // decrement active page count
1725 // and unlock allocation page
1727 mgr->pagezero->activepages--;
1728 bt_releasemutex (mgr->lock);
1730 // unlock released page
1732 bt_unlockpage (BtLockDelete, set->latch);
1733 bt_unlockpage (BtLockWrite, set->latch);
1734 bt_unpinlatch (mgr, set->latch);
1737 // a fence key was deleted from a page
1738 // push new fence value upwards
1740 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1742 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1743 unsigned char value[BtId];
1747 // remove the old fence value
1749 ptr = keyptr(set->page, set->page->cnt);
1750 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1751 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1752 set->latch->dirty = 1;
1754 // cache new fence value
1756 ptr = keyptr(set->page, set->page->cnt);
1757 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1759 bt_lockpage (BtLockParent, set->latch, thread_no);
1760 bt_unlockpage (BtLockWrite, set->latch);
1762 // insert new (now smaller) fence key
1764 bt_putid (value, set->latch->page_no);
1765 ptr = (BtKey*)leftkey;
1767 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1770 // now delete old fence key
1772 ptr = (BtKey*)rightkey;
1774 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1777 bt_unlockpage (BtLockParent, set->latch);
1778 bt_unpinlatch(mgr, set->latch);
1782 // root has a single child
1783 // collapse a level from the tree
1785 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1792 // find the child entry and promote as new root contents
1795 for( idx = 0; idx++ < root->page->cnt; )
1796 if( !slotptr(root->page, idx)->dead )
1799 val = valptr(root->page, idx);
1801 if( val->len == BtId )
1802 page_no = bt_getid (valptr(root->page, idx)->value);
1804 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1806 if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1807 child->page = bt_mappage (mgr, child->latch);
1811 bt_lockpage (BtLockDelete, child->latch, thread_no);
1812 bt_lockpage (BtLockWrite, child->latch, thread_no);
1814 memcpy (root->page, child->page, mgr->page_size);
1815 root->latch->dirty = 1;
1817 bt_freepage (mgr, child);
1819 } while( root->page->lvl > 1 && root->page->act == 1 );
1821 bt_unlockpage (BtLockWrite, root->latch);
1822 bt_unpinlatch (mgr, root->latch);
1826 // delete a page and manage keys
1827 // call with page writelocked
1829 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1831 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1832 unsigned char value[BtId];
1833 uint lvl = set->page->lvl;
1838 // cache copy of fence key
1839 // to remove in parent
1841 ptr = keyptr(set->page, set->page->cnt);
1842 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1844 // obtain lock on right page
1846 page_no = bt_getid(set->page->right);
1848 if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1849 right->page = bt_mappage (mgr, right->latch);
1853 bt_lockpage (BtLockWrite, right->latch, thread_no);
1855 // cache copy of key to update
1857 ptr = keyptr(right->page, right->page->cnt);
1858 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1860 if( right->page->kill )
1861 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1863 // pull contents of right peer into our empty page
1865 memcpy (set->page, right->page, mgr->page_size);
1866 set->latch->dirty = 1;
1868 // mark right page deleted and point it to left page
1869 // until we can post parent updates that remove access
1870 // to the deleted page.
1872 bt_putid (right->page->right, set->latch->page_no);
1873 right->latch->dirty = 1;
1874 right->page->kill = 1;
1876 bt_lockpage (BtLockParent, right->latch, thread_no);
1877 bt_unlockpage (BtLockWrite, right->latch);
1879 bt_lockpage (BtLockParent, set->latch, thread_no);
1880 bt_unlockpage (BtLockWrite, set->latch);
1882 // redirect higher key directly to our new node contents
1884 bt_putid (value, set->latch->page_no);
1885 ptr = (BtKey*)higherfence;
1887 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1890 // delete old lower key to our node
1892 ptr = (BtKey*)lowerfence;
1894 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1897 // obtain delete and write locks to right node
1899 bt_unlockpage (BtLockParent, right->latch);
1900 bt_lockpage (BtLockDelete, right->latch, thread_no);
1901 bt_lockpage (BtLockWrite, right->latch, thread_no);
1902 bt_freepage (mgr, right);
1904 bt_unlockpage (BtLockParent, set->latch);
1908 // find and delete key on page by marking delete flag bit
1909 // if page becomes empty, delete it from the btree
1911 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
1913 uint slot, idx, found, fence;
1919 if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
1920 node = slotptr(set->page, slot);
1921 ptr = keyptr(set->page, slot);
1925 // if librarian slot, advance to real slot
1927 if( node->type == Librarian ) {
1928 ptr = keyptr(set->page, ++slot);
1929 node = slotptr(set->page, slot);
1932 fence = slot == set->page->cnt;
1934 // delete the key, ignore request if already dead
1936 if( found = !keycmp (ptr, key, len) )
1937 if( found = node->dead == 0 ) {
1938 val = valptr(set->page,slot);
1939 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1942 // mark node type as delete
1944 node->type = Delete;
1947 // collapse empty slots beneath the fence
1948 // on interiour nodes
1951 while( idx = set->page->cnt - 1 )
1952 if( slotptr(set->page, idx)->dead ) {
1953 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1954 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1959 // did we delete a fence key in an upper level?
1961 if( found && lvl && set->page->act && fence )
1962 if( bt_fixfence (mgr, set, lvl, thread_no) )
1967 // do we need to collapse root?
1969 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1970 if( bt_collapseroot (mgr, set, thread_no) )
1975 // delete empty page
1977 if( !set->page->act ) {
1978 if( bt_deletepage (mgr, set, thread_no) )
1981 set->latch->dirty = 1;
1982 bt_unlockpage(BtLockWrite, set->latch);
1985 bt_unpinlatch (mgr, set->latch);
1989 // advance to next slot
1991 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1993 BtLatchSet *prevlatch;
1996 if( slot < set->page->cnt )
1999 prevlatch = set->latch;
2001 if( page_no = bt_getid(set->page->right) )
2002 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2003 set->page = bt_mappage (bt->mgr, set->latch);
2007 return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2009 // obtain access lock using lock chaining with Access mode
2011 bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2013 bt_unlockpage(BtLockRead, prevlatch);
2014 bt_unpinlatch (bt->mgr, prevlatch);
2016 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2017 bt_unlockpage(BtLockAccess, set->latch);
2021 // find unique key == given key, or first duplicate key in
2022 // leaf level and return number of value bytes
2023 // or (-1) if not found.
2025 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2033 if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2035 ptr = keyptr(set->page, slot);
2037 // skip librarian slot place holder
2039 if( slotptr(set->page, slot)->type == Librarian )
2040 ptr = keyptr(set->page, ++slot);
2042 // return actual key found
2044 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2047 if( slotptr(set->page, slot)->type == Duplicate )
2050 // not there if we reach the stopper key
2052 if( slot == set->page->cnt )
2053 if( !bt_getid (set->page->right) )
2056 // if key exists, return >= 0 value bytes copied
2057 // otherwise return (-1)
2059 if( slotptr(set->page, slot)->dead )
2063 if( !memcmp (ptr->key, key, len) ) {
2064 val = valptr (set->page,slot);
2065 if( valmax > val->len )
2067 memcpy (value, val->value, valmax);
2073 } while( slot = bt_findnext (bt, set, slot) );
2075 bt_unlockpage (BtLockRead, set->latch);
2076 bt_unpinlatch (bt->mgr, set->latch);
2080 // check page for space available,
2081 // clean if necessary and return
2082 // 0 - page needs splitting
2083 // >0 new slot value
2085 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2087 BtPage page = set->page, frame;
2088 uint nxt = mgr->page_size;
2089 uint cnt = 0, idx = 0;
2090 uint max = page->cnt;
2095 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2098 // skip cleanup and proceed to split
2099 // if there's not enough garbage
2102 if( page->garbage < nxt / 5 )
2105 frame = malloc (mgr->page_size);
2106 memcpy (frame, page, mgr->page_size);
2108 // skip page info and set rest of page to zero
2110 memset (page+1, 0, mgr->page_size - sizeof(*page));
2111 set->latch->dirty = 1;
2116 // clean up page first by
2117 // removing deleted keys
2119 while( cnt++ < max ) {
2123 if( cnt < max || frame->lvl )
2124 if( slotptr(frame,cnt)->dead )
2127 // copy the value across
2129 val = valptr(frame, cnt);
2130 nxt -= val->len + sizeof(BtVal);
2131 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2133 // copy the key across
2135 key = keyptr(frame, cnt);
2136 nxt -= key->len + sizeof(BtKey);
2137 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2139 // make a librarian slot
2141 slotptr(page, ++idx)->off = nxt;
2142 slotptr(page, idx)->type = Librarian;
2143 slotptr(page, idx)->dead = 1;
2147 slotptr(page, ++idx)->off = nxt;
2148 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2150 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2158 // see if page has enough space now, or does it need splitting?
2160 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2166 // split the root and raise the height of the btree
2168 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2170 unsigned char leftkey[BT_keyarray];
2171 uint nxt = mgr->page_size;
2172 unsigned char value[BtId];
2178 // save left page fence key for new root
2180 ptr = keyptr(root->page, root->page->cnt);
2181 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2183 // Obtain an empty page to use, and copy the current
2184 // root contents into it, e.g. lower keys
2186 if( bt_newpage(mgr, left, root->page, page_no) )
2189 left_page_no = left->latch->page_no;
2190 bt_unpinlatch (mgr, left->latch);
2192 // preserve the page info at the bottom
2193 // of higher keys and set rest to zero
2195 memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2197 // insert stopper key at top of newroot page
2198 // and increase the root height
2200 nxt -= BtId + sizeof(BtVal);
2201 bt_putid (value, right->page_no);
2202 val = (BtVal *)((unsigned char *)root->page + nxt);
2203 memcpy (val->value, value, BtId);
2206 nxt -= 2 + sizeof(BtKey);
2207 slotptr(root->page, 2)->off = nxt;
2208 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2213 // insert lower keys page fence key on newroot page as first key
2215 nxt -= BtId + sizeof(BtVal);
2216 bt_putid (value, left_page_no);
2217 val = (BtVal *)((unsigned char *)root->page + nxt);
2218 memcpy (val->value, value, BtId);
2221 ptr = (BtKey *)leftkey;
2222 nxt -= ptr->len + sizeof(BtKey);
2223 slotptr(root->page, 1)->off = nxt;
2224 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2226 bt_putid(root->page->right, 0);
2227 root->page->min = nxt; // reset lowest used offset and key count
2228 root->page->cnt = 2;
2229 root->page->act = 2;
2232 mgr->pagezero->alloc->lvl = root->page->lvl;
2234 // release and unpin root pages
2236 bt_unlockpage(BtLockWrite, root->latch);
2237 bt_unpinlatch (mgr, root->latch);
2239 bt_unpinlatch (mgr, right);
2243 // split already locked full node
2245 // return pool entry for new right
2248 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2250 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2251 BtPage frame = malloc (mgr->page_size);
2252 uint lvl = set->page->lvl;
2259 // split higher half of keys to frame
2261 memset (frame, 0, mgr->page_size);
2262 max = set->page->cnt;
2266 while( cnt++ < max ) {
2267 if( cnt < max || set->page->lvl )
2268 if( slotptr(set->page, cnt)->dead )
2271 src = valptr(set->page, cnt);
2272 nxt -= src->len + sizeof(BtVal);
2273 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2275 key = keyptr(set->page, cnt);
2276 nxt -= key->len + sizeof(BtKey);
2277 ptr = (BtKey*)((unsigned char *)frame + nxt);
2278 memcpy (ptr, key, key->len + sizeof(BtKey));
2280 // add librarian slot
2282 slotptr(frame, ++idx)->off = nxt;
2283 slotptr(frame, idx)->type = Librarian;
2284 slotptr(frame, idx)->dead = 1;
2288 slotptr(frame, ++idx)->off = nxt;
2289 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2291 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2295 frame->bits = mgr->page_bits;
2302 if( set->latch->page_no > ROOT_page )
2303 bt_putid (frame->right, bt_getid (set->page->right));
2305 // get new free page and write higher keys to it.
2307 if( bt_newpage(mgr, right, frame, thread_no) )
2310 // process lower keys
2312 memcpy (frame, set->page, mgr->page_size);
2313 memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2314 set->latch->dirty = 1;
2316 nxt = mgr->page_size;
2317 set->page->garbage = 0;
2323 if( slotptr(frame, max)->type == Librarian )
2326 // assemble page of smaller keys
2328 while( cnt++ < max ) {
2329 if( slotptr(frame, cnt)->dead )
2331 val = valptr(frame, cnt);
2332 nxt -= val->len + sizeof(BtVal);
2333 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2335 key = keyptr(frame, cnt);
2336 nxt -= key->len + sizeof(BtKey);
2337 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2339 // add librarian slot
2341 slotptr(set->page, ++idx)->off = nxt;
2342 slotptr(set->page, idx)->type = Librarian;
2343 slotptr(set->page, idx)->dead = 1;
2347 slotptr(set->page, ++idx)->off = nxt;
2348 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2352 bt_putid(set->page->right, right->latch->page_no);
2353 set->page->min = nxt;
2354 set->page->cnt = idx;
2356 return right->latch->entry;
2359 // fix keys for newly split page
2360 // call with page locked, return
2363 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2365 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2366 unsigned char value[BtId];
2367 uint lvl = set->page->lvl;
2371 // if current page is the root page, split it
2373 if( set->latch->page_no == ROOT_page )
2374 return bt_splitroot (mgr, set, right, thread_no);
2376 ptr = keyptr(set->page, set->page->cnt);
2377 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2379 page = bt_mappage (mgr, right);
2381 ptr = keyptr(page, page->cnt);
2382 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2384 // insert new fences in their parent pages
2386 bt_lockpage (BtLockParent, right, thread_no);
2388 bt_lockpage (BtLockParent, set->latch, thread_no);
2389 bt_unlockpage (BtLockWrite, set->latch);
2391 // insert new fence for reformulated left block of smaller keys
2393 bt_putid (value, set->latch->page_no);
2394 ptr = (BtKey *)leftkey;
2396 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2399 // switch fence for right block of larger keys to new right page
2401 bt_putid (value, right->page_no);
2402 ptr = (BtKey *)rightkey;
2404 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2407 bt_unlockpage (BtLockParent, set->latch);
2408 bt_unpinlatch (mgr, set->latch);
2410 bt_unlockpage (BtLockParent, right);
2411 bt_unpinlatch (mgr, right);
2415 // install new key and value onto page
2416 // page must already be checked for
2419 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2421 uint idx, librarian;
2426 // if found slot > desired slot and previous slot
2427 // is a librarian slot, use it
2430 if( slotptr(set->page, slot-1)->type == Librarian )
2433 // copy value onto page
2435 set->page->min -= vallen + sizeof(BtVal);
2436 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2437 memcpy (val->value, value, vallen);
2440 // copy key onto page
2442 set->page->min -= keylen + sizeof(BtKey);
2443 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2444 memcpy (ptr->key, key, keylen);
2447 // find first empty slot
2449 for( idx = slot; idx < set->page->cnt; idx++ )
2450 if( slotptr(set->page, idx)->dead )
2453 // now insert key into array before slot
2455 if( idx == set->page->cnt )
2456 idx += 2, set->page->cnt += 2, librarian = 2;
2460 set->latch->dirty = 1;
2463 while( idx > slot + librarian - 1 )
2464 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2466 // add librarian slot
2468 if( librarian > 1 ) {
2469 node = slotptr(set->page, slot++);
2470 node->off = set->page->min;
2471 node->type = Librarian;
2477 node = slotptr(set->page, slot);
2478 node->off = set->page->min;
2483 bt_unlockpage (BtLockWrite, set->latch);
2484 bt_unpinlatch (mgr, set->latch);
2490 // Insert new key into the btree at given level.
2491 // either add a new key or update/add an existing one
2493 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2495 uint slot, idx, len, entry;
2501 while( 1 ) { // find the page and slot for the current key
2502 if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2503 node = slotptr(set->page, slot);
2504 ptr = keyptr(set->page, slot);
2507 mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2511 // if librarian slot == found slot, advance to real slot
2513 if( node->type == Librarian )
2514 if( !keycmp (ptr, key, keylen) ) {
2515 ptr = keyptr(set->page, ++slot);
2516 node = slotptr(set->page, slot);
2519 // if inserting a duplicate key or unique
2520 // key that doesn't exist on the page,
2521 // check for adequate space on the page
2522 // and insert the new key before slot.
2527 if( keycmp (ptr, key, keylen) )
2528 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2529 return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
2530 else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2532 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2537 // if key already exists, update value and return
2539 val = valptr(set->page, slot);
2541 if( val->len >= vallen ) {
2542 if( slotptr(set->page, slot)->dead )
2547 set->page->garbage += val->len - vallen;
2548 set->latch->dirty = 1;
2550 memcpy (val->value, value, vallen);
2551 bt_unlockpage(BtLockWrite, set->latch);
2552 bt_unpinlatch (mgr, set->latch);
2556 // new update value doesn't fit in existing value area
2557 // make sure page has room
2560 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2567 if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2568 if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2570 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2575 // copy key and value onto page and update slot
2577 set->page->min -= vallen + sizeof(BtVal);
2578 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2579 memcpy (val->value, value, vallen);
2582 set->latch->dirty = 1;
2583 set->page->min -= keylen + sizeof(BtKey);
2584 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2585 memcpy (ptr->key, key, keylen);
2588 node->off = set->page->min;
2589 bt_unlockpage(BtLockWrite, set->latch);
2590 bt_unpinlatch (mgr, set->latch);
2598 logseqno reqlsn; // redo log seq no required
2599 logseqno lsn; // redo log sequence number
2600 uint entry; // latch table entry number
2601 uint slot:31; // page slot number
2602 uint reuse:1; // reused previous page
2606 uid page_no; // page number for split leaf
2607 void *next; // next key to insert
2608 uint entry:29; // latch table entry number
2609 uint type:2; // 0 == insert, 1 == delete, 2 == free
2610 uint nounlock:1; // don't unlock ParentModification
2611 unsigned char leafkey[BT_keyarray];
2614 // determine actual page where key is located
2615 // return slot number
2617 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2619 BtKey *key = keyptr(source,src);
2620 uint slot = locks[src].slot;
2623 if( src > 1 && locks[src].reuse )
2624 entry = locks[src-1].entry, slot = 0;
2626 entry = locks[src].entry;
2629 set->latch = mgr->latchsets + entry;
2630 set->page = bt_mappage (mgr, set->latch);
2634 // is locks->reuse set? or was slot zeroed?
2635 // if so, find where our key is located
2636 // on current page or pages split on
2637 // same page txn operations.
2640 set->latch = mgr->latchsets + entry;
2641 set->page = bt_mappage (mgr, set->latch);
2643 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2644 if( slotptr(set->page, slot)->type == Librarian )
2646 if( locks[src].reuse )
2647 locks[src].entry = entry;
2650 } while( entry = set->latch->split );
2652 mgr->line = __LINE__, mgr->err = BTERR_atomic;
2656 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
2658 BtKey *key = keyptr(source, src);
2659 BtVal *val = valptr(source, src);
2664 while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2665 if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2666 if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2668 set->page->lsn = locks[src].lsn;
2672 if( entry = bt_splitpage (mgr, set, thread_no) )
2673 latch = mgr->latchsets + entry;
2677 // splice right page into split chain
2678 // and WriteLock it.
2680 bt_lockpage(BtLockWrite, latch, thread_no);
2681 latch->split = set->latch->split;
2682 set->latch->split = entry;
2683 locks[src].slot = 0;
2686 return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2689 // perform delete from smaller btree
2690 // insert a delete slot if not found there
2692 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
2694 BtKey *key = keyptr(source, src);
2701 if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2702 node = slotptr(set->page, slot);
2703 ptr = keyptr(set->page, slot);
2704 val = valptr(set->page, slot);
2706 return mgr->line = __LINE__, mgr->err = BTERR_struct;
2708 // if slot is not found, insert a delete slot
2710 if( keycmp (ptr, key->key, key->len) )
2711 return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
2713 // if node is already dead,
2714 // ignore the request.
2719 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2720 set->latch->dirty = 1;
2721 set->page->lsn = locks[src].lsn;
2729 // delete an empty master page for a transaction
2731 // note that the far right page never empties because
2732 // it always contains (at least) the infinite stopper key
2733 // and that all pages that don't contain any keys are
2734 // deleted, or are being held under Atomic lock.
2736 BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no)
2738 BtPageSet right[1], temp[1];
2739 unsigned char value[BtId];
2743 bt_lockpage(BtLockWrite, prev->latch, thread_no);
2745 // grab the right sibling
2747 if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) )
2748 right->page = bt_mappage (mgr, right->latch);
2752 bt_lockpage(BtLockAtomic, right->latch, thread_no);
2753 bt_lockpage(BtLockWrite, right->latch, thread_no);
2755 // and pull contents over empty page
2756 // while preserving master's left link
2758 memcpy (right->page->left, prev->page->left, BtId);
2759 memcpy (prev->page, right->page, mgr->page_size);
2761 // forward seekers to old right sibling
2762 // to new page location in set
2764 bt_putid (right->page->right, prev->latch->page_no);
2765 right->latch->dirty = 1;
2766 right->page->kill = 1;
2768 // remove pointer to right page for searchers by
2769 // changing right fence key to point to the master page
2771 ptr = keyptr(right->page,right->page->cnt);
2772 bt_putid (value, prev->latch->page_no);
2774 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
2777 // now that master page is in good shape we can
2778 // remove its locks.
2780 bt_unlockpage (BtLockAtomic, prev->latch);
2781 bt_unlockpage (BtLockWrite, prev->latch);
2783 // fix master's right sibling's left pointer
2784 // to remove scanner's poiner to the right page
2786 if( right_page_no = bt_getid (prev->page->right) ) {
2787 if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
2788 temp->page = bt_mappage (mgr, temp->latch);
2792 bt_lockpage (BtLockWrite, temp->latch, thread_no);
2793 bt_putid (temp->page->left, prev->latch->page_no);
2794 temp->latch->dirty = 1;
2796 bt_unlockpage (BtLockWrite, temp->latch);
2797 bt_unpinlatch (mgr, temp->latch);
2798 } else { // master is now the far right page
2799 bt_mutexlock (mgr->lock);
2800 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
2801 bt_releasemutex(mgr->lock);
2804 // now that there are no pointers to the right page
2805 // we can delete it after the last read access occurs
2807 bt_unlockpage (BtLockWrite, right->latch);
2808 bt_unlockpage (BtLockAtomic, right->latch);
2809 bt_lockpage (BtLockDelete, right->latch, thread_no);
2810 bt_lockpage (BtLockWrite, right->latch, thread_no);
2811 bt_freepage (mgr, right);
2815 // atomic modification of a batch of keys.
2817 // return -1 if BTERR is set
2818 // otherwise return slot number
2819 // causing the key constraint violation
2820 // or zero on successful completion.
2822 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2824 uint src, idx, slot, samepage, entry, que = 0;
2825 AtomicKey *head, *tail, *leaf;
2826 BtPageSet set[1], prev[1];
2827 unsigned char value[BtId];
2828 BtKey *key, *ptr, *key2;
2839 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2843 // stable sort the list of keys into order to
2844 // prevent deadlocks between threads.
2846 for( src = 1; src++ < source->cnt; ) {
2847 *temp = *slotptr(source,src);
2848 key = keyptr (source,src);
2850 for( idx = src; --idx; ) {
2851 key2 = keyptr (source,idx);
2852 if( keycmp (key, key2->key, key2->len) < 0 ) {
2853 *slotptr(source,idx+1) = *slotptr(source,idx);
2854 *slotptr(source,idx) = *temp;
2860 // add entries to redo log
2862 if( bt->mgr->redopages )
2863 if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
2864 for( src = 0; src++ < source->cnt; )
2865 locks[src].lsn = lsn;
2867 return bt->mgr->err;
2869 // Load the leaf page for each key
2870 // group same page references with reuse bit
2871 // and determine any constraint violations
2873 for( src = 0; src++ < source->cnt; ) {
2874 key = keyptr(source, src);
2877 // first determine if this modification falls
2878 // on the same page as the previous modification
2879 // note that the far right leaf page is a special case
2881 if( samepage = src > 1 )
2882 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2883 slot = bt_findslot(set->page, key->key, key->len);
2885 bt_unlockpage(BtLockRead, set->latch);
2888 if( slot = bt_loadpage(bt->mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, bt->thread_no) )
2889 set->latch->split = 0;
2891 return bt->mgr->err;
2893 if( slotptr(set->page, slot)->type == Librarian )
2894 ptr = keyptr(set->page, ++slot);
2896 ptr = keyptr(set->page, slot);
2899 locks[src].entry = set->latch->entry;
2900 locks[src].slot = slot;
2901 locks[src].reuse = 0;
2903 locks[src].entry = 0;
2904 locks[src].slot = 0;
2905 locks[src].reuse = 1;
2908 // capture current lsn for master page
2910 locks[src].reqlsn = set->page->lsn;
2913 // unlock last loadpage lock
2916 bt_unlockpage(BtLockRead, set->latch);
2918 // obtain write lock for each master page
2919 // sync flushed pages to disk
2921 for( src = 0; src++ < source->cnt; ) {
2922 if( locks[src].reuse )
2925 set->latch = bt->mgr->latchsets + locks[src].entry;
2926 bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
2929 // insert or delete each key
2930 // process any splits or merges
2931 // release Write & Atomic latches
2932 // set ParentModifications and build
2933 // queue of keys to insert for split pages
2934 // or delete for deleted pages.
2936 // run through txn list backwards
2938 samepage = source->cnt + 1;
2940 for( src = source->cnt; src; src-- ) {
2941 if( locks[src].reuse )
2944 // perform the txn operations
2945 // from smaller to larger on
2948 for( idx = src; idx < samepage; idx++ )
2949 switch( slotptr(source,idx)->type ) {
2951 if( bt_atomicdelete (bt->mgr, source, locks, idx, bt->thread_no) )
2952 return bt->mgr->err;
2957 if( bt_atomicinsert (bt->mgr, source, locks, idx, bt->thread_no) )
2958 return bt->mgr->err;
2962 // after the same page operations have finished,
2963 // process master page for splits or deletion.
2965 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2966 prev->page = bt_mappage (bt->mgr, prev->latch);
2969 // pick-up all splits from master page
2970 // each one is already WriteLocked.
2972 entry = prev->latch->split;
2975 set->latch = bt->mgr->latchsets + entry;
2976 set->page = bt_mappage (bt->mgr, set->latch);
2977 entry = set->latch->split;
2979 // delete empty master page by undoing its split
2980 // (this is potentially another empty page)
2981 // note that there are no new left pointers yet
2983 if( !prev->page->act ) {
2984 memcpy (set->page->left, prev->page->left, BtId);
2985 memcpy (prev->page, set->page, bt->mgr->page_size);
2986 bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
2987 bt_freepage (bt->mgr, set);
2989 prev->latch->split = set->latch->split;
2990 prev->latch->dirty = 1;
2994 // remove empty page from the split chain
2995 // and return it to the free list.
2997 if( !set->page->act ) {
2998 memcpy (prev->page->right, set->page->right, BtId);
2999 prev->latch->split = set->latch->split;
3000 bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
3001 bt_freepage (bt->mgr, set);
3005 // schedule prev fence key update
3007 ptr = keyptr(prev->page,prev->page->cnt);
3008 leaf = calloc (sizeof(AtomicKey), 1), que++;
3010 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3011 leaf->page_no = prev->latch->page_no;
3012 leaf->entry = prev->latch->entry;
3022 // splice in the left link into the split page
3024 bt_putid (set->page->left, prev->latch->page_no);
3025 bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
3026 bt_unlockpage(BtLockWrite, prev->latch);
3030 // update left pointer in next right page from last split page
3031 // (if all splits were reversed, latch->split == 0)
3033 if( latch->split ) {
3034 // fix left pointer in master's original (now split)
3035 // far right sibling or set rightmost page in page zero
3037 if( right = bt_getid (prev->page->right) ) {
3038 if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3039 set->page = bt_mappage (bt->mgr, set->latch);
3041 return bt->mgr->err;
3043 bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3044 bt_putid (set->page->left, prev->latch->page_no);
3045 set->latch->dirty = 1;
3046 bt_unlockpage (BtLockWrite, set->latch);
3047 bt_unpinlatch (bt->mgr, set->latch);
3048 } else { // prev is rightmost page
3049 bt_mutexlock (bt->mgr->lock);
3050 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
3051 bt_releasemutex(bt->mgr->lock);
3054 // Process last page split in chain
3056 ptr = keyptr(prev->page,prev->page->cnt);
3057 leaf = calloc (sizeof(AtomicKey), 1), que++;
3059 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3060 leaf->page_no = prev->latch->page_no;
3061 leaf->entry = prev->latch->entry;
3071 bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
3072 bt_unlockpage(BtLockWrite, prev->latch);
3074 // remove atomic lock on master page
3076 bt_unlockpage(BtLockAtomic, latch);
3080 // finished if prev page occupied (either master or final split)
3082 if( prev->page->act ) {
3083 bt_unlockpage(BtLockWrite, latch);
3084 bt_unlockpage(BtLockAtomic, latch);
3085 bt_unpinlatch(bt->mgr, latch);
3089 // any and all splits were reversed, and the
3090 // master page located in prev is empty, delete it
3091 // by pulling over master's right sibling.
3093 // Remove empty master's fence key
3095 ptr = keyptr(prev->page,prev->page->cnt);
3097 if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3098 return bt->mgr->err;
3100 // perform the remainder of the delete
3101 // from the FIFO queue
3103 leaf = calloc (sizeof(AtomicKey), 1), que++;
3105 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3106 leaf->page_no = prev->latch->page_no;
3107 leaf->entry = prev->latch->entry;
3118 // leave atomic lock in place until
3119 // deletion completes in next phase.
3121 bt_unlockpage(BtLockWrite, prev->latch);
3124 // add & delete keys for any pages split or merged during transaction
3128 set->latch = bt->mgr->latchsets + leaf->entry;
3129 set->page = bt_mappage (bt->mgr, set->latch);
3131 bt_putid (value, leaf->page_no);
3132 ptr = (BtKey *)leaf->leafkey;
3134 switch( leaf->type ) {
3135 case 0: // insert key
3136 if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, Unique, bt->thread_no) )
3137 return bt->mgr->err;
3141 case 1: // delete key
3142 if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3143 return bt->mgr->err;
3147 case 2: // free page
3148 if( bt_atomicfree (bt->mgr, set, bt->thread_no) )
3149 return bt->mgr->err;
3154 if( !leaf->nounlock )
3155 bt_unlockpage (BtLockParent, set->latch);
3157 bt_unpinlatch (bt->mgr, set->latch);
3160 } while( leaf = tail );
3162 // if number of active pages
3163 // is greater than the buffer pool
3164 // promote page into larger btree
3166 while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
3167 if( bt_txnpromote (bt) )
3168 return bt->mgr->err;
3176 // promote a page into the larger btree
3178 BTERR bt_txnpromote (BtDb *bt)
3180 uint entry, slot, idx;
3188 entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
3190 entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
3192 entry %= bt->mgr->latchtotal;
3197 set->latch = bt->mgr->latchsets + entry;
3198 idx = set->latch->page_no % bt->mgr->latchhash;
3200 if( !bt_mutextry(set->latch->modify) )
3203 // if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
3204 // bt_releasemutex(set->latch->modify);
3208 // skip this entry if it is pinned
3210 if( set->latch->pin & ~CLOCK_bit ) {
3211 bt_releasemutex(set->latch->modify);
3212 // bt_releasemutex(bt->mgr->hashtable[idx].latch);
3216 set->page = bt_mappage (bt->mgr, set->latch);
3218 // entry never used or has no right sibling
3220 if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3221 bt_releasemutex(set->latch->modify);
3222 // bt_releasemutex(bt->mgr->hashtable[idx].latch);
3226 bt_lockpage (BtLockAccess, set->latch, bt->thread_no);
3227 bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3228 bt_unlockpage(BtLockAccess, set->latch);
3230 // entry interiour node or being or was killed
3232 if( set->page->lvl || set->page->free || set->page->kill ) {
3233 bt_releasemutex(set->latch->modify);
3234 // bt_releasemutex(bt->mgr->hashtable[idx].latch);
3235 bt_unlockpage(BtLockWrite, set->latch);
3239 // pin the page for our useage
3242 bt_releasemutex(set->latch->modify);
3243 // bt_releasemutex(bt->mgr->hashtable[idx].latch);
3245 // if page is dirty, then
3246 // sync it to the disk first.
3248 if( set->latch->dirty )
3249 if( bt->mgr->err = bt_writepage (bt->mgr, set->page, set->latch->page_no, 1) )
3250 return bt->mgr->line = __LINE__, bt->mgr->err;
3252 set->latch->dirty = 0;
3254 // transfer slots in our selected page to larger btree
3255 if( !(set->latch->page_no % 100) )
3256 fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cnt);
3258 for( slot = 0; slot++ < set->page->cnt; ) {
3259 ptr = keyptr (set->page, slot);
3260 val = valptr (set->page, slot);
3261 node = slotptr(set->page, slot);
3263 switch( node->type ) {
3266 if( bt_insertkey (bt->main, ptr->key, ptr->len, 0, val->value, val->len, node->type, bt->thread_no) )
3267 return bt->main->err;
3272 if( bt_deletekey (bt->main, ptr->key, ptr->len, 0, bt->thread_no) )
3273 return bt->main->err;
3279 // now delete the page
3281 if( bt_deletepage (bt->mgr, set, bt->thread_no) )
3282 return bt->mgr->err;
3284 bt_unpinlatch (bt->mgr, set->latch);
3289 // set cursor to highest slot on highest page
3291 uint bt_lastkey (BtDb *bt)
3293 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3296 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3297 set->page = bt_mappage (bt->mgr, set->latch);
3301 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3302 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3303 bt_unlockpage(BtLockRead, set->latch);
3304 bt_unpinlatch (bt->mgr, set->latch);
3305 return bt->cursor->cnt;
3308 // return previous slot on cursor page
3310 uint bt_prevkey (BtDb *bt, uint slot)
3312 uid cursor_page = bt->cursor->page_no;
3313 uid ourright, next, us = cursor_page;
3319 ourright = bt_getid(bt->cursor->right);
3322 if( !(next = bt_getid(bt->cursor->left)) )
3328 if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3329 set->page = bt_mappage (bt->mgr, set->latch);
3333 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3334 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3335 bt_unlockpage(BtLockRead, set->latch);
3336 bt_unpinlatch (bt->mgr, set->latch);
3338 next = bt_getid (bt->cursor->right);
3340 if( bt->cursor->kill )
3344 if( next == ourright )
3349 return bt->cursor->cnt;
3352 // return next slot on cursor page
3353 // or slide cursor right into next page
3355 uint bt_nextkey (BtDb *bt, uint slot)
3361 right = bt_getid(bt->cursor->right);
3363 while( slot++ < bt->cursor->cnt )
3364 if( slotptr(bt->cursor,slot)->dead )
3366 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3374 if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3375 set->page = bt_mappage (bt->mgr, set->latch);
3379 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3380 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3381 bt_unlockpage(BtLockRead, set->latch);
3382 bt_unpinlatch (bt->mgr, set->latch);
3387 return bt->mgr->err = 0;
3390 // cache page of keys into cursor and return starting slot for given key
3392 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3397 // cache page for retrieval
3399 if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3400 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3404 bt_unlockpage(BtLockRead, set->latch);
3405 bt_unpinlatch (bt->mgr, set->latch);
3412 double getCpuTime(int type)
3415 FILETIME xittime[1];
3416 FILETIME systime[1];
3417 FILETIME usrtime[1];
3418 SYSTEMTIME timeconv[1];
3421 memset (timeconv, 0, sizeof(SYSTEMTIME));
3425 GetSystemTimeAsFileTime (xittime);
3426 FileTimeToSystemTime (xittime, timeconv);
3427 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3430 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3431 FileTimeToSystemTime (usrtime, timeconv);
3434 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3435 FileTimeToSystemTime (systime, timeconv);
3439 ans += (double)timeconv->wHour * 3600;
3440 ans += (double)timeconv->wMinute * 60;
3441 ans += (double)timeconv->wSecond;
3442 ans += (double)timeconv->wMilliseconds / 1000;
3447 #include <sys/resource.h>
3449 double getCpuTime(int type)
3451 struct rusage used[1];
3452 struct timeval tv[1];
3456 gettimeofday(tv, NULL);
3457 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3460 getrusage(RUSAGE_SELF, used);
3461 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3464 getrusage(RUSAGE_SELF, used);
3465 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3472 void bt_poolaudit (BtMgr *mgr)
3477 while( ++entry < mgr->latchtotal ) {
3478 latch = mgr->latchsets + entry;
3480 if( *latch->readwr->rin & MASK )
3481 fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3483 if( *latch->access->rin & MASK )
3484 fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3486 // if( *latch->parent->xcl->value )
3487 // fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3489 // if( *latch->atomic->xcl->value )
3490 // fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3492 // if( *latch->modify->value )
3493 // fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3495 if( latch->pin & ~CLOCK_bit )
3496 fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3509 // standalone program to index file of keys
3510 // then list them onto std-out
3513 void *index_file (void *arg)
3515 uint __stdcall index_file (void *arg)
3518 int line = 0, found = 0, cnt = 0, idx;
3519 uid next, page_no = LEAF_page; // start on first page of leaves
3520 int ch, len = 0, slot, type = 0;
3521 unsigned char key[BT_maxkey];
3522 unsigned char txn[65536];
3523 ThreadArg *args = arg;
3532 bt = bt_open (args->mgr, args->main);
3535 if( args->idx < strlen (args->type) )
3536 ch = args->type[args->idx];
3538 ch = args->type[strlen(args->type) - 1];
3550 if( type == Delete )
3551 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3553 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3555 if( type == Delete )
3556 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3558 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3560 if( in = fopen (args->infile, "rb") )
3561 while( ch = getc(in), ch != EOF )
3567 if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3568 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3574 memcpy (txn + nxt, key + 10, len - 10);
3576 txn[nxt] = len - 10;
3578 memcpy (txn + nxt, key, 10);
3581 slotptr(page,++cnt)->off = nxt;
3582 slotptr(page,cnt)->type = type;
3585 if( cnt < args->num )
3592 if( bt_atomictxn (bt, page) )
3593 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3598 else if( len < BT_maxkey )
3600 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3604 fprintf(stderr, "started indexing for %s\n", args->infile);
3605 if( in = fopen (args->infile, "r") )
3606 while( ch = getc(in), ch != EOF )
3611 if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3612 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3615 else if( len < BT_maxkey )
3617 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3621 fprintf(stderr, "started finding keys for %s\n", args->infile);
3622 if( in = fopen (args->infile, "rb") )
3623 while( ch = getc(in), ch != EOF )
3627 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3629 else if( bt->mgr->err )
3630 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3633 else if( len < BT_maxkey )
3635 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3639 fprintf(stderr, "started scanning\n");
3642 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3643 set->page = bt_mappage (bt->mgr, set->latch);
3645 fprintf(stderr, "unable to obtain latch"), exit(1);
3647 bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3648 next = bt_getid (set->page->right);
3650 for( slot = 0; slot++ < set->page->cnt; )
3651 if( next || slot < set->page->cnt )
3652 if( !slotptr(set->page, slot)->dead ) {
3653 ptr = keyptr(set->page, slot);
3656 if( slotptr(set->page, slot)->type == Duplicate )
3659 fwrite (ptr->key, len, 1, stdout);
3660 val = valptr(set->page, slot);
3661 fwrite (val->value, val->len, 1, stdout);
3662 fputc ('\n', stdout);
3666 bt_unlockpage (BtLockRead, set->latch);
3667 bt_unpinlatch (bt->mgr, set->latch);
3668 } while( page_no = next );
3670 fprintf(stderr, " Total keys read %d\n", cnt);
3674 fprintf(stderr, "started reverse scan\n");
3675 if( slot = bt_lastkey (bt) )
3676 while( slot = bt_prevkey (bt, slot) ) {
3677 if( slotptr(bt->cursor, slot)->dead )
3680 ptr = keyptr(bt->cursor, slot);
3683 if( slotptr(bt->cursor, slot)->type == Duplicate )
3686 fwrite (ptr->key, len, 1, stdout);
3687 val = valptr(bt->cursor, slot);
3688 fwrite (val->value, val->len, 1, stdout);
3689 fputc ('\n', stdout);
3693 fprintf(stderr, " Total keys read %d\n", cnt);
3698 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3700 fprintf(stderr, "started counting\n");
3701 next = LEAF_page + bt->mgr->redopages + 1;
3703 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3704 if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3707 if( !bt->cursor->free && !bt->cursor->lvl )
3708 cnt += bt->cursor->act;
3714 cnt--; // remove stopper key
3715 fprintf(stderr, " Total keys counted %d\n", cnt);
3719 fprintf(stderr, "%d reads %d writes %d found\n", bt->mgr->reads, bt->mgr->writes, bt->mgr->found);
3728 typedef struct timeval timer;
3730 int main (int argc, char **argv)
3732 int idx, cnt, len, slot, err;
3733 int segsize, bits = 16;
3753 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3754 fprintf (stderr, " where idx_file is the name of the cache btree file\n");
3755 fprintf (stderr, " where main_file is the name of the main btree file\n");
3756 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3757 fprintf (stderr, " page_bits is the page size in bits for the cache btree\n");
3758 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3759 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3760 fprintf (stderr, " recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3761 fprintf (stderr, " main_bits is the page size of the main btree in bits\n");
3762 fprintf (stderr, " main_pool is the number of main pages in the main buffer pool\n");
3763 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3767 start = getCpuTime(0);
3770 bits = atoi(argv[4]);
3773 poolsize = atoi(argv[5]);
3776 fprintf (stderr, "Warning: no mapped_pool\n");
3779 num = atoi(argv[6]);
3782 recovery = atoi(argv[7]);
3785 mainbits = atoi(argv[8]);
3788 mainpool = atoi(argv[9]);
3792 threads = malloc (cnt * sizeof(pthread_t));
3794 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3796 args = malloc ((cnt + 1) * sizeof(ThreadArg));
3798 mgr = bt_mgr (argv[1], bits, poolsize, recovery);
3801 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3805 main = bt_mgr (argv[2], mainbits, mainpool, 0);
3808 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3815 for( idx = 0; idx < cnt; idx++ ) {
3816 args[idx].infile = argv[idx + 10];
3817 args[idx].type = argv[3];
3818 args[idx].main = main;
3819 args[idx].mgr = mgr;
3820 args[idx].num = num;
3821 args[idx].idx = idx;
3823 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3824 fprintf(stderr, "Error creating thread %d\n", err);
3826 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3830 args[0].infile = argv[idx + 10];
3831 args[0].type = argv[3];
3832 args[0].main = main;
3839 // wait for termination
3842 for( idx = 0; idx < cnt; idx++ )
3843 pthread_join (threads[idx], NULL);
3845 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3847 for( idx = 0; idx < cnt; idx++ )
3848 CloseHandle(threads[idx]);
3856 elapsed = getCpuTime(0) - start;
3857 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3858 elapsed = getCpuTime(1);
3859 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3860 elapsed = getCpuTime(2);
3861 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);