1 // btree version threadskv10f futex version
2 // with reworked bt_deletekey code,
3 // phase-fair re-entrant reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // redo log for failure recovery
10 // LSM B-trees for write optimization
11 // larger sized leaf pages than non-leaf
12 // and LSM B-tree find operations
16 // author: karl malbrain, malbrain@cal.berkeley.edu
19 This work, including the source code, documentation
20 and related data, is placed into the public domain.
22 The orginal author is Karl Malbrain.
24 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
25 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
26 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
27 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
28 RESULTING FROM THE USE, MODIFICATION, OR
29 REDISTRIBUTION OF THIS SOFTWARE.
32 // Please see the project home page for documentation
33 // code.google.com/p/high-concurrency-btree
35 #define _FILE_OFFSET_BITS 64
36 #define _LARGEFILE64_SOURCE
40 #include <xmmintrin.h>
41 #include <linux/futex.h>
56 #define WIN32_LEAN_AND_MEAN
70 typedef unsigned long long uid;
71 typedef unsigned long long logseqno;
74 typedef unsigned long long off64_t;
75 typedef unsigned short ushort;
76 typedef unsigned int uint;
79 #define BT_ro 0x6f72 // ro
80 #define BT_rw 0x7772 // rw
82 #define BT_maxbits 26 // maximum page size in bits
83 #define BT_minbits 9 // minimum page size in bits
84 #define BT_minpage (1 << BT_minbits) // minimum page size
85 #define BT_maxpage (1 << BT_maxbits) // maximum page size
87 // BTree page number constants
88 #define ALLOC_page 0 // allocation page
89 #define ROOT_page 1 // root of the btree
90 #define LEAF_page 2 // first page of leaves
92 // Number of levels to create in a new BTree
97 There are six lock types for each node in four independent sets:
98 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
99 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
100 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
101 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
102 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
103 6. (set 4) LinkModification: Exclusive. Update of a node's left link is underway. Incompatible with another LinkModification.
118 volatile unsigned char xcl[1];
119 volatile unsigned char filler;
120 volatile ushort waiters[1];
126 // definition for reader/writer reentrant lock implementation
132 ushort dup; // re-entrant locks
133 ushort tid; // owner thread-no
134 ushort line; // owner line #
137 // hash table entries
141 uint entry; // Latch table entry at head of chain
144 // latch manager table structure
147 uid page_no; // latch set page number
148 RWLock readwr[1]; // read/write page lock
149 RWLock access[1]; // Access Intent/Page delete
150 RWLock parent[1]; // Posting of fence key in parent
151 RWLock link[1]; // left link update in progress
152 MutexLatch modify[1]; // modify entry lite latch
153 uint split; // right split page atomic insert
154 uint next; // next entry in hash table chain
155 uint prev; // prev entry in hash table chain
156 volatile ushort pin; // number of accessing threads
157 unsigned char dirty; // page in cache is dirty
158 unsigned char leaf; // page in cache is a leaf
161 // Define the length of the page record numbers
165 // Page key slot definition.
167 // Keys are marked dead, but remain on the page until
168 // it cleanup is called. The fence key (highest key) for
169 // a leaf page is always present, even after cleanup.
173 // In addition to the Unique keys that occupy slots
174 // there are Librarian and Duplicate key
175 // slots occupying the key slot array.
177 // The Librarian slots are dead keys that
178 // serve as filler, available to add new Unique
179 // or Dup slots that are inserted into the B-tree.
181 // The Duplicate slots have had their key bytes extended
182 // by 6 bytes to contain a binary duplicate key uniqueifier.
193 uint off:BT_maxbits; // page offset for key start
194 uint type:3; // type of slot
195 uint dead:1; // set for deleted slot
198 // The key structure occupies space at the upper end of
199 // each page. It's a length byte followed by the key
203 unsigned char len; // this can be changed to a ushort or uint
204 unsigned char key[0];
207 // the value structure also occupies space at the upper
208 // end of the page. Each key is immediately followed by a value.
211 unsigned char len; // this can be changed to a ushort or uint
212 unsigned char value[0];
215 #define BT_maxkey 255 // maximum number of bytes in a key
216 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
218 // The first part of an index page.
219 // It is immediately followed
220 // by the BtSlot array of keys.
222 typedef struct BtPage_ {
223 uint cnt; // count of keys in page
224 uint act; // count of active keys
225 uint min; // next key offset
226 uint garbage; // page garbage in bytes
227 unsigned char lvl; // level of page
228 unsigned char free; // page is on free chain
229 unsigned char kill; // page is being deleted
230 unsigned char nopromote; // page is being constructed
231 unsigned char filler1[6]; // padding to multiple of 8 bytes
232 unsigned char right[BtId]; // page number to right
233 unsigned char left[BtId]; // page number to left
234 unsigned char filler2[2]; // padding to multiple of 8 bytes
235 logseqno lsn; // log sequence number applied
236 uid page_no; // this page number
239 // The loadpage interface object
242 BtPage page; // current page pointer
243 BtLatchSet *latch; // current page latch set
246 // structure for latch manager on ALLOC_page
249 struct BtPage_ alloc[1]; // next page_no in right ptr
250 unsigned char freechain[BtId]; // head of free page_nos chain
251 unsigned char leafchain[BtId]; // head of leaf page_nos chain
252 unsigned long long leafpages; // number of active leaf pages
253 unsigned long long upperpages; // number of active upper pages
254 uint redopages; // number of redo pages in file
255 unsigned char leaf_xtra; // leaf page size in xtra bits
256 unsigned char page_bits; // base page size in bits
259 // The object structure for Btree access
262 uint page_size; // base page size
263 uint page_bits; // base page size in bits
264 uint leaf_xtra; // leaf xtra bits
270 BtPageZero *pagezero; // mapped allocation page
271 BtHashEntry *hashtable; // the buffer pool hash table entries
272 BtHashEntry *leaftable; // the buffer pool hash table entries
273 BtLatchSet *latchsets; // mapped latch set from buffer pool
274 BtLatchSet *leafsets; // mapped latch set from buffer pool
275 unsigned char *pagepool; // mapped to the buffer pool pages
276 unsigned char *leafpool; // mapped to the leaf pool pages
277 unsigned char *redobuff; // mapped recovery buffer pointer
278 logseqno lsn, flushlsn; // current & first lsn flushed
279 MutexLatch redo[1]; // redo area lite latch
280 MutexLatch lock[1]; // allocation area lite latch
281 MutexLatch maps[1]; // mapping segments lite latch
282 ushort thread_no[1]; // highest thread number issued
283 ushort err_thread; // error thread number
284 uint nlatchpage; // size of buffer pool & latchsets
285 uint latchtotal; // number of page latch entries
286 uint latchhash; // number of latch hash table slots
287 uint latchvictim; // next latch entry to examine
288 uint nleafpage; // size of leaf pool & leafsets
289 uint leaftotal; // number of leaf latch entries
290 uint leafhash; // number of leaf hash table slots
291 uint leafvictim; // next leaf entry to examine
292 uint leafpromote; // next leaf entry to promote
293 uint redopage; // page number of redo buffer
294 uint redolast; // last msync size of recovery buff
295 uint redoend; // eof/end element in recovery buff
296 int err; // last error
297 int line; // last error line no
298 int found; // number of keys found by delete
299 int reads, writes; // number of reads and writes
301 HANDLE halloc; // allocation handle
302 HANDLE hpool; // buffer pool handle
304 volatile uint segments; // number of memory mapped segments
305 unsigned char *pages[64000];// memory mapped segments of b-tree
309 BtMgr *mgr; // buffer manager for entire process
310 BtMgr *main; // buffer manager for main btree
311 BtPage cachecursor; // cached page frame for cache btree
312 BtPage maincursor; // cached page frame for main btree
313 ushort thread_no; // thread number
314 unsigned char key[BT_keyarray]; // last found complete key
317 // atomic txn structures
320 logseqno reqlsn; // redo log seq no required
321 uint entry:31; // latch table entry number
322 uint reuse:1; // reused previous page
323 uint slot; // slot on page
326 // Catastrophic errors
340 #define CLOCK_bit 0x8000
342 // recovery manager entry types
345 BTRM_eof = 0, // rest of buffer is emtpy
346 BTRM_add, // add a unique key-value to btree
347 BTRM_dup, // add a duplicate key-value to btree
348 BTRM_del, // delete a key-value from btree
349 BTRM_upd, // update a key with a new value
350 BTRM_new, // allocate a new empty page
351 BTRM_old // reuse an old empty page
354 // recovery manager entry
355 // structure followed by BtKey & BtVal
358 logseqno reqlsn; // log sequence number required
359 logseqno lsn; // log sequence number for entry
360 uint len; // length of entry
361 unsigned char type; // type of entry
362 unsigned char lvl; // level of btree entry pertains to
367 extern void bt_close (BtDb *bt);
368 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
369 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
370 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
371 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
372 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
373 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
374 extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
376 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
378 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
379 extern uint bt_nextkey (BtDb *bt, uint slot);
380 extern uint bt_prevkey (BtMgr *mgr, BtPage cursor, uint slot, ushort thread_no);
381 extern uint bt_lastkey (BtMgr *mgr, BtPage cursor, ushort thread_no);
384 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
385 extern void bt_mgrclose (BtMgr *mgr);
386 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
387 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
389 // atomic transaction functions
390 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
391 BTERR bt_promote (BtDb *bt);
393 // The page is allocated from low and hi ends.
394 // The key slots are allocated from the bottom,
395 // while the text and value of the key
396 // are allocated from the top. When the two
397 // areas meet, the page is split into two.
399 // A key consists of a length byte, two bytes of
400 // index number (0 - 65535), and up to 253 bytes
403 // Associated with each key is a value byte string
404 // containing any value desired.
406 // The b-tree root is always located at page 1.
407 // The first leaf page of level zero is always
408 // located on page 2.
410 // The b-tree pages are linked with next
411 // pointers to facilitate enumerators,
412 // and provide for concurrency.
414 // When to root page fills, it is split in two and
415 // the tree height is raised by a new root at page
416 // one with two keys.
418 // Deleted keys are marked with a dead bit until
419 // page cleanup. The fence key for a leaf node is
422 // To achieve maximum concurrency one page is locked at a time
423 // as the tree is traversed to find leaf key in question. The right
424 // page numbers are used in cases where the page is being split,
427 // Page 0 is dedicated to lock for new page extensions,
428 // and chains empty pages together for reuse. It also
429 // contains the latch manager hash table.
431 // The ParentModification lock on a node is obtained to serialize posting
432 // or changing the fence key for a node.
434 // Empty pages are chained together through the ALLOC page and reused.
436 // Access macros to address slot and key values from the page
437 // Page slots use 1 based indexing.
439 #define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1))
440 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
441 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
443 void bt_putid(unsigned char *dest, uid id)
448 dest[i] = (unsigned char)id, id >>= 8;
451 uid bt_getid(unsigned char *src)
456 for( i = 0; i < BtId; i++ )
457 id <<= 8, id |= *src++;
462 // lite weight spin lock Latch Manager
464 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
466 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
469 void bt_mutexlock(MutexLatch *latch)
471 uint idx, waited = 0;
475 for( idx = 0; idx < 100; idx++ ) {
476 *prev->value = __sync_fetch_and_or (latch->value, 1);
477 if( !*prev->bits->xcl ) {
479 __sync_fetch_and_sub (latch->bits->waiters, 1);
485 __sync_fetch_and_add (latch->bits->waiters, 1);
486 *prev->bits->waiters += 1;
490 sys_futex (latch->value, FUTEX_WAIT_PRIVATE, *prev->value, NULL, NULL, 0);
494 int bt_mutextry(MutexLatch *latch)
496 return !__sync_lock_test_and_set (latch->bits->xcl, 1);
499 void bt_releasemutex(MutexLatch *latch)
503 *prev->value = __sync_fetch_and_and (latch->value, 0xffff0000);
505 if( *prev->bits->waiters )
506 sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
509 // reader/writer lock implementation
511 void WriteLock (RWLock *lock, ushort tid, uint line)
513 if( lock->tid == tid ) {
517 bt_mutexlock (lock->xcl);
518 bt_mutexlock (lock->wrt);
519 bt_releasemutex (lock->xcl);
525 void WriteRelease (RWLock *lock)
532 bt_releasemutex (lock->wrt);
535 void ReadLock (RWLock *lock, ushort tid)
537 bt_mutexlock (lock->xcl);
539 if( !__sync_fetch_and_add (&lock->readers, 1) )
540 bt_mutexlock (lock->wrt);
542 bt_releasemutex (lock->xcl);
545 void ReadRelease (RWLock *lock)
547 if( __sync_fetch_and_sub (&lock->readers, 1) == 1 )
548 bt_releasemutex (lock->wrt);
551 // recovery manager -- flush dirty pages
553 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
555 uint cnt3 = 0, cnt2 = 0, cnt = 0;
560 // flush dirty pool pages to the btree
562 fprintf(stderr, "Start flushlsn ");
563 for( entry = 1; entry < mgr->latchtotal; entry++ ) {
564 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
565 latch = mgr->latchsets + entry;
566 bt_mutexlock (latch->modify);
567 bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
570 bt_writepage(mgr, page, latch->page_no, 0);
571 latch->dirty = 0, cnt++;
573 if( latch->pin & ~CLOCK_bit )
575 bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
576 bt_releasemutex (latch->modify);
578 for( entry = 1; entry < mgr->leaftotal; entry++ ) {
579 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
580 latch = mgr->leafsets + entry;
581 bt_mutexlock (latch->modify);
582 bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
585 bt_writepage(mgr, page, latch->page_no, 1);
586 latch->dirty = 0, cnt++;
588 if( latch->pin & ~CLOCK_bit )
590 bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
591 bt_releasemutex (latch->modify);
593 fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
594 fprintf(stderr, "begin sync");
595 for( segment = 0; segment < mgr->segments; segment++ )
596 if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
597 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
598 fprintf(stderr, " end sync\n");
601 // recovery manager -- process current recovery buff on startup
602 // this won't do much if previous session was properly closed.
604 BTERR bt_recoveryredo (BtMgr *mgr)
611 hdr = (BtLogHdr *)mgr->redobuff;
612 mgr->flushlsn = hdr->lsn;
615 hdr = (BtLogHdr *)(mgr->redobuff + offset);
616 switch( hdr->type ) {
620 case BTRM_add: // add a unique key-value to btree
622 case BTRM_dup: // add a duplicate key-value to btree
623 case BTRM_del: // delete a key-value from btree
624 case BTRM_upd: // update a key with a new value
625 case BTRM_new: // allocate a new empty page
626 case BTRM_old: // reuse an old empty page
632 // recovery manager -- append new entry to recovery log
633 // flush dirty pages to disk when it overflows.
635 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
637 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
638 uint amt = sizeof(BtLogHdr);
642 bt_mutexlock (mgr->redo);
645 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
647 // see if new entry fits in buffer
648 // flush and reset if it doesn't
650 if( amt > size - mgr->redoend ) {
651 mgr->flushlsn = mgr->lsn;
652 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
653 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
656 bt_flushlsn(mgr, thread_no);
659 // fill in new entry & either eof or end block
661 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
666 hdr->lsn = ++mgr->lsn;
670 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
671 memset (eof, 0, sizeof(BtLogHdr));
673 // fill in key and value
676 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
677 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
680 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
681 memset (eof, 0, sizeof(BtLogHdr));
684 last = mgr->redolast & ~0xfff;
687 if( end - last + sizeof(BtLogHdr) >= 32768 )
688 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
689 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
693 bt_releasemutex(mgr->redo);
697 // recovery manager -- append transaction to recovery log
698 // flush dirty pages to disk when it overflows.
700 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
702 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
703 uint amt = 0, src, type;
710 // determine amount of redo recovery log space required
712 for( src = 0; src++ < source->cnt; ) {
713 key = keyptr(source,src);
714 val = valptr(source,src);
715 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
716 amt += sizeof(BtLogHdr);
719 bt_mutexlock (mgr->redo);
721 // see if new entry fits in buffer
722 // flush and reset if it doesn't
724 if( amt > size - mgr->redoend ) {
725 mgr->flushlsn = mgr->lsn;
726 if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
727 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
730 bt_flushlsn (mgr, thread_no);
733 // assign new lsn to transaction
737 // fill in new entries
739 for( src = 0; src++ < source->cnt; ) {
740 key = keyptr(source, src);
741 val = valptr(source, src);
743 switch( slotptr(source, src)->type ) {
755 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
756 amt += sizeof(BtLogHdr);
758 hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
764 // fill in key and value
766 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
767 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
772 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
773 memset (eof, 0, sizeof(BtLogHdr));
776 last = mgr->redolast & ~0xfff;
779 if( end - last + sizeof(BtLogHdr) >= 32768 )
780 if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
781 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
785 bt_releasemutex(mgr->redo);
789 // sync a single btree page to disk
791 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
793 uint segment = latch->page_no >> 16;
794 uint page_size = mgr->page_size;
797 if( bt_writepage (mgr, page, latch->page_no, latch->leaf) )
801 page_size <<= mgr->leaf_xtra;
803 perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
805 if( msync (perm, page_size, MS_SYNC) < 0 )
806 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
812 // read page into buffer pool from permanent location in Btree file
814 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
816 int flag = PROT_READ | PROT_WRITE;
817 uint page_size = mgr->page_size;
818 uint off = 0, segment, fragment;
822 page_size <<= mgr->leaf_xtra;
824 fragment = page_no & 0xffff;
825 segment = page_no >> 16;
828 while( off < page_size ) {
830 segment++, fragment = 0;
832 if( segment < mgr->segments ) {
833 perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
835 memcpy ((unsigned char *)page + off, perm, mgr->page_size);
836 off += mgr->page_size;
841 bt_mutexlock (mgr->maps);
843 if( segment < mgr->segments ) {
844 bt_releasemutex (mgr->maps);
848 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
851 bt_releasemutex (mgr->maps);
857 // write page to permanent location in Btree file
859 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
861 int flag = PROT_READ | PROT_WRITE;
862 uint page_size = mgr->page_size;
863 uint off = 0, segment, fragment;
867 page_size <<= mgr->leaf_xtra;
869 fragment = page_no & 0xffff;
870 segment = page_no >> 16;
873 while( off < page_size ) {
875 segment++, fragment = 0;
877 if( segment < mgr->segments ) {
878 perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
879 memcpy (perm, (unsigned char *)page + off, mgr->page_size);
880 off += mgr->page_size;
885 bt_mutexlock (mgr->maps);
887 if( segment < mgr->segments ) {
888 bt_releasemutex (mgr->maps);
892 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
894 bt_releasemutex (mgr->maps);
900 // set CLOCK bit in latch
901 // decrement pin count
905 void bt_unpinlatch (BtLatchSet *latch, ushort thread_no, uint line)
907 bt_mutexlock(latch->modify);
908 latch->pin |= CLOCK_bit;
911 bt_releasemutex(latch->modify);
914 // return the btree cached page address
916 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
918 uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets);
922 page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
924 page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
929 // return next available leaf entry
930 // and with latch entry locked
932 uint bt_availleaf (BtMgr *mgr)
939 entry = __sync_fetch_and_add (&mgr->leafvictim, 1) + 1;
941 entry = _InterlockedIncrement (&mgr->leafvictim);
943 entry %= mgr->leaftotal;
948 latch = mgr->leafsets + entry;
950 if( !bt_mutextry(latch->modify) )
953 // return this entry if it is not pinned
958 // if the CLOCK bit is set
961 latch->pin &= ~CLOCK_bit;
962 bt_releasemutex(latch->modify);
966 // return next available latch entry
967 // and with latch entry locked
969 uint bt_availnext (BtMgr *mgr)
976 entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
978 entry = _InterlockedIncrement (&mgr->latchvictim);
980 entry %= mgr->latchtotal;
985 latch = mgr->latchsets + entry;
987 if( !bt_mutextry(latch->modify) )
990 // return this entry if it is not pinned
995 // if the CLOCK bit is set
998 latch->pin &= ~CLOCK_bit;
999 bt_releasemutex(latch->modify);
1003 // pin leaf in leaf buffer pool
1004 // return with latchset pinned
1006 BtLatchSet *bt_pinleaf (BtMgr *mgr, uid page_no, ushort thread_no)
1008 uint hashidx = page_no % mgr->leafhash;
1013 // try to find our entry
1015 bt_mutexlock(mgr->leaftable[hashidx].latch);
1017 if( entry = mgr->leaftable[hashidx].entry )
1019 latch = mgr->leafsets + entry;
1021 if( page_no == latch->page_no )
1023 } while( entry = latch->next );
1025 // found our entry: increment pin
1028 latch = mgr->leafsets + entry;
1029 bt_mutexlock(latch->modify);
1030 latch->pin |= CLOCK_bit;
1033 bt_releasemutex(latch->modify);
1034 bt_releasemutex(mgr->leaftable[hashidx].latch);
1038 // find and reuse unpinned entry
1041 entry = bt_availleaf (mgr);
1042 latch = mgr->leafsets + entry;
1044 idx = latch->page_no % mgr->leafhash;
1046 // if latch is on a different hash chain
1047 // unlink from the old page_no chain
1049 if( latch->page_no )
1050 if( idx != hashidx ) {
1052 // skip over this entry if latch not available
1054 if( !bt_mutextry (mgr->leaftable[idx].latch) ) {
1055 bt_releasemutex(latch->modify);
1060 mgr->leafsets[latch->prev].next = latch->next;
1062 mgr->leaftable[idx].entry = latch->next;
1065 mgr->leafsets[latch->next].prev = latch->prev;
1067 bt_releasemutex (mgr->leaftable[idx].latch);
1070 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1072 // update permanent page area in btree from buffer pool
1073 // no read-lock is required since page is not pinned.
1076 if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) )
1077 return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1081 if( bt_readpage (mgr, page, page_no, 1) )
1082 return mgr->line = __LINE__, NULL;
1084 // link page as head of hash table chain
1085 // if this is a never before used entry,
1086 // or it was previously on a different
1087 // hash table chain. Otherwise, just
1088 // leave it in its current hash table
1091 if( !latch->page_no || hashidx != idx ) {
1092 if( latch->next = mgr->leaftable[hashidx].entry )
1093 mgr->leafsets[latch->next].prev = entry;
1095 mgr->leaftable[hashidx].entry = entry;
1099 // fill in latch structure
1101 latch->pin = CLOCK_bit | 1;
1102 latch->page_no = page_no;
1105 bt_releasemutex (latch->modify);
1106 bt_releasemutex (mgr->leaftable[hashidx].latch);
1110 // pin page in non-leaf buffer pool
1111 // return with latchset pinned
1113 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, ushort thread_no)
1115 uint hashidx = page_no % mgr->latchhash;
1120 // try to find our entry
1122 bt_mutexlock(mgr->hashtable[hashidx].latch);
1124 if( entry = mgr->hashtable[hashidx].entry ) do
1126 latch = mgr->latchsets + entry;
1128 if( page_no == latch->page_no )
1130 } while( entry = latch->next );
1132 // found our entry: increment pin
1135 latch = mgr->latchsets + entry;
1136 bt_mutexlock(latch->modify);
1137 latch->pin |= CLOCK_bit;
1139 bt_releasemutex(latch->modify);
1140 bt_releasemutex(mgr->hashtable[hashidx].latch);
1144 // find and reuse unpinned entry
1147 entry = bt_availnext (mgr);
1148 latch = mgr->latchsets + entry;
1150 idx = latch->page_no % mgr->latchhash;
1152 // if latch is on a different hash chain
1153 // unlink from the old page_no chain
1155 if( latch->page_no )
1156 if( idx != hashidx ) {
1158 // skip over this entry if latch not available
1160 if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1161 bt_releasemutex(latch->modify);
1166 mgr->latchsets[latch->prev].next = latch->next;
1168 mgr->hashtable[idx].entry = latch->next;
1171 mgr->latchsets[latch->next].prev = latch->prev;
1173 bt_releasemutex (mgr->hashtable[idx].latch);
1176 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1178 // update permanent page area in btree from buffer pool
1179 // no read-lock is required since page is not pinned.
1182 if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1183 return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1187 if( bt_readpage (mgr, page, page_no, 0) )
1188 return mgr->line = __LINE__, NULL;
1190 // link page as head of hash table chain
1191 // if this is a never before used entry,
1192 // or it was previously on a different
1193 // hash table chain. Otherwise, just
1194 // leave it in its current hash table
1197 if( !latch->page_no || hashidx != idx ) {
1198 if( latch->next = mgr->hashtable[hashidx].entry )
1199 mgr->latchsets[latch->next].prev = entry;
1201 mgr->hashtable[hashidx].entry = entry;
1205 // fill in latch structure
1207 latch->pin = CLOCK_bit | 1;
1208 latch->page_no = page_no;
1211 bt_releasemutex (latch->modify);
1212 bt_releasemutex (mgr->hashtable[hashidx].latch);
1216 void bt_mgrclose (BtMgr *mgr)
1224 // flush previously written dirty pages
1225 // and write recovery buffer to disk
1227 fdatasync (mgr->idx);
1229 if( mgr->redoend ) {
1230 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1231 memset (eof, 0, sizeof(BtLogHdr));
1234 // write remaining dirty pool pages to the btree
1236 for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1237 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1238 latch = mgr->latchsets + slot;
1240 if( latch->dirty ) {
1241 bt_writepage(mgr, page, latch->page_no, 0);
1242 latch->dirty = 0, num++;
1246 // write remaining dirty leaf pages to the btree
1248 for( slot = 1; slot < mgr->leaftotal; slot++ ) {
1249 page = (BtPage)(((uid)slot << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1250 latch = mgr->leafsets + slot;
1252 if( latch->dirty ) {
1253 bt_writepage(mgr, page, latch->page_no, 1);
1254 latch->dirty = 0, num++;
1258 // clear redo recovery buffer on disk.
1260 if( mgr->pagezero->redopages ) {
1261 eof = (BtLogHdr *)mgr->redobuff;
1262 memset (eof, 0, sizeof(BtLogHdr));
1263 eof->lsn = mgr->lsn;
1264 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1265 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1268 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1271 while( mgr->segments )
1272 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1274 munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1275 munmap (mgr->leafpool, (uid)mgr->nleafpage << mgr->page_bits);
1276 munmap (mgr->pagezero, mgr->page_size);
1278 FlushViewOfFile(mgr->pagezero, 0);
1279 UnmapViewOfFile(mgr->pagezero);
1280 UnmapViewOfFile(mgr->pagepool);
1281 CloseHandle(mgr->halloc);
1282 CloseHandle(mgr->hpool);
1288 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1289 FlushFileBuffers(mgr->idx);
1290 CloseHandle(mgr->idx);
1295 // close and release memory
1297 void bt_close (BtDb *bt)
1300 if( bt->cachecursor )
1301 free (bt->cachecursor);
1302 if( bt->maincursor )
1303 free (bt->maincursor);
1305 if( bt->cachecursor)
1306 VirtualFree (bt->cachecursor, 0, MEM_RELEASE);
1308 VirtualFree (bt->maincursor, 0, MEM_RELEASE);
1313 // open/create new btree buffer manager
1315 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1316 // size of page pool (e.g. 262144) and leaf pool
1318 BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax, uint leafmax, uint redopages)
1320 uint lvl, attr, last, slot, idx;
1321 uint nlatchpage, latchhash;
1322 uint nleafpage, leafhash;
1323 unsigned char value[BtId];
1324 int flag, initit = 0;
1325 BtPageZero *pagezero;
1333 // determine sanity of page size and buffer pool
1335 if( leafxtra + pagebits > BT_maxbits )
1336 fprintf (stderr, "pagebits + leafxtra > maxbits\n"), exit(1);
1338 if( pagebits < BT_minbits )
1339 fprintf (stderr, "pagebits < minbits\n"), exit(1);
1342 mgr = calloc (1, sizeof(BtMgr));
1344 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1346 if( mgr->idx == -1 ) {
1347 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1348 return free(mgr), NULL;
1351 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1352 attr = FILE_ATTRIBUTE_NORMAL;
1353 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1355 if( mgr->idx == INVALID_HANDLE_VALUE ) {
1356 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1357 return GlobalFree(mgr), NULL;
1362 pagezero = valloc (BT_maxpage);
1365 // read minimum page size to get root info
1366 // to support raw disk partition files
1367 // check if page_bits == 0 on the disk.
1369 if( size = lseek (mgr->idx, 0L, 2) )
1370 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1371 if( pagezero->page_bits ) {
1372 pagebits = pagezero->page_bits;
1373 leafxtra = pagezero->leaf_xtra;
1377 return free(mgr), free(pagezero), NULL;
1381 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1382 size = GetFileSize(mgr->idx, amt);
1384 if( size || *amt ) {
1385 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1386 return bt_mgrclose (mgr), NULL;
1387 pagebits = pagezero->page_bits;
1388 leafxtra = pagezero->leaf_xtra;
1393 mgr->page_size = 1 << pagebits;
1394 mgr->page_bits = pagebits;
1395 mgr->leaf_xtra = leafxtra;
1397 // calculate number of latch hash table entries
1399 mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1401 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1402 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1) >> mgr->page_bits;
1403 mgr->latchtotal = nodemax;
1405 // calculate number of leaf hash table entries
1407 mgr->nleafpage = ((uid)leafmax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1409 mgr->nleafpage += leafmax << leafxtra; // size of the leaf pool in pages
1410 mgr->nleafpage += (sizeof(BtLatchSet) * leafmax + mgr->page_size - 1) >> mgr->page_bits;
1411 mgr->leaftotal = leafmax;
1413 mgr->redopage = LEAF_page + (1 << leafxtra);
1418 // initialize an empty b-tree with latch page, root page, page of leaves
1419 // and page(s) of latches and page pool cache
1421 ftruncate (mgr->idx, (mgr->redopage + pagezero->redopages) << mgr->page_bits);
1422 memset (pagezero, 0, 1 << pagebits);
1423 pagezero->alloc->lvl = MIN_lvl - 1;
1424 pagezero->redopages = redopages;
1425 pagezero->page_bits = mgr->page_bits;
1426 pagezero->leaf_xtra = leafxtra;
1428 bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages);
1429 pagezero->upperpages = 1;
1430 pagezero->leafpages = 1;
1432 // initialize left-most LEAF page in
1433 // alloc->left and count of active leaf pages.
1435 bt_putid (pagezero->alloc->left, LEAF_page);
1437 if( bt_writepage (mgr, pagezero->alloc, 0, 0) ) {
1438 fprintf (stderr, "Unable to create btree page zero\n");
1439 return bt_mgrclose (mgr), NULL;
1442 memset (pagezero, 0, 1 << pagebits);
1444 for( lvl=MIN_lvl; lvl--; ) {
1445 BtSlot *node = slotptr(pagezero->alloc, 1);
1446 node->off = mgr->page_size;
1449 node->off <<= mgr->leaf_xtra;
1451 node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1452 node->type = Librarian;
1455 node->off = node[-1].off;
1456 key = keyptr(pagezero->alloc, 2);
1457 key = keyptr(pagezero->alloc, 1);
1458 key->len = 2; // create stopper key
1462 bt_putid(value, MIN_lvl - lvl + 1);
1463 val = valptr(pagezero->alloc, 1);
1464 val->len = lvl ? BtId : 0;
1465 memcpy (val->value, value, val->len);
1467 pagezero->alloc->min = node->off;
1468 pagezero->alloc->lvl = lvl;
1469 pagezero->alloc->cnt = 2;
1470 pagezero->alloc->act = 1;
1471 pagezero->alloc->page_no = MIN_lvl - lvl;
1473 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, !lvl) ) {
1474 fprintf (stderr, "Unable to create btree page\n");
1475 return bt_mgrclose (mgr), NULL;
1483 VirtualFree (pagezero, 0, MEM_RELEASE);
1486 // mlock the first segment of 64K pages
1488 flag = PROT_READ | PROT_WRITE;
1489 mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1492 if( mgr->pages[0] == MAP_FAILED ) {
1493 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1494 return bt_mgrclose (mgr), NULL;
1497 mgr->pagezero = (BtPageZero *)mgr->pages[0];
1498 mlock (mgr->pagezero, mgr->page_size);
1500 mgr->redobuff = mgr->pages[0] + (mgr->redopage << mgr->page_bits);
1501 mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1503 // allocate pool buffers
1505 mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1506 if( mgr->pagepool == MAP_FAILED ) {
1507 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1508 return bt_mgrclose (mgr), NULL;
1511 mgr->leafpool = mmap (0, (uid)mgr->nleafpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1512 if( mgr->leafpool == MAP_FAILED ) {
1513 fprintf (stderr, "Unable to mmap anonymous leaf pool pages, error = %d\n", errno);
1514 return bt_mgrclose (mgr), NULL;
1517 mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1518 mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1519 mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1521 mgr->leafsets = (BtLatchSet *)(mgr->leafpool + ((uid)mgr->leaftotal << mgr->page_bits << mgr->leaf_xtra));
1522 mgr->leaftable = (BtHashEntry *)(mgr->leafsets + mgr->leaftotal);
1523 mgr->leafhash = (mgr->leafpool + ((uid)mgr->nleafpage << mgr->page_bits) - (unsigned char *)mgr->leaftable) / sizeof(BtHashEntry);
1528 // open BTree access method
1529 // based on buffer manager
1531 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1533 BtDb *bt = malloc (sizeof(*bt));
1535 memset (bt, 0, sizeof(*bt));
1539 bt->cachecursor = valloc (mgr->page_size << mgr->leaf_xtra);
1540 bt->maincursor = valloc (mgr->page_size << mgr->leaf_xtra);
1542 bt->cachecursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
1543 bt->maincursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
1546 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1548 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1553 // compare two keys, return > 0, = 0, or < 0
1554 // =0: keys are same
1557 // as the comparison value
1559 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1561 uint len1 = key1->len;
1564 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1575 // place write, read, or parent lock on requested page_no.
1577 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1581 ReadLock (latch->readwr, thread_no);
1584 WriteLock (latch->readwr, thread_no, line);
1587 ReadLock (latch->access, thread_no);
1590 WriteLock (latch->access, thread_no, line);
1593 WriteLock (latch->parent, thread_no, line);
1596 WriteLock (latch->link, thread_no, line);
1601 // remove write, read, or parent lock on requested page
1603 void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1607 ReadRelease (latch->readwr);
1610 WriteRelease (latch->readwr);
1613 ReadRelease (latch->access);
1616 WriteRelease (latch->access);
1619 WriteRelease (latch->parent);
1622 WriteRelease (latch->link);
1627 // allocate a new page
1628 // return with page latched, but unlocked.
1630 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_no)
1632 uint page_size = mgr->page_size, page_xtra = 0;
1633 unsigned char *freechain;
1636 if( contents->lvl ) {
1637 freechain = mgr->pagezero->freechain;
1638 mgr->pagezero->upperpages++;
1640 freechain = mgr->pagezero->leafchain;
1641 mgr->pagezero->leafpages++;
1642 page_xtra = mgr->leaf_xtra;
1643 page_size <<= page_xtra;
1646 // lock allocation page
1648 bt_mutexlock(mgr->lock);
1650 // use empty chain first
1651 // else allocate new page
1653 if( page_no = bt_getid(freechain) ) {
1654 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1655 set->page = bt_mappage (mgr, set->latch);
1657 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1659 bt_putid(freechain, bt_getid(set->page->right));
1661 // the page is currently nopromote and this
1662 // will keep bt_promote out.
1664 // contents will replace this bit
1665 // and pin will keep bt_promote out
1667 contents->page_no = page_no;
1668 contents->nopromote = 0;
1669 set->latch->dirty = 1;
1671 memcpy (set->page, contents, page_size);
1673 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1674 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1676 bt_releasemutex(mgr->lock);
1680 page_no = bt_getid(mgr->pagezero->alloc->right);
1681 bt_putid(mgr->pagezero->alloc->right, page_no+(1 << page_xtra));
1683 // unlock allocation latch and
1684 // extend file into new page.
1686 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1687 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1688 bt_releasemutex(mgr->lock);
1690 // keep bt_promote out of this page
1691 contents->nopromote = 1;
1692 contents->page_no = page_no;
1693 if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
1694 fprintf(stderr, "Write %d error %d\n", (uint)page_no, errno);
1695 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1696 set->page = bt_mappage (mgr, set->latch);
1698 return mgr->err_thread = thread_no, mgr->err;
1700 // now pin will keep bt_promote out
1702 set->page->nopromote = 0;
1703 set->latch->dirty = 1;
1707 // find slot in page for given key at a given level
1709 int bt_findslot (BtPage page, unsigned char *key, uint len)
1711 uint diff, higher = page->cnt, low = 1, slot;
1714 // make stopper key an infinite fence value
1716 if( bt_getid (page->right) )
1721 // low is the lowest candidate.
1722 // loop ends when they meet
1724 // higher is already
1725 // tested as .ge. the passed key.
1727 while( diff = higher - low ) {
1728 slot = low + ( diff >> 1 );
1729 if( keycmp (keyptr(page, slot), key, len) < 0 )
1732 higher = slot, good++;
1735 // return zero if key is on right link page
1737 return good ? higher : 0;
1740 // find and load page at given level for given key
1741 // leave page rd or wr locked as requested
1743 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1745 uid page_no = ROOT_page, prevpage_no = 0;
1746 uint drill = 0xff, slot;
1747 BtLatchSet *prevlatch;
1748 uint mode, prevmode;
1753 // start at root of btree and drill down
1756 // determine lock mode of drill level
1757 mode = (drill == lvl) ? lock : BtLockRead;
1759 if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1760 set->page = bt_mappage (mgr, set->latch);
1764 // obtain access lock using lock chaining with Access mode
1766 if( page_no > ROOT_page )
1767 bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1769 // release & unpin parent or left sibling page
1772 bt_unlockpage(prevmode, prevlatch, thread_no, __LINE__);
1773 bt_unpinlatch (prevlatch, thread_no, __LINE__);
1777 // obtain mode lock using lock coupling through AccessLock
1779 bt_lockpage(mode, set->latch, thread_no, __LINE__);
1781 // grab our fence key
1783 ptr=keyptr(set->page,set->page->cnt);
1785 if( set->page->free )
1786 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1788 if( page_no > ROOT_page )
1789 bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1791 // re-read and re-lock root after determining actual level of root
1793 if( set->page->lvl != drill) {
1794 if( set->latch->page_no != ROOT_page )
1795 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1797 drill = set->page->lvl;
1799 if( lock != BtLockRead && drill == lvl ) {
1800 bt_unlockpage(mode, set->latch, thread_no, __LINE__);
1801 bt_unpinlatch (set->latch, thread_no, __LINE__);
1806 prevpage_no = set->latch->page_no;
1807 prevlatch = set->latch;
1808 prevpage = set->page;
1811 // if requested key is beyond our fence,
1812 // slide to the right
1814 if( keycmp (ptr, key, len) < 0 )
1815 if( page_no = bt_getid(set->page->right) )
1818 // if page is part of a delete operation,
1819 // slide to the left;
1821 if( set->page->kill ) {
1822 bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
1823 page_no = bt_getid(set->page->left);
1824 bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
1828 // find key on page at this level
1829 // and descend to requested level
1831 if( slot = bt_findslot (set->page, key, len) ) {
1835 // find next non-dead slot -- the fence key if nothing else
1837 while( slotptr(set->page, slot)->dead )
1838 if( slot++ < set->page->cnt )
1841 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1843 val = valptr(set->page, slot);
1845 if( val->len == BtId )
1846 page_no = bt_getid(val->value);
1848 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0;
1854 // slide right into next page
1856 page_no = bt_getid(set->page->right);
1860 // return error on end of right chain
1862 mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1863 return 0; // return error
1866 // return page to free list
1867 // page must be delete & write locked
1868 // and have no keys pointing to it.
1870 void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1872 unsigned char *freechain;
1874 if( set->page->lvl ) {
1875 freechain = mgr->pagezero->freechain;
1876 mgr->pagezero->upperpages--;
1878 freechain = mgr->pagezero->leafchain;
1879 mgr->pagezero->leafpages--;
1882 // lock allocation page
1884 bt_mutexlock (mgr->lock);
1888 memcpy(set->page->right, freechain, BtId);
1889 bt_putid(freechain, set->latch->page_no);
1890 set->latch->dirty = 1;
1891 set->page->free = 1;
1893 // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1894 // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1896 // unlock released page
1897 // and unlock allocation page
1899 bt_unlockpage (BtLockDelete, set->latch, thread_no, __LINE__);
1900 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1901 bt_unpinlatch (set->latch, thread_no, __LINE__);
1902 bt_releasemutex (mgr->lock);
1905 // a fence key was deleted from a page
1906 // push new fence value upwards
1908 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1910 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1911 unsigned char value[BtId];
1915 // remove the old fence value
1917 ptr = keyptr(set->page, set->page->cnt);
1918 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1919 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1920 set->latch->dirty = 1;
1922 // cache new fence value
1924 ptr = keyptr(set->page, set->page->cnt);
1925 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1927 bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
1928 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1930 // insert new (now smaller) fence key
1932 bt_putid (value, set->latch->page_no);
1933 ptr = (BtKey*)leftkey;
1935 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1936 return mgr->err_thread = thread_no, mgr->err;
1938 // now delete old fence key
1940 ptr = (BtKey*)rightkey;
1942 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1943 return mgr->err_thread = thread_no, mgr->err;
1945 bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
1946 bt_unpinlatch(set->latch, thread_no, __LINE__);
1950 // root has a single child
1951 // collapse a level from the tree
1953 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1960 // find the child entry and promote as new root contents
1963 for( idx = 0; idx++ < root->page->cnt; )
1964 if( !slotptr(root->page, idx)->dead )
1967 val = valptr(root->page, idx);
1969 if( val->len == BtId )
1970 page_no = bt_getid (valptr(root->page, idx)->value);
1972 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1974 if( child->latch = bt_pinlatch (mgr, page_no, thread_no) )
1975 child->page = bt_mappage (mgr, child->latch);
1977 return mgr->err_thread = thread_no, mgr->err;
1979 bt_lockpage (BtLockDelete, child->latch, thread_no, __LINE__);
1980 bt_lockpage (BtLockWrite, child->latch, thread_no, __LINE__);
1982 memcpy (root->page, child->page, mgr->page_size);
1983 root->latch->dirty = 1;
1985 bt_freepage (mgr, child, thread_no);
1987 } while( root->page->lvl > 1 && root->page->act == 1 );
1989 bt_unlockpage (BtLockWrite, root->latch, thread_no, __LINE__);
1990 bt_unpinlatch (root->latch, thread_no, __LINE__);
1994 // delete a page and manage key
1995 // call with page writelocked
1997 // returns with page unpinned
1998 // from the page pool.
2000 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl)
2002 unsigned char lowerfence[BT_keyarray];
2003 uint page_size = mgr->page_size;
2004 BtPageSet right[1], temp[1];
2005 unsigned char value[BtId];
2006 uid page_no, right2;
2010 page_size <<= mgr->leaf_xtra;
2012 // cache copy of original fence key
2013 // that is being deleted.
2015 ptr = keyptr(set->page, set->page->cnt);
2016 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
2018 // obtain locks on right page
2020 page_no = bt_getid(set->page->right);
2022 if( right->latch = lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
2023 right->page = bt_mappage (mgr, right->latch);
2027 bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2029 if( right->page->kill )
2030 return mgr->line = __LINE__, mgr->err = BTERR_struct;
2032 // pull contents of right peer into our empty page
2033 // preserving our left page number, and its right page number.
2035 bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
2036 page_no = bt_getid(set->page->left);
2037 memcpy (set->page, right->page, page_size);
2038 bt_putid (set->page->left, page_no);
2039 bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
2041 set->page->page_no = set->latch->page_no;
2042 set->latch->dirty = 1;
2044 // fix left link from far right page
2046 if( right2 = bt_getid (right->page->right) ) {
2047 if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2048 temp->page = bt_mappage (mgr, temp->latch);
2052 bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2053 bt_putid (temp->page->left, set->latch->page_no);
2054 bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2055 bt_unpinlatch (temp->latch, thread_no, __LINE__);
2056 } else { // our page is now rightmost
2057 bt_mutexlock (mgr->lock);
2058 bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
2059 bt_releasemutex(mgr->lock);
2062 // mark right page deleted and release lock
2064 right->latch->dirty = 1;
2065 right->page->kill = 1;
2066 bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2068 // redirect higher key directly to our new node contents
2070 ptr = keyptr(right->page, right->page->cnt);
2071 bt_putid (value, set->latch->page_no);
2073 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
2076 // delete our orignal fence key in parent
2077 // and unlock our page.
2079 ptr = (BtKey *)lowerfence;
2081 if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
2084 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2085 bt_unpinlatch (set->latch, thread_no, __LINE__);
2087 // obtain delete and write locks to right node
2089 bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
2090 bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2091 bt_freepage (mgr, right, thread_no);
2095 // find and delete key on page by marking delete flag bit
2096 // if page becomes empty, delete it from the btree
2098 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2100 uint slot, idx, found, fence;
2106 if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2107 node = slotptr(set->page, slot);
2108 ptr = keyptr(set->page, slot);
2110 return mgr->err_thread = thread_no, mgr->err;
2112 // if librarian slot, advance to real slot
2114 if( node->type == Librarian ) {
2115 ptr = keyptr(set->page, ++slot);
2116 node = slotptr(set->page, slot);
2119 fence = slot == set->page->cnt;
2121 // delete the key, ignore request if already dead
2123 if( found = !keycmp (ptr, key, len) )
2124 if( found = node->dead == 0 ) {
2125 val = valptr(set->page,slot);
2126 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2129 // mark node type as delete
2131 node->type = Delete;
2134 // collapse empty slots beneath the fence
2135 // on interiour nodes
2138 while( idx = set->page->cnt - 1 )
2139 if( slotptr(set->page, idx)->dead ) {
2140 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2141 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2149 // did we delete a fence key in an upper level?
2151 if( lvl && set->page->act && fence )
2152 if( bt_fixfence (mgr, set, lvl, thread_no) )
2153 return mgr->err_thread = thread_no, mgr->err;
2157 // do we need to collapse root?
2159 if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2160 if( bt_collapseroot (mgr, set, thread_no) )
2161 return mgr->err_thread = thread_no, mgr->err;
2165 // delete empty page
2167 if( !set->page->act )
2168 return bt_deletepage (mgr, set, thread_no, set->page->lvl);
2170 set->latch->dirty = 1;
2171 bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2172 bt_unpinlatch (set->latch, thread_no, __LINE__);
2176 // check page for space available,
2177 // clean if necessary and return
2178 // 0 - page needs splitting
2179 // >0 new slot value
2181 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2183 uint page_size = mgr->page_size;
2184 BtPage page = set->page, frame;
2185 uint cnt = 0, idx = 0;
2186 uint max = page->cnt;
2191 if( !set->page->lvl )
2192 page_size <<= mgr->leaf_xtra;
2194 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2197 // skip cleanup and proceed to split
2198 // if there's not enough garbage
2201 if( page->garbage < page_size / 5 )
2204 frame = malloc (page_size);
2205 memcpy (frame, page, page_size);
2207 // skip page info and set rest of page to zero
2209 memset (page+1, 0, page_size - sizeof(*page));
2210 set->latch->dirty = 1;
2212 page->min = page_size;
2216 // clean up page first by
2217 // removing deleted keys
2219 while( cnt++ < max ) {
2223 if( cnt < max || frame->lvl )
2224 if( slotptr(frame,cnt)->dead )
2227 // copy the value across
2229 val = valptr(frame, cnt);
2230 page->min -= val->len + sizeof(BtVal);
2231 memcpy ((unsigned char *)page + page->min, val, val->len + sizeof(BtVal));
2233 // copy the key across
2235 key = keyptr(frame, cnt);
2236 page->min -= key->len + sizeof(BtKey);
2237 memcpy ((unsigned char *)page + page->min, key, key->len + sizeof(BtKey));
2239 // make a librarian slot
2241 slotptr(page, ++idx)->off = page->min;
2242 slotptr(page, idx)->type = Librarian;
2243 slotptr(page, idx)->dead = 1;
2247 slotptr(page, ++idx)->off = page->min;
2248 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2250 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2257 // see if page has enough space now, or does it need splitting?
2259 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2265 // split the root and raise the height of the btree
2267 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort thread_no)
2269 unsigned char leftkey[BT_keyarray];
2270 uint nxt = mgr->page_size;
2271 unsigned char value[BtId];
2278 frame = malloc (mgr->page_size);
2279 memcpy (frame, root->page, mgr->page_size);
2281 // save left page fence key for new root
2283 ptr = keyptr(root->page, root->page->cnt);
2284 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2286 // Obtain an empty page to use, and copy the current
2287 // root contents into it, e.g. lower keys
2289 if( bt_newpage(mgr, left, frame, thread_no) )
2290 return mgr->err_thread = thread_no, mgr->err;
2292 left_page_no = left->latch->page_no;
2293 bt_unpinlatch (left->latch, thread_no, __LINE__);
2296 // left link the pages together
2298 page = bt_mappage (mgr, right);
2299 bt_putid (page->left, left_page_no);
2301 // preserve the page info at the bottom
2302 // of higher keys and set rest to zero
2304 memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2306 // insert stopper key at top of newroot page
2307 // and increase the root height
2309 nxt -= BtId + sizeof(BtVal);
2310 bt_putid (value, right->page_no);
2311 val = (BtVal *)((unsigned char *)root->page + nxt);
2312 memcpy (val->value, value, BtId);
2315 nxt -= 2 + sizeof(BtKey);
2316 slotptr(root->page, 2)->off = nxt;
2317 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2322 // insert lower keys page fence key on newroot page as first key
2324 nxt -= BtId + sizeof(BtVal);
2325 bt_putid (value, left_page_no);
2326 val = (BtVal *)((unsigned char *)root->page + nxt);
2327 memcpy (val->value, value, BtId);
2330 ptr = (BtKey *)leftkey;
2331 nxt -= ptr->len + sizeof(BtKey);
2332 slotptr(root->page, 1)->off = nxt;
2333 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2335 bt_putid(root->page->right, 0);
2336 root->page->min = nxt; // reset lowest used offset and key count
2337 root->page->cnt = 2;
2338 root->page->act = 2;
2341 mgr->pagezero->alloc->lvl = root->page->lvl;
2343 // release and unpin root pages
2345 bt_unlockpage(BtLockWrite, root->latch, thread_no, __LINE__);
2346 bt_unpinlatch (root->latch, thread_no, __LINE__);
2348 bt_unpinlatch (right, thread_no, __LINE__);
2352 // split already locked full node
2354 // return pool entry for new right
2355 // page, pinned & unlocked
2357 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint linkleft)
2359 uint page_size = mgr->page_size;
2360 BtPageSet right[1], temp[1];
2361 uint cnt = 0, idx = 0, max;
2362 uint lvl = set->page->lvl;
2370 if( !set->page->lvl )
2371 page_size <<= mgr->leaf_xtra;
2373 // split higher half of keys to frame
2375 frame = malloc (page_size);
2376 memset (frame, 0, page_size);
2377 frame->min = page_size;
2378 max = set->page->cnt;
2382 while( cnt++ < max ) {
2383 if( cnt < max || set->page->lvl )
2384 if( slotptr(set->page, cnt)->dead )
2387 val = valptr(set->page, cnt);
2388 frame->min -= val->len + sizeof(BtVal);
2389 memcpy ((unsigned char *)frame + frame->min, val, val->len + sizeof(BtVal));
2391 key = keyptr(set->page, cnt);
2392 frame->min -= key->len + sizeof(BtKey);
2393 memcpy ((unsigned char *)frame + frame->min, key, key->len + sizeof(BtKey));
2395 // add librarian slot
2397 slotptr(frame, ++idx)->off = frame->min;
2398 slotptr(frame, idx)->type = Librarian;
2399 slotptr(frame, idx)->dead = 1;
2403 slotptr(frame, ++idx)->off = frame->min;
2404 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2406 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2415 if( set->latch->page_no > ROOT_page ) {
2416 right2 = bt_getid (set->page->right);
2417 bt_putid (frame->right, right2);
2420 bt_putid (frame->left, set->latch->page_no);
2423 // get new free page and write higher keys to it.
2425 if( bt_newpage(mgr, right, frame, thread_no) )
2428 // link far right's left pointer to new page
2430 if( linkleft && set->latch->page_no > ROOT_page )
2432 if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2433 temp->page = bt_mappage (mgr, temp->latch);
2437 bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2438 bt_putid (temp->page->left, right->latch->page_no);
2439 bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2440 bt_unpinlatch (temp->latch, thread_no, __LINE__);
2441 } else { // page is rightmost
2442 bt_mutexlock (mgr->lock);
2443 bt_putid (mgr->pagezero->alloc->left, right->latch->page_no);
2444 bt_releasemutex(mgr->lock);
2447 // process lower keys
2449 memcpy (frame, set->page, page_size);
2450 memset (set->page+1, 0, page_size - sizeof(*set->page));
2451 set->latch->dirty = 1;
2453 set->page->min = page_size;
2454 set->page->garbage = 0;
2460 // assemble page of smaller keys
2462 while( cnt++ < max ) {
2463 if( slotptr(frame, cnt)->dead )
2465 val = valptr(frame, cnt);
2466 set->page->min -= val->len + sizeof(BtVal);
2467 memcpy ((unsigned char *)set->page + set->page->min, val, val->len + sizeof(BtVal));
2469 key = keyptr(frame, cnt);
2470 set->page->min -= key->len + sizeof(BtKey);
2471 memcpy ((unsigned char *)set->page + set->page->min, key, key->len + sizeof(BtKey));
2473 // add librarian slot
2475 slotptr(set->page, ++idx)->off = set->page->min;
2476 slotptr(set->page, idx)->type = Librarian;
2477 slotptr(set->page, idx)->dead = 1;
2481 slotptr(set->page, ++idx)->off = set->page->min;
2482 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2486 bt_putid(set->page->right, right->latch->page_no);
2487 set->page->cnt = idx;
2490 entry = right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
2494 // fix keys for newly split page
2495 // call with both pages pinned & locked
2496 // return unlocked and unpinned
2498 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2500 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2501 unsigned char value[BtId];
2502 uint lvl = set->page->lvl;
2508 // if current page is the root page, split it
2510 if( set->latch->page_no == ROOT_page )
2511 return bt_splitroot (mgr, set, right, thread_no);
2513 ptr = keyptr(set->page, set->page->cnt);
2514 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2516 page = bt_mappage (mgr, right);
2518 ptr = keyptr(page, page->cnt);
2519 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2521 // splice in far right page's left page_no
2523 if( right2 = bt_getid (page->right) ) {
2524 if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2525 temp->page = bt_mappage (mgr, temp->latch);
2529 bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2530 bt_putid (temp->page->left, right->page_no);
2531 bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2532 bt_unpinlatch (temp->latch, thread_no, __LINE__);
2533 } else { // right page is far right page
2534 bt_mutexlock (mgr->lock);
2535 bt_putid (mgr->pagezero->alloc->left, right->page_no);
2536 bt_releasemutex(mgr->lock);
2538 // insert new fences in their parent pages
2540 bt_lockpage (BtLockParent, right, thread_no, __LINE__);
2542 bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
2543 bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2545 // insert new fence for reformulated left block of smaller keys
2547 bt_putid (value, set->latch->page_no);
2548 ptr = (BtKey *)leftkey;
2550 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2551 return mgr->err_thread = thread_no, mgr->err;
2553 // switch fence for right block of larger keys to new right page
2555 bt_putid (value, right->page_no);
2556 ptr = (BtKey *)rightkey;
2558 if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2559 return mgr->err_thread = thread_no, mgr->err;
2561 bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
2562 bt_unpinlatch (set->latch, thread_no, __LINE__);
2564 bt_unlockpage (BtLockParent, right, thread_no, __LINE__);
2565 bt_unpinlatch (right, thread_no, __LINE__);
2569 // install new key and value onto page
2570 // page must already be checked for
2573 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2575 uint idx, librarian;
2581 // if previous slot is a librarian slot, use it
2584 if( slotptr(set->page, slot-1)->type == Librarian )
2587 // copy value onto page
2589 set->page->min -= vallen + sizeof(BtVal);
2590 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2591 memcpy (val->value, value, vallen);
2594 // copy key onto page
2596 set->page->min -= keylen + sizeof(BtKey);
2597 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2598 memcpy (ptr->key, key, keylen);
2601 // find first empty slot at or above our insert slot
2603 for( idx = slot; idx < set->page->cnt; idx++ )
2604 if( slotptr(set->page, idx)->dead )
2607 // now insert key into array before slot.
2609 // if we're going all the way to the top,
2610 // add as many librarian slots as
2613 if( idx == set->page->cnt ) {
2614 int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2616 librarian = ++idx - slot;
2617 avail /= sizeof(BtSlot);
2622 if( librarian > avail )
2626 rate = (idx - slot) / librarian;
2627 set->page->cnt += librarian;
2632 librarian = 0, rate = 0;
2634 // transfer slots and add librarian slots
2636 while( idx > slot ) {
2637 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2639 // add librarian slot per rate
2642 if( (idx - slot)/2 <= librarian * rate ) {
2643 node = slotptr(set->page, --idx);
2644 node->off = node[1].off;
2645 node->type = Librarian;
2653 set->latch->dirty = 1;
2658 node = slotptr(set->page, slot);
2659 node->off = set->page->min;
2665 // Insert new key into the btree at given level.
2666 // either add a new key or update/add an existing one
2668 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2670 uint slot, idx, len, entry;
2676 while( 1 ) { // find the page and slot for the current key
2677 if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2678 node = slotptr(set->page, slot);
2679 ptr = keyptr(set->page, slot);
2683 // if librarian slot == found slot, advance to real slot
2685 if( node->type == Librarian ) {
2686 node = slotptr(set->page, ++slot);
2687 ptr = keyptr(set->page, slot);
2690 // if inserting a duplicate key or unique
2691 // key that doesn't exist on the page,
2692 // check for adequate space on the page
2693 // and insert the new key before slot.
2698 if( !keycmp (ptr, key, keylen) )
2701 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2702 if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
2707 if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2708 if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2711 return mgr->err_thread = thread_no, mgr->err;
2714 if( keycmp (ptr, key, keylen) )
2720 // if key already exists, update value and return
2722 val = valptr(set->page, slot);
2724 if( val->len >= vallen ) {
2730 set->page->garbage += val->len - vallen;
2731 set->latch->dirty = 1;
2733 memcpy (val->value, value, vallen);
2737 // new update value doesn't fit in existing value area
2738 // make sure page has room
2741 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2748 if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2751 if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2752 if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2755 return mgr->err_thread = thread_no, mgr->err;
2758 // copy key and value onto page and update slot
2760 set->page->min -= vallen + sizeof(BtVal);
2761 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2762 memcpy (val->value, value, vallen);
2765 set->latch->dirty = 1;
2766 set->page->min -= keylen + sizeof(BtKey);
2767 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2768 memcpy (ptr->key, key, keylen);
2771 slotptr(set->page,slot)->off = set->page->min;
2774 bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2775 bt_unpinlatch (set->latch, thread_no, __LINE__);
2779 // determine actual page where key is located
2780 // return slot number
2782 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2784 BtKey *key = keyptr(source,src), *ptr;
2785 uint slot = locks[src].slot;
2788 if( locks[src].reuse )
2789 entry = locks[src-1].entry;
2791 entry = locks[src].entry;
2794 set->latch = mgr->leafsets + entry;
2795 set->page = bt_mappage (mgr, set->latch);
2799 // find where our key is located
2800 // on current page or pages split on
2801 // same page txn operations.
2804 set->latch = mgr->leafsets + entry;
2805 set->page = bt_mappage (mgr, set->latch);
2807 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2808 if( slotptr(set->page, slot)->type == Librarian )
2810 if( locks[src].reuse )
2811 locks[src].entry = entry;
2814 } while( entry = set->latch->split );
2816 mgr->line = __LINE__, mgr->err = BTERR_atomic;
2820 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2822 BtKey *key = keyptr(source, src);
2823 BtVal *val = valptr(source, src);
2828 while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2829 if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2830 if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type) )
2831 return mgr->err_thread = thread_no, mgr->err;
2833 set->page->lsn = lsn;
2839 if( entry = bt_splitpage (mgr, set, thread_no, 0) )
2840 latch = mgr->leafsets + entry;
2844 // splice right page into split chain
2847 bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2848 latch->split = set->latch->split;
2849 set->latch->split = entry;
2851 // clear slot number for atomic page
2853 locks[src].slot = 0;
2856 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
2859 // perform delete from smaller btree
2860 // insert a delete slot if not found there
2862 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2864 BtKey *key = keyptr(source, src);
2871 if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2872 node = slotptr(set->page, slot);
2873 ptr = keyptr(set->page, slot);
2874 val = valptr(set->page, slot);
2876 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
2878 // if slot is not found, insert a delete slot
2880 if( keycmp (ptr, key->key, key->len) )
2881 if( bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete) )
2884 // if node is already dead,
2885 // ignore the request.
2890 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2891 set->latch->dirty = 1;
2892 set->page->lsn = lsn;
2896 __sync_fetch_and_add(&mgr->found, 1);
2900 // release master's splits from right to left
2902 void bt_atomicrelease (BtMgr *mgr, uint entry, ushort thread_no)
2904 BtLatchSet *latch = mgr->leafsets + entry;
2907 bt_atomicrelease (mgr, latch->split, thread_no);
2910 bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
2911 bt_unpinlatch(latch, thread_no, __LINE__);
2914 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2916 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2917 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2919 return keycmp (key1, key2->key, key2->len);
2921 // atomic modification of a batch of keys.
2923 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2925 uint src, idx, slot, samepage, entry, que = 0;
2926 BtKey *key, *ptr, *key2;
2932 // stable sort the list of keys into order to
2933 // prevent deadlocks between threads.
2935 for( src = 1; src++ < source->cnt; ) {
2936 *temp = *slotptr(source,src);
2937 key = keyptr (source,src);
2939 for( idx = src; --idx; ) {
2940 key2 = keyptr (source,idx);
2941 if( keycmp (key, key2->key, key2->len) < 0 ) {
2942 *slotptr(source,idx+1) = *slotptr(source,idx);
2943 *slotptr(source,idx) = *temp;
2949 qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2950 // add entries to redo log
2952 if( bt->mgr->pagezero->redopages )
2953 lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2957 // perform the individual actions in the transaction
2959 if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2960 return bt->mgr->err;
2962 // if number of active pages
2963 // is greater than the buffer pool
2964 // promote page into larger btree
2967 while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no )
2968 if( bt_promote (bt) )
2969 return bt->mgr->err;
2976 // execute the source list of inserts/deletes
2978 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2980 uint slot, src, idx, samepage, entry;
2981 BtPageSet set[1], prev[1];
2982 unsigned char value[BtId];
2990 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2992 // Load the leaf page for each key
2993 // group same page references with reuse bit
2995 for( src = 0; src++ < source->cnt; ) {
2996 key = keyptr(source, src);
2998 // first determine if this modification falls
2999 // on the same page as the previous modification
3000 // note that the far right leaf page is a special case
3002 if( samepage = src > 1 )
3003 samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
3006 if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
3007 ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
3014 if( slotptr(set->page, slot)->type == Librarian )
3017 entry = set->latch - mgr->leafsets;
3018 locks[src].reuse = samepage;
3019 locks[src].entry = entry;
3020 locks[src].slot = slot;
3022 // capture current lsn for master page
3024 locks[src].reqlsn = set->page->lsn;
3027 // insert or delete each key
3028 // process any splits or merges
3029 // run through txn list backwards
3031 samepage = source->cnt + 1;
3033 for( src = source->cnt; src; src-- ) {
3034 if( locks[src].reuse )
3037 // perform the txn operations
3038 // from smaller to larger on
3041 for( idx = src; idx < samepage; idx++ )
3042 switch( slotptr(source,idx)->type ) {
3044 if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
3050 if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
3055 bt_atomicpage (mgr, source, locks, idx, set);
3059 // after the same page operations have finished,
3060 // process master page for splits or deletion.
3062 latch = prev->latch = mgr->leafsets + locks[src].entry;
3063 prev->page = bt_mappage (mgr, prev->latch);
3066 // pick-up all splits from master page
3067 // each one is already pinned & WriteLocked.
3069 while( entry = prev->latch->split ) {
3070 set->latch = mgr->leafsets + entry;
3071 set->page = bt_mappage (mgr, set->latch);
3073 // delete empty master page by undoing its split
3074 // (this is potentially another empty page)
3075 // note that there are no pointers to it yet
3077 if( !prev->page->act ) {
3078 memcpy (set->page->left, prev->page->left, BtId);
3079 memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
3080 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3081 prev->latch->split = set->latch->split;
3082 prev->latch->dirty = 1;
3083 bt_freepage (mgr, set, thread_no);
3087 // remove empty split page from the split chain
3088 // and return it to the free list. No other
3089 // thread has its page number yet.
3091 if( !set->page->act ) {
3092 memcpy (prev->page->right, set->page->right, BtId);
3093 prev->latch->split = set->latch->split;
3095 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3096 bt_freepage (mgr, set, thread_no);
3100 // update prev's fence key
3102 ptr = keyptr(prev->page,prev->page->cnt);
3103 bt_putid (value, prev->latch->page_no);
3105 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3108 // splice in the left link into the split page
3110 bt_putid (set->page->left, prev->latch->page_no);
3113 bt_syncpage (mgr, prev->page, prev->latch);
3118 // update left pointer in next right page from last split page
3119 // (if all splits were reversed or none occurred, latch->split == 0)
3121 if( latch->split ) {
3122 // fix left pointer in master's original (now split)
3123 // far right sibling or set rightmost page in page zero
3125 if( right_page_no = bt_getid (prev->page->right) ) {
3126 if( set->latch = bt_pinleaf (mgr, right_page_no, thread_no) )
3127 set->page = bt_mappage (mgr, set->latch);
3131 bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3132 bt_putid (set->page->left, prev->latch->page_no);
3133 set->latch->dirty = 1;
3135 bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
3136 bt_unpinlatch (set->latch, thread_no, __LINE__);
3137 } else { // prev is rightmost page
3138 bt_mutexlock (mgr->lock);
3139 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3140 bt_releasemutex(mgr->lock);
3143 // switch the original fence key from the
3144 // master page to the last split page.
3146 ptr = keyptr(prev->page,prev->page->cnt);
3147 bt_putid (value, prev->latch->page_no);
3149 if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) )
3153 bt_syncpage (mgr, prev->page, prev->latch);
3155 // unlock and unpin the split pages
3157 bt_atomicrelease (mgr, latch->split, thread_no);
3159 // unlock and unpin the master page
3162 bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
3163 bt_unpinlatch(latch, thread_no, __LINE__);
3167 // since there are no splits remaining, we're
3168 // finished if master page occupied
3170 if( prev->page->act ) {
3171 bt_unlockpage(BtLockWrite, prev->latch, thread_no, __LINE__);
3174 bt_syncpage (mgr, prev->page, prev->latch);
3176 bt_unpinlatch(prev->latch, thread_no, __LINE__);
3180 // any and all splits were reversed, and the
3181 // master page located in prev is empty, delete it
3183 if( bt_deletepage (mgr, prev, thread_no, 0) )
3191 // pick & promote a page into the larger btree
3193 BTERR bt_promote (BtDb *bt)
3195 uint entry, slot, idx;
3203 entry = __sync_fetch_and_add(&bt->mgr->leafpromote, 1);
3205 entry = _InterlockedIncrement (&bt->mgr->leafpromote) - 1;
3207 entry %= bt->mgr->leaftotal;
3212 set->latch = bt->mgr->leafsets + entry;
3214 // skip this entry if it has never been used
3216 if( !set->latch->page_no )
3219 if( !bt_mutextry(set->latch->modify) )
3222 // skip this entry if it is pinned
3224 if( set->latch->pin & ~CLOCK_bit ) {
3225 bt_releasemutex(set->latch->modify);
3229 set->page = bt_mappage (bt->mgr, set->latch);
3231 // entry has no right sibling
3233 if( !bt_getid (set->page->right) ) {
3234 bt_releasemutex(set->latch->modify);
3238 // entry interiour node or being killed or constructed
3240 if( set->page->lvl || set->page->nopromote || set->page->kill ) {
3241 bt_releasemutex(set->latch->modify);
3245 // pin the page for our access
3246 // and leave it locked for the
3247 // duration of the promotion.
3250 bt_lockpage (BtLockWrite, set->latch, bt->thread_no, __LINE__);
3251 bt_releasemutex(set->latch->modify);
3253 // transfer slots in our selected page to the main btree
3254 if( !(entry % 100) )
3255 fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_no, set->page->act);
3257 if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
3258 fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
3259 return bt->main->err;
3262 // now delete the page
3264 if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
3265 fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread);
3267 return bt->mgr->err;
3271 // find unique key == given key, or first duplicate key in
3272 // leaf level and return number of value bytes
3273 // or (-1) if not found.
3275 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
3284 for( type = 0; type < 2; type++ )
3285 if( slot = bt_loadpage (type ? bt->main : bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) ) {
3286 node = slotptr(set->page, slot);
3288 // skip librarian slot place holder
3290 if( node->type == Librarian )
3291 node = slotptr(set->page, ++slot);
3293 ptr = keyptr(set->page, slot);
3295 // not there if we reach the stopper key
3296 // or the key doesn't match what's on the page.
3298 if( slot == set->page->cnt )
3299 if( !bt_getid (set->page->right) ) {
3300 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3301 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3305 if( keycmp (ptr, key, keylen) ) {
3306 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3307 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3311 // key matches, return >= 0 value bytes copied
3312 // or -1 if not there.
3314 if( node->type == Delete || node->dead ) {
3319 val = valptr (set->page,slot);
3321 if( valmax > val->len )
3324 memcpy (value, val->value, valmax);
3333 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3334 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3339 // set cursor to highest slot on highest page
3341 uint bt_lastkey (BtMgr *mgr, BtPage cursor, ushort thread_no)
3343 uid page_no = bt_getid (mgr->pagezero->alloc->left);
3346 if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
3347 set->page = bt_mappage (mgr, set->latch);
3351 bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3352 memcpy (cursor, set->page, mgr->page_size << mgr->leaf_xtra);
3353 bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3354 bt_unpinlatch (set->latch, thread_no, __LINE__);
3358 // return previous slot on cursor page
3360 uint bt_prevkey (BtMgr *mgr, BtPage cursor, uint slot, ushort thread_no)
3362 uid cursor_page = cursor->page_no;
3363 uid ourright, next, us = cursor_page;
3369 ourright = bt_getid(cursor->right);
3372 if( !(next = bt_getid(cursor->left)) )
3378 if( set->latch = bt_pinleaf (mgr, next, thread_no) )
3379 set->page = bt_mappage (mgr, set->latch);
3383 bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3384 memcpy (cursor, set->page, mgr->page_size << mgr->leaf_xtra);
3385 bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3386 bt_unpinlatch (set->latch, thread_no, __LINE__);
3388 next = bt_getid (cursor->right);
3394 if( next == ourright )
3402 // return next slot on cursor page
3403 // or slide cursor right into next page
3405 uint bt_nextkey (BtDb *bt, uint slot)
3411 right = bt_getid(bt->cachecursor->right);
3413 while( slot++ < bt->cachecursor->cnt )
3414 if( slotptr(bt->cachecursor,slot)->dead )
3416 else if( right || (slot < bt->cachecursor->cnt) ) // skip infinite stopper
3424 if( set->latch = bt_pinleaf (bt->mgr, right, bt->thread_no) )
3425 set->page = bt_mappage (bt->mgr, set->latch);
3429 bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3430 memcpy (bt->cachecursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3431 bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3432 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3437 return bt->mgr->err = 0;
3440 // cache page of keys into cursor and return starting slot for given key
3442 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3447 // cache page for retrieval
3449 if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
3450 memcpy (bt->cachecursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
3454 bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3455 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3462 double getCpuTime(int type)
3465 FILETIME xittime[1];
3466 FILETIME systime[1];
3467 FILETIME usrtime[1];
3468 SYSTEMTIME timeconv[1];
3471 memset (timeconv, 0, sizeof(SYSTEMTIME));
3475 GetSystemTimeAsFileTime (xittime);
3476 FileTimeToSystemTime (xittime, timeconv);
3477 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3480 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3481 FileTimeToSystemTime (usrtime, timeconv);
3484 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3485 FileTimeToSystemTime (systime, timeconv);
3489 ans += (double)timeconv->wHour * 3600;
3490 ans += (double)timeconv->wMinute * 60;
3491 ans += (double)timeconv->wSecond;
3492 ans += (double)timeconv->wMilliseconds / 1000;
3497 #include <sys/resource.h>
3499 double getCpuTime(int type)
3501 struct rusage used[1];
3502 struct timeval tv[1];
3506 gettimeofday(tv, NULL);
3507 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3510 getrusage(RUSAGE_SELF, used);
3511 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3514 getrusage(RUSAGE_SELF, used);
3515 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3522 void bt_poolaudit (BtMgr *mgr, char *type)
3524 BtLatchSet *latch, test[1];
3527 memset (test, 0, sizeof(test));
3529 if( memcmp (test, mgr->latchsets, sizeof(test)) )
3530 fprintf(stderr, "%s latchset zero overwritten\n", type);
3532 if( memcmp (test, mgr->leafsets, sizeof(test)) )
3533 fprintf(stderr, "%s leafset zero overwritten\n", type);
3535 for( entry = 0; ++entry < mgr->latchtotal; ) {
3536 latch = mgr->latchsets + entry;
3538 if( *latch->modify->value )
3539 fprintf(stderr, "%s latchset %d modifylocked for page %d\n", type, entry, latch->page_no);
3541 if( latch->pin & ~CLOCK_bit )
3542 fprintf(stderr, "%s latchset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3545 for( entry = 0; ++entry < mgr->leaftotal; ) {
3546 latch = mgr->leafsets + entry;
3548 if( *latch->modify->value )
3549 fprintf(stderr, "%s leafset %d modifylocked for page %d\n", type, entry, latch->page_no);
3551 if( latch->pin & ~CLOCK_bit )
3552 fprintf(stderr, "%s leafset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3565 // standalone program to index file of keys
3566 // then list them onto std-out
3569 void *index_file (void *arg)
3571 uint __stdcall index_file (void *arg)
3574 int line = 0, found = 0, cnt = 0, idx;
3575 uid next, page_no = LEAF_page; // start on first page of leaves
3576 int ch, len = 0, slot, type = 0;
3577 unsigned char key[BT_maxkey];
3578 unsigned char txn[65536];
3579 ThreadArg *args = arg;
3588 bt = bt_open (args->mgr, args->main);
3591 if( args->idx < strlen (args->type) )
3592 ch = args->type[args->idx];
3594 ch = args->type[strlen(args->type) - 1];
3606 if( type == Delete )
3607 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3609 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3611 if( type == Delete )
3612 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3614 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3616 if( in = fopen (args->infile, "rb") )
3617 while( ch = getc(in), ch != EOF )
3623 if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3624 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3630 memcpy (txn + nxt, key + 10, len - 10);
3632 txn[nxt] = len - 10;
3634 memcpy (txn + nxt, key, 10);
3637 slotptr(page,++cnt)->off = nxt;
3638 slotptr(page,cnt)->type = type;
3641 if( cnt < args->num )
3648 if( bt_atomictxn (bt, page) )
3649 fprintf(stderr, "Error %d Line: %d by %d source: %d\n", bt->mgr->err, bt->mgr->line, bt->mgr->err_thread, line), exit(0);
3654 else if( len < BT_maxkey )
3656 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3660 fprintf(stderr, "started indexing for %s\n", args->infile);
3661 if( in = fopen (args->infile, "r") )
3662 while( ch = getc(in), ch != EOF )
3667 if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3668 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3671 else if( len < BT_maxkey )
3673 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3677 fprintf(stderr, "started finding keys for %s\n", args->infile);
3678 if( in = fopen (args->infile, "rb") )
3679 while( ch = getc(in), ch != EOF )
3683 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3685 else if( bt->mgr->err )
3686 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3689 else if( len < BT_maxkey )
3691 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3695 fprintf(stderr, "started forward scan\n");
3696 if( slot = bt_startkey (bt, NULL, 0) )
3698 if( slotptr(bt->cachecursor, slot)->type == Librarian )
3701 ptr = keyptr(bt->cachecursor, slot);
3704 if( slotptr(bt->cachecursor, slot)->type == Duplicate )
3707 fwrite (ptr->key, len, 1, stdout);
3708 val = valptr(bt->cachecursor, slot);
3709 fwrite (val->value, val->len, 1, stdout);
3710 fputc ('\n', stdout);
3712 } while( slot = bt_nextkey (bt, slot) );
3714 fprintf(stderr, " Total keys read %d\n", cnt);
3718 fprintf(stderr, "started reverse scan\n");
3719 if( slot = bt_lastkey (bt->mgr, bt->cachecursor, bt->thread_no) )
3720 while( slot = bt_prevkey (bt->mgr, bt->cachecursor, slot, bt->thread_no) ) {
3721 if( slotptr(bt->cachecursor, slot)->dead )
3724 ptr = keyptr(bt->cachecursor, slot);
3727 if( slotptr(bt->cachecursor, slot)->type == Duplicate )
3730 fwrite (ptr->key, len, 1, stdout);
3731 val = valptr(bt->cachecursor, slot);
3732 fwrite (val->value, val->len, 1, stdout);
3733 fputc ('\n', stdout);
3737 fprintf(stderr, " Total keys read %d\n", cnt);
3749 typedef struct timeval timer;
3751 int main (int argc, char **argv)
3753 int idx, cnt, len, slot, err;
3761 uint mainleafpool = 0;
3762 uint mainleafxtra = 0;
3778 fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
3779 fprintf (stderr, " where idx_file is the name of the cache btree file\n");
3780 fprintf (stderr, " where main_file is the name of the main btree file\n");
3781 fprintf (stderr, " cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with a one character command for each input src_file. Commands can also be given with no input file\n");
3782 fprintf (stderr, " pagebits is the page size in bits for the cache btree\n");
3783 fprintf (stderr, " leafbits is the number of xtra bits for a leaf page\n");
3784 fprintf (stderr, " poolsize is the number of pages in buffer pool for the cache btree\n");
3785 fprintf (stderr, " leafpool is the number of leaf pages in leaf pool for the cache btree\n");
3786 fprintf (stderr, " txnsize = n to block transactions into n units, or zero for no transactions\n");
3787 fprintf (stderr, " redopages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3788 fprintf (stderr, " mainbits is the page size of the main btree in bits\n");
3789 fprintf (stderr, " mainpool is the number of main pages in the main buffer pool\n");
3790 fprintf (stderr, " mainleaf is the number of main pages in the main leaf pool\n");
3791 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3795 start = getCpuTime(0);
3798 bits = atoi(argv[4]);
3801 leafxtra = atoi(argv[5]);
3804 poolsize = atoi(argv[6]);
3807 fprintf (stderr, "no page pool\n"), exit(1);
3810 leafpool = atoi(argv[7]);
3813 num = atoi(argv[8]);
3816 redopages = atoi(argv[9]);
3818 if( redopages > 65535 )
3819 fprintf (stderr, "Recovery buffer too large\n"), exit(1);
3822 mainbits = atoi(argv[10]);
3825 mainleafxtra = atoi(argv[11]);
3828 mainpool = atoi(argv[12]);
3831 mainleafpool = atoi(argv[13]);
3835 threads = malloc (cnt * sizeof(pthread_t));
3837 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3839 args = malloc ((cnt + 1) * sizeof(ThreadArg));
3841 mgr = bt_mgr (argv[1], bits, leafxtra, poolsize, leafpool, redopages);
3844 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3849 main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool, mainleafpool, 0);
3852 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3861 for( idx = 0; idx < cnt; idx++ ) {
3862 args[idx].infile = argv[idx + 14];
3863 args[idx].type = argv[3];
3864 args[idx].main = main;
3865 args[idx].mgr = mgr;
3866 args[idx].num = num;
3867 args[idx].idx = idx;
3869 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3870 fprintf(stderr, "Error creating thread %d\n", err);
3872 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3876 args[0].infile = argv[idx + 10];
3877 args[0].type = argv[3];
3878 args[0].main = main;
3885 // wait for termination
3888 for( idx = 0; idx < cnt; idx++ )
3889 pthread_join (threads[idx], NULL);
3891 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3893 for( idx = 0; idx < cnt; idx++ )
3894 CloseHandle(threads[idx]);
3896 bt_poolaudit(mgr, "cache");
3899 bt_poolaudit(main, "main");
3901 fprintf(stderr, "cache %lld leaves %lld upper %d reads %d writes %d found\n", mgr->pagezero->leafpages, mgr->pagezero->upperpages, mgr->reads, mgr->writes, mgr->found);
3903 fprintf(stderr, "main %lld leaves %lld upper %d reads %d writes %d found\n", main->pagezero->leafpages, main->pagezero->upperpages, main->reads, main->writes, main->found);
3909 elapsed = getCpuTime(0) - start;
3910 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3911 elapsed = getCpuTime(1);
3912 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3913 elapsed = getCpuTime(2);
3914 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3917 BtKey *bt_key (BtPage page, uint slot)
3919 return keyptr(page,slot);
3922 BtSlot *bt_slot (BtPage page, uint slot)
3924 return slotptr(page,slot);