1 // btree version threadskv9c FUTEX version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // and redo log for failure recovery
13 // author: karl malbrain, malbrain@cal.berkeley.edu
16 This work, including the source code, documentation
17 and related data, is placed into the public domain.
19 The orginal author is Karl Malbrain.
21 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
22 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
23 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
24 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
25 RESULTING FROM THE USE, MODIFICATION, OR
26 REDISTRIBUTION OF THIS SOFTWARE.
29 // Please see the project home page for documentation
30 // code.google.com/p/high-concurrency-btree
32 #define _FILE_OFFSET_BITS 64
33 #define _LARGEFILE64_SOURCE
37 #include <linux/futex.h>
52 #define WIN32_LEAN_AND_MEAN
66 typedef unsigned long long uid;
67 typedef unsigned long long logseqno;
70 typedef unsigned long long off64_t;
71 typedef unsigned short ushort;
72 typedef unsigned int uint;
75 #define BT_ro 0x6f72 // ro
76 #define BT_rw 0x7772 // rw
78 #define BT_maxbits 24 // maximum page size in bits
79 #define BT_minbits 9 // minimum page size in bits
80 #define BT_minpage (1 << BT_minbits) // minimum page size
81 #define BT_maxpage (1 << BT_maxbits) // maximum page size
83 // BTree page number constants
84 #define ALLOC_page 0 // allocation page
85 #define ROOT_page 1 // root of the btree
86 #define LEAF_page 2 // first page of leaves
87 #define REDO_page 3 // first page of redo buffer
89 // Number of levels to create in a new BTree
94 There are six lock types for each node in four independent sets:
95 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
96 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
97 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
98 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
99 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
100 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
112 // definition for phase-fair reader/writer lock implementation
115 volatile ushort rin[1];
116 volatile ushort rout[1];
117 volatile ushort ticket[1];
118 volatile ushort serving[1];
126 volatile uint exclusive[1];
138 // exclusive is set for write access
141 volatile uint exclusive[1];
146 // mode & definition for lite latch implementation
149 QueRd = 1, // reader queue
150 QueWr = 2 // writer queue
153 // hash table entries
156 volatile uint entry; // Latch table entry at head of chain
157 BtMutexLatch latch[1];
160 // latch manager table structure
163 uid page_no; // latch set page number
164 RWLock readwr[1]; // read/write page lock
165 RWLock access[1]; // Access Intent/Page delete
166 WOLock parent[1]; // Posting of fence key in parent
167 WOLock atomic[1]; // Atomic update in progress
168 uint split; // right split page atomic insert
169 uint entry; // entry slot in latch table
170 uint next; // next entry in hash table chain
171 uint prev; // prev entry in hash table chain
172 BtMutexLatch modify[1]; // modify entry lite latch
173 volatile ushort pin; // number of accessing threads
174 volatile unsigned char dirty; // page in cache is dirty (atomic set)
175 volatile unsigned char avail; // page is an available entry
178 // Define the length of the page record numbers
182 // Page key slot definition.
184 // Keys are marked dead, but remain on the page until
185 // it cleanup is called. The fence key (highest key) for
186 // a leaf page is always present, even after cleanup.
190 // In addition to the Unique keys that occupy slots
191 // there are Librarian and Duplicate key
192 // slots occupying the key slot array.
194 // The Librarian slots are dead keys that
195 // serve as filler, available to add new Unique
196 // or Dup slots that are inserted into the B-tree.
198 // The Duplicate slots have had their key bytes extended
199 // by 6 bytes to contain a binary duplicate key uniqueifier.
210 uint off:BT_maxbits; // page offset for key start
211 uint type:3; // type of slot
212 uint dead:1; // set for deleted slot
215 // The key structure occupies space at the upper end of
216 // each page. It's a length byte followed by the key
220 unsigned char len; // this can be changed to a ushort or uint
221 unsigned char key[0];
224 // the value structure also occupies space at the upper
225 // end of the page. Each key is immediately followed by a value.
228 unsigned char len; // this can be changed to a ushort or uint
229 unsigned char value[0];
232 #define BT_maxkey 255 // maximum number of bytes in a key
233 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
235 // The first part of an index page.
236 // It is immediately followed
237 // by the BtSlot array of keys.
239 // note that this structure size
240 // must be a multiple of 8 bytes
241 // in order to place dups correctly.
243 typedef struct BtPage_ {
244 uint cnt; // count of keys in page
245 uint act; // count of active keys
246 uint min; // next key offset
247 uint garbage; // page garbage in bytes
248 unsigned char bits:7; // page size in bits
249 unsigned char free:1; // page is on free chain
250 unsigned char lvl:7; // level of page
251 unsigned char kill:1; // page is being deleted
252 unsigned char right[BtId]; // page number to right
253 unsigned char left[BtId]; // page number to left
254 unsigned char filler[2]; // padding to multiple of 8
255 logseqno lsn; // log sequence number applied
258 // The loadpage interface object
261 BtPage page; // current page pointer
262 BtLatchSet *latch; // current page latch set
265 // structure for latch manager on ALLOC_page
268 struct BtPage_ alloc[1]; // next page_no in right ptr
269 unsigned long long dups[1]; // global duplicate key uniqueifier
270 unsigned char chain[BtId]; // head of free page_nos chain
273 // The object structure for Btree access
276 uint page_size; // page size
277 uint page_bits; // page size in bits
283 BtPageZero *pagezero; // mapped allocation page
284 BtHashEntry *hashtable; // the buffer pool hash table entries
285 BtLatchSet *latchsets; // mapped latch set from buffer pool
286 unsigned char *pagepool; // mapped to the buffer pool pages
287 unsigned char *redobuff; // mapped recovery buffer pointer
288 logseqno lsn, flushlsn; // current & first lsn flushed
289 BtMutexLatch dump[1]; // redo dump lite latch
290 BtMutexLatch redo[1]; // redo area lite latch
291 BtMutexLatch lock[1]; // allocation area lite latch
292 ushort thread_no[1]; // next thread number
293 uint nlatchpage; // number of latch pages at BT_latch
294 uint latchtotal; // number of page latch entries
295 uint latchhash; // number of latch hash table slots
296 uint latchvictim; // next latch entry to examine
297 uint latchavail; // next available latch entry
298 uint availlock[1]; // latch available chain commitments
299 uint available; // size of latch available chain
300 uint redopages; // size of recovery buff in pages
301 uint redoend; // eof/end element in recovery buff
303 HANDLE halloc; // allocation handle
304 HANDLE hpool; // buffer pool handle
309 BtMgr *mgr; // buffer manager for thread
310 BtPage cursor; // cached frame for start/next (never mapped)
311 BtPage frame; // spare frame for the page split (never mapped)
312 uid cursor_page; // current cursor page number
313 unsigned char *mem; // frame, cursor, page memory buffer
314 unsigned char key[BT_keyarray]; // last found complete key
315 int found; // last delete or insert was found
316 int err; // last error
317 int line; // last error line no
318 int reads, writes; // number of reads and writes from the btree
319 ushort thread_no; // thread number
322 // Catastrophic errors
337 #define CLOCK_bit 0x8000
339 // recovery manager entry types
342 BTRM_eof = 0, // rest of buffer is emtpy
343 BTRM_add, // add a unique key-value to btree
344 BTRM_dup, // add a duplicate key-value to btree
345 BTRM_del, // delete a key-value from btree
346 BTRM_upd, // update a key with a new value
347 BTRM_new, // allocate a new empty page
348 BTRM_old // reuse an old empty page
351 // recovery manager entry
352 // structure followed by BtKey & BtVal
355 logseqno reqlsn; // log sequence number required
356 logseqno lsn; // log sequence number for entry
357 uint len; // length of entry
358 unsigned char type; // type of entry
359 unsigned char lvl; // level of btree entry pertains to
364 extern void bt_close (BtDb *bt);
365 extern BtDb *bt_open (BtMgr *mgr);
366 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
367 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
368 extern void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
369 extern void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
370 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
371 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
372 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
373 extern BtKey *bt_foundkey (BtDb *bt);
374 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
375 extern uint bt_nextkey (BtDb *bt, uint slot);
378 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint rmpages);
379 extern void bt_mgrclose (BtMgr *mgr);
380 extern logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val);
381 extern logseqno bt_txnredo (BtDb *bt, BtPage page);
383 // Helper functions to return slot values
384 // from the cursor page.
386 extern BtKey *bt_key (BtDb *bt, uint slot);
387 extern BtVal *bt_val (BtDb *bt, uint slot);
389 // The page is allocated from low and hi ends.
390 // The key slots are allocated from the bottom,
391 // while the text and value of the key
392 // are allocated from the top. When the two
393 // areas meet, the page is split into two.
395 // A key consists of a length byte, two bytes of
396 // index number (0 - 65535), and up to 253 bytes
399 // Associated with each key is a value byte string
400 // containing any value desired.
402 // The b-tree root is always located at page 1.
403 // The first leaf page of level zero is always
404 // located on page 2.
406 // The b-tree pages are linked with next
407 // pointers to facilitate enumerators,
408 // and provide for concurrency.
410 // When to root page fills, it is split in two and
411 // the tree height is raised by a new root at page
412 // one with two keys.
414 // Deleted keys are marked with a dead bit until
415 // page cleanup. The fence key for a leaf node is
418 // To achieve maximum concurrency one page is locked at a time
419 // as the tree is traversed to find leaf key in question. The right
420 // page numbers are used in cases where the page is being split,
423 // Page 0 is dedicated to lock for new page extensions,
424 // and chains empty pages together for reuse. It also
425 // contains the latch manager hash table.
427 // The ParentModification lock on a node is obtained to serialize posting
428 // or changing the fence key for a node.
430 // Empty pages are chained together through the ALLOC page and reused.
432 // Access macros to address slot and key values from the page
433 // Page slots use 1 based indexing.
435 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
436 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
437 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
439 void bt_putid(unsigned char *dest, uid id)
444 dest[i] = (unsigned char)id, id >>= 8;
447 uid bt_getid(unsigned char *src)
452 for( i = 0; i < BtId; i++ )
453 id <<= 8, id |= *src++;
458 uid bt_newdup (BtDb *bt)
461 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
463 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
467 // Write-Only Queue Lock
469 void WriteOLock (WOLock *lock, ushort tid)
473 if( lock->tid == tid ) {
480 prev = __sync_fetch_and_or (lock->exclusive, 1);
482 prev = _InterlockedExchangeOr (lock->exclusive, 1);
484 if( !(prev & XCL) ) {
489 sys_futex( (void *)lock->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
496 void WriteORelease (WOLock *lock)
503 *lock->exclusive = 0;
506 sys_futex( (void *)lock->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
510 // Phase-Fair reader/writer lock implementation
512 void WriteLock (RWLock *lock, ushort tid)
516 if( lock->tid == tid ) {
521 tix = __sync_fetch_and_add (lock->ticket, 1);
523 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
525 // wait for our ticket to come up
527 while( tix != lock->serving[0] )
534 w = PRES | (tix & PHID);
536 r = __sync_fetch_and_add (lock->rin, w);
538 r = _InterlockedExchangeAdd16 (lock->rin, w);
540 while( r != *lock->rout )
549 void WriteRelease (RWLock *lock)
558 __sync_fetch_and_and (lock->rin, ~MASK);
560 _InterlockedAnd16 (lock->rin, ~MASK);
565 // try to obtain read lock
566 // return 1 if successful
568 int ReadTry (RWLock *lock, ushort tid)
572 // OK if write lock already held by same thread
574 if( lock->tid == tid ) {
579 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
581 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
584 if( w == (*lock->rin & MASK) ) {
586 __sync_fetch_and_add (lock->rin, -RINC);
588 _InterlockedExchangeAdd16 (lock->rin, -RINC);
596 void ReadLock (RWLock *lock, ushort tid)
599 if( lock->tid == tid ) {
604 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
606 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
609 while( w == (*lock->rin & MASK) )
617 void ReadRelease (RWLock *lock)
625 __sync_fetch_and_add (lock->rout, RINC);
627 _InterlockedExchangeAdd16 (lock->rout, RINC);
631 // lite weight spin lock Latch Manager
633 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
635 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
638 void bt_mutexlock(BtMutexLatch *latch)
644 prev = __sync_fetch_and_or(latch->exclusive, XCL);
646 prev = _InterlockedOr(latch->exclusive, XCL);
651 sys_futex( (void *)latch->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
658 // try to obtain write lock
660 // return 1 if obtained,
663 int bt_mutextry(BtMutexLatch *latch)
668 prev = __sync_fetch_and_or(latch->exclusive, XCL);
670 prev = _InterlockedOr(latch->exclusive, XCL);
672 // take write access if exclusive bit is clear
674 return !(prev & XCL);
679 void bt_releasemutex(BtMutexLatch *latch)
681 *latch->exclusive = 0;
683 sys_futex( (void *)latch->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
687 // recovery manager -- flush dirty pages
689 void bt_flushlsn (BtDb *bt)
691 uint cnt3 = 0, cnt2 = 0, cnt = 0;
696 // flush dirty pool pages to the btree that were
697 // dirty before the start of this redo recovery buffer
698 fprintf(stderr, "Start flushlsn\n");
699 for( entry = 1; entry < bt->mgr->latchtotal; entry++ ) {
700 page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
701 latch = bt->mgr->latchsets + entry;
702 bt_mutexlock (latch->modify);
703 bt_lockpage(bt, BtLockRead, latch);
706 bt_writepage(bt->mgr, page, latch->page_no);
707 latch->dirty = 0, bt->writes++, cnt++;
711 if( latch->pin & ~CLOCK_bit )
713 bt_unlockpage(bt, BtLockRead, latch);
714 bt_releasemutex (latch->modify);
716 fprintf(stderr, "End flushlsn %d pages %d pinned %d available\n", cnt, cnt2, cnt3);
719 // recovery manager -- process current recovery buff on startup
720 // this won't do much if previous session was properly closed.
722 BTERR bt_recoveryredo (BtMgr *mgr)
729 pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
731 hdr = (BtLogHdr *)mgr->redobuff;
732 mgr->flushlsn = hdr->lsn;
735 hdr = (BtLogHdr *)(mgr->redobuff + offset);
736 switch( hdr->type ) {
740 case BTRM_add: // add a unique key-value to btree
742 case BTRM_dup: // add a duplicate key-value to btree
743 case BTRM_del: // delete a key-value from btree
744 case BTRM_upd: // update a key with a new value
745 case BTRM_new: // allocate a new empty page
746 case BTRM_old: // reuse an old empty page
752 // recovery manager -- dump current recovery buff & flush dirty pages
753 // in preparation for next recovery buffer.
755 BTERR bt_dumpredo (BtDb *bt)
758 fprintf(stderr, "Flush pages ");
760 eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
761 memset (eof, 0, sizeof(BtLogHdr));
763 // flush pages written at beginning of this redo buffer
764 // then write the redo buffer out to disk
766 fdatasync (bt->mgr->idx);
768 fprintf(stderr, "Dump ReDo: %d bytes\n", bt->mgr->redoend);
769 pwrite (bt->mgr->idx, bt->mgr->redobuff, bt->mgr->redoend + sizeof(BtLogHdr), REDO_page << bt->mgr->page_bits);
771 sync_file_range (bt->mgr->idx, REDO_page << bt->mgr->page_bits, bt->mgr->redoend + sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
773 bt->mgr->flushlsn = bt->mgr->lsn;
774 bt->mgr->redoend = 0;
776 eof = (BtLogHdr *)(bt->mgr->redobuff);
777 memset (eof, 0, sizeof(BtLogHdr));
778 eof->lsn = bt->mgr->lsn;
782 // recovery manager -- append new entry to recovery log
783 // flush to disk when it overflows.
785 logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val)
787 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
788 uint amt = sizeof(BtLogHdr);
792 bt_mutexlock (bt->mgr->redo);
795 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
797 // see if new entry fits in buffer
798 // flush and reset if it doesn't
800 if( flush = amt > size - bt->mgr->redoend ) {
801 bt_mutexlock (bt->mgr->dump);
803 if( bt_dumpredo (bt) )
807 // fill in new entry & either eof or end block
809 hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
814 hdr->lsn = ++bt->mgr->lsn;
816 bt->mgr->redoend += amt;
818 eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
819 memset (eof, 0, sizeof(BtLogHdr));
821 // fill in key and value
824 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
825 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
828 bt_releasemutex(bt->mgr->redo);
832 bt_releasemutex(bt->mgr->dump);
838 // recovery manager -- append transaction to recovery log
839 // flush to disk when it overflows.
841 logseqno bt_txnredo (BtDb *bt, BtPage source)
843 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
844 uint amt = 0, src, type;
851 // determine amount of redo recovery log space required
853 for( src = 0; src++ < source->cnt; ) {
854 key = keyptr(source,src);
855 val = valptr(source,src);
856 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
857 amt += sizeof(BtLogHdr);
860 bt_mutexlock (bt->mgr->redo);
862 // see if new entry fits in buffer
863 // flush and reset if it doesn't
865 if( flush = amt > size - bt->mgr->redoend ) {
866 bt_mutexlock (bt->mgr->dump);
868 if( bt_dumpredo (bt) )
872 // assign new lsn to transaction
874 lsn = ++bt->mgr->lsn;
876 // fill in new entries
878 for( src = 0; src++ < source->cnt; ) {
879 key = keyptr(source, src);
880 val = valptr(source, src);
882 switch( slotptr(source, src)->type ) {
897 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
898 amt += sizeof(BtLogHdr);
900 hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
906 // fill in key and value
908 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
909 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
911 bt->mgr->redoend += amt;
914 eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
915 memset (eof, 0, sizeof(BtLogHdr));
917 bt_releasemutex(bt->mgr->redo);
921 bt_releasemutex(bt->mgr->dump);
927 // read page into buffer pool from permanent location in Btree file
929 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
931 off64_t off = page_no << mgr->page_bits;
934 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
935 fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno);
942 memset (ovl, 0, sizeof(OVERLAPPED));
944 ovl->OffsetHigh = off >> 32;
946 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
947 fprintf (stderr, "Unable to read page %d GetLastError = %d\n", page_no, GetLastError());
950 if( *amt < mgr->page_size ) {
951 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
958 // write page to permanent location in Btree file
959 // clear the dirty bit
961 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
963 off64_t off = page_no << mgr->page_bits;
966 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
972 memset (ovl, 0, sizeof(OVERLAPPED));
974 ovl->OffsetHigh = off >> 32;
976 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
979 if( *amt < mgr->page_size )
985 // set CLOCK bit in latch
986 // decrement pin count
988 void bt_unpinlatch (BtLatchSet *latch)
990 bt_mutexlock(latch->modify);
991 latch->pin |= CLOCK_bit;
993 bt_releasemutex(latch->modify);
996 // return the btree cached page address
997 // if page is dirty and has not yet been
998 // flushed to disk for the current redo
999 // recovery buffer, write it out.
1001 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
1003 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
1008 // return next available latch entry
1009 // and with latch entry locked
1011 uint bt_availnext (BtDb *bt)
1018 entry = __sync_fetch_and_add (&bt->mgr->latchavail, 1) + 1;
1020 entry = _InterlockedIncrement (&bt->mgr->latchavail);
1022 entry %= bt->mgr->latchtotal;
1027 latch = bt->mgr->latchsets + entry;
1032 bt_mutexlock(latch->modify);
1034 if( !latch->avail ) {
1035 bt_releasemutex(latch->modify);
1043 // find and add the next available latch entry
1046 BTERR bt_availlatch (BtDb *bt)
1054 // find and reuse previous entry on victim
1056 startattempt = bt->mgr->latchvictim;
1060 entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
1062 entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
1064 // skip entry if it has outstanding pins
1066 entry %= bt->mgr->latchtotal;
1071 // only go around one time before
1072 // flushing redo recovery buffer,
1073 // and the buffer pool to free up entries.
1075 if( bt->mgr->redopages )
1076 if( bt->mgr->latchvictim - startattempt > bt->mgr->latchtotal ) {
1077 if( bt_mutextry (bt->mgr->dump) ) {
1078 if( bt_dumpredo (bt) )
1081 // synchronize the various threads running into this condition
1082 // so that only one thread does the dump and flush
1084 bt_mutexlock(bt->mgr->dump);
1086 startattempt = bt->mgr->latchvictim;
1087 bt_releasemutex(bt->mgr->dump);
1090 latch = bt->mgr->latchsets + entry;
1095 bt_mutexlock(latch->modify);
1097 // skip if already an available entry
1099 if( latch->avail ) {
1100 bt_releasemutex(latch->modify);
1104 // skip this entry if it is pinned
1105 // if the CLOCK bit is set
1106 // reset it to zero.
1109 latch->pin &= ~CLOCK_bit;
1110 bt_releasemutex(latch->modify);
1114 page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
1116 // if dirty page has lsn >= last redo recovery buffer
1117 // then hold page in the buffer pool until next redo
1118 // recovery buffer is being written to disk.
1121 if( page->lsn >= bt->mgr->flushlsn ) {
1122 bt_releasemutex(latch->modify);
1126 // entry is available
1128 __sync_fetch_and_add (&bt->mgr->available, 1);
1130 _InterlockedIncrement(&bt->mgr->available);
1133 bt_releasemutex(latch->modify);
1138 // release available latch requests
1140 void bt_availrelease (BtDb *bt, uint count)
1143 __sync_fetch_and_add(bt->mgr->availlock, -count);
1145 _InterlockedAdd(bt->mgr->availlock, -count);
1149 // commit available chain entries
1150 // find available entries as required
1152 void bt_availrequest (BtDb *bt, uint count)
1155 __sync_fetch_and_add(bt->mgr->availlock, count);
1157 _InterlockedAdd(bt->mgr->availlock, count);
1160 while( *bt->mgr->availlock > bt->mgr->available )
1164 // find available latchset
1165 // return with latchset pinned
1167 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, BtPage loadit)
1169 uint hashidx = page_no % bt->mgr->latchhash;
1174 // try to find our entry
1176 bt_mutexlock(bt->mgr->hashtable[hashidx].latch);
1178 if( entry = bt->mgr->hashtable[hashidx].entry ) do
1180 latch = bt->mgr->latchsets + entry;
1181 if( page_no == latch->page_no )
1183 } while( entry = latch->next );
1185 // found our entry: increment pin
1186 // remove from available status
1189 latch = bt->mgr->latchsets + entry;
1190 bt_mutexlock(latch->modify);
1193 __sync_fetch_and_add (&bt->mgr->available, -1);
1195 _InterlockedDecrement(&bt->mgr->available);
1198 latch->pin |= CLOCK_bit;
1201 bt_releasemutex(latch->modify);
1202 bt_releasemutex(bt->mgr->hashtable[hashidx].latch);
1206 // find and reuse entry from available set
1210 if( entry = bt_availnext (bt) )
1211 latch = bt->mgr->latchsets + entry;
1213 return bt->line = __LINE__, bt->err = BTERR_avail, NULL;
1215 idx = latch->page_no % bt->mgr->latchhash;
1217 // if latch is on a different hash chain
1218 // unlink from the old page_no chain
1220 if( latch->page_no )
1221 if( idx != hashidx ) {
1223 // skip over this entry if latch not available
1225 if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
1226 bt_releasemutex(latch->modify);
1231 bt->mgr->latchsets[latch->prev].next = latch->next;
1233 bt->mgr->hashtable[idx].entry = latch->next;
1236 bt->mgr->latchsets[latch->next].prev = latch->prev;
1238 bt_releasemutex (bt->mgr->hashtable[idx].latch);
1241 // remove available status
1245 __sync_fetch_and_add (&bt->mgr->available, -1);
1247 _InterlockedDecrement(&bt->mgr->available);
1249 page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
1252 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
1253 return bt->line = __LINE__, NULL;
1255 latch->dirty = 0, bt->writes++;
1258 memcpy (page, loadit, bt->mgr->page_size);
1261 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
1262 return bt->line = __LINE__, NULL;
1266 // link page as head of hash table chain
1267 // if this is a never before used entry,
1268 // or it was previously on a different
1269 // hash table chain. Otherwise, just
1270 // leave it in its current hash table
1273 if( !latch->page_no || hashidx != idx ) {
1274 if( latch->next = bt->mgr->hashtable[hashidx].entry )
1275 bt->mgr->latchsets[latch->next].prev = entry;
1277 bt->mgr->hashtable[hashidx].entry = entry;
1281 // fill in latch structure
1283 latch->pin = CLOCK_bit | 1;
1284 latch->page_no = page_no;
1285 latch->entry = entry;
1288 bt_releasemutex (latch->modify);
1289 bt_releasemutex (bt->mgr->hashtable[hashidx].latch);
1293 void bt_mgrclose (BtMgr *mgr)
1301 // flush previously written dirty pages
1302 // and write recovery buffer to disk
1304 fdatasync (mgr->idx);
1306 if( mgr->redoend ) {
1307 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1308 memset (eof, 0, sizeof(BtLogHdr));
1310 pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1313 // write remaining dirty pool pages to the btree
1315 for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1316 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1317 latch = mgr->latchsets + slot;
1319 if( latch->dirty ) {
1320 bt_writepage(mgr, page, latch->page_no);
1321 latch->dirty = 0, num++;
1325 // flush last batch to disk and clear
1326 // redo recovery buffer on disk.
1328 fdatasync (mgr->idx);
1330 eof = (BtLogHdr *)mgr->redobuff;
1331 memset (eof, 0, sizeof(BtLogHdr));
1332 eof->lsn = mgr->lsn;
1334 pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1336 sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
1338 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1341 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1342 munmap (mgr->pagezero, mgr->page_size);
1344 FlushViewOfFile(mgr->pagezero, 0);
1345 UnmapViewOfFile(mgr->pagezero);
1346 UnmapViewOfFile(mgr->pagepool);
1347 CloseHandle(mgr->halloc);
1348 CloseHandle(mgr->hpool);
1351 free (mgr->redobuff);
1355 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1356 FlushFileBuffers(mgr->idx);
1357 CloseHandle(mgr->idx);
1362 // close and release memory
1364 void bt_close (BtDb *bt)
1371 VirtualFree (bt->mem, 0, MEM_RELEASE);
1376 // open/create new btree buffer manager
1378 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1379 // size of page pool (e.g. 262144)
1381 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1383 uint lvl, attr, last, slot, idx;
1384 uint nlatchpage, latchhash;
1385 unsigned char value[BtId];
1386 int flag, initit = 0;
1387 BtPageZero *pagezero;
1395 // determine sanity of page size and buffer pool
1397 if( bits > BT_maxbits )
1399 else if( bits < BT_minbits )
1402 if( nodemax < 16 ) {
1403 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
1408 mgr = calloc (1, sizeof(BtMgr));
1410 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1412 if( mgr->idx == -1 ) {
1413 fprintf (stderr, "Unable to open btree file\n");
1414 return free(mgr), NULL;
1417 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1418 attr = FILE_ATTRIBUTE_NORMAL;
1419 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1421 if( mgr->idx == INVALID_HANDLE_VALUE )
1422 return GlobalFree(mgr), NULL;
1426 pagezero = valloc (BT_maxpage);
1429 // read minimum page size to get root info
1430 // to support raw disk partition files
1431 // check if bits == 0 on the disk.
1433 if( size = lseek (mgr->idx, 0L, 2) )
1434 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1435 if( pagezero->alloc->bits )
1436 bits = pagezero->alloc->bits;
1440 return free(mgr), free(pagezero), NULL;
1444 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1445 size = GetFileSize(mgr->idx, amt);
1447 if( size || *amt ) {
1448 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1449 return bt_mgrclose (mgr), NULL;
1450 bits = pagezero->alloc->bits;
1455 mgr->page_size = 1 << bits;
1456 mgr->page_bits = bits;
1458 // calculate number of latch hash table entries
1460 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1462 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1463 mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
1464 mgr->latchtotal = nodemax;
1469 // initialize an empty b-tree with latch page, root page, page of leaves
1470 // and page(s) of latches and page pool cache
1472 memset (pagezero, 0, 1 << bits);
1473 pagezero->alloc->bits = mgr->page_bits;
1474 bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
1476 // initialize left-most LEAF page in
1479 bt_putid (pagezero->alloc->left, LEAF_page);
1481 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1482 fprintf (stderr, "Unable to create btree page zero\n");
1483 return bt_mgrclose (mgr), NULL;
1486 memset (pagezero, 0, 1 << bits);
1487 pagezero->alloc->bits = mgr->page_bits;
1489 for( lvl=MIN_lvl; lvl--; ) {
1490 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1491 key = keyptr(pagezero->alloc, 1);
1492 key->len = 2; // create stopper key
1496 bt_putid(value, MIN_lvl - lvl + 1);
1497 val = valptr(pagezero->alloc, 1);
1498 val->len = lvl ? BtId : 0;
1499 memcpy (val->value, value, val->len);
1501 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1502 pagezero->alloc->lvl = lvl;
1503 pagezero->alloc->cnt = 1;
1504 pagezero->alloc->act = 1;
1506 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1507 fprintf (stderr, "Unable to create btree page zero\n");
1508 return bt_mgrclose (mgr), NULL;
1516 VirtualFree (pagezero, 0, MEM_RELEASE);
1519 // mlock the pagezero page
1521 flag = PROT_READ | PROT_WRITE;
1522 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1523 if( mgr->pagezero == MAP_FAILED ) {
1524 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1525 return bt_mgrclose (mgr), NULL;
1527 mlock (mgr->pagezero, mgr->page_size);
1529 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1530 if( mgr->pagepool == MAP_FAILED ) {
1531 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1532 return bt_mgrclose (mgr), NULL;
1534 if( mgr->redopages = redopages )
1535 mgr->redobuff = valloc (redopages * mgr->page_size);
1537 flag = PAGE_READWRITE;
1538 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1539 if( !mgr->halloc ) {
1540 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1541 return bt_mgrclose (mgr), NULL;
1544 flag = FILE_MAP_WRITE;
1545 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1546 if( !mgr->pagezero ) {
1547 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1548 return bt_mgrclose (mgr), NULL;
1551 flag = PAGE_READWRITE;
1552 size = (uid)mgr->nlatchpage << mgr->page_bits;
1553 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1555 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1556 return bt_mgrclose (mgr), NULL;
1559 flag = FILE_MAP_WRITE;
1560 mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1561 if( !mgr->pagepool ) {
1562 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1563 return bt_mgrclose (mgr), NULL;
1565 if( mgr->redopages = redopages )
1566 mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
1569 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1570 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1571 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1573 // mark all pool entries as available
1575 for( idx = 1; idx < mgr->latchtotal; idx++ ) {
1576 latch = mgr->latchsets + idx;
1584 // open BTree access method
1585 // based on buffer manager
1587 BtDb *bt_open (BtMgr *mgr)
1589 BtDb *bt = malloc (sizeof(*bt));
1591 memset (bt, 0, sizeof(*bt));
1594 bt->mem = valloc (2 *mgr->page_size);
1596 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1598 bt->frame = (BtPage)bt->mem;
1599 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1601 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1603 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1608 // compare two keys, return > 0, = 0, or < 0
1609 // =0: keys are same
1612 // as the comparison value
1614 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1616 uint len1 = key1->len;
1619 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1630 // place write, read, or parent lock on requested page_no.
1632 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1636 ReadLock (latch->readwr, bt->thread_no);
1639 WriteLock (latch->readwr, bt->thread_no);
1642 ReadLock (latch->access, bt->thread_no);
1645 WriteLock (latch->access, bt->thread_no);
1648 WriteOLock (latch->parent, bt->thread_no);
1651 WriteOLock (latch->atomic, bt->thread_no);
1653 case BtLockAtomic | BtLockRead:
1654 WriteOLock (latch->atomic, bt->thread_no);
1655 ReadLock (latch->readwr, bt->thread_no);
1660 // remove write, read, or parent lock on requested page
1662 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1666 ReadRelease (latch->readwr);
1669 WriteRelease (latch->readwr);
1672 ReadRelease (latch->access);
1675 WriteRelease (latch->access);
1678 WriteORelease (latch->parent);
1681 WriteORelease (latch->atomic);
1683 case BtLockAtomic | BtLockRead:
1684 WriteORelease (latch->atomic);
1685 ReadRelease (latch->readwr);
1690 // allocate a new page
1691 // return with page latched, but unlocked.
1693 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1698 // lock allocation page
1700 bt_mutexlock(bt->mgr->lock);
1702 // use empty chain first
1703 // else allocate empty page
1705 if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1706 if( set->latch = bt_pinlatch (bt, page_no, NULL) )
1707 set->page = bt_mappage (bt, set->latch);
1709 return bt->err = BTERR_struct, bt->line = __LINE__, -1;
1711 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1712 bt_releasemutex(bt->mgr->lock);
1714 memcpy (set->page, contents, bt->mgr->page_size);
1715 set->latch->dirty = 1;
1719 page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1720 bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1722 // unlock allocation latch and
1723 // extend file into new page.
1725 bt_releasemutex(bt->mgr->lock);
1727 // don't load cache from btree page
1729 if( set->latch = bt_pinlatch (bt, page_no, contents) )
1730 set->page = bt_mappage (bt, set->latch);
1734 set->latch->dirty = 1;
1738 // find slot in page for given key at a given level
1740 int bt_findslot (BtPage page, unsigned char *key, uint len)
1742 uint diff, higher = page->cnt, low = 1, slot;
1745 // make stopper key an infinite fence value
1747 if( bt_getid (page->right) )
1752 // low is the lowest candidate.
1753 // loop ends when they meet
1755 // higher is already
1756 // tested as .ge. the passed key.
1758 while( diff = higher - low ) {
1759 slot = low + ( diff >> 1 );
1760 if( keycmp (keyptr(page, slot), key, len) < 0 )
1763 higher = slot, good++;
1766 // return zero if key is on right link page
1768 return good ? higher : 0;
1771 // find and load page at given level for given key
1772 // leave page rd or wr locked as requested
1774 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1776 uid page_no = ROOT_page, prevpage = 0;
1777 uint drill = 0xff, slot;
1778 BtLatchSet *prevlatch;
1779 uint mode, prevmode;
1782 // start at root of btree and drill down
1785 // determine lock mode of drill level
1786 mode = (drill == lvl) ? lock : BtLockRead;
1788 if( !(set->latch = bt_pinlatch (bt, page_no, NULL)) )
1791 // obtain access lock using lock chaining with Access mode
1793 if( page_no > ROOT_page )
1794 bt_lockpage(bt, BtLockAccess, set->latch);
1796 set->page = bt_mappage (bt, set->latch);
1798 // release & unpin parent or left sibling page
1801 bt_unlockpage(bt, prevmode, prevlatch);
1802 bt_unpinlatch (prevlatch);
1806 // obtain mode lock using lock chaining through AccessLock
1808 bt_lockpage(bt, mode, set->latch);
1810 if( set->page->free )
1811 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1813 if( page_no > ROOT_page )
1814 bt_unlockpage(bt, BtLockAccess, set->latch);
1816 // re-read and re-lock root after determining actual level of root
1818 if( set->page->lvl != drill) {
1819 if( set->latch->page_no != ROOT_page )
1820 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1822 drill = set->page->lvl;
1824 if( lock != BtLockRead && drill == lvl ) {
1825 bt_unlockpage(bt, mode, set->latch);
1826 bt_unpinlatch (set->latch);
1831 prevpage = set->latch->page_no;
1832 prevlatch = set->latch;
1835 // find key on page at this level
1836 // and descend to requested level
1838 if( !set->page->kill )
1839 if( slot = bt_findslot (set->page, key, len) ) {
1843 // find next non-dead slot -- the fence key if nothing else
1845 while( slotptr(set->page, slot)->dead )
1846 if( slot++ < set->page->cnt )
1849 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1851 val = valptr(set->page, slot);
1853 if( val->len == BtId )
1854 page_no = bt_getid(valptr(set->page, slot)->value);
1856 return bt->line = __LINE__, bt->err = BTERR_struct, 0;
1862 // slide right into next page
1864 page_no = bt_getid(set->page->right);
1867 // return error on end of right chain
1869 bt->line = __LINE__, bt->err = BTERR_struct;
1870 return 0; // return error
1873 // return page to free list
1874 // page must be delete & write locked
1876 void bt_freepage (BtDb *bt, BtPageSet *set)
1878 // lock allocation page
1880 bt_mutexlock (bt->mgr->lock);
1884 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1885 bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1886 set->latch->dirty = 1;
1887 set->page->free = 1;
1889 // unlock released page
1891 bt_unlockpage (bt, BtLockDelete, set->latch);
1892 bt_unlockpage (bt, BtLockWrite, set->latch);
1893 bt_unpinlatch (set->latch);
1895 // unlock allocation page
1897 bt_releasemutex (bt->mgr->lock);
1900 // a fence key was deleted from a page
1901 // push new fence value upwards
1903 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1905 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1906 unsigned char value[BtId];
1910 // remove the old fence value
1912 ptr = keyptr(set->page, set->page->cnt);
1913 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1914 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1915 set->latch->dirty = 1;
1917 // cache new fence value
1919 ptr = keyptr(set->page, set->page->cnt);
1920 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1922 bt_lockpage (bt, BtLockParent, set->latch);
1923 bt_unlockpage (bt, BtLockWrite, set->latch);
1925 // insert new (now smaller) fence key
1927 bt_putid (value, set->latch->page_no);
1928 ptr = (BtKey*)leftkey;
1930 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1933 // now delete old fence key
1935 ptr = (BtKey*)rightkey;
1937 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1940 bt_unlockpage (bt, BtLockParent, set->latch);
1941 bt_unpinlatch(set->latch);
1945 // root has a single child
1946 // collapse a level from the tree
1948 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1955 // find the child entry and promote as new root contents
1958 for( idx = 0; idx++ < root->page->cnt; )
1959 if( !slotptr(root->page, idx)->dead )
1962 val = valptr(root->page, idx);
1964 if( val->len == BtId )
1965 page_no = bt_getid (valptr(root->page, idx)->value);
1967 return bt->line = __LINE__, bt->err = BTERR_struct;
1969 if( child->latch = bt_pinlatch (bt, page_no, NULL) )
1970 child->page = bt_mappage (bt, child->latch);
1974 bt_lockpage (bt, BtLockDelete, child->latch);
1975 bt_lockpage (bt, BtLockWrite, child->latch);
1977 memcpy (root->page, child->page, bt->mgr->page_size);
1978 root->latch->dirty = 1;
1980 bt_freepage (bt, child);
1982 } while( root->page->lvl > 1 && root->page->act == 1 );
1984 bt_unlockpage (bt, BtLockWrite, root->latch);
1985 bt_unpinlatch (root->latch);
1989 // delete a page and manage keys
1990 // call with page writelocked
1991 // returns with page unpinned
1993 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1995 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1996 unsigned char value[BtId];
1997 uint lvl = set->page->lvl;
2002 // cache copy of fence key
2003 // to post in parent
2005 ptr = keyptr(set->page, set->page->cnt);
2006 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
2008 // obtain lock on right page
2010 page_no = bt_getid(set->page->right);
2012 if( right->latch = bt_pinlatch (bt, page_no, NULL) )
2013 right->page = bt_mappage (bt, right->latch);
2017 bt_lockpage (bt, BtLockWrite, right->latch);
2019 // cache copy of key to update
2021 ptr = keyptr(right->page, right->page->cnt);
2022 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
2024 if( right->page->kill )
2025 return bt->line = __LINE__, bt->err = BTERR_struct;
2027 // pull contents of right peer into our empty page
2029 memcpy (set->page, right->page, bt->mgr->page_size);
2030 set->latch->dirty = 1;
2032 // mark right page deleted and point it to left page
2033 // until we can post parent updates that remove access
2034 // to the deleted page.
2036 bt_putid (right->page->right, set->latch->page_no);
2037 right->latch->dirty = 1;
2038 right->page->kill = 1;
2040 bt_lockpage (bt, BtLockParent, right->latch);
2041 bt_unlockpage (bt, BtLockWrite, right->latch);
2043 bt_lockpage (bt, BtLockParent, set->latch);
2044 bt_unlockpage (bt, BtLockWrite, set->latch);
2046 // redirect higher key directly to our new node contents
2048 bt_putid (value, set->latch->page_no);
2049 ptr = (BtKey*)higherfence;
2051 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2054 // delete old lower key to our node
2056 ptr = (BtKey*)lowerfence;
2058 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
2061 // obtain delete and write locks to right node
2063 bt_unlockpage (bt, BtLockParent, right->latch);
2064 bt_lockpage (bt, BtLockDelete, right->latch);
2065 bt_lockpage (bt, BtLockWrite, right->latch);
2066 bt_freepage (bt, right);
2068 bt_unlockpage (bt, BtLockParent, set->latch);
2069 bt_unpinlatch (set->latch);
2073 // find and delete key on page by marking delete flag bit
2074 // if page becomes empty, delete it from the btree
2076 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
2078 uint slot, idx, found, fence;
2083 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
2084 ptr = keyptr(set->page, slot);
2088 // if librarian slot, advance to real slot
2090 if( slotptr(set->page, slot)->type == Librarian )
2091 ptr = keyptr(set->page, ++slot);
2093 fence = slot == set->page->cnt;
2095 // if key is found delete it, otherwise ignore request
2097 if( found = !keycmp (ptr, key, len) )
2098 if( found = slotptr(set->page, slot)->dead == 0 ) {
2099 val = valptr(set->page,slot);
2100 slotptr(set->page, slot)->dead = 1;
2101 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2104 // collapse empty slots beneath the fence
2106 while( idx = set->page->cnt - 1 )
2107 if( slotptr(set->page, idx)->dead ) {
2108 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2109 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2114 // did we delete a fence key in an upper level?
2116 if( found && lvl && set->page->act && fence )
2117 if( bt_fixfence (bt, set, lvl) )
2122 // do we need to collapse root?
2124 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
2125 if( bt_collapseroot (bt, set) )
2130 // delete empty page
2132 if( !set->page->act )
2133 return bt_deletepage (bt, set);
2135 set->latch->dirty = 1;
2136 bt_unlockpage(bt, BtLockWrite, set->latch);
2137 bt_unpinlatch (set->latch);
2141 BtKey *bt_foundkey (BtDb *bt)
2143 return (BtKey*)(bt->key);
2146 // advance to next slot
2148 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
2150 BtLatchSet *prevlatch;
2153 if( slot < set->page->cnt )
2156 prevlatch = set->latch;
2158 if( page_no = bt_getid(set->page->right) )
2159 if( set->latch = bt_pinlatch (bt, page_no, NULL) )
2160 set->page = bt_mappage (bt, set->latch);
2164 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
2166 // obtain access lock using lock chaining with Access mode
2168 bt_lockpage(bt, BtLockAccess, set->latch);
2170 bt_unlockpage(bt, BtLockRead, prevlatch);
2171 bt_unpinlatch (prevlatch);
2173 bt_lockpage(bt, BtLockRead, set->latch);
2174 bt_unlockpage(bt, BtLockAccess, set->latch);
2178 // find unique key or first duplicate key in
2179 // leaf level and return number of value bytes
2180 // or (-1) if not found. Setup key for bt_foundkey
2182 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2190 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
2192 ptr = keyptr(set->page, slot);
2194 // skip librarian slot place holder
2196 if( slotptr(set->page, slot)->type == Librarian )
2197 ptr = keyptr(set->page, ++slot);
2199 // return actual key found
2201 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2204 if( slotptr(set->page, slot)->type == Duplicate )
2207 // not there if we reach the stopper key
2209 if( slot == set->page->cnt )
2210 if( !bt_getid (set->page->right) )
2213 // if key exists, return >= 0 value bytes copied
2214 // otherwise return (-1)
2216 if( slotptr(set->page, slot)->dead )
2220 if( !memcmp (ptr->key, key, len) ) {
2221 val = valptr (set->page,slot);
2222 if( valmax > val->len )
2224 memcpy (value, val->value, valmax);
2230 } while( slot = bt_findnext (bt, set, slot) );
2232 bt_unlockpage (bt, BtLockRead, set->latch);
2233 bt_unpinlatch (set->latch);
2237 // check page for space available,
2238 // clean if necessary and return
2239 // 0 - page needs splitting
2240 // >0 new slot value
2242 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
2244 uint nxt = bt->mgr->page_size;
2245 BtPage page = set->page;
2246 uint cnt = 0, idx = 0;
2247 uint max = page->cnt;
2252 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2255 // skip cleanup and proceed to split
2256 // if there's not enough garbage
2259 if( page->garbage < nxt / 5 )
2262 memcpy (bt->frame, page, bt->mgr->page_size);
2264 // skip page info and set rest of page to zero
2266 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2267 set->latch->dirty = 1;
2272 // clean up page first by
2273 // removing deleted keys
2275 while( cnt++ < max ) {
2279 if( cnt < max || bt->frame->lvl )
2280 if( slotptr(bt->frame,cnt)->dead )
2283 // copy the value across
2285 val = valptr(bt->frame, cnt);
2286 nxt -= val->len + sizeof(BtVal);
2287 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2289 // copy the key across
2291 key = keyptr(bt->frame, cnt);
2292 nxt -= key->len + sizeof(BtKey);
2293 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2295 // make a librarian slot
2297 slotptr(page, ++idx)->off = nxt;
2298 slotptr(page, idx)->type = Librarian;
2299 slotptr(page, idx)->dead = 1;
2303 slotptr(page, ++idx)->off = nxt;
2304 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2306 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2313 // see if page has enough space now, or does it need splitting?
2315 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2321 // split the root and raise the height of the btree
2323 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
2325 unsigned char leftkey[BT_keyarray];
2326 uint nxt = bt->mgr->page_size;
2327 unsigned char value[BtId];
2333 // save left page fence key for new root
2335 ptr = keyptr(root->page, root->page->cnt);
2336 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2338 // Obtain an empty page to use, and copy the current
2339 // root contents into it, e.g. lower keys
2341 if( bt_newpage(bt, left, root->page) )
2344 left_page_no = left->latch->page_no;
2345 bt_unpinlatch (left->latch);
2347 // preserve the page info at the bottom
2348 // of higher keys and set rest to zero
2350 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2352 // insert stopper key at top of newroot page
2353 // and increase the root height
2355 nxt -= BtId + sizeof(BtVal);
2356 bt_putid (value, right->page_no);
2357 val = (BtVal *)((unsigned char *)root->page + nxt);
2358 memcpy (val->value, value, BtId);
2361 nxt -= 2 + sizeof(BtKey);
2362 slotptr(root->page, 2)->off = nxt;
2363 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2368 // insert lower keys page fence key on newroot page as first key
2370 nxt -= BtId + sizeof(BtVal);
2371 bt_putid (value, left_page_no);
2372 val = (BtVal *)((unsigned char *)root->page + nxt);
2373 memcpy (val->value, value, BtId);
2376 ptr = (BtKey *)leftkey;
2377 nxt -= ptr->len + sizeof(BtKey);
2378 slotptr(root->page, 1)->off = nxt;
2379 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2381 bt_putid(root->page->right, 0);
2382 root->page->min = nxt; // reset lowest used offset and key count
2383 root->page->cnt = 2;
2384 root->page->act = 2;
2387 // release and unpin root pages
2389 bt_unlockpage(bt, BtLockWrite, root->latch);
2390 bt_unpinlatch (root->latch);
2392 bt_unpinlatch (right);
2396 // split already locked full node
2398 // return pool entry for new right
2401 uint bt_splitpage (BtDb *bt, BtPageSet *set)
2403 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2404 uint lvl = set->page->lvl;
2411 // split higher half of keys to bt->frame
2413 memset (bt->frame, 0, bt->mgr->page_size);
2414 max = set->page->cnt;
2418 while( cnt++ < max ) {
2419 if( cnt < max || set->page->lvl )
2420 if( slotptr(set->page, cnt)->dead )
2423 src = valptr(set->page, cnt);
2424 nxt -= src->len + sizeof(BtVal);
2425 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
2427 key = keyptr(set->page, cnt);
2428 nxt -= key->len + sizeof(BtKey);
2429 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
2430 memcpy (ptr, key, key->len + sizeof(BtKey));
2432 // add librarian slot
2434 slotptr(bt->frame, ++idx)->off = nxt;
2435 slotptr(bt->frame, idx)->type = Librarian;
2436 slotptr(bt->frame, idx)->dead = 1;
2440 slotptr(bt->frame, ++idx)->off = nxt;
2441 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2443 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2447 bt->frame->bits = bt->mgr->page_bits;
2448 bt->frame->min = nxt;
2449 bt->frame->cnt = idx;
2450 bt->frame->lvl = lvl;
2454 if( set->latch->page_no > ROOT_page )
2455 bt_putid (bt->frame->right, bt_getid (set->page->right));
2457 // get new free page and write higher keys to it.
2459 if( bt_newpage(bt, right, bt->frame) )
2462 // process lower keys
2464 memcpy (bt->frame, set->page, bt->mgr->page_size);
2465 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2466 set->latch->dirty = 1;
2468 nxt = bt->mgr->page_size;
2469 set->page->garbage = 0;
2475 if( slotptr(bt->frame, max)->type == Librarian )
2478 // assemble page of smaller keys
2480 while( cnt++ < max ) {
2481 if( slotptr(bt->frame, cnt)->dead )
2483 val = valptr(bt->frame, cnt);
2484 nxt -= val->len + sizeof(BtVal);
2485 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2487 key = keyptr(bt->frame, cnt);
2488 nxt -= key->len + sizeof(BtKey);
2489 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2491 // add librarian slot
2493 slotptr(set->page, ++idx)->off = nxt;
2494 slotptr(set->page, idx)->type = Librarian;
2495 slotptr(set->page, idx)->dead = 1;
2499 slotptr(set->page, ++idx)->off = nxt;
2500 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2504 bt_putid(set->page->right, right->latch->page_no);
2505 set->page->min = nxt;
2506 set->page->cnt = idx;
2508 return right->latch->entry;
2511 // fix keys for newly split page
2512 // call with page locked, return
2515 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2517 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2518 unsigned char value[BtId];
2519 uint lvl = set->page->lvl;
2523 // if current page is the root page, split it
2525 if( set->latch->page_no == ROOT_page )
2526 return bt_splitroot (bt, set, right);
2528 ptr = keyptr(set->page, set->page->cnt);
2529 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2531 page = bt_mappage (bt, right);
2533 ptr = keyptr(page, page->cnt);
2534 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2536 // insert new fences in their parent pages
2538 bt_lockpage (bt, BtLockParent, right);
2540 bt_lockpage (bt, BtLockParent, set->latch);
2541 bt_unlockpage (bt, BtLockWrite, set->latch);
2543 // insert new fence for reformulated left block of smaller keys
2545 bt_putid (value, set->latch->page_no);
2546 ptr = (BtKey *)leftkey;
2548 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2551 // switch fence for right block of larger keys to new right page
2553 bt_putid (value, right->page_no);
2554 ptr = (BtKey *)rightkey;
2556 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2559 bt_unlockpage (bt, BtLockParent, set->latch);
2560 bt_unpinlatch (set->latch);
2562 bt_unlockpage (bt, BtLockParent, right);
2563 bt_unpinlatch (right);
2567 // install new key and value onto page
2568 // page must already be checked for
2571 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2573 uint idx, librarian;
2578 // if found slot > desired slot and previous slot
2579 // is a librarian slot, use it
2582 if( slotptr(set->page, slot-1)->type == Librarian )
2585 // copy value onto page
2587 set->page->min -= vallen + sizeof(BtVal);
2588 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2589 memcpy (val->value, value, vallen);
2592 // copy key onto page
2594 set->page->min -= keylen + sizeof(BtKey);
2595 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2596 memcpy (ptr->key, key, keylen);
2599 // find first empty slot
2601 for( idx = slot; idx < set->page->cnt; idx++ )
2602 if( slotptr(set->page, idx)->dead )
2605 // now insert key into array before slot
2607 if( idx == set->page->cnt )
2608 idx += 2, set->page->cnt += 2, librarian = 2;
2612 set->latch->dirty = 1;
2615 while( idx > slot + librarian - 1 )
2616 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2618 // add librarian slot
2620 if( librarian > 1 ) {
2621 node = slotptr(set->page, slot++);
2622 node->off = set->page->min;
2623 node->type = Librarian;
2629 node = slotptr(set->page, slot);
2630 node->off = set->page->min;
2635 bt_unlockpage (bt, BtLockWrite, set->latch);
2636 bt_unpinlatch (set->latch);
2642 // Insert new key into the btree at given level.
2643 // either add a new key or update/add an existing one
2645 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2647 unsigned char newkey[BT_keyarray];
2648 uint slot, idx, len, entry;
2655 // set up the key we're working on
2657 ins = (BtKey*)newkey;
2658 memcpy (ins->key, key, keylen);
2661 // is this a non-unique index value?
2667 sequence = bt_newdup (bt);
2668 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2672 while( 1 ) { // find the page and slot for the current key
2673 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2674 ptr = keyptr(set->page, slot);
2677 bt->line = __LINE__, bt->err = BTERR_ovflw;
2681 // if librarian slot == found slot, advance to real slot
2683 if( slotptr(set->page, slot)->type == Librarian )
2684 if( !keycmp (ptr, key, keylen) )
2685 ptr = keyptr(set->page, ++slot);
2689 if( slotptr(set->page, slot)->type == Duplicate )
2692 // if inserting a duplicate key or unique key
2693 // check for adequate space on the page
2694 // and insert the new key before slot.
2696 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2697 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2698 if( !(entry = bt_splitpage (bt, set)) )
2700 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2705 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2708 // if key already exists, update value and return
2710 val = valptr(set->page, slot);
2712 if( val->len >= vallen ) {
2713 if( slotptr(set->page, slot)->dead )
2715 set->page->garbage += val->len - vallen;
2716 set->latch->dirty = 1;
2717 slotptr(set->page, slot)->dead = 0;
2719 memcpy (val->value, value, vallen);
2720 bt_unlockpage(bt, BtLockWrite, set->latch);
2721 bt_unpinlatch (set->latch);
2725 // new update value doesn't fit in existing value area
2727 if( !slotptr(set->page, slot)->dead )
2728 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2730 slotptr(set->page, slot)->dead = 0;
2734 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2735 if( !(entry = bt_splitpage (bt, set)) )
2737 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2742 set->page->min -= vallen + sizeof(BtVal);
2743 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2744 memcpy (val->value, value, vallen);
2747 set->latch->dirty = 1;
2748 set->page->min -= keylen + sizeof(BtKey);
2749 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2750 memcpy (ptr->key, key, keylen);
2753 slotptr(set->page, slot)->off = set->page->min;
2754 bt_unlockpage(bt, BtLockWrite, set->latch);
2755 bt_unpinlatch (set->latch);
2762 logseqno reqlsn; // redo log seq no required
2763 logseqno lsn; // redo log sequence number
2764 uint entry; // latch table entry number
2765 uint slot:31; // page slot number
2766 uint reuse:1; // reused previous page
2770 uid page_no; // page number for split leaf
2771 void *next; // next key to insert
2772 uint entry:29; // latch table entry number
2773 uint type:2; // 0 == insert, 1 == delete, 2 == free
2774 uint nounlock:1; // don't unlock ParentModification
2775 unsigned char leafkey[BT_keyarray];
2778 // determine actual page where key is located
2779 // return slot number
2781 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2783 BtKey *key = keyptr(source,src);
2784 uint slot = locks[src].slot;
2787 if( src > 1 && locks[src].reuse )
2788 entry = locks[src-1].entry, slot = 0;
2790 entry = locks[src].entry;
2793 set->latch = bt->mgr->latchsets + entry;
2794 set->page = bt_mappage (bt, set->latch);
2798 // is locks->reuse set? or was slot zeroed?
2799 // if so, find where our key is located
2800 // on current page or pages split on
2801 // same page txn operations.
2804 set->latch = bt->mgr->latchsets + entry;
2805 set->page = bt_mappage (bt, set->latch);
2807 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2808 if( slotptr(set->page, slot)->type == Librarian )
2810 if( locks[src].reuse )
2811 locks[src].entry = entry;
2814 } while( entry = set->latch->split );
2816 bt->line = __LINE__, bt->err = BTERR_atomic;
2820 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2822 BtKey *key = keyptr(source, src);
2823 BtVal *val = valptr(source, src);
2828 while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2829 if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) ) {
2830 if( bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2832 set->page->lsn = locks[src].lsn;
2836 if( entry = bt_splitpage (bt, set) )
2837 latch = bt->mgr->latchsets + entry;
2841 // splice right page into split chain
2842 // and WriteLock it.
2844 bt_lockpage(bt, BtLockWrite, latch);
2845 latch->split = set->latch->split;
2846 set->latch->split = entry;
2847 locks[src].slot = 0;
2850 return bt->line = __LINE__, bt->err = BTERR_atomic;
2853 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2855 BtKey *key = keyptr(source, src);
2861 if( slot = bt_atomicpage (bt, source, locks, src, set) )
2862 ptr = keyptr(set->page, slot);
2864 return bt->line = __LINE__, bt->err = BTERR_struct;
2866 if( !keycmp (ptr, key->key, key->len) )
2867 if( !slotptr(set->page, slot)->dead )
2868 slotptr(set->page, slot)->dead = 1;
2874 val = valptr(set->page, slot);
2875 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2876 set->latch->dirty = 1;
2877 set->page->lsn = locks[src].lsn;
2883 // delete an empty master page for a transaction
2885 // note that the far right page never empties because
2886 // it always contains (at least) the infinite stopper key
2887 // and that all pages that don't contain any keys are
2888 // deleted, or are being held under Atomic lock.
2890 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2892 BtPageSet right[1], temp[1];
2893 unsigned char value[BtId];
2897 bt_lockpage(bt, BtLockWrite, prev->latch);
2899 // grab the right sibling
2901 if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), NULL) )
2902 right->page = bt_mappage (bt, right->latch);
2906 bt_lockpage(bt, BtLockAtomic, right->latch);
2907 bt_lockpage(bt, BtLockWrite, right->latch);
2909 // and pull contents over empty page
2910 // while preserving master's left link
2912 memcpy (right->page->left, prev->page->left, BtId);
2913 memcpy (prev->page, right->page, bt->mgr->page_size);
2915 // forward seekers to old right sibling
2916 // to new page location in set
2918 bt_putid (right->page->right, prev->latch->page_no);
2919 right->latch->dirty = 1;
2920 right->page->kill = 1;
2922 // remove pointer to right page for searchers by
2923 // changing right fence key to point to the master page
2925 ptr = keyptr(right->page,right->page->cnt);
2926 bt_putid (value, prev->latch->page_no);
2928 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2931 // now that master page is in good shape we can
2932 // remove its locks.
2934 bt_unlockpage (bt, BtLockAtomic, prev->latch);
2935 bt_unlockpage (bt, BtLockWrite, prev->latch);
2937 // fix master's right sibling's left pointer
2938 // to remove scanner's poiner to the right page
2940 if( right_page_no = bt_getid (prev->page->right) ) {
2941 if( temp->latch = bt_pinlatch (bt, right_page_no, NULL) )
2942 temp->page = bt_mappage (bt, temp->latch);
2944 bt_lockpage (bt, BtLockWrite, temp->latch);
2945 bt_putid (temp->page->left, prev->latch->page_no);
2946 temp->latch->dirty = 1;
2948 bt_unlockpage (bt, BtLockWrite, temp->latch);
2949 bt_unpinlatch (temp->latch);
2950 } else { // master is now the far right page
2951 bt_mutexlock (bt->mgr->lock);
2952 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2953 bt_releasemutex(bt->mgr->lock);
2956 // now that there are no pointers to the right page
2957 // we can delete it after the last read access occurs
2959 bt_unlockpage (bt, BtLockWrite, right->latch);
2960 bt_unlockpage (bt, BtLockAtomic, right->latch);
2961 bt_lockpage (bt, BtLockDelete, right->latch);
2962 bt_lockpage (bt, BtLockWrite, right->latch);
2963 bt_freepage (bt, right);
2967 // atomic modification of a batch of keys.
2969 // return -1 if BTERR is set
2970 // otherwise return slot number
2971 // causing the key constraint violation
2972 // or zero on successful completion.
2974 int bt_atomictxn (BtDb *bt, BtPage source)
2976 uint src, idx, slot, samepage, entry, avail, que = 0;
2977 AtomicKey *head, *tail, *leaf;
2978 BtPageSet set[1], prev[1];
2979 unsigned char value[BtId];
2980 BtKey *key, *ptr, *key2;
2991 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2995 // stable sort the list of keys into order to
2996 // prevent deadlocks between threads.
2998 for( src = 1; src++ < source->cnt; ) {
2999 *temp = *slotptr(source,src);
3000 key = keyptr (source,src);
3002 for( idx = src; --idx; ) {
3003 key2 = keyptr (source,idx);
3004 if( keycmp (key, key2->key, key2->len) < 0 ) {
3005 *slotptr(source,idx+1) = *slotptr(source,idx);
3006 *slotptr(source,idx) = *temp;
3012 // reserve enough buffer pool entries
3014 avail = source->cnt * 3 + bt->mgr->pagezero->alloc->lvl + 1;
3015 bt_availrequest (bt, avail);
3017 // Load the leaf page for each key
3018 // group same page references with reuse bit
3019 // and determine any constraint violations
3021 for( src = 0; src++ < source->cnt; ) {
3022 key = keyptr(source, src);
3025 // first determine if this modification falls
3026 // on the same page as the previous modification
3027 // note that the far right leaf page is a special case
3029 if( samepage = src > 1 )
3030 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
3031 slot = bt_findslot(set->page, key->key, key->len);
3033 bt_unlockpage(bt, BtLockRead, set->latch);
3036 if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) )
3037 set->latch->split = 0;
3041 if( slotptr(set->page, slot)->type == Librarian )
3042 ptr = keyptr(set->page, ++slot);
3044 ptr = keyptr(set->page, slot);
3047 locks[src].entry = set->latch->entry;
3048 locks[src].slot = slot;
3049 locks[src].reuse = 0;
3051 locks[src].entry = 0;
3052 locks[src].slot = 0;
3053 locks[src].reuse = 1;
3056 // capture current lsn for master page
3058 locks[src].reqlsn = set->page->lsn;
3060 // perform constraint checks
3062 switch( slotptr(source, src)->type ) {
3065 if( !slotptr(set->page, slot)->dead )
3066 if( slot < set->page->cnt || bt_getid (set->page->right) )
3067 if( !keycmp (ptr, key->key, key->len) ) {
3069 // return constraint violation if key already exists
3071 bt_unlockpage(bt, BtLockRead, set->latch);
3075 if( locks[src].entry ) {
3076 set->latch = bt->mgr->latchsets + locks[src].entry;
3077 bt_unlockpage(bt, BtLockAtomic, set->latch);
3078 bt_unpinlatch (set->latch);
3089 // unlock last loadpage lock
3092 bt_unlockpage(bt, BtLockRead, set->latch);
3094 // and add entries to redo log
3096 if( bt->mgr->redopages )
3097 if( lsn = bt_txnredo (bt, source) )
3098 for( src = 0; src++ < source->cnt; )
3099 locks[src].lsn = lsn;
3103 // obtain write lock for each master page
3105 for( src = 0; src++ < source->cnt; ) {
3106 if( locks[src].reuse )
3109 bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
3112 // insert or delete each key
3113 // process any splits or merges
3114 // release Write & Atomic latches
3115 // set ParentModifications and build
3116 // queue of keys to insert for split pages
3117 // or delete for deleted pages.
3119 // run through txn list backwards
3121 samepage = source->cnt + 1;
3123 for( src = source->cnt; src; src-- ) {
3124 if( locks[src].reuse )
3127 // perform the txn operations
3128 // from smaller to larger on
3131 for( idx = src; idx < samepage; idx++ )
3132 switch( slotptr(source,idx)->type ) {
3134 if( bt_atomicdelete (bt, source, locks, idx) )
3140 if( bt_atomicinsert (bt, source, locks, idx) )
3145 // after the same page operations have finished,
3146 // process master page for splits or deletion.
3148 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
3149 prev->page = bt_mappage (bt, prev->latch);
3152 // pick-up all splits from master page
3153 // each one is already WriteLocked.
3155 entry = prev->latch->split;
3158 set->latch = bt->mgr->latchsets + entry;
3159 set->page = bt_mappage (bt, set->latch);
3160 entry = set->latch->split;
3162 // delete empty master page by undoing its split
3163 // (this is potentially another empty page)
3164 // note that there are no new left pointers yet
3166 if( !prev->page->act ) {
3167 memcpy (set->page->left, prev->page->left, BtId);
3168 memcpy (prev->page, set->page, bt->mgr->page_size);
3169 bt_lockpage (bt, BtLockDelete, set->latch);
3170 bt_freepage (bt, set);
3172 prev->latch->split = set->latch->split;
3173 prev->latch->dirty = 1;
3177 // remove empty page from the split chain
3178 // and return it to the free list.
3180 if( !set->page->act ) {
3181 memcpy (prev->page->right, set->page->right, BtId);
3182 prev->latch->split = set->latch->split;
3183 bt_lockpage (bt, BtLockDelete, set->latch);
3184 bt_freepage (bt, set);
3188 // schedule prev fence key update
3190 ptr = keyptr(prev->page,prev->page->cnt);
3191 leaf = calloc (sizeof(AtomicKey), 1), que++;
3193 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3194 leaf->page_no = prev->latch->page_no;
3195 leaf->entry = prev->latch->entry;
3205 // splice in the left link into the split page
3207 bt_putid (set->page->left, prev->latch->page_no);
3208 bt_lockpage(bt, BtLockParent, prev->latch);
3209 bt_unlockpage(bt, BtLockWrite, prev->latch);
3213 // update left pointer in next right page from last split page
3214 // (if all splits were reversed, latch->split == 0)
3216 if( latch->split ) {
3217 // fix left pointer in master's original (now split)
3218 // far right sibling or set rightmost page in page zero
3220 if( right = bt_getid (prev->page->right) ) {
3221 if( set->latch = bt_pinlatch (bt, right, NULL) )
3222 set->page = bt_mappage (bt, set->latch);
3226 bt_lockpage (bt, BtLockWrite, set->latch);
3227 bt_putid (set->page->left, prev->latch->page_no);
3228 set->latch->dirty = 1;
3229 bt_unlockpage (bt, BtLockWrite, set->latch);
3230 bt_unpinlatch (set->latch);
3231 } else { // prev is rightmost page
3232 bt_mutexlock (bt->mgr->lock);
3233 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
3234 bt_releasemutex(bt->mgr->lock);
3237 // Process last page split in chain
3239 ptr = keyptr(prev->page,prev->page->cnt);
3240 leaf = calloc (sizeof(AtomicKey), 1), que++;
3242 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3243 leaf->page_no = prev->latch->page_no;
3244 leaf->entry = prev->latch->entry;
3254 bt_lockpage(bt, BtLockParent, prev->latch);
3255 bt_unlockpage(bt, BtLockWrite, prev->latch);
3257 // remove atomic lock on master page
3259 bt_unlockpage(bt, BtLockAtomic, latch);
3263 // finished if prev page occupied (either master or final split)
3265 if( prev->page->act ) {
3266 bt_unlockpage(bt, BtLockWrite, latch);
3267 bt_unlockpage(bt, BtLockAtomic, latch);
3268 bt_unpinlatch(latch);
3272 // any and all splits were reversed, and the
3273 // master page located in prev is empty, delete it
3274 // by pulling over master's right sibling.
3276 // Remove empty master's fence key
3278 ptr = keyptr(prev->page,prev->page->cnt);
3280 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3283 // perform the remainder of the delete
3284 // from the FIFO queue
3286 leaf = calloc (sizeof(AtomicKey), 1), que++;
3288 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3289 leaf->page_no = prev->latch->page_no;
3290 leaf->entry = prev->latch->entry;
3301 // leave atomic lock in place until
3302 // deletion completes in next phase.
3304 bt_unlockpage(bt, BtLockWrite, prev->latch);
3307 bt_availrelease (bt, avail);
3309 que *= bt->mgr->pagezero->alloc->lvl;
3310 bt_availrequest (bt, que);
3312 // add & delete keys for any pages split or merged during transaction
3316 set->latch = bt->mgr->latchsets + leaf->entry;
3317 set->page = bt_mappage (bt, set->latch);
3319 bt_putid (value, leaf->page_no);
3320 ptr = (BtKey *)leaf->leafkey;
3322 switch( leaf->type ) {
3323 case 0: // insert key
3324 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
3329 case 1: // delete key
3330 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3335 case 2: // free page
3336 if( bt_atomicfree (bt, set) )
3342 if( !leaf->nounlock )
3343 bt_unlockpage (bt, BtLockParent, set->latch);
3345 bt_unpinlatch (set->latch);
3348 } while( leaf = tail );
3350 bt_availrelease (bt, que);
3360 // set cursor to highest slot on highest page
3362 uint bt_lastkey (BtDb *bt)
3364 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3367 if( set->latch = bt_pinlatch (bt, page_no, NULL) )
3368 set->page = bt_mappage (bt, set->latch);
3372 bt_lockpage(bt, BtLockRead, set->latch);
3373 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3374 bt_unlockpage(bt, BtLockRead, set->latch);
3375 bt_unpinlatch (set->latch);
3377 bt->cursor_page = page_no;
3378 return bt->cursor->cnt;
3381 // return previous slot on cursor page
3383 uint bt_prevkey (BtDb *bt, uint slot)
3385 uid ourright, next, us = bt->cursor_page;
3391 ourright = bt_getid(bt->cursor->right);
3394 if( !(next = bt_getid(bt->cursor->left)) )
3398 bt->cursor_page = next;
3400 if( set->latch = bt_pinlatch (bt, next, NULL) )
3401 set->page = bt_mappage (bt, set->latch);
3405 bt_lockpage(bt, BtLockRead, set->latch);
3406 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3407 bt_unlockpage(bt, BtLockRead, set->latch);
3408 bt_unpinlatch (set->latch);
3410 next = bt_getid (bt->cursor->right);
3412 if( bt->cursor->kill )
3416 if( next == ourright )
3421 return bt->cursor->cnt;
3424 // return next slot on cursor page
3425 // or slide cursor right into next page
3427 uint bt_nextkey (BtDb *bt, uint slot)
3433 right = bt_getid(bt->cursor->right);
3435 while( slot++ < bt->cursor->cnt )
3436 if( slotptr(bt->cursor,slot)->dead )
3438 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3446 bt->cursor_page = right;
3448 if( set->latch = bt_pinlatch (bt, right, NULL) )
3449 set->page = bt_mappage (bt, set->latch);
3453 bt_lockpage(bt, BtLockRead, set->latch);
3455 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3457 bt_unlockpage(bt, BtLockRead, set->latch);
3458 bt_unpinlatch (set->latch);
3466 // cache page of keys into cursor and return starting slot for given key
3468 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3473 // cache page for retrieval
3475 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
3476 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3480 bt->cursor_page = set->latch->page_no;
3482 bt_unlockpage(bt, BtLockRead, set->latch);
3483 bt_unpinlatch (set->latch);
3487 BtKey *bt_key(BtDb *bt, uint slot)
3489 return keyptr(bt->cursor, slot);
3492 BtVal *bt_val(BtDb *bt, uint slot)
3494 return valptr(bt->cursor,slot);
3500 double getCpuTime(int type)
3503 FILETIME xittime[1];
3504 FILETIME systime[1];
3505 FILETIME usrtime[1];
3506 SYSTEMTIME timeconv[1];
3509 memset (timeconv, 0, sizeof(SYSTEMTIME));
3513 GetSystemTimeAsFileTime (xittime);
3514 FileTimeToSystemTime (xittime, timeconv);
3515 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3518 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3519 FileTimeToSystemTime (usrtime, timeconv);
3522 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3523 FileTimeToSystemTime (systime, timeconv);
3527 ans += (double)timeconv->wHour * 3600;
3528 ans += (double)timeconv->wMinute * 60;
3529 ans += (double)timeconv->wSecond;
3530 ans += (double)timeconv->wMilliseconds / 1000;
3535 #include <sys/resource.h>
3537 double getCpuTime(int type)
3539 struct rusage used[1];
3540 struct timeval tv[1];
3544 gettimeofday(tv, NULL);
3545 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3548 getrusage(RUSAGE_SELF, used);
3549 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3552 getrusage(RUSAGE_SELF, used);
3553 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3560 void bt_poolaudit (BtMgr *mgr)
3565 while( ++entry < mgr->latchtotal ) {
3566 latch = mgr->latchsets + entry;
3568 if( *latch->readwr->rin & MASK )
3569 fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
3571 if( *latch->access->rin & MASK )
3572 fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
3574 if( *latch->parent->exclusive )
3575 fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
3577 if( *latch->atomic->exclusive )
3578 fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
3580 if( *latch->modify->exclusive )
3581 fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
3583 if( latch->pin & ~CLOCK_bit )
3584 fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
3596 // standalone program to index file of keys
3597 // then list them onto std-out
3600 void *index_file (void *arg)
3602 uint __stdcall index_file (void *arg)
3605 int line = 0, found = 0, cnt = 0, idx;
3606 uid next, page_no = LEAF_page; // start on first page of leaves
3607 int ch, len = 0, slot, type = 0;
3608 unsigned char key[BT_maxkey];
3609 unsigned char txn[65536];
3610 ThreadArg *args = arg;
3619 bt = bt_open (args->mgr);
3622 if( args->idx < strlen (args->type) )
3623 ch = args->type[args->idx];
3625 ch = args->type[strlen(args->type) - 1];
3637 if( type == Delete )
3638 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3640 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3642 if( type == Delete )
3643 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3645 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3647 if( in = fopen (args->infile, "rb") )
3648 while( ch = getc(in), ch != EOF )
3654 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3655 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3661 memcpy (txn + nxt, key + 10, len - 10);
3663 txn[nxt] = len - 10;
3665 memcpy (txn + nxt, key, 10);
3668 slotptr(page,++cnt)->off = nxt;
3669 slotptr(page,cnt)->type = type;
3672 if( cnt < args->num )
3679 if( bt_atomictxn (bt, page) )
3680 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3685 else if( len < BT_maxkey )
3687 fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
3691 fprintf(stderr, "started indexing for %s\n", args->infile);
3692 if( in = fopen (args->infile, "r") )
3693 while( ch = getc(in), ch != EOF )
3698 if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3699 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3702 else if( len < BT_maxkey )
3704 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3708 fprintf(stderr, "started finding keys for %s\n", args->infile);
3709 if( in = fopen (args->infile, "rb") )
3710 while( ch = getc(in), ch != EOF )
3714 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3717 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->err, errno, bt->line, line), exit(0);
3720 else if( len < BT_maxkey )
3722 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3726 fprintf(stderr, "started scanning\n");
3729 if( set->latch = bt_pinlatch (bt, page_no, NULL) )
3730 set->page = bt_mappage (bt, set->latch);
3732 fprintf(stderr, "unable to obtain latch"), exit(1);
3733 bt_lockpage (bt, BtLockRead, set->latch);
3734 next = bt_getid (set->page->right);
3736 for( slot = 0; slot++ < set->page->cnt; )
3737 if( next || slot < set->page->cnt )
3738 if( !slotptr(set->page, slot)->dead ) {
3739 ptr = keyptr(set->page, slot);
3742 if( slotptr(set->page, slot)->type == Duplicate )
3745 fwrite (ptr->key, len, 1, stdout);
3746 val = valptr(set->page, slot);
3747 fwrite (val->value, val->len, 1, stdout);
3748 fputc ('\n', stdout);
3752 set->latch->avail = 1;
3753 bt_unlockpage (bt, BtLockRead, set->latch);
3754 bt_unpinlatch (set->latch);
3755 } while( page_no = next );
3757 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3761 fprintf(stderr, "started reverse scan\n");
3762 if( slot = bt_lastkey (bt) )
3763 while( slot = bt_prevkey (bt, slot) ) {
3764 if( slotptr(bt->cursor, slot)->dead )
3767 ptr = keyptr(bt->cursor, slot);
3770 if( slotptr(bt->cursor, slot)->type == Duplicate )
3773 fwrite (ptr->key, len, 1, stdout);
3774 val = valptr(bt->cursor, slot);
3775 fwrite (val->value, val->len, 1, stdout);
3776 fputc ('\n', stdout);
3780 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3785 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3787 fprintf(stderr, "started counting\n");
3788 page_no = LEAF_page;
3790 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3791 if( bt_readpage (bt->mgr, bt->frame, page_no) )
3794 if( !bt->frame->free && !bt->frame->lvl )
3795 cnt += bt->frame->act;
3801 cnt--; // remove stopper key
3802 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3814 typedef struct timeval timer;
3816 int main (int argc, char **argv)
3818 int idx, cnt, len, slot, err;
3819 int segsize, bits = 16;
3837 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size recovery_pages src_file1 src_file2 ... ]\n", argv[0]);
3838 fprintf (stderr, " where idx_file is the name of the btree file\n");
3839 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3840 fprintf (stderr, " page_bits is the page size in bits\n");
3841 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
3842 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3843 fprintf (stderr, " recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3844 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3848 start = getCpuTime(0);
3851 bits = atoi(argv[3]);
3854 poolsize = atoi(argv[4]);
3857 fprintf (stderr, "Warning: no mapped_pool\n");
3860 num = atoi(argv[5]);
3863 recovery = atoi(argv[6]);
3867 threads = malloc (cnt * sizeof(pthread_t));
3869 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3871 args = malloc (cnt * sizeof(ThreadArg));
3873 mgr = bt_mgr ((argv[1]), bits, poolsize, recovery);
3876 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3882 for( idx = 0; idx < cnt; idx++ ) {
3883 args[idx].infile = argv[idx + 7];
3884 args[idx].type = argv[2];
3885 args[idx].mgr = mgr;
3886 args[idx].num = num;
3887 args[idx].idx = idx;
3889 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3890 fprintf(stderr, "Error creating thread %d\n", err);
3892 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3896 // wait for termination
3899 for( idx = 0; idx < cnt; idx++ )
3900 pthread_join (threads[idx], NULL);
3902 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3904 for( idx = 0; idx < cnt; idx++ )
3905 CloseHandle(threads[idx]);
3911 elapsed = getCpuTime(0) - start;
3912 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3913 elapsed = getCpuTime(1);
3914 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3915 elapsed = getCpuTime(2);
3916 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);