1 // btree version threadskv10 futex version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // redo log for failure recovery
10 // and LSM B-trees for write optimization
14 // author: karl malbrain, malbrain@cal.berkeley.edu
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
20 The orginal author is Karl Malbrain.
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
38 #include <linux/futex.h>
53 #define WIN32_LEAN_AND_MEAN
67 typedef unsigned long long uid;
68 typedef unsigned long long logseqno;
71 typedef unsigned long long off64_t;
72 typedef unsigned short ushort;
73 typedef unsigned int uint;
76 #define BT_ro 0x6f72 // ro
77 #define BT_rw 0x7772 // rw
79 #define BT_maxbits 26 // maximum page size in bits
80 #define BT_minbits 9 // minimum page size in bits
81 #define BT_minpage (1 << BT_minbits) // minimum page size
82 #define BT_maxpage (1 << BT_maxbits) // maximum page size
84 // BTree page number constants
85 #define ALLOC_page 0 // allocation page
86 #define ROOT_page 1 // root of the btree
87 #define LEAF_page 2 // first page of leaves
88 #define REDO_page 3 // first page of redo buffer
90 // Number of levels to create in a new BTree
95 There are six lock types for each node in four independent sets:
96 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
97 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
98 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
99 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
100 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
101 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
116 volatile uint xlock:1; // one writer has exclusive lock
117 volatile uint wrt:31; // count of other writers waiting
126 // definition for phase-fair reader/writer lock implementation
129 volatile ushort rin[1];
130 volatile ushort rout[1];
131 volatile ushort ticket[1];
132 volatile ushort serving[1];
133 volatile ushort tid[1];
134 volatile ushort dup[1];
141 volatile ushort tid[1];
142 volatile ushort dup[1];
150 // mode & definition for lite latch implementation
153 QueRd = 1, // reader queue
154 QueWr = 2 // writer queue
157 // hash table entries
160 uint entry; // Latch table entry at head of chain
161 BtMutexLatch latch[1];
164 // latch manager table structure
167 uid page_no; // latch set page number
168 RWLock readwr[1]; // read/write page lock
169 RWLock access[1]; // Access Intent/Page delete
170 WOLock parent[1]; // Posting of fence key in parent
171 WOLock atomic[1]; // Atomic update in progress
172 uint split; // right split page atomic insert
173 uint next; // next entry in hash table chain
174 uint prev; // prev entry in hash table chain
175 ushort pin; // number of accessing threads
176 unsigned char dirty; // page in cache is dirty (atomic set)
177 BtMutexLatch modify[1]; // modify entry lite latch
180 // Define the length of the page record numbers
184 // Page key slot definition.
186 // Keys are marked dead, but remain on the page until
187 // it cleanup is called. The fence key (highest key) for
188 // a leaf page is always present, even after cleanup.
192 // In addition to the Unique keys that occupy slots
193 // there are Librarian and Duplicate key
194 // slots occupying the key slot array.
196 // The Librarian slots are dead keys that
197 // serve as filler, available to add new Unique
198 // or Dup slots that are inserted into the B-tree.
200 // The Duplicate slots have had their key bytes extended
201 // by 6 bytes to contain a binary duplicate key uniqueifier.
211 uint off:BT_maxbits; // page offset for key start
212 uint type:3; // type of slot
213 uint dead:1; // set for deleted slot
216 // The key structure occupies space at the upper end of
217 // each page. It's a length byte followed by the key
221 unsigned char len; // this can be changed to a ushort or uint
222 unsigned char key[0];
225 // the value structure also occupies space at the upper
226 // end of the page. Each key is immediately followed by a value.
229 unsigned char len; // this can be changed to a ushort or uint
230 unsigned char value[0];
233 #define BT_maxkey 255 // maximum number of bytes in a key
234 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
236 // The first part of an index page.
237 // It is immediately followed
238 // by the BtSlot array of keys.
240 // note that this structure size
241 // must be a multiple of 8 bytes
242 // in order to place PageZero correctly.
244 typedef struct BtPage_ {
245 uint cnt; // count of keys in page
246 uint act; // count of active keys
247 uint min; // next key offset
248 uint garbage; // page garbage in bytes
249 unsigned char bits:7; // page size in bits
250 unsigned char free:1; // page is on free chain
251 unsigned char lvl:7; // level of page
252 unsigned char kill:1; // page is being deleted
253 unsigned char right[BtId]; // page number to right
254 unsigned char left[BtId]; // page number to left
255 unsigned char filler[2]; // padding to multiple of 8
256 logseqno lsn; // log sequence number applied
257 uid page_no; // this page number
260 // The loadpage interface object
263 BtPage page; // current page pointer
264 BtLatchSet *latch; // current page latch set
267 // structure for latch manager on ALLOC_page
270 struct BtPage_ alloc[1]; // next page_no in right ptr
271 unsigned char freechain[BtId]; // head of free page_nos chain
272 unsigned long long activepages; // number of active pages
273 uint redopages; // number of redo pages in file
276 // The object structure for Btree access
279 uint page_size; // page size
280 uint page_bits; // page size in bits
286 BtPageZero *pagezero; // mapped allocation page
287 BtHashEntry *hashtable; // the buffer pool hash table entries
288 BtLatchSet *latchsets; // mapped latch set from buffer pool
289 unsigned char *pagepool; // mapped to the buffer pool pages
290 unsigned char *redobuff; // mapped recovery buffer pointer
291 logseqno lsn, flushlsn; // current & first lsn flushed
292 BtMutexLatch redo[1]; // redo area lite latch
293 BtMutexLatch lock[1]; // allocation area lite latch
294 BtMutexLatch maps[1]; // mapping segments lite latch
295 ushort thread_no[1]; // next thread number
296 uint nlatchpage; // number of latch pages at BT_latch
297 uint latchtotal; // number of page latch entries
298 uint latchhash; // number of latch hash table slots
299 uint latchvictim; // next latch entry to examine
300 uint latchpromote; // next latch entry to promote
301 uint redolast; // last msync size of recovery buff
302 uint redoend; // eof/end element in recovery buff
303 int err; // last error
304 int line; // last error line no
305 int found; // number of keys found by delete
306 int reads, writes; // number of reads and writes
308 HANDLE halloc; // allocation handle
309 HANDLE hpool; // buffer pool handle
311 uint segments; // number of memory mapped segments
312 unsigned char *pages[64000];// memory mapped segments of b-tree
316 BtMgr *mgr; // buffer manager for entire process
317 BtMgr *main; // buffer manager for main btree
318 BtPage cursor; // cached page frame for start/next
319 ushort thread_no; // thread number
320 unsigned char key[BT_keyarray]; // last found complete key
323 // Catastrophic errors
337 #define CLOCK_bit 0x8000
339 // recovery manager entry types
342 BTRM_eof = 0, // rest of buffer is emtpy
343 BTRM_add, // add a unique key-value to btree
344 BTRM_dup, // add a duplicate key-value to btree
345 BTRM_del, // delete a key-value from btree
346 BTRM_upd, // update a key with a new value
347 BTRM_new, // allocate a new empty page
348 BTRM_old // reuse an old empty page
351 // recovery manager entry
352 // structure followed by BtKey & BtVal
355 logseqno reqlsn; // log sequence number required
356 logseqno lsn; // log sequence number for entry
357 uint len; // length of entry
358 unsigned char type; // type of entry
359 unsigned char lvl; // level of btree entry pertains to
364 extern void bt_close (BtDb *bt);
365 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
366 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
367 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
368 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
369 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
370 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
371 extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
373 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
375 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
376 extern uint bt_nextkey (BtDb *bt, uint slot);
377 extern uint bt_prevkey (BtDb *db, uint slot);
378 extern uint bt_lastkey (BtDb *db);
381 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint redopages);
382 extern void bt_mgrclose (BtMgr *mgr);
383 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
384 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
386 // transaction functions
387 BTERR bt_txnpromote (BtDb *bt);
389 // The page is allocated from low and hi ends.
390 // The key slots are allocated from the bottom,
391 // while the text and value of the key
392 // are allocated from the top. When the two
393 // areas meet, the page is split into two.
395 // A key consists of a length byte, two bytes of
396 // index number (0 - 65535), and up to 253 bytes
399 // Associated with each key is a value byte string
400 // containing any value desired.
402 // The b-tree root is always located at page 1.
403 // The first leaf page of level zero is always
404 // located on page 2.
406 // The b-tree pages are linked with next
407 // pointers to facilitate enumerators,
408 // and provide for concurrency.
410 // When to root page fills, it is split in two and
411 // the tree height is raised by a new root at page
412 // one with two keys.
414 // Deleted keys are marked with a dead bit until
415 // page cleanup. The fence key for a leaf node is
418 // To achieve maximum concurrency one page is locked at a time
419 // as the tree is traversed to find leaf key in question. The right
420 // page numbers are used in cases where the page is being split,
423 // Page 0 is dedicated to lock for new page extensions,
424 // and chains empty pages together for reuse. It also
425 // contains the latch manager hash table.
427 // The ParentModification lock on a node is obtained to serialize posting
428 // or changing the fence key for a node.
430 // Empty pages are chained together through the ALLOC page and reused.
432 // Access macros to address slot and key values from the page
433 // Page slots use 1 based indexing.
435 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
436 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
437 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
439 void bt_putid(unsigned char *dest, uid id)
444 dest[i] = (unsigned char)id, id >>= 8;
447 uid bt_getid(unsigned char *src)
452 for( i = 0; i < BtId; i++ )
453 id <<= 8, id |= *src++;
458 // lite weight spin lock Latch Manager
460 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
462 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
465 void bt_mutexlock(BtMutexLatch *latch)
467 BtMutexLatch prev[1];
471 *prev->value = __sync_fetch_and_or(latch->value, XCL);
473 if( !prev->bits->xlock ) { // did we set XCL bit?
475 __sync_fetch_and_sub(latch->value, WRT);
481 __sync_fetch_and_add(latch->value, WRT);
484 sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr);
489 // try to obtain write lock
491 // return 1 if obtained,
494 int bt_mutextry(BtMutexLatch *latch)
496 BtMutexLatch prev[1];
498 *prev->value = __sync_fetch_and_or(latch->value, XCL);
500 // take write access if exclusive bit is clear
502 return !prev->bits->xlock;
507 void bt_releasemutex(BtMutexLatch *latch)
509 BtMutexLatch prev[1];
511 *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
513 if( prev->bits->wrt )
514 sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
517 // Write-Only Queue Lock
519 void WriteOLock (WOLock *lock, ushort tid)
521 if( *lock->tid == tid ) {
526 bt_mutexlock(lock->xcl);
530 void WriteORelease (WOLock *lock)
538 bt_releasemutex(lock->xcl);
541 // Phase-Fair reader/writer lock implementation
543 void WriteLock (RWLock *lock, ushort tid)
547 if( *lock->tid == tid ) {
552 tix = __sync_fetch_and_add (lock->ticket, 1);
554 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
556 // wait for our ticket to come up
558 while( tix != lock->serving[0] )
565 w = PRES | (tix & PHID);
567 r = __sync_fetch_and_add (lock->rin, w);
569 r = _InterlockedExchangeAdd16 (lock->rin, w);
571 while( r != *lock->rout )
580 void WriteRelease (RWLock *lock)
589 __sync_fetch_and_and (lock->rin, ~MASK);
591 _InterlockedAnd16 (lock->rin, ~MASK);
596 // try to obtain read lock
597 // return 1 if successful
599 int ReadTry (RWLock *lock, ushort tid)
603 // OK if write lock already held by same thread
605 if( *lock->tid == tid ) {
610 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
612 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
615 if( w == (*lock->rin & MASK) ) {
617 __sync_fetch_and_add (lock->rin, -RINC);
619 _InterlockedExchangeAdd16 (lock->rin, -RINC);
627 void ReadLock (RWLock *lock, ushort tid)
631 if( *lock->tid == tid ) {
636 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
638 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
641 while( w == (*lock->rin & MASK) )
649 void ReadRelease (RWLock *lock)
657 __sync_fetch_and_add (lock->rout, RINC);
659 _InterlockedExchangeAdd16 (lock->rout, RINC);
663 // recovery manager -- flush dirty pages
665 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
667 uint cnt3 = 0, cnt2 = 0, cnt = 0;
672 // flush dirty pool pages to the btree
674 fprintf(stderr, "Start flushlsn ");
675 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
676 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
677 latch = mgr->latchsets + entry;
678 bt_mutexlock (latch->modify);
679 bt_lockpage(BtLockRead, latch, thread_no);
682 bt_writepage(mgr, page, latch->page_no);
683 latch->dirty = 0, cnt++;
685 if( latch->pin & ~CLOCK_bit )
687 bt_unlockpage(BtLockRead, latch);
688 bt_releasemutex (latch->modify);
690 fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
691 fprintf(stderr, "begin sync");
692 for( segment = 0; segment < mgr->segments; segment++ )
693 if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
694 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
695 fprintf(stderr, " end sync\n");
698 // recovery manager -- process current recovery buff on startup
699 // this won't do much if previous session was properly closed.
701 BTERR bt_recoveryredo (BtMgr *mgr)
708 hdr = (BtLogHdr *)mgr->redobuff;
709 mgr->flushlsn = hdr->lsn;
712 hdr = (BtLogHdr *)(mgr->redobuff + offset);
713 switch( hdr->type ) {
717 case BTRM_add: // add a unique key-value to btree
719 case BTRM_dup: // add a duplicate key-value to btree
720 case BTRM_del: // delete a key-value from btree
721 case BTRM_upd: // update a key with a new value
722 case BTRM_new: // allocate a new empty page
723 case BTRM_old: // reuse an old empty page
729 // recovery manager -- append new entry to recovery log
730 // flush dirty pages to disk when it overflows.
732 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
734 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
735 uint amt = sizeof(BtLogHdr);
739 bt_mutexlock (mgr->redo);
742 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
744 // see if new entry fits in buffer
745 // flush and reset if it doesn't
747 if( amt > size - mgr->redoend ) {
748 mgr->flushlsn = mgr->lsn;
749 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
750 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
753 bt_flushlsn(mgr, thread_no);
756 // fill in new entry & either eof or end block
758 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
763 hdr->lsn = ++mgr->lsn;
767 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
768 memset (eof, 0, sizeof(BtLogHdr));
770 // fill in key and value
773 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
774 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
777 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
778 memset (eof, 0, sizeof(BtLogHdr));
781 last = mgr->redolast & ~0xfff;
784 if( end - last + sizeof(BtLogHdr) >= 8192 )
785 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
786 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
790 bt_releasemutex(mgr->redo);
794 // recovery manager -- append transaction to recovery log
795 // flush dirty pages to disk when it overflows.
797 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
799 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
800 uint amt = 0, src, type;
807 // determine amount of redo recovery log space required
809 for( src = 0; src++ < source->cnt; ) {
810 key = keyptr(source,src);
811 val = valptr(source,src);
812 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
813 amt += sizeof(BtLogHdr);
816 bt_mutexlock (mgr->redo);
818 // see if new entry fits in buffer
819 // flush and reset if it doesn't
821 if( amt > size - mgr->redoend ) {
822 mgr->flushlsn = mgr->lsn;
823 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
824 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
827 bt_flushlsn (mgr, thread_no);
830 // assign new lsn to transaction
834 // fill in new entries
836 for( src = 0; src++ < source->cnt; ) {
837 key = keyptr(source, src);
838 val = valptr(source, src);
840 switch( slotptr(source, src)->type ) {
852 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
853 amt += sizeof(BtLogHdr);
855 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
861 // fill in key and value
863 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
864 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
869 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
870 memset (eof, 0, sizeof(BtLogHdr));
873 last = mgr->redolast & ~0xfff;
876 if( end - last + sizeof(BtLogHdr) >= 8192 )
877 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
878 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
882 bt_releasemutex(mgr->redo);
886 // read page into buffer pool from permanent location in Btree file
888 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
890 int flag = PROT_READ | PROT_WRITE;
891 uint segment = page_no >> 16;
895 if( segment < mgr->segments ) {
896 perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
897 if( perm->page_no != page_no )
899 memcpy (page, perm, mgr->page_size);
904 bt_mutexlock (mgr->maps);
906 if( segment < mgr->segments ) {
907 bt_releasemutex (mgr->maps);
911 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
914 bt_releasemutex (mgr->maps);
918 // write page to permanent location in Btree file
919 // clear the dirty bit
921 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
923 int flag = PROT_READ | PROT_WRITE;
924 uint segment = page_no >> 16;
928 if( segment < mgr->segments ) {
929 perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
930 if( page_no > LEAF_page && perm->page_no != page_no)
932 memcpy (perm, page, mgr->page_size);
937 bt_mutexlock (mgr->maps);
939 if( segment < mgr->segments ) {
940 bt_releasemutex (mgr->maps);
944 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
945 bt_releasemutex (mgr->maps);
950 // set CLOCK bit in latch
951 // decrement pin count
953 void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
955 bt_mutexlock(latch->modify);
956 latch->pin |= CLOCK_bit;
959 bt_releasemutex(latch->modify);
962 // return the btree cached page address
964 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
966 uid entry = latch - mgr->latchsets;
967 BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
972 // return next available latch entry
973 // and with latch entry locked
975 uint bt_availnext (BtMgr *mgr)
982 entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
984 entry = _InterlockedIncrement (&mgr->latchvictim);
986 entry %= mgr->latchtotal;
991 latch = mgr->latchsets + entry;
993 if( !bt_mutextry(latch->modify) )
996 // return this entry if it is not pinned
1001 // if the CLOCK bit is set
1002 // reset it to zero.
1004 latch->pin &= ~CLOCK_bit;
1005 bt_releasemutex(latch->modify);
1009 // pin page in buffer pool
1010 // return with latchset pinned
1012 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
1014 uint hashidx = page_no % mgr->latchhash;
1019 // try to find our entry
1021 bt_mutexlock(mgr->hashtable[hashidx].latch);
1023 if( entry = mgr->hashtable[hashidx].entry ) do
1025 latch = mgr->latchsets + entry;
1026 if( page_no == latch->page_no )
1028 } while( entry = latch->next );
1030 // found our entry: increment pin
1033 latch = mgr->latchsets + entry;
1034 bt_mutexlock(latch->modify);
1035 latch->pin |= CLOCK_bit;
1039 bt_releasemutex(latch->modify);
1040 bt_releasemutex(mgr->hashtable[hashidx].latch);
1044 // find and reuse unpinned entry
1048 entry = bt_availnext (mgr);
1049 latch = mgr->latchsets + entry;
1051 idx = latch->page_no % mgr->latchhash;
1053 // if latch is on a different hash chain
1054 // unlink from the old page_no chain
1056 if( latch->page_no )
1057 if( idx != hashidx ) {
1059 // skip over this entry if latch not available
1061 if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1062 bt_releasemutex(latch->modify);
1067 mgr->latchsets[latch->prev].next = latch->next;
1069 mgr->hashtable[idx].entry = latch->next;
1072 mgr->latchsets[latch->next].prev = latch->prev;
1074 bt_releasemutex (mgr->hashtable[idx].latch);
1077 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1079 // update permanent page area in btree from buffer pool
1080 // no read-lock is required since page is not pinned.
1083 if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
1084 return mgr->line = __LINE__, NULL;
1089 memcpy (page, contents, mgr->page_size);
1091 } else if( bt_readpage (mgr, page, page_no) )
1092 return mgr->line = __LINE__, NULL;
1094 // link page as head of hash table chain
1095 // if this is a never before used entry,
1096 // or it was previously on a different
1097 // hash table chain. Otherwise, just
1098 // leave it in its current hash table
1101 if( !latch->page_no || hashidx != idx ) {
1102 if( latch->next = mgr->hashtable[hashidx].entry )
1103 mgr->latchsets[latch->next].prev = entry;
1105 mgr->hashtable[hashidx].entry = entry;
1109 // fill in latch structure
1111 latch->pin = CLOCK_bit | 1;
1112 latch->page_no = page_no;
1115 bt_releasemutex (latch->modify);
1116 bt_releasemutex (mgr->hashtable[hashidx].latch);
1120 void bt_mgrclose (BtMgr *mgr)
1128 // flush previously written dirty pages
1129 // and write recovery buffer to disk
1131 fdatasync (mgr->idx);
1133 if( mgr->redoend ) {
1134 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1135 memset (eof, 0, sizeof(BtLogHdr));
1138 // write remaining dirty pool pages to the btree
1140 for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1141 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1142 latch = mgr->latchsets + slot;
1144 if( latch->dirty ) {
1145 bt_writepage(mgr, page, latch->page_no);
1146 latch->dirty = 0, num++;
1150 // clear redo recovery buffer on disk.
1152 if( mgr->pagezero->redopages ) {
1153 eof = (BtLogHdr *)mgr->redobuff;
1154 memset (eof, 0, sizeof(BtLogHdr));
1155 eof->lsn = mgr->lsn;
1156 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1157 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1160 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1163 while( mgr->segments )
1164 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1166 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1167 munmap (mgr->pagezero, mgr->page_size);
1169 FlushViewOfFile(mgr->pagezero, 0);
1170 UnmapViewOfFile(mgr->pagezero);
1171 UnmapViewOfFile(mgr->pagepool);
1172 CloseHandle(mgr->halloc);
1173 CloseHandle(mgr->hpool);
1179 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1180 FlushFileBuffers(mgr->idx);
1181 CloseHandle(mgr->idx);
1186 // close and release memory
1188 void bt_close (BtDb *bt)
1195 VirtualFree (bt->cursor, 0, MEM_RELEASE);
1200 // open/create new btree buffer manager
1202 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1203 // size of page pool (e.g. 262144)
1205 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1207 uint lvl, attr, last, slot, idx;
1208 uint nlatchpage, latchhash;
1209 unsigned char value[BtId];
1210 int flag, initit = 0;
1211 BtPageZero *pagezero;
1219 // determine sanity of page size and buffer pool
1221 if( bits > BT_maxbits )
1223 else if( bits < BT_minbits )
1226 mgr = calloc (1, sizeof(BtMgr));
1228 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1230 if( mgr->idx == -1 ) {
1231 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1232 return free(mgr), NULL;
1235 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1236 attr = FILE_ATTRIBUTE_NORMAL;
1237 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1239 if( mgr->idx == INVALID_HANDLE_VALUE ) {
1240 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1241 return GlobalFree(mgr), NULL;
1246 pagezero = valloc (BT_maxpage);
1249 // read minimum page size to get root info
1250 // to support raw disk partition files
1251 // check if bits == 0 on the disk.
1253 if( size = lseek (mgr->idx, 0L, 2) )
1254 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1255 if( pagezero->alloc->bits )
1256 bits = pagezero->alloc->bits;
1260 return free(mgr), free(pagezero), NULL;
1264 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1265 size = GetFileSize(mgr->idx, amt);
1267 if( size || *amt ) {
1268 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1269 return bt_mgrclose (mgr), NULL;
1270 bits = pagezero->alloc->bits;
1275 mgr->page_size = 1 << bits;
1276 mgr->page_bits = bits;
1278 // calculate number of latch hash table entries
1280 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1282 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1283 mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1284 mgr->latchtotal = nodemax;
1289 // initialize an empty b-tree with latch page, root page, page of leaves
1290 // and page(s) of latches and page pool cache
1292 memset (pagezero, 0, 1 << bits);
1293 pagezero->alloc->lvl = MIN_lvl - 1;
1294 pagezero->alloc->bits = mgr->page_bits;
1295 pagezero->redopages = redopages;
1297 bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1);
1298 pagezero->activepages = 2;
1300 // initialize left-most LEAF page in
1301 // alloc->left and count of active leaf pages.
1303 bt_putid (pagezero->alloc->left, LEAF_page);
1304 ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits);
1306 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1307 fprintf (stderr, "Unable to create btree page zero\n");
1308 return bt_mgrclose (mgr), NULL;
1311 memset (pagezero, 0, 1 << bits);
1312 pagezero->alloc->bits = mgr->page_bits;
1314 for( lvl=MIN_lvl; lvl--; ) {
1315 BtSlot *node = slotptr(pagezero->alloc, 1);
1316 node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1317 key = keyptr(pagezero->alloc, 1);
1318 key->len = 2; // create stopper key
1322 bt_putid(value, MIN_lvl - lvl + 1);
1323 val = valptr(pagezero->alloc, 1);
1324 val->len = lvl ? BtId : 0;
1325 memcpy (val->value, value, val->len);
1327 pagezero->alloc->min = node->off;
1328 pagezero->alloc->lvl = lvl;
1329 pagezero->alloc->cnt = 1;
1330 pagezero->alloc->act = 1;
1331 pagezero->alloc->page_no = MIN_lvl - lvl;
1333 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1334 fprintf (stderr, "Unable to create btree page\n");
1335 return bt_mgrclose (mgr), NULL;
1343 VirtualFree (pagezero, 0, MEM_RELEASE);
1346 // mlock the first segment of 64K pages
1348 flag = PROT_READ | PROT_WRITE;
1349 mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1352 if( mgr->pages[0] == MAP_FAILED ) {
1353 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1354 return bt_mgrclose (mgr), NULL;
1357 mgr->pagezero = (BtPageZero *)mgr->pages[0];
1358 mlock (mgr->pagezero, mgr->page_size);
1360 mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size;
1361 mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1363 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1364 if( mgr->pagepool == MAP_FAILED ) {
1365 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1366 return bt_mgrclose (mgr), NULL;
1369 flag = PAGE_READWRITE;
1370 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1371 if( !mgr->halloc ) {
1372 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1373 return bt_mgrclose (mgr), NULL;
1376 flag = FILE_MAP_WRITE;
1377 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1378 if( !mgr->pagezero ) {
1379 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1380 return bt_mgrclose (mgr), NULL;
1383 flag = PAGE_READWRITE;
1384 size = (uid)mgr->nlatchpage << mgr->page_bits;
1385 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1387 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1388 return bt_mgrclose (mgr), NULL;
1391 flag = FILE_MAP_WRITE;
1392 mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1393 if( !mgr->pagepool ) {
1394 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1395 return bt_mgrclose (mgr), NULL;
1399 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1400 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1401 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1406 // open BTree access method
1407 // based on buffer manager
1409 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1411 BtDb *bt = malloc (sizeof(*bt));
1413 memset (bt, 0, sizeof(*bt));
1417 bt->cursor = valloc (mgr->page_size);
1419 bt->cursor = VirtualAlloc(NULL, mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1422 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1424 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1429 // compare two keys, return > 0, = 0, or < 0
1430 // =0: keys are same
1433 // as the comparison value
1435 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1437 uint len1 = key1->len;
1440 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1451 // place write, read, or parent lock on requested page_no.
1453 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
1457 ReadLock (latch->readwr, thread_no);
1460 WriteLock (latch->readwr, thread_no);
1463 ReadLock (latch->access, thread_no);
1466 WriteLock (latch->access, thread_no);
1469 WriteOLock (latch->parent, thread_no);
1472 WriteOLock (latch->atomic, thread_no);
1474 case BtLockAtomic | BtLockRead:
1475 WriteOLock (latch->atomic, thread_no);
1476 ReadLock (latch->readwr, thread_no);
1481 // remove write, read, or parent lock on requested page
1483 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1487 ReadRelease (latch->readwr);
1490 WriteRelease (latch->readwr);
1493 ReadRelease (latch->access);
1496 WriteRelease (latch->access);
1499 WriteORelease (latch->parent);
1502 WriteORelease (latch->atomic);
1504 case BtLockAtomic | BtLockRead:
1505 WriteORelease (latch->atomic);
1506 ReadRelease (latch->readwr);
1511 // allocate a new page
1512 // return with page latched, but unlocked.
1514 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_id)
1519 // lock allocation page
1521 bt_mutexlock(mgr->lock);
1523 // use empty chain first
1524 // else allocate empty page
1526 if( page_no = bt_getid(mgr->pagezero->freechain) ) {
1527 if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
1528 set->page = bt_mappage (mgr, set->latch);
1530 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1532 mgr->pagezero->activepages++;
1533 bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
1534 if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1535 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1537 bt_releasemutex(mgr->lock);
1539 contents->page_no = page_no;
1540 memcpy (set->page, contents, mgr->page_size);
1541 set->latch->dirty = 1;
1545 page_no = bt_getid(mgr->pagezero->alloc->right);
1546 bt_putid(mgr->pagezero->alloc->right, page_no+1);
1548 // unlock allocation latch and
1549 // extend file into new page.
1551 mgr->pagezero->activepages++;
1552 if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1553 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1554 bt_releasemutex(mgr->lock);
1556 contents->page_no = page_no;
1557 pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
1559 // don't load cache from btree page, load it from contents
1561 if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
1562 set->page = bt_mappage (mgr, set->latch);
1569 // find slot in page for given key at a given level
1571 int bt_findslot (BtPage page, unsigned char *key, uint len)
1573 uint diff, higher = page->cnt, low = 1, slot;
1576 // make stopper key an infinite fence value
1578 if( bt_getid (page->right) )
1583 // low is the lowest candidate.
1584 // loop ends when they meet
1586 // higher is already
1587 // tested as .ge. the passed key.
1589 while( diff = higher - low ) {
1590 slot = low + ( diff >> 1 );
1591 if( keycmp (keyptr(page, slot), key, len) < 0 )
1594 higher = slot, good++;
1597 // return zero if key is on right link page
1599 return good ? higher : 0;
1602 // find and load page at given level for given key
1603 // leave page rd or wr locked as requested
1605 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1607 uid page_no = ROOT_page, prevpage = 0;
1608 uint drill = 0xff, slot;
1609 BtLatchSet *prevlatch;
1610 uint mode, prevmode;
1613 // start at root of btree and drill down
1616 // determine lock mode of drill level
1617 mode = (drill == lvl) ? lock : BtLockRead;
1619 if( !(set->latch = bt_pinlatch (mgr, page_no, NULL, thread_no)) )
1622 // obtain access lock using lock chaining with Access mode
1624 if( page_no > ROOT_page )
1625 bt_lockpage(BtLockAccess, set->latch, thread_no);
1627 set->page = bt_mappage (mgr, set->latch);
1629 // release & unpin parent or left sibling page
1632 bt_unlockpage(prevmode, prevlatch);
1633 bt_unpinlatch (mgr, prevlatch);
1637 // obtain mode lock using lock chaining through AccessLock
1639 bt_lockpage(mode, set->latch, thread_no);
1641 if( set->page->free )
1642 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1644 if( page_no > ROOT_page )
1645 bt_unlockpage(BtLockAccess, set->latch);
1647 // re-read and re-lock root after determining actual level of root
1649 if( set->page->lvl != drill) {
1650 if( set->latch->page_no != ROOT_page )
1651 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1653 drill = set->page->lvl;
1655 if( lock != BtLockRead && drill == lvl ) {
1656 bt_unlockpage(mode, set->latch);
1657 bt_unpinlatch (mgr, set->latch);
1662 prevpage = set->latch->page_no;
1663 prevlatch = set->latch;
1666 // find key on page at this level
1667 // and descend to requested level
1669 if( !set->page->kill )
1670 if( slot = bt_findslot (set->page, key, len) ) {
1674 // find next non-dead slot -- the fence key if nothing else
1676 while( slotptr(set->page, slot)->dead )
1677 if( slot++ < set->page->cnt )
1680 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1682 val = valptr(set->page, slot);
1684 if( val->len == BtId )
1685 page_no = bt_getid(valptr(set->page, slot)->value);
1687 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1693 // slide right into next page
1695 page_no = bt_getid(set->page->right);
1698 // return error on end of right chain
1700 mgr->line = __LINE__, mgr->err = BTERR_struct;
1701 return 0; // return error
1704 // return page to free list
1705 // page must be delete & write locked
1707 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1709 // lock allocation page
1711 bt_mutexlock (mgr->lock);
1715 memcpy(set->page->right, mgr->pagezero->freechain, BtId);
1716 bt_putid(mgr->pagezero->freechain, set->latch->page_no);
1717 set->latch->dirty = 1;
1718 set->page->free = 1;
1720 // decrement active page count
1722 mgr->pagezero->activepages--;
1723 if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1724 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1726 // unlock released page
1727 // and unlock allocation page
1729 bt_unlockpage (BtLockDelete, set->latch);
1730 bt_unlockpage (BtLockWrite, set->latch);
1731 bt_unpinlatch (mgr, set->latch);
1732 bt_releasemutex (mgr->lock);
1735 // a fence key was deleted from a page
1736 // push new fence value upwards
1738 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1740 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1741 unsigned char value[BtId];
1745 // remove the old fence value
1747 ptr = keyptr(set->page, set->page->cnt);
1748 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1749 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1750 set->latch->dirty = 1;
1752 // cache new fence value
1754 ptr = keyptr(set->page, set->page->cnt);
1755 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1757 bt_lockpage (BtLockParent, set->latch, thread_no);
1758 bt_unlockpage (BtLockWrite, set->latch);
1760 // insert new (now smaller) fence key
1762 bt_putid (value, set->latch->page_no);
1763 ptr = (BtKey*)leftkey;
1765 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1768 // now delete old fence key
1770 ptr = (BtKey*)rightkey;
1772 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1775 bt_unlockpage (BtLockParent, set->latch);
1776 bt_unpinlatch(mgr, set->latch);
1780 // root has a single child
1781 // collapse a level from the tree
1783 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1790 // find the child entry and promote as new root contents
1793 for( idx = 0; idx++ < root->page->cnt; )
1794 if( !slotptr(root->page, idx)->dead )
1797 val = valptr(root->page, idx);
1799 if( val->len == BtId )
1800 page_no = bt_getid (valptr(root->page, idx)->value);
1802 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1804 if( child->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1805 child->page = bt_mappage (mgr, child->latch);
1809 bt_lockpage (BtLockDelete, child->latch, thread_no);
1810 bt_lockpage (BtLockWrite, child->latch, thread_no);
1812 memcpy (root->page, child->page, mgr->page_size);
1813 root->latch->dirty = 1;
1815 bt_freepage (mgr, child);
1817 } while( root->page->lvl > 1 && root->page->act == 1 );
1819 bt_unlockpage (BtLockWrite, root->latch);
1820 bt_unpinlatch (mgr, root->latch);
1824 // delete a page and manage keys
1825 // call with page writelocked
1827 // returns with page removed
1828 // from the page pool.
1830 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1832 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1833 unsigned char value[BtId];
1834 uint lvl = set->page->lvl;
1840 // cache copy of fence key
1841 // to remove in parent
1843 ptr = keyptr(set->page, set->page->cnt);
1844 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1846 // obtain lock on right page
1848 page_no = bt_getid(set->page->right);
1850 if( right->latch = bt_pinlatch (mgr, page_no, NULL, thread_no) )
1851 right->page = bt_mappage (mgr, right->latch);
1855 bt_lockpage (BtLockWrite, right->latch, thread_no);
1857 // cache copy of key to update
1859 ptr = keyptr(right->page, right->page->cnt);
1860 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1862 if( right->page->kill )
1863 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1865 // pull contents of right peer into our empty page
1867 memcpy (set->page, right->page, mgr->page_size);
1868 set->page->page_no = set->latch->page_no;
1869 set->latch->dirty = 1;
1871 // mark right page deleted and point it to left page
1872 // until we can post parent updates that remove access
1873 // to the deleted page.
1875 bt_putid (right->page->right, set->latch->page_no);
1876 right->latch->dirty = 1;
1877 right->page->kill = 1;
1879 bt_lockpage (BtLockParent, right->latch, thread_no);
1880 bt_unlockpage (BtLockWrite, right->latch);
1882 bt_lockpage (BtLockParent, set->latch, thread_no);
1883 bt_unlockpage (BtLockWrite, set->latch);
1885 // redirect higher key directly to our new node contents
1887 bt_putid (value, set->latch->page_no);
1888 ptr = (BtKey*)higherfence;
1890 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1893 // delete old lower key to our node
1895 ptr = (BtKey*)lowerfence;
1897 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1900 // obtain delete and write locks to right node
1902 bt_unlockpage (BtLockParent, right->latch);
1903 bt_lockpage (BtLockDelete, right->latch, thread_no);
1904 bt_lockpage (BtLockWrite, right->latch, thread_no);
1905 bt_freepage (mgr, right);
1907 bt_unlockpage (BtLockParent, set->latch);
1908 bt_unpinlatch (mgr, set->latch);
1912 // find and delete key on page by marking delete flag bit
1913 // if page becomes empty, delete it from the btree
1915 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
1917 uint slot, idx, found, fence;
1923 if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
1924 node = slotptr(set->page, slot);
1925 ptr = keyptr(set->page, slot);
1929 // if librarian slot, advance to real slot
1931 if( node->type == Librarian ) {
1932 ptr = keyptr(set->page, ++slot);
1933 node = slotptr(set->page, slot);
1936 fence = slot == set->page->cnt;
1938 // delete the key, ignore request if already dead
1940 if( found = !keycmp (ptr, key, len) )
1941 if( found = node->dead == 0 ) {
1942 val = valptr(set->page,slot);
1943 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1946 // mark node type as delete
1948 node->type = Delete;
1951 // collapse empty slots beneath the fence
1952 // on interiour nodes
1955 while( idx = set->page->cnt - 1 )
1956 if( slotptr(set->page, idx)->dead ) {
1957 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1958 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1963 // did we delete a fence key in an upper level?
1965 if( found && lvl && set->page->act && fence )
1966 if( bt_fixfence (mgr, set, lvl, thread_no) )
1971 // do we need to collapse root?
1973 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1974 if( bt_collapseroot (mgr, set, thread_no) )
1979 // delete empty page
1981 if( !set->page->act ) {
1982 if( bt_deletepage (mgr, set, thread_no) )
1985 set->latch->dirty = 1;
1986 bt_unlockpage(BtLockWrite, set->latch);
1987 bt_unpinlatch (mgr, set->latch);
1993 // advance to next slot
1995 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1997 BtLatchSet *prevlatch;
2000 if( slot < set->page->cnt )
2003 prevlatch = set->latch;
2005 if( page_no = bt_getid(set->page->right) )
2006 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
2007 set->page = bt_mappage (bt->mgr, set->latch);
2011 return bt->mgr->err = BTERR_struct, bt->mgr->line = __LINE__, 0;
2013 // obtain access lock using lock chaining with Access mode
2015 bt_lockpage(BtLockAccess, set->latch, bt->thread_no);
2017 bt_unlockpage(BtLockRead, prevlatch);
2018 bt_unpinlatch (bt->mgr, prevlatch);
2020 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
2021 bt_unlockpage(BtLockAccess, set->latch);
2025 // find unique key == given key, or first duplicate key in
2026 // leaf level and return number of value bytes
2027 // or (-1) if not found.
2029 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2037 if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
2039 ptr = keyptr(set->page, slot);
2041 // skip librarian slot place holder
2043 if( slotptr(set->page, slot)->type == Librarian )
2044 ptr = keyptr(set->page, ++slot);
2046 // return actual key found
2048 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2051 if( slotptr(set->page, slot)->type == Duplicate )
2054 // not there if we reach the stopper key
2056 if( slot == set->page->cnt )
2057 if( !bt_getid (set->page->right) )
2060 // if key exists, return >= 0 value bytes copied
2061 // otherwise return (-1)
2063 if( slotptr(set->page, slot)->dead )
2067 if( !memcmp (ptr->key, key, len) ) {
2068 val = valptr (set->page,slot);
2069 if( valmax > val->len )
2071 memcpy (value, val->value, valmax);
2077 } while( slot = bt_findnext (bt, set, slot) );
2079 bt_unlockpage (BtLockRead, set->latch);
2080 bt_unpinlatch (bt->mgr, set->latch);
2084 // check page for space available,
2085 // clean if necessary and return
2086 // 0 - page needs splitting
2087 // >0 new slot value
2089 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2091 BtPage page = set->page, frame;
2092 uint nxt = mgr->page_size;
2093 uint cnt = 0, idx = 0;
2094 uint max = page->cnt;
2099 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2102 // skip cleanup and proceed to split
2103 // if there's not enough garbage
2106 if( page->garbage < nxt / 5 )
2109 frame = malloc (mgr->page_size);
2110 memcpy (frame, page, mgr->page_size);
2112 // skip page info and set rest of page to zero
2114 memset (page+1, 0, mgr->page_size - sizeof(*page));
2115 set->latch->dirty = 1;
2120 // clean up page first by
2121 // removing deleted keys
2123 while( cnt++ < max ) {
2127 if( cnt < max || frame->lvl )
2128 if( slotptr(frame,cnt)->dead )
2131 // copy the value across
2133 val = valptr(frame, cnt);
2134 nxt -= val->len + sizeof(BtVal);
2135 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2137 // copy the key across
2139 key = keyptr(frame, cnt);
2140 nxt -= key->len + sizeof(BtKey);
2141 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2143 // make a librarian slot
2145 slotptr(page, ++idx)->off = nxt;
2146 slotptr(page, idx)->type = Librarian;
2147 slotptr(page, idx)->dead = 1;
2151 slotptr(page, ++idx)->off = nxt;
2152 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2154 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2162 // see if page has enough space now, or does it need splitting?
2164 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2170 // split the root and raise the height of the btree
2172 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort page_no)
2174 unsigned char leftkey[BT_keyarray];
2175 uint nxt = mgr->page_size;
2176 unsigned char value[BtId];
2183 frame = malloc (mgr->page_size);
2184 memcpy (frame, root->page, mgr->page_size);
2186 // save left page fence key for new root
2188 ptr = keyptr(root->page, root->page->cnt);
2189 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2191 // Obtain an empty page to use, and copy the current
2192 // root contents into it, e.g. lower keys
2194 if( bt_newpage(mgr, left, frame, page_no) )
2197 left_page_no = left->latch->page_no;
2198 bt_unpinlatch (mgr, left->latch);
2201 // preserve the page info at the bottom
2202 // of higher keys and set rest to zero
2204 memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2206 // insert stopper key at top of newroot page
2207 // and increase the root height
2209 nxt -= BtId + sizeof(BtVal);
2210 bt_putid (value, right->page_no);
2211 val = (BtVal *)((unsigned char *)root->page + nxt);
2212 memcpy (val->value, value, BtId);
2215 nxt -= 2 + sizeof(BtKey);
2216 slotptr(root->page, 2)->off = nxt;
2217 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2222 // insert lower keys page fence key on newroot page as first key
2224 nxt -= BtId + sizeof(BtVal);
2225 bt_putid (value, left_page_no);
2226 val = (BtVal *)((unsigned char *)root->page + nxt);
2227 memcpy (val->value, value, BtId);
2230 ptr = (BtKey *)leftkey;
2231 nxt -= ptr->len + sizeof(BtKey);
2232 slotptr(root->page, 1)->off = nxt;
2233 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2235 bt_putid(root->page->right, 0);
2236 root->page->min = nxt; // reset lowest used offset and key count
2237 root->page->cnt = 2;
2238 root->page->act = 2;
2241 mgr->pagezero->alloc->lvl = root->page->lvl;
2243 // release and unpin root pages
2245 bt_unlockpage(BtLockWrite, root->latch);
2246 bt_unpinlatch (mgr, root->latch);
2248 bt_unpinlatch (mgr, right);
2252 // split already locked full node
2254 // return pool entry for new right
2257 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
2259 uint cnt = 0, idx = 0, max, nxt = mgr->page_size;
2260 BtPage frame = malloc (mgr->page_size);
2261 uint lvl = set->page->lvl;
2268 // split higher half of keys to frame
2270 memset (frame, 0, mgr->page_size);
2271 max = set->page->cnt;
2275 while( cnt++ < max ) {
2276 if( cnt < max || set->page->lvl )
2277 if( slotptr(set->page, cnt)->dead )
2280 src = valptr(set->page, cnt);
2281 nxt -= src->len + sizeof(BtVal);
2282 memcpy ((unsigned char *)frame + nxt, src, src->len + sizeof(BtVal));
2284 key = keyptr(set->page, cnt);
2285 nxt -= key->len + sizeof(BtKey);
2286 ptr = (BtKey*)((unsigned char *)frame + nxt);
2287 memcpy (ptr, key, key->len + sizeof(BtKey));
2289 // add librarian slot
2291 slotptr(frame, ++idx)->off = nxt;
2292 slotptr(frame, idx)->type = Librarian;
2293 slotptr(frame, idx)->dead = 1;
2297 slotptr(frame, ++idx)->off = nxt;
2298 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2300 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2304 frame->bits = mgr->page_bits;
2311 if( set->latch->page_no > ROOT_page )
2312 bt_putid (frame->right, bt_getid (set->page->right));
2314 // get new free page and write higher keys to it.
2316 if( bt_newpage(mgr, right, frame, thread_no) )
2319 // process lower keys
2321 memcpy (frame, set->page, mgr->page_size);
2322 memset (set->page+1, 0, mgr->page_size - sizeof(*set->page));
2323 set->latch->dirty = 1;
2325 nxt = mgr->page_size;
2326 set->page->garbage = 0;
2332 if( slotptr(frame, max)->type == Librarian )
2335 // assemble page of smaller keys
2337 while( cnt++ < max ) {
2338 if( slotptr(frame, cnt)->dead )
2340 val = valptr(frame, cnt);
2341 nxt -= val->len + sizeof(BtVal);
2342 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2344 key = keyptr(frame, cnt);
2345 nxt -= key->len + sizeof(BtKey);
2346 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2348 // add librarian slot
2350 slotptr(set->page, ++idx)->off = nxt;
2351 slotptr(set->page, idx)->type = Librarian;
2352 slotptr(set->page, idx)->dead = 1;
2356 slotptr(set->page, ++idx)->off = nxt;
2357 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2361 bt_putid(set->page->right, right->latch->page_no);
2362 set->page->min = nxt;
2363 set->page->cnt = idx;
2366 return right->latch - mgr->latchsets;
2369 // fix keys for newly split page
2370 // call with page locked, return
2373 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2375 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2376 unsigned char value[BtId];
2377 uint lvl = set->page->lvl;
2381 // if current page is the root page, split it
2383 if( set->latch->page_no == ROOT_page )
2384 return bt_splitroot (mgr, set, right, thread_no);
2386 ptr = keyptr(set->page, set->page->cnt);
2387 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2389 page = bt_mappage (mgr, right);
2391 ptr = keyptr(page, page->cnt);
2392 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2394 // insert new fences in their parent pages
2396 bt_lockpage (BtLockParent, right, thread_no);
2398 bt_lockpage (BtLockParent, set->latch, thread_no);
2399 bt_unlockpage (BtLockWrite, set->latch);
2401 // insert new fence for reformulated left block of smaller keys
2403 bt_putid (value, set->latch->page_no);
2404 ptr = (BtKey *)leftkey;
2406 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2409 // switch fence for right block of larger keys to new right page
2411 bt_putid (value, right->page_no);
2412 ptr = (BtKey *)rightkey;
2414 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2417 bt_unlockpage (BtLockParent, set->latch);
2418 bt_unpinlatch (mgr, set->latch);
2420 bt_unlockpage (BtLockParent, right);
2421 bt_unpinlatch (mgr, right);
2425 // install new key and value onto page
2426 // page must already be checked for
2429 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2431 uint idx, librarian;
2436 // if found slot > desired slot and previous slot
2437 // is a librarian slot, use it
2440 if( slotptr(set->page, slot-1)->type == Librarian )
2443 // copy value onto page
2445 set->page->min -= vallen + sizeof(BtVal);
2446 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2447 memcpy (val->value, value, vallen);
2450 // copy key onto page
2452 set->page->min -= keylen + sizeof(BtKey);
2453 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2454 memcpy (ptr->key, key, keylen);
2457 // find first empty slot
2459 for( idx = slot; idx < set->page->cnt; idx++ )
2460 if( slotptr(set->page, idx)->dead )
2463 // now insert key into array before slot
2465 if( idx == set->page->cnt )
2466 idx += 2, set->page->cnt += 2, librarian = 2;
2470 set->latch->dirty = 1;
2473 while( idx > slot + librarian - 1 )
2474 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2476 // add librarian slot
2478 if( librarian > 1 ) {
2479 node = slotptr(set->page, slot++);
2480 node->off = set->page->min;
2481 node->type = Librarian;
2487 node = slotptr(set->page, slot);
2488 node->off = set->page->min;
2493 bt_unlockpage (BtLockWrite, set->latch);
2494 bt_unpinlatch (mgr, set->latch);
2500 // Insert new key into the btree at given level.
2501 // either add a new key or update/add an existing one
2503 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2505 uint slot, idx, len, entry;
2511 while( 1 ) { // find the page and slot for the current key
2512 if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2513 node = slotptr(set->page, slot);
2514 ptr = keyptr(set->page, slot);
2517 mgr->line = __LINE__, mgr->err = BTERR_ovflw;
2521 // if librarian slot == found slot, advance to real slot
2523 if( node->type == Librarian )
2524 if( !keycmp (ptr, key, keylen) ) {
2525 ptr = keyptr(set->page, ++slot);
2526 node = slotptr(set->page, slot);
2529 // if inserting a duplicate key or unique
2530 // key that doesn't exist on the page,
2531 // check for adequate space on the page
2532 // and insert the new key before slot.
2537 if( keycmp (ptr, key, keylen) )
2538 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2539 return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
2540 else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2542 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2547 // if key already exists, update value and return
2549 val = valptr(set->page, slot);
2551 if( val->len >= vallen ) {
2552 if( slotptr(set->page, slot)->dead )
2557 set->page->garbage += val->len - vallen;
2558 set->latch->dirty = 1;
2560 memcpy (val->value, value, vallen);
2561 bt_unlockpage(BtLockWrite, set->latch);
2562 bt_unpinlatch (mgr, set->latch);
2566 // new update value doesn't fit in existing value area
2567 // make sure page has room
2570 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2577 if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
2578 if( !(entry = bt_splitpage (mgr, set, thread_no)) )
2580 else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
2585 // copy key and value onto page and update slot
2587 set->page->min -= vallen + sizeof(BtVal);
2588 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2589 memcpy (val->value, value, vallen);
2592 set->latch->dirty = 1;
2593 set->page->min -= keylen + sizeof(BtKey);
2594 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2595 memcpy (ptr->key, key, keylen);
2598 node->off = set->page->min;
2599 bt_unlockpage(BtLockWrite, set->latch);
2600 bt_unpinlatch (mgr, set->latch);
2608 logseqno reqlsn; // redo log seq no required
2609 logseqno lsn; // redo log sequence number
2610 uint entry; // latch table entry number
2611 uint slot:31; // page slot number
2612 uint reuse:1; // reused previous page
2616 uid page_no; // page number for split leaf
2617 void *next; // next key to insert
2618 uint entry:29; // latch table entry number
2619 uint type:2; // 0 == insert, 1 == delete, 2 == free
2620 uint nounlock:1; // don't unlock ParentModification
2621 unsigned char leafkey[BT_keyarray];
2624 // determine actual page where key is located
2625 // return slot number
2627 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2629 BtKey *key = keyptr(source,src);
2630 uint slot = locks[src].slot;
2633 if( src > 1 && locks[src].reuse )
2634 entry = locks[src-1].entry, slot = 0;
2636 entry = locks[src].entry;
2639 set->latch = mgr->latchsets + entry;
2640 set->page = bt_mappage (mgr, set->latch);
2644 // is locks->reuse set? or was slot zeroed?
2645 // if so, find where our key is located
2646 // on current page or pages split on
2647 // same page txn operations.
2650 set->latch = mgr->latchsets + entry;
2651 set->page = bt_mappage (mgr, set->latch);
2653 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2654 if( slotptr(set->page, slot)->type == Librarian )
2656 if( locks[src].reuse )
2657 locks[src].entry = entry;
2660 } while( entry = set->latch->split );
2662 mgr->line = __LINE__, mgr->err = BTERR_atomic;
2666 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
2668 BtKey *key = keyptr(source, src);
2669 BtVal *val = valptr(source, src);
2674 while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2675 if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2676 if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2678 set->page->lsn = locks[src].lsn;
2682 if( entry = bt_splitpage (mgr, set, thread_no) )
2683 latch = mgr->latchsets + entry;
2687 // splice right page into split chain
2688 // and WriteLock it.
2690 bt_lockpage(BtLockWrite, latch, thread_no);
2691 latch->split = set->latch->split;
2692 set->latch->split = entry;
2693 locks[src].slot = 0;
2696 return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2699 // perform delete from smaller btree
2700 // insert a delete slot if not found there
2702 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
2704 BtKey *key = keyptr(source, src);
2711 if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2712 node = slotptr(set->page, slot);
2713 ptr = keyptr(set->page, slot);
2714 val = valptr(set->page, slot);
2716 return mgr->line = __LINE__, mgr->err = BTERR_struct;
2718 // if slot is not found, insert a delete slot
2720 if( keycmp (ptr, key->key, key->len) )
2721 return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
2723 // if node is already dead,
2724 // ignore the request.
2729 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2730 set->latch->dirty = 1;
2731 set->page->lsn = locks[src].lsn;
2735 __sync_fetch_and_add(&mgr->found, 1);
2739 // delete an empty master page for a transaction
2741 // note that the far right page never empties because
2742 // it always contains (at least) the infinite stopper key
2743 // and that all pages that don't contain any keys are
2744 // deleted, or are being held under Atomic lock.
2746 BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no)
2748 BtPageSet right[1], temp[1];
2749 unsigned char value[BtId];
2753 bt_lockpage(BtLockWrite, prev->latch, thread_no);
2755 // grab the right sibling
2757 if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) )
2758 right->page = bt_mappage (mgr, right->latch);
2762 bt_lockpage(BtLockAtomic, right->latch, thread_no);
2763 bt_lockpage(BtLockWrite, right->latch, thread_no);
2765 // and pull contents over empty page
2766 // while preserving master's left link
2768 memcpy (right->page->left, prev->page->left, BtId);
2769 memcpy (prev->page, right->page, mgr->page_size);
2771 // forward seekers to old right sibling
2772 // to new page location in set
2774 bt_putid (right->page->right, prev->latch->page_no);
2775 right->latch->dirty = 1;
2776 right->page->kill = 1;
2778 // remove pointer to right page for searchers by
2779 // changing right fence key to point to the master page
2781 ptr = keyptr(right->page,right->page->cnt);
2782 bt_putid (value, prev->latch->page_no);
2784 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
2787 // now that master page is in good shape we can
2788 // remove its locks.
2790 bt_unlockpage (BtLockAtomic, prev->latch);
2791 bt_unlockpage (BtLockWrite, prev->latch);
2793 // fix master's right sibling's left pointer
2794 // to remove scanner's poiner to the right page
2796 if( right_page_no = bt_getid (prev->page->right) ) {
2797 if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
2798 temp->page = bt_mappage (mgr, temp->latch);
2802 bt_lockpage (BtLockWrite, temp->latch, thread_no);
2803 bt_putid (temp->page->left, prev->latch->page_no);
2804 temp->latch->dirty = 1;
2806 bt_unlockpage (BtLockWrite, temp->latch);
2807 bt_unpinlatch (mgr, temp->latch);
2808 } else { // master is now the far right page
2809 bt_mutexlock (mgr->lock);
2810 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
2811 bt_releasemutex(mgr->lock);
2814 // now that there are no pointers to the right page
2815 // we can delete it after the last read access occurs
2817 bt_unlockpage (BtLockWrite, right->latch);
2818 bt_unlockpage (BtLockAtomic, right->latch);
2819 bt_lockpage (BtLockDelete, right->latch, thread_no);
2820 bt_lockpage (BtLockWrite, right->latch, thread_no);
2821 bt_freepage (mgr, right);
2825 // atomic modification of a batch of keys.
2827 // return -1 if BTERR is set
2828 // otherwise return slot number
2829 // causing the key constraint violation
2830 // or zero on successful completion.
2832 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2834 uint src, idx, slot, samepage, entry, que = 0;
2835 AtomicKey *head, *tail, *leaf;
2836 BtPageSet set[1], prev[1];
2837 unsigned char value[BtId];
2838 BtKey *key, *ptr, *key2;
2849 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2853 // stable sort the list of keys into order to
2854 // prevent deadlocks between threads.
2856 for( src = 1; src++ < source->cnt; ) {
2857 *temp = *slotptr(source,src);
2858 key = keyptr (source,src);
2860 for( idx = src; --idx; ) {
2861 key2 = keyptr (source,idx);
2862 if( keycmp (key, key2->key, key2->len) < 0 ) {
2863 *slotptr(source,idx+1) = *slotptr(source,idx);
2864 *slotptr(source,idx) = *temp;
2870 // add entries to redo log
2872 if( bt->mgr->pagezero->redopages )
2873 if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
2874 for( src = 0; src++ < source->cnt; )
2875 locks[src].lsn = lsn;
2877 return bt->mgr->err;
2879 // Load the leaf page for each key
2880 // group same page references with reuse bit
2881 // and determine any constraint violations
2883 for( src = 0; src++ < source->cnt; ) {
2884 key = keyptr(source, src);
2887 // first determine if this modification falls
2888 // on the same page as the previous modification
2889 // note that the far right leaf page is a special case
2891 if( samepage = src > 1 )
2892 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2893 slot = bt_findslot(set->page, key->key, key->len);
2895 bt_unlockpage(BtLockRead, set->latch);
2898 if( slot = bt_loadpage(bt->mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, bt->thread_no) )
2899 set->latch->split = 0;
2901 return bt->mgr->err;
2903 if( slotptr(set->page, slot)->type == Librarian )
2904 ptr = keyptr(set->page, ++slot);
2906 ptr = keyptr(set->page, slot);
2909 locks[src].entry = set->latch - bt->mgr->latchsets;
2910 locks[src].slot = slot;
2911 locks[src].reuse = 0;
2913 locks[src].entry = 0;
2914 locks[src].slot = 0;
2915 locks[src].reuse = 1;
2918 // capture current lsn for master page
2920 locks[src].reqlsn = set->page->lsn;
2923 // unlock last loadpage lock
2926 bt_unlockpage(BtLockRead, set->latch);
2928 // obtain write lock for each master page
2929 // sync flushed pages to disk
2931 for( src = 0; src++ < source->cnt; ) {
2932 if( locks[src].reuse )
2935 set->latch = bt->mgr->latchsets + locks[src].entry;
2936 bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
2939 // insert or delete each key
2940 // process any splits or merges
2941 // release Write & Atomic latches
2942 // set ParentModifications and build
2943 // queue of keys to insert for split pages
2944 // or delete for deleted pages.
2946 // run through txn list backwards
2948 samepage = source->cnt + 1;
2950 for( src = source->cnt; src; src-- ) {
2951 if( locks[src].reuse )
2954 // perform the txn operations
2955 // from smaller to larger on
2958 for( idx = src; idx < samepage; idx++ )
2959 switch( slotptr(source,idx)->type ) {
2961 if( bt_atomicdelete (bt->mgr, source, locks, idx, bt->thread_no) )
2962 return bt->mgr->err;
2967 if( bt_atomicinsert (bt->mgr, source, locks, idx, bt->thread_no) )
2968 return bt->mgr->err;
2972 // after the same page operations have finished,
2973 // process master page for splits or deletion.
2975 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2976 prev->page = bt_mappage (bt->mgr, prev->latch);
2979 // pick-up all splits from master page
2980 // each one is already WriteLocked.
2982 entry = prev->latch->split;
2985 set->latch = bt->mgr->latchsets + entry;
2986 set->page = bt_mappage (bt->mgr, set->latch);
2987 entry = set->latch->split;
2989 // delete empty master page by undoing its split
2990 // (this is potentially another empty page)
2991 // note that there are no new left pointers yet
2993 if( !prev->page->act ) {
2994 memcpy (set->page->left, prev->page->left, BtId);
2995 memcpy (prev->page, set->page, bt->mgr->page_size);
2996 bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
2997 bt_freepage (bt->mgr, set);
2999 prev->latch->split = set->latch->split;
3000 prev->latch->dirty = 1;
3004 // remove empty page from the split chain
3005 // and return it to the free list.
3007 if( !set->page->act ) {
3008 memcpy (prev->page->right, set->page->right, BtId);
3009 prev->latch->split = set->latch->split;
3010 bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
3011 bt_freepage (bt->mgr, set);
3015 // schedule prev fence key update
3017 ptr = keyptr(prev->page,prev->page->cnt);
3018 leaf = calloc (sizeof(AtomicKey), 1), que++;
3020 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3021 leaf->entry = prev->latch - bt->mgr->latchsets;
3022 leaf->page_no = prev->latch->page_no;
3032 // splice in the left link into the split page
3034 bt_putid (set->page->left, prev->latch->page_no);
3035 bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
3036 bt_unlockpage(BtLockWrite, prev->latch);
3040 // update left pointer in next right page from last split page
3041 // (if all splits were reversed, latch->split == 0)
3043 if( latch->split ) {
3044 // fix left pointer in master's original (now split)
3045 // far right sibling or set rightmost page in page zero
3047 if( right = bt_getid (prev->page->right) ) {
3048 if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3049 set->page = bt_mappage (bt->mgr, set->latch);
3051 return bt->mgr->err;
3053 bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3054 bt_putid (set->page->left, prev->latch->page_no);
3055 set->latch->dirty = 1;
3056 bt_unlockpage (BtLockWrite, set->latch);
3057 bt_unpinlatch (bt->mgr, set->latch);
3058 } else { // prev is rightmost page
3059 bt_mutexlock (bt->mgr->lock);
3060 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
3061 bt_releasemutex(bt->mgr->lock);
3064 // Process last page split in chain
3066 ptr = keyptr(prev->page,prev->page->cnt);
3067 leaf = calloc (sizeof(AtomicKey), 1), que++;
3069 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3070 leaf->entry = prev->latch - bt->mgr->latchsets;
3071 leaf->page_no = prev->latch->page_no;
3081 bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
3082 bt_unlockpage(BtLockWrite, prev->latch);
3084 // remove atomic lock on master page
3086 bt_unlockpage(BtLockAtomic, latch);
3090 // finished if prev page occupied (either master or final split)
3092 if( prev->page->act ) {
3093 bt_unlockpage(BtLockWrite, latch);
3094 bt_unlockpage(BtLockAtomic, latch);
3095 bt_unpinlatch(bt->mgr, latch);
3099 // any and all splits were reversed, and the
3100 // master page located in prev is empty, delete it
3101 // by pulling over master's right sibling.
3103 // Remove empty master's fence key
3105 ptr = keyptr(prev->page,prev->page->cnt);
3107 if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3108 return bt->mgr->err;
3110 // perform the remainder of the delete
3111 // from the FIFO queue
3113 leaf = calloc (sizeof(AtomicKey), 1), que++;
3115 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3116 leaf->entry = prev->latch - bt->mgr->latchsets;
3117 leaf->page_no = prev->latch->page_no;
3128 // leave atomic lock in place until
3129 // deletion completes in next phase.
3131 bt_unlockpage(BtLockWrite, prev->latch);
3134 // add & delete keys for any pages split or merged during transaction
3138 set->latch = bt->mgr->latchsets + leaf->entry;
3139 set->page = bt_mappage (bt->mgr, set->latch);
3141 bt_putid (value, leaf->page_no);
3142 ptr = (BtKey *)leaf->leafkey;
3144 switch( leaf->type ) {
3145 case 0: // insert key
3146 if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, Unique, bt->thread_no) )
3147 return bt->mgr->err;
3151 case 1: // delete key
3152 if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
3153 return bt->mgr->err;
3157 case 2: // free page
3158 if( bt_atomicfree (bt->mgr, set, bt->thread_no) )
3159 return bt->mgr->err;
3164 if( !leaf->nounlock )
3165 bt_unlockpage (BtLockParent, set->latch);
3167 bt_unpinlatch (bt->mgr, set->latch);
3170 } while( leaf = tail );
3172 // if number of active pages
3173 // is greater than the buffer pool
3174 // promote page into larger btree
3177 while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
3178 if( bt_txnpromote (bt) )
3179 return bt->mgr->err;
3187 // promote a page into the larger btree
3189 BTERR bt_txnpromote (BtDb *bt)
3191 uint entry, slot, idx;
3199 entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
3201 entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
3203 entry %= bt->mgr->latchtotal;
3208 set->latch = bt->mgr->latchsets + entry;
3209 idx = set->latch->page_no % bt->mgr->latchhash;
3211 if( !bt_mutextry(set->latch->modify) )
3214 // skip this entry if it is pinned
3216 if( set->latch->pin & ~CLOCK_bit ) {
3217 bt_releasemutex(set->latch->modify);
3221 set->page = bt_mappage (bt->mgr, set->latch);
3223 // entry never used or has no right sibling
3225 if( !set->latch->page_no || !bt_getid (set->page->right) ) {
3226 bt_releasemutex(set->latch->modify);
3230 bt_lockpage (BtLockAccess, set->latch, bt->thread_no);
3231 bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
3232 bt_unlockpage(BtLockAccess, set->latch);
3234 // entry interiour node or being or was killed
3236 if( set->page->lvl || set->page->free || set->page->kill ) {
3237 bt_releasemutex(set->latch->modify);
3238 bt_unlockpage(BtLockWrite, set->latch);
3242 // pin the page for our useage
3245 bt_releasemutex(set->latch->modify);
3247 // transfer slots in our selected page to larger btree
3248 if( !(set->latch->page_no % 100) )
3249 fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cnt);
3251 for( slot = 0; slot++ < set->page->cnt; ) {
3252 ptr = keyptr (set->page, slot);
3253 val = valptr (set->page, slot);
3254 node = slotptr(set->page, slot);
3256 switch( node->type ) {
3259 if( bt_insertkey (bt->main, ptr->key, ptr->len, 0, val->value, val->len, node->type, bt->thread_no) )
3260 return bt->main->err;
3265 if( bt_deletekey (bt->main, ptr->key, ptr->len, 0, bt->thread_no) )
3266 return bt->main->err;
3272 // now delete the page
3274 if( bt_deletepage (bt->mgr, set, bt->thread_no) )
3275 return bt->mgr->err;
3281 // set cursor to highest slot on highest page
3283 uint bt_lastkey (BtDb *bt)
3285 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3288 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3289 set->page = bt_mappage (bt->mgr, set->latch);
3293 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3294 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3295 bt_unlockpage(BtLockRead, set->latch);
3296 bt_unpinlatch (bt->mgr, set->latch);
3297 return bt->cursor->cnt;
3300 // return previous slot on cursor page
3302 uint bt_prevkey (BtDb *bt, uint slot)
3304 uid cursor_page = bt->cursor->page_no;
3305 uid ourright, next, us = cursor_page;
3311 ourright = bt_getid(bt->cursor->right);
3314 if( !(next = bt_getid(bt->cursor->left)) )
3320 if( set->latch = bt_pinlatch (bt->mgr, next, NULL, bt->thread_no) )
3321 set->page = bt_mappage (bt->mgr, set->latch);
3325 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3326 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3327 bt_unlockpage(BtLockRead, set->latch);
3328 bt_unpinlatch (bt->mgr, set->latch);
3330 next = bt_getid (bt->cursor->right);
3332 if( bt->cursor->kill )
3336 if( next == ourright )
3341 return bt->cursor->cnt;
3344 // return next slot on cursor page
3345 // or slide cursor right into next page
3347 uint bt_nextkey (BtDb *bt, uint slot)
3353 right = bt_getid(bt->cursor->right);
3355 while( slot++ < bt->cursor->cnt )
3356 if( slotptr(bt->cursor,slot)->dead )
3358 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3366 if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
3367 set->page = bt_mappage (bt->mgr, set->latch);
3371 bt_lockpage(BtLockRead, set->latch, bt->thread_no);
3372 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3373 bt_unlockpage(BtLockRead, set->latch);
3374 bt_unpinlatch (bt->mgr, set->latch);
3379 return bt->mgr->err = 0;
3382 // cache page of keys into cursor and return starting slot for given key
3384 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3389 // cache page for retrieval
3391 if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3392 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3396 bt_unlockpage(BtLockRead, set->latch);
3397 bt_unpinlatch (bt->mgr, set->latch);
3404 double getCpuTime(int type)
3407 FILETIME xittime[1];
3408 FILETIME systime[1];
3409 FILETIME usrtime[1];
3410 SYSTEMTIME timeconv[1];
3413 memset (timeconv, 0, sizeof(SYSTEMTIME));
3417 GetSystemTimeAsFileTime (xittime);
3418 FileTimeToSystemTime (xittime, timeconv);
3419 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3422 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3423 FileTimeToSystemTime (usrtime, timeconv);
3426 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3427 FileTimeToSystemTime (systime, timeconv);
3431 ans += (double)timeconv->wHour * 3600;
3432 ans += (double)timeconv->wMinute * 60;
3433 ans += (double)timeconv->wSecond;
3434 ans += (double)timeconv->wMilliseconds / 1000;
3439 #include <sys/resource.h>
3441 double getCpuTime(int type)
3443 struct rusage used[1];
3444 struct timeval tv[1];
3448 gettimeofday(tv, NULL);
3449 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3452 getrusage(RUSAGE_SELF, used);
3453 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3456 getrusage(RUSAGE_SELF, used);
3457 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3464 void bt_poolaudit (BtMgr *mgr)
3469 while( ++entry < mgr->latchtotal ) {
3470 latch = mgr->latchsets + entry;
3472 if( *latch->readwr->rin & MASK )
3473 fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3475 if( *latch->access->rin & MASK )
3476 fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3478 // if( *latch->parent->xcl->value )
3479 // fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3481 // if( *latch->atomic->xcl->value )
3482 // fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3484 // if( *latch->modify->value )
3485 // fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3487 if( latch->pin & ~CLOCK_bit )
3488 fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3501 // standalone program to index file of keys
3502 // then list them onto std-out
3505 void *index_file (void *arg)
3507 uint __stdcall index_file (void *arg)
3510 int line = 0, found = 0, cnt = 0, idx;
3511 uid next, page_no = LEAF_page; // start on first page of leaves
3512 int ch, len = 0, slot, type = 0;
3513 unsigned char key[BT_maxkey];
3514 unsigned char txn[65536];
3515 ThreadArg *args = arg;
3524 bt = bt_open (args->mgr, args->main);
3527 if( args->idx < strlen (args->type) )
3528 ch = args->type[args->idx];
3530 ch = args->type[strlen(args->type) - 1];
3542 if( type == Delete )
3543 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3545 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3547 if( type == Delete )
3548 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3550 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3552 if( in = fopen (args->infile, "rb") )
3553 while( ch = getc(in), ch != EOF )
3559 if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3560 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3566 memcpy (txn + nxt, key + 10, len - 10);
3568 txn[nxt] = len - 10;
3570 memcpy (txn + nxt, key, 10);
3573 slotptr(page,++cnt)->off = nxt;
3574 slotptr(page,cnt)->type = type;
3577 if( cnt < args->num )
3584 if( bt_atomictxn (bt, page) )
3585 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3590 else if( len < BT_maxkey )
3592 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3596 fprintf(stderr, "started indexing for %s\n", args->infile);
3597 if( in = fopen (args->infile, "r") )
3598 while( ch = getc(in), ch != EOF )
3603 if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3604 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3607 else if( len < BT_maxkey )
3609 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3613 fprintf(stderr, "started finding keys for %s\n", args->infile);
3614 if( in = fopen (args->infile, "rb") )
3615 while( ch = getc(in), ch != EOF )
3619 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3621 else if( bt->mgr->err )
3622 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3625 else if( len < BT_maxkey )
3627 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3631 fprintf(stderr, "started scanning\n");
3634 if( set->latch = bt_pinlatch (bt->mgr, page_no, NULL, bt->thread_no) )
3635 set->page = bt_mappage (bt->mgr, set->latch);
3637 fprintf(stderr, "unable to obtain latch"), exit(1);
3639 bt_lockpage (BtLockRead, set->latch, bt->thread_no);
3640 next = bt_getid (set->page->right);
3642 for( slot = 0; slot++ < set->page->cnt; )
3643 if( next || slot < set->page->cnt )
3644 if( !slotptr(set->page, slot)->dead ) {
3645 ptr = keyptr(set->page, slot);
3648 if( slotptr(set->page, slot)->type == Duplicate )
3651 fwrite (ptr->key, len, 1, stdout);
3652 val = valptr(set->page, slot);
3653 fwrite (val->value, val->len, 1, stdout);
3654 fputc ('\n', stdout);
3658 bt_unlockpage (BtLockRead, set->latch);
3659 bt_unpinlatch (bt->mgr, set->latch);
3660 } while( page_no = next );
3662 fprintf(stderr, " Total keys read %d\n", cnt);
3666 fprintf(stderr, "started reverse scan\n");
3667 if( slot = bt_lastkey (bt) )
3668 while( slot = bt_prevkey (bt, slot) ) {
3669 if( slotptr(bt->cursor, slot)->dead )
3672 ptr = keyptr(bt->cursor, slot);
3675 if( slotptr(bt->cursor, slot)->type == Duplicate )
3678 fwrite (ptr->key, len, 1, stdout);
3679 val = valptr(bt->cursor, slot);
3680 fwrite (val->value, val->len, 1, stdout);
3681 fputc ('\n', stdout);
3685 fprintf(stderr, " Total keys read %d\n", cnt);
3690 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3692 fprintf(stderr, "started counting\n");
3693 next = REDO_page + bt->mgr->pagezero->redopages;
3695 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3696 if( bt_readpage (bt->mgr, bt->cursor, page_no) )
3699 if( !bt->cursor->free && !bt->cursor->lvl )
3700 cnt += bt->cursor->act;
3706 cnt--; // remove stopper key
3707 fprintf(stderr, " Total keys counted %d\n", cnt);
3719 typedef struct timeval timer;
3721 int main (int argc, char **argv)
3723 int idx, cnt, len, slot, err;
3724 int segsize, bits = 16;
3744 fprintf (stderr, "Usage: %s idx_file main_file cmds [page_bits buffer_pool_size txn_size recovery_pages main_bits main_pool src_file1 src_file2 ... ]\n", argv[0]);
3745 fprintf (stderr, " where idx_file is the name of the cache btree file\n");
3746 fprintf (stderr, " where main_file is the name of the main btree file\n");
3747 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3748 fprintf (stderr, " page_bits is the page size in bits for the cache btree\n");
3749 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool for the cache btree\n");
3750 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3751 fprintf (stderr, " recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3752 fprintf (stderr, " main_bits is the page size of the main btree in bits\n");
3753 fprintf (stderr, " main_pool is the number of main pages in the main buffer pool\n");
3754 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3758 start = getCpuTime(0);
3761 bits = atoi(argv[4]);
3764 poolsize = atoi(argv[5]);
3767 fprintf (stderr, "Warning: no mapped_pool\n");
3770 num = atoi(argv[6]);
3773 redopages = atoi(argv[7]);
3775 if( redopages + REDO_page > 65535 )
3776 fprintf (stderr, "Warning: Recovery buffer too large\n");
3779 mainbits = atoi(argv[8]);
3782 mainpool = atoi(argv[9]);
3786 threads = malloc (cnt * sizeof(pthread_t));
3788 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3790 args = malloc ((cnt + 1) * sizeof(ThreadArg));
3792 mgr = bt_mgr (argv[1], bits, poolsize, redopages);
3795 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3800 main = bt_mgr (argv[2], mainbits, mainpool, 0);
3803 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3812 for( idx = 0; idx < cnt; idx++ ) {
3813 args[idx].infile = argv[idx + 10];
3814 args[idx].type = argv[3];
3815 args[idx].main = main;
3816 args[idx].mgr = mgr;
3817 args[idx].num = num;
3818 args[idx].idx = idx;
3820 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3821 fprintf(stderr, "Error creating thread %d\n", err);
3823 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3827 args[0].infile = argv[idx + 10];
3828 args[0].type = argv[3];
3829 args[0].main = main;
3836 // wait for termination
3839 for( idx = 0; idx < cnt; idx++ )
3840 pthread_join (threads[idx], NULL);
3842 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3844 for( idx = 0; idx < cnt; idx++ )
3845 CloseHandle(threads[idx]);
3852 fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
3858 elapsed = getCpuTime(0) - start;
3859 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3860 elapsed = getCpuTime(1);
3861 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3862 elapsed = getCpuTime(2);
3863 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);