1 // btree version threadskv9 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // and redo log for failure recovery
13 // author: karl malbrain, malbrain@cal.berkeley.edu
16 This work, including the source code, documentation
17 and related data, is placed into the public domain.
19 The orginal author is Karl Malbrain.
21 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
22 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
23 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
24 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
25 RESULTING FROM THE USE, MODIFICATION, OR
26 REDISTRIBUTION OF THIS SOFTWARE.
29 // Please see the project home page for documentation
30 // code.google.com/p/high-concurrency-btree
32 #define _FILE_OFFSET_BITS 64
33 #define _LARGEFILE64_SOURCE
49 #define WIN32_LEAN_AND_MEAN
63 typedef unsigned long long uid;
64 typedef unsigned long long logseqno;
67 typedef unsigned long long off64_t;
68 typedef unsigned short ushort;
69 typedef unsigned int uint;
72 #define BT_ro 0x6f72 // ro
73 #define BT_rw 0x7772 // rw
75 #define BT_maxbits 24 // maximum page size in bits
76 #define BT_minbits 9 // minimum page size in bits
77 #define BT_minpage (1 << BT_minbits) // minimum page size
78 #define BT_maxpage (1 << BT_maxbits) // maximum page size
80 // BTree page number constants
81 #define ALLOC_page 0 // allocation page
82 #define ROOT_page 1 // root of the btree
83 #define LEAF_page 2 // first page of leaves
84 #define REDO_page 3 // first page of redo buffer
86 // Number of levels to create in a new BTree
91 There are six lock types for each node in four independent sets:
92 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
93 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
94 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
95 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
96 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
97 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
109 // definition for phase-fair reader/writer lock implementation
112 volatile ushort rin[1];
113 volatile ushort rout[1];
114 volatile ushort ticket[1];
115 volatile ushort serving[1];
120 // write only queue lock
123 volatile ushort ticket[1];
124 volatile ushort serving[1];
134 // definition for spin latch implementation
136 // exclusive is set for write access
137 // share is count of read accessors
138 // grant write lock when share == 0
140 volatile typedef struct {
151 // hash table entries
154 volatile uint slot; // Latch table entry at head of chain
155 BtSpinLatch latch[1];
158 // latch manager table structure
161 uid page_no; // latch set page number
162 RWLock readwr[1]; // read/write page lock
163 RWLock access[1]; // Access Intent/Page delete
164 WOLock parent[1]; // Posting of fence key in parent
165 WOLock atomic[1]; // Atomic update in progress
166 uint split; // right split page atomic insert
167 uint entry; // entry slot in latch table
168 uint next; // next entry in hash table chain
169 uint prev; // prev entry in hash table chain
170 volatile ushort pin; // number of outstanding threads
171 ushort dirty:1; // page in cache is dirty
174 // Define the length of the page record numbers
178 // Page key slot definition.
180 // Keys are marked dead, but remain on the page until
181 // it cleanup is called. The fence key (highest key) for
182 // a leaf page is always present, even after cleanup.
186 // In addition to the Unique keys that occupy slots
187 // there are Librarian and Duplicate key
188 // slots occupying the key slot array.
190 // The Librarian slots are dead keys that
191 // serve as filler, available to add new Unique
192 // or Dup slots that are inserted into the B-tree.
194 // The Duplicate slots have had their key bytes extended
195 // by 6 bytes to contain a binary duplicate key uniqueifier.
206 uint off:BT_maxbits; // page offset for key start
207 uint type:3; // type of slot
208 uint dead:1; // set for deleted slot
211 // The key structure occupies space at the upper end of
212 // each page. It's a length byte followed by the key
216 unsigned char len; // this can be changed to a ushort or uint
217 unsigned char key[0];
220 // the value structure also occupies space at the upper
221 // end of the page. Each key is immediately followed by a value.
224 unsigned char len; // this can be changed to a ushort or uint
225 unsigned char value[0];
228 #define BT_maxkey 255 // maximum number of bytes in a key
229 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
231 // The first part of an index page.
232 // It is immediately followed
233 // by the BtSlot array of keys.
235 // note that this structure size
236 // must be a multiple of 8 bytes
237 // in order to place dups correctly.
239 typedef struct BtPage_ {
240 uint cnt; // count of keys in page
241 uint act; // count of active keys
242 uint min; // next key offset
243 uint garbage; // page garbage in bytes
244 unsigned char bits:7; // page size in bits
245 unsigned char free:1; // page is on free chain
246 unsigned char lvl:7; // level of page
247 unsigned char kill:1; // page is being deleted
248 unsigned char right[BtId]; // page number to right
249 unsigned char left[BtId]; // page number to left
250 unsigned char filler[2]; // padding to multiple of 8
251 logseqno lsn; // last LogSeqNo applied to page
254 // The loadpage interface object
257 BtPage page; // current page pointer
258 BtLatchSet *latch; // current page latch set
261 // structure for latch manager on ALLOC_page
264 struct BtPage_ alloc[1]; // next page_no in right ptr
265 unsigned long long dups[1]; // global duplicate key uniqueifier
266 unsigned char chain[BtId]; // head of free page_nos chain
269 // The object structure for Btree access
272 uint page_size; // page size
273 uint page_bits; // page size in bits
279 BtPageZero *pagezero; // mapped allocation page
280 BtHashEntry *hashtable; // the buffer pool hash table entries
281 BtLatchSet *latchsets; // mapped latch set from buffer pool
282 unsigned char *pagepool; // mapped to the buffer pool pages
283 unsigned char *redobuff; // mapped recovery buffer pointer
284 logseqno flushlsn; // first lsn flushed w/msync
285 BtSpinLatch redo[1]; // redo area lite latch
286 BtSpinLatch lock[1]; // allocation area lite latch
287 ushort thread_no[1]; // next thread number
288 uint latchdeployed; // highest number of latch entries deployed
289 uint nlatchpage; // number of latch pages at BT_latch
290 uint latchtotal; // number of page latch entries
291 uint latchhash; // number of latch hash table slots
292 uint latchvictim; // next latch entry to examine
293 uint redopages; // size of recovery buff in pages
294 uint redoend; // eof/end element in recovery buff
296 HANDLE halloc; // allocation handle
297 HANDLE hpool; // buffer pool handle
302 BtMgr *mgr; // buffer manager for thread
303 BtPage cursor; // cached frame for start/next (never mapped)
304 BtPage frame; // spare frame for the page split (never mapped)
305 uid cursor_page; // current cursor page number
306 unsigned char *mem; // frame, cursor, page memory buffer
307 unsigned char key[BT_keyarray]; // last found complete key
308 int found; // last delete or insert was found
309 int err; // last error
310 int reads, writes; // number of reads and writes from the btree
311 ushort thread_no; // thread number
314 // Catastrophic errors
328 #define CLOCK_bit 0x8000
330 // recovery manager entry types
333 BTRM_eof = 0, // rest of buffer is emtpy
334 BTRM_add, // add a unique key-value to btree
335 BTRM_dup, // add a duplicate key-value to btree
336 BTRM_del, // delete a key-value from btree
337 BTRM_upd, // update a key with a new value
338 BTRM_new, // allocate a new empty page
339 BTRM_old, // reuse an old empty page
340 BTRM_end = 255 // circular buffer inter-gap
343 // recovery manager entry
344 // structure followed by BtKey & BtVal
347 logseqno lsn; // log sequence number for entry
348 uint len; // length of entry
349 unsigned char type; // type of entry
350 unsigned char lvl; // level of btree entry pertains to
355 extern void bt_close (BtDb *bt);
356 extern BtDb *bt_open (BtMgr *mgr);
357 extern void bt_flushlsn (BtDb *bt);
358 extern void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
359 extern void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
360 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
361 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
362 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
363 extern BtKey *bt_foundkey (BtDb *bt);
364 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
365 extern uint bt_nextkey (BtDb *bt, uint slot);
368 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint rmpages);
369 extern void bt_mgrclose (BtMgr *mgr);
370 extern logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val);
372 // Helper functions to return slot values
373 // from the cursor page.
375 extern BtKey *bt_key (BtDb *bt, uint slot);
376 extern BtVal *bt_val (BtDb *bt, uint slot);
378 // The page is allocated from low and hi ends.
379 // The key slots are allocated from the bottom,
380 // while the text and value of the key
381 // are allocated from the top. When the two
382 // areas meet, the page is split into two.
384 // A key consists of a length byte, two bytes of
385 // index number (0 - 65535), and up to 253 bytes
388 // Associated with each key is a value byte string
389 // containing any value desired.
391 // The b-tree root is always located at page 1.
392 // The first leaf page of level zero is always
393 // located on page 2.
395 // The b-tree pages are linked with next
396 // pointers to facilitate enumerators,
397 // and provide for concurrency.
399 // When to root page fills, it is split in two and
400 // the tree height is raised by a new root at page
401 // one with two keys.
403 // Deleted keys are marked with a dead bit until
404 // page cleanup. The fence key for a leaf node is
407 // To achieve maximum concurrency one page is locked at a time
408 // as the tree is traversed to find leaf key in question. The right
409 // page numbers are used in cases where the page is being split,
412 // Page 0 is dedicated to lock for new page extensions,
413 // and chains empty pages together for reuse. It also
414 // contains the latch manager hash table.
416 // The ParentModification lock on a node is obtained to serialize posting
417 // or changing the fence key for a node.
419 // Empty pages are chained together through the ALLOC page and reused.
421 // Access macros to address slot and key values from the page
422 // Page slots use 1 based indexing.
424 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
425 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
426 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
428 void bt_putid(unsigned char *dest, uid id)
433 dest[i] = (unsigned char)id, id >>= 8;
436 uid bt_getid(unsigned char *src)
441 for( i = 0; i < BtId; i++ )
442 id <<= 8, id |= *src++;
447 uid bt_newdup (BtDb *bt)
450 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
452 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
456 // Write-Only Queue Lock
458 void WriteOLock (WOLock *lock, ushort tid)
462 if( lock->tid == tid ) {
467 tix = __sync_fetch_and_add (lock->ticket, 1);
469 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
471 // wait for our ticket to come up
473 while( tix != lock->serving[0] )
482 void WriteORelease (WOLock *lock)
493 // Phase-Fair reader/writer lock implementation
495 void WriteLock (RWLock *lock, ushort tid)
499 if( lock->tid == tid ) {
504 tix = __sync_fetch_and_add (lock->ticket, 1);
506 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
508 // wait for our ticket to come up
510 while( tix != lock->serving[0] )
517 w = PRES | (tix & PHID);
519 r = __sync_fetch_and_add (lock->rin, w);
521 r = _InterlockedExchangeAdd16 (lock->rin, w);
523 while( r != *lock->rout )
532 void WriteRelease (RWLock *lock)
541 __sync_fetch_and_and (lock->rin, ~MASK);
543 _InterlockedAnd16 (lock->rin, ~MASK);
548 void ReadLock (RWLock *lock, ushort tid)
551 if( lock->tid == tid ) {
556 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
558 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
561 while( w == (*lock->rin & MASK) )
569 void ReadRelease (RWLock *lock)
577 __sync_fetch_and_add (lock->rout, RINC);
579 _InterlockedExchangeAdd16 (lock->rout, RINC);
583 // Spin Latch Manager
585 // wait until write lock mode is clear
586 // and add 1 to the share count
588 void bt_spinreadlock(BtSpinLatch *latch)
594 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
596 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
598 // see if exclusive request is granted or pending
603 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
605 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
608 } while( sched_yield(), 1 );
610 } while( SwitchToThread(), 1 );
614 // wait for other read and write latches to relinquish
616 void bt_spinwritelock(BtSpinLatch *latch)
622 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
624 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
627 if( !(prev & ~BOTH) )
631 __sync_fetch_and_and ((ushort *)latch, ~XCL);
633 _InterlockedAnd16((ushort *)latch, ~XCL);
636 } while( sched_yield(), 1 );
638 } while( SwitchToThread(), 1 );
642 // try to obtain write lock
644 // return 1 if obtained,
647 int bt_spinwritetry(BtSpinLatch *latch)
652 prev = __sync_fetch_and_or((ushort *)latch, XCL);
654 prev = _InterlockedOr16((ushort *)latch, XCL);
656 // take write access if all bits are clear
659 if( !(prev & ~BOTH) )
663 __sync_fetch_and_and ((ushort *)latch, ~XCL);
665 _InterlockedAnd16((ushort *)latch, ~XCL);
672 void bt_spinreleasewrite(BtSpinLatch *latch)
675 __sync_fetch_and_and((ushort *)latch, ~BOTH);
677 _InterlockedAnd16((ushort *)latch, ~BOTH);
681 // decrement reader count
683 void bt_spinreleaseread(BtSpinLatch *latch)
686 __sync_fetch_and_add((ushort *)latch, -SHARE);
688 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
692 // recovery manager -- dump current recovery buff & flush
694 BTERR bt_dumpredo (BtDb *bt)
698 eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
699 memset (eof, 0, sizeof(BtLogHdr));
701 pwrite (bt->mgr->idx, bt->mgr->redobuff, bt->mgr->redoend + sizeof(BtLogHdr), REDO_page << bt->mgr->page_bits);
703 // flush pages written at beginning of this redo buffer
704 // along with the redo buffer out to disk
706 fdatasync (bt->mgr->idx);
708 bt->mgr->flushlsn = bt->mgr->pagezero->alloc->lsn;
709 bt->mgr->redoend = 0;
713 // recovery manager -- append new entry to recovery log
714 // flush to disk when it overflows.
716 logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val)
718 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
719 uint amt = sizeof(BtLogHdr);
723 bt_spinwritelock (bt->mgr->redo);
726 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
728 // see if new entry fits in buffer
729 // flush and reset if it doesn't
731 if( flush = amt > size - bt->mgr->redoend )
732 if( bt_dumpredo (bt) )
735 // fill in new entry & either eof or end block
737 hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
742 hdr->lsn = ++bt->mgr->pagezero->alloc->lsn;
744 bt->mgr->redoend += amt;
746 eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
747 memset (eof, 0, sizeof(BtLogHdr));
749 // fill in key and value
752 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
753 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
756 bt_spinreleasewrite(bt->mgr->redo);
764 // read page into buffer pool from permanent location in Btree file
766 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
768 off64_t off = page_no << mgr->page_bits;
771 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
772 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
779 memset (ovl, 0, sizeof(OVERLAPPED));
781 ovl->OffsetHigh = off >> 32;
783 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
784 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
787 if( *amt < mgr->page_size ) {
788 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
795 // write page to permanent location in Btree file
796 // clear the dirty bit
798 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
800 off64_t off = page_no << mgr->page_bits;
803 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
809 memset (ovl, 0, sizeof(OVERLAPPED));
811 ovl->OffsetHigh = off >> 32;
813 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
816 if( *amt < mgr->page_size )
822 // link latch table entry into head of latch hash table
824 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
826 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
827 BtLatchSet *latch = bt->mgr->latchsets + slot;
829 if( latch->next = bt->mgr->hashtable[hashidx].slot )
830 bt->mgr->latchsets[latch->next].prev = slot;
832 bt->mgr->hashtable[hashidx].slot = slot;
833 latch->page_no = page_no;
840 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
848 // set CLOCK bit in latch
849 // decrement pin count
851 void bt_unpinlatch (BtLatchSet *latch)
854 if( ~latch->pin & CLOCK_bit )
855 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
856 __sync_fetch_and_add(&latch->pin, -1);
858 if( ~latch->pin & CLOCK_bit )
859 _InterlockedOr16 (&latch->pin, CLOCK_bit);
860 _InterlockedDecrement16 (&latch->pin);
864 // return the btree cached page address
865 // if page is dirty and has not yet been
866 // flushed to disk for the current redo
867 // recovery buffer, write it out.
869 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
871 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
874 if( page->lsn < bt->mgr->flushlsn )
875 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
878 latch->dirty = 0, bt->writes++;
883 // find existing latchset or inspire new one
884 // return with latchset pinned
886 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
888 uint hashidx = page_no % bt->mgr->latchhash;
897 // try to find our entry
899 bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
901 if( slot = bt->mgr->hashtable[hashidx].slot ) do
903 latch = bt->mgr->latchsets + slot;
904 if( page_no == latch->page_no )
906 } while( slot = latch->next );
912 latch = bt->mgr->latchsets + slot;
914 __sync_fetch_and_add(&latch->pin, 1);
916 _InterlockedIncrement16 (&latch->pin);
918 bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
922 // see if there are any unused pool entries
924 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
926 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
929 if( slot < bt->mgr->latchtotal ) {
930 latch = bt->mgr->latchsets + slot;
931 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
933 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
938 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
940 _InterlockedDecrement (&bt->mgr->latchdeployed);
942 // find and reuse previous entry on victim
946 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
948 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
950 // try to get write lock on hash chain
951 // skip entry if not obtained
952 // or has outstanding pins
954 slot %= bt->mgr->latchtotal;
959 // only go around two times before
960 // flushing redo recovery buffer,
961 // and the buffer pool.
963 if( bt->mgr->redopages )
964 if( attempts++ > 2 * bt->mgr->latchtotal ) {
965 if( bt_dumpredo (bt) )
971 latch = bt->mgr->latchsets + slot;
972 idx = latch->page_no % bt->mgr->latchhash;
974 // see we are on same chain as hashidx
979 if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
982 // skip this slot if it is pinned
983 // or the CLOCK bit is set
986 if( latch->pin & CLOCK_bit ) {
988 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
990 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
993 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
997 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
999 // if dirty page has lsn >= last redo recovery buffer
1000 // then hold page in the buffer pool until redo
1001 // recovery buffer is written to disk.
1004 if( page->lsn >= bt->mgr->flushlsn ) {
1005 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
1009 // update permanent page area in btree from buffer pool
1010 // no read-lock is required since page is not pinned.
1013 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
1016 latch->dirty = 0, bt->writes++;
1018 // unlink our available slot from its hash chain
1021 bt->mgr->latchsets[latch->prev].next = latch->next;
1023 bt->mgr->hashtable[idx].slot = latch->next;
1026 bt->mgr->latchsets[latch->next].prev = latch->prev;
1028 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
1030 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
1033 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
1040 void bt_flushlsn (BtDb *bt)
1048 // flush dirty pool pages to the btree that were
1049 // dirty before the start of this redo recovery buffer
1051 for( slot = 1; slot <= bt->mgr->latchdeployed; slot++ ) {
1052 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
1053 latch = bt->mgr->latchsets + slot;
1054 // hashidx = latch->page_no % bt->mgr->latchhash;
1056 // if( !bt_spinwritetry (bt->mgr->hashtable[hashidx].latch) )
1059 if( latch->dirty ) {
1060 bt_lockpage(bt, BtLockRead, latch);
1061 bt_writepage(bt->mgr, page, latch->page_no);
1062 latch->dirty = 0, bt->writes++;
1063 bt_unlockpage(bt, BtLockRead, latch);
1066 // bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
1070 void bt_mgrclose (BtMgr *mgr)
1078 // flush recovery buffer to disk
1080 if( mgr->redoend ) {
1081 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1082 memset (eof, 0, sizeof(BtLogHdr));
1084 pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
1087 // flush dirty pool pages to the btree
1089 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
1090 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1091 latch = mgr->latchsets + slot;
1093 if( latch->dirty ) {
1094 bt_writepage(mgr, page, latch->page_no);
1095 latch->dirty = 0, num++;
1099 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1102 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
1103 munmap (mgr->pagezero, mgr->page_size);
1105 FlushViewOfFile(mgr->pagezero, 0);
1106 UnmapViewOfFile(mgr->pagezero);
1107 UnmapViewOfFile(mgr->hashtable);
1108 CloseHandle(mgr->halloc);
1109 CloseHandle(mgr->hpool);
1112 free (mgr->redobuff);
1116 VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1117 FlushFileBuffers(mgr->idx);
1118 CloseHandle(mgr->idx);
1123 // close and release memory
1125 void bt_close (BtDb *bt)
1132 VirtualFree (bt->mem, 0, MEM_RELEASE);
1137 // open/create new btree buffer manager
1139 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1140 // size of page pool (e.g. 262144)
1142 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1144 uint lvl, attr, last, slot, idx;
1145 uint nlatchpage, latchhash;
1146 unsigned char value[BtId];
1147 int flag, initit = 0;
1148 BtPageZero *pagezero;
1155 // determine sanity of page size and buffer pool
1157 if( bits > BT_maxbits )
1159 else if( bits < BT_minbits )
1162 if( nodemax < 16 ) {
1163 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
1168 mgr = calloc (1, sizeof(BtMgr));
1170 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1172 if( mgr->idx == -1 ) {
1173 fprintf (stderr, "Unable to open btree file\n");
1174 return free(mgr), NULL;
1177 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1178 attr = FILE_ATTRIBUTE_NORMAL;
1179 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1181 if( mgr->idx == INVALID_HANDLE_VALUE )
1182 return GlobalFree(mgr), NULL;
1186 pagezero = valloc (BT_maxpage);
1189 // read minimum page size to get root info
1190 // to support raw disk partition files
1191 // check if bits == 0 on the disk.
1193 if( size = lseek (mgr->idx, 0L, 2) )
1194 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1195 if( pagezero->alloc->bits )
1196 bits = pagezero->alloc->bits;
1200 return free(mgr), free(pagezero), NULL;
1204 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1205 size = GetFileSize(mgr->idx, amt);
1207 if( size || *amt ) {
1208 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1209 return bt_mgrclose (mgr), NULL;
1210 bits = pagezero->alloc->bits;
1215 mgr->page_size = 1 << bits;
1216 mgr->page_bits = bits;
1218 // calculate number of latch hash table entries
1220 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1221 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
1223 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1224 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1225 mgr->latchtotal = nodemax;
1230 // initialize an empty b-tree with latch page, root page, page of leaves
1231 // and page(s) of latches and page pool cache
1233 memset (pagezero, 0, 1 << bits);
1234 pagezero->alloc->bits = mgr->page_bits;
1235 bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
1237 // initialize left-most LEAF page in
1240 bt_putid (pagezero->alloc->left, LEAF_page);
1242 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1243 fprintf (stderr, "Unable to create btree page zero\n");
1244 return bt_mgrclose (mgr), NULL;
1247 memset (pagezero, 0, 1 << bits);
1248 pagezero->alloc->bits = mgr->page_bits;
1250 for( lvl=MIN_lvl; lvl--; ) {
1251 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1252 key = keyptr(pagezero->alloc, 1);
1253 key->len = 2; // create stopper key
1257 bt_putid(value, MIN_lvl - lvl + 1);
1258 val = valptr(pagezero->alloc, 1);
1259 val->len = lvl ? BtId : 0;
1260 memcpy (val->value, value, val->len);
1262 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1263 pagezero->alloc->lvl = lvl;
1264 pagezero->alloc->cnt = 1;
1265 pagezero->alloc->act = 1;
1267 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1268 fprintf (stderr, "Unable to create btree page zero\n");
1269 return bt_mgrclose (mgr), NULL;
1277 VirtualFree (pagezero, 0, MEM_RELEASE);
1280 // mlock the pagezero page
1282 flag = PROT_READ | PROT_WRITE;
1283 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1284 if( mgr->pagezero == MAP_FAILED ) {
1285 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1286 return bt_mgrclose (mgr), NULL;
1288 mlock (mgr->pagezero, mgr->page_size);
1290 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1291 if( mgr->hashtable == MAP_FAILED ) {
1292 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1293 return bt_mgrclose (mgr), NULL;
1295 if( mgr->redopages = redopages )
1296 mgr->redobuff = valloc (redopages * mgr->page_size);
1298 flag = PAGE_READWRITE;
1299 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1300 if( !mgr->halloc ) {
1301 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1302 return bt_mgrclose (mgr), NULL;
1305 flag = FILE_MAP_WRITE;
1306 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1307 if( !mgr->pagezero ) {
1308 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1309 return bt_mgrclose (mgr), NULL;
1312 flag = PAGE_READWRITE;
1313 size = (uid)mgr->nlatchpage << mgr->page_bits;
1314 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1316 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1317 return bt_mgrclose (mgr), NULL;
1320 flag = FILE_MAP_WRITE;
1321 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1322 if( !mgr->hashtable ) {
1323 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1324 return bt_mgrclose (mgr), NULL;
1326 if( mgr->redopages = redopages )
1327 mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
1330 mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1331 mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1336 // open BTree access method
1337 // based on buffer manager
1339 BtDb *bt_open (BtMgr *mgr)
1341 BtDb *bt = malloc (sizeof(*bt));
1343 memset (bt, 0, sizeof(*bt));
1346 bt->mem = valloc (2 *mgr->page_size);
1348 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1350 bt->frame = (BtPage)bt->mem;
1351 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1353 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1355 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1360 // compare two keys, return > 0, = 0, or < 0
1361 // =0: keys are same
1364 // as the comparison value
1366 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1368 uint len1 = key1->len;
1371 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1382 // place write, read, or parent lock on requested page_no.
1384 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1388 ReadLock (latch->readwr, bt->thread_no);
1391 WriteLock (latch->readwr, bt->thread_no);
1394 ReadLock (latch->access, bt->thread_no);
1397 WriteLock (latch->access, bt->thread_no);
1400 WriteOLock (latch->parent, bt->thread_no);
1403 WriteOLock (latch->atomic, bt->thread_no);
1405 case BtLockAtomic | BtLockRead:
1406 WriteOLock (latch->atomic, bt->thread_no);
1407 ReadLock (latch->readwr, bt->thread_no);
1412 // remove write, read, or parent lock on requested page
1414 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1418 ReadRelease (latch->readwr);
1421 WriteRelease (latch->readwr);
1424 ReadRelease (latch->access);
1427 WriteRelease (latch->access);
1430 WriteORelease (latch->parent);
1433 WriteORelease (latch->atomic);
1435 case BtLockAtomic | BtLockRead:
1436 WriteORelease (latch->atomic);
1437 ReadRelease (latch->readwr);
1442 // allocate a new page
1443 // return with page latched, but unlocked.
1445 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1450 // lock allocation page
1452 bt_spinwritelock(bt->mgr->lock);
1454 // use empty chain first
1455 // else allocate empty page
1457 if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1458 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1459 set->page = bt_mappage (bt, set->latch);
1461 return bt->err = BTERR_struct, -1;
1463 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1464 bt_spinreleasewrite(bt->mgr->lock);
1466 memcpy (set->page, contents, bt->mgr->page_size);
1467 set->latch->dirty = 1;
1471 page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1472 bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1474 // unlock allocation latch
1476 bt_spinreleasewrite(bt->mgr->lock);
1478 // don't load cache from btree page
1480 if( set->latch = bt_pinlatch (bt, page_no, 0) )
1481 set->page = bt_mappage (bt, set->latch);
1483 return bt->err = BTERR_struct;
1485 memcpy (set->page, contents, bt->mgr->page_size);
1486 set->latch->dirty = 1;
1490 // find slot in page for given key at a given level
1492 int bt_findslot (BtPage page, unsigned char *key, uint len)
1494 uint diff, higher = page->cnt, low = 1, slot;
1497 // make stopper key an infinite fence value
1499 if( bt_getid (page->right) )
1504 // low is the lowest candidate.
1505 // loop ends when they meet
1507 // higher is already
1508 // tested as .ge. the passed key.
1510 while( diff = higher - low ) {
1511 slot = low + ( diff >> 1 );
1512 if( keycmp (keyptr(page, slot), key, len) < 0 )
1515 higher = slot, good++;
1518 // return zero if key is on right link page
1520 return good ? higher : 0;
1523 // find and load page at given level for given key
1524 // leave page rd or wr locked as requested
1526 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1528 uid page_no = ROOT_page, prevpage = 0;
1529 uint drill = 0xff, slot;
1530 BtLatchSet *prevlatch;
1531 uint mode, prevmode;
1533 // start at root of btree and drill down
1536 // determine lock mode of drill level
1537 mode = (drill == lvl) ? lock : BtLockRead;
1539 if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1542 // obtain access lock using lock chaining with Access mode
1544 if( page_no > ROOT_page )
1545 bt_lockpage(bt, BtLockAccess, set->latch);
1547 set->page = bt_mappage (bt, set->latch);
1549 // release & unpin parent or left sibling page
1552 bt_unlockpage(bt, prevmode, prevlatch);
1553 bt_unpinlatch (prevlatch);
1557 // obtain mode lock using lock chaining through AccessLock
1559 bt_lockpage(bt, mode, set->latch);
1561 if( set->page->free )
1562 return bt->err = BTERR_struct, 0;
1564 if( page_no > ROOT_page )
1565 bt_unlockpage(bt, BtLockAccess, set->latch);
1567 // re-read and re-lock root after determining actual level of root
1569 if( set->page->lvl != drill) {
1570 if( set->latch->page_no != ROOT_page )
1571 return bt->err = BTERR_struct, 0;
1573 drill = set->page->lvl;
1575 if( lock != BtLockRead && drill == lvl ) {
1576 bt_unlockpage(bt, mode, set->latch);
1577 bt_unpinlatch (set->latch);
1582 prevpage = set->latch->page_no;
1583 prevlatch = set->latch;
1586 // find key on page at this level
1587 // and descend to requested level
1589 if( !set->page->kill )
1590 if( slot = bt_findslot (set->page, key, len) ) {
1594 // find next non-dead slot -- the fence key if nothing else
1596 while( slotptr(set->page, slot)->dead )
1597 if( slot++ < set->page->cnt )
1600 return bt->err = BTERR_struct, 0;
1602 page_no = bt_getid(valptr(set->page, slot)->value);
1607 // or slide right into next page
1609 page_no = bt_getid(set->page->right);
1612 // return error on end of right chain
1614 bt->err = BTERR_struct;
1615 return 0; // return error
1618 // return page to free list
1619 // page must be delete & write locked
1621 void bt_freepage (BtDb *bt, BtPageSet *set)
1623 // lock allocation page
1625 bt_spinwritelock (bt->mgr->lock);
1629 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1630 bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1631 set->latch->dirty = 1;
1632 set->page->free = 1;
1634 // unlock released page
1636 bt_unlockpage (bt, BtLockDelete, set->latch);
1637 bt_unlockpage (bt, BtLockWrite, set->latch);
1638 bt_unpinlatch (set->latch);
1640 // unlock allocation page
1642 bt_spinreleasewrite (bt->mgr->lock);
1645 // a fence key was deleted from a page
1646 // push new fence value upwards
1648 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1650 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1651 unsigned char value[BtId];
1655 // remove the old fence value
1657 ptr = keyptr(set->page, set->page->cnt);
1658 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1659 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1660 set->latch->dirty = 1;
1662 // cache new fence value
1664 ptr = keyptr(set->page, set->page->cnt);
1665 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1667 bt_lockpage (bt, BtLockParent, set->latch);
1668 bt_unlockpage (bt, BtLockWrite, set->latch);
1670 // insert new (now smaller) fence key
1672 bt_putid (value, set->latch->page_no);
1673 ptr = (BtKey*)leftkey;
1675 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1678 // now delete old fence key
1680 ptr = (BtKey*)rightkey;
1682 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1685 bt_unlockpage (bt, BtLockParent, set->latch);
1686 bt_unpinlatch(set->latch);
1690 // root has a single child
1691 // collapse a level from the tree
1693 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1699 // find the child entry and promote as new root contents
1702 for( idx = 0; idx++ < root->page->cnt; )
1703 if( !slotptr(root->page, idx)->dead )
1706 page_no = bt_getid (valptr(root->page, idx)->value);
1708 if( child->latch = bt_pinlatch (bt, page_no, 1) )
1709 child->page = bt_mappage (bt, child->latch);
1713 bt_lockpage (bt, BtLockDelete, child->latch);
1714 bt_lockpage (bt, BtLockWrite, child->latch);
1716 memcpy (root->page, child->page, bt->mgr->page_size);
1717 root->latch->dirty = 1;
1719 bt_freepage (bt, child);
1721 } while( root->page->lvl > 1 && root->page->act == 1 );
1723 bt_unlockpage (bt, BtLockWrite, root->latch);
1724 bt_unpinlatch (root->latch);
1728 // delete a page and manage keys
1729 // call with page writelocked
1730 // returns with page unpinned
1732 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1734 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1735 unsigned char value[BtId];
1736 uint lvl = set->page->lvl;
1741 // cache copy of fence key
1742 // to post in parent
1744 ptr = keyptr(set->page, set->page->cnt);
1745 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1747 // obtain lock on right page
1749 page_no = bt_getid(set->page->right);
1751 if( right->latch = bt_pinlatch (bt, page_no, 1) )
1752 right->page = bt_mappage (bt, right->latch);
1756 bt_lockpage (bt, BtLockWrite, right->latch);
1758 // cache copy of key to update
1760 ptr = keyptr(right->page, right->page->cnt);
1761 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1763 if( right->page->kill )
1764 return bt->err = BTERR_struct;
1766 // pull contents of right peer into our empty page
1768 memcpy (set->page, right->page, bt->mgr->page_size);
1769 set->latch->dirty = 1;
1771 // mark right page deleted and point it to left page
1772 // until we can post parent updates that remove access
1773 // to the deleted page.
1775 bt_putid (right->page->right, set->latch->page_no);
1776 right->latch->dirty = 1;
1777 right->page->kill = 1;
1779 bt_lockpage (bt, BtLockParent, right->latch);
1780 bt_unlockpage (bt, BtLockWrite, right->latch);
1782 bt_lockpage (bt, BtLockParent, set->latch);
1783 bt_unlockpage (bt, BtLockWrite, set->latch);
1785 // redirect higher key directly to our new node contents
1787 bt_putid (value, set->latch->page_no);
1788 ptr = (BtKey*)higherfence;
1790 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1793 // delete old lower key to our node
1795 ptr = (BtKey*)lowerfence;
1797 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1800 // obtain delete and write locks to right node
1802 bt_unlockpage (bt, BtLockParent, right->latch);
1803 bt_lockpage (bt, BtLockDelete, right->latch);
1804 bt_lockpage (bt, BtLockWrite, right->latch);
1805 bt_freepage (bt, right);
1807 bt_unlockpage (bt, BtLockParent, set->latch);
1808 bt_unpinlatch (set->latch);
1812 // find and delete key on page by marking delete flag bit
1813 // if page becomes empty, delete it from the btree
1815 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1817 uint slot, idx, found, fence;
1822 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1823 ptr = keyptr(set->page, slot);
1827 // if librarian slot, advance to real slot
1829 if( slotptr(set->page, slot)->type == Librarian )
1830 ptr = keyptr(set->page, ++slot);
1832 fence = slot == set->page->cnt;
1834 // if key is found delete it, otherwise ignore request
1836 if( found = !keycmp (ptr, key, len) )
1837 if( found = slotptr(set->page, slot)->dead == 0 ) {
1838 val = valptr(set->page,slot);
1839 slotptr(set->page, slot)->dead = 1;
1840 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1843 // collapse empty slots beneath the fence
1845 while( idx = set->page->cnt - 1 )
1846 if( slotptr(set->page, idx)->dead ) {
1847 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1848 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1853 // did we delete a fence key in an upper level?
1855 if( found && lvl && set->page->act && fence )
1856 if( bt_fixfence (bt, set, lvl) )
1861 // do we need to collapse root?
1863 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1864 if( bt_collapseroot (bt, set) )
1869 // delete empty page
1871 if( !set->page->act )
1872 return bt_deletepage (bt, set);
1874 set->latch->dirty = 1;
1875 bt_unlockpage(bt, BtLockWrite, set->latch);
1876 bt_unpinlatch (set->latch);
1880 BtKey *bt_foundkey (BtDb *bt)
1882 return (BtKey*)(bt->key);
1885 // advance to next slot
1887 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1889 BtLatchSet *prevlatch;
1892 if( slot < set->page->cnt )
1895 prevlatch = set->latch;
1897 if( page_no = bt_getid(set->page->right) )
1898 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1899 set->page = bt_mappage (bt, set->latch);
1903 return bt->err = BTERR_struct, 0;
1905 // obtain access lock using lock chaining with Access mode
1907 bt_lockpage(bt, BtLockAccess, set->latch);
1909 bt_unlockpage(bt, BtLockRead, prevlatch);
1910 bt_unpinlatch (prevlatch);
1912 bt_lockpage(bt, BtLockRead, set->latch);
1913 bt_unlockpage(bt, BtLockAccess, set->latch);
1917 // find unique key or first duplicate key in
1918 // leaf level and return number of value bytes
1919 // or (-1) if not found. Setup key for bt_foundkey
1921 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1929 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1931 ptr = keyptr(set->page, slot);
1933 // skip librarian slot place holder
1935 if( slotptr(set->page, slot)->type == Librarian )
1936 ptr = keyptr(set->page, ++slot);
1938 // return actual key found
1940 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1943 if( slotptr(set->page, slot)->type == Duplicate )
1946 // not there if we reach the stopper key
1948 if( slot == set->page->cnt )
1949 if( !bt_getid (set->page->right) )
1952 // if key exists, return >= 0 value bytes copied
1953 // otherwise return (-1)
1955 if( slotptr(set->page, slot)->dead )
1959 if( !memcmp (ptr->key, key, len) ) {
1960 val = valptr (set->page,slot);
1961 if( valmax > val->len )
1963 memcpy (value, val->value, valmax);
1969 } while( slot = bt_findnext (bt, set, slot) );
1971 bt_unlockpage (bt, BtLockRead, set->latch);
1972 bt_unpinlatch (set->latch);
1976 // check page for space available,
1977 // clean if necessary and return
1978 // 0 - page needs splitting
1979 // >0 new slot value
1981 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1983 uint nxt = bt->mgr->page_size;
1984 BtPage page = set->page;
1985 uint cnt = 0, idx = 0;
1986 uint max = page->cnt;
1991 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1994 // skip cleanup and proceed to split
1995 // if there's not enough garbage
1998 if( page->garbage < nxt / 5 )
2001 memcpy (bt->frame, page, bt->mgr->page_size);
2003 // skip page info and set rest of page to zero
2005 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2006 set->latch->dirty = 1;
2010 // clean up page first by
2011 // removing deleted keys
2013 while( cnt++ < max ) {
2017 if( cnt < max || bt->frame->lvl )
2018 if( slotptr(bt->frame,cnt)->dead )
2021 // copy the value across
2023 val = valptr(bt->frame, cnt);
2024 nxt -= val->len + sizeof(BtVal);
2025 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2027 // copy the key across
2029 key = keyptr(bt->frame, cnt);
2030 nxt -= key->len + sizeof(BtKey);
2031 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2033 // make a librarian slot
2035 slotptr(page, ++idx)->off = nxt;
2036 slotptr(page, idx)->type = Librarian;
2037 slotptr(page, idx)->dead = 1;
2041 slotptr(page, ++idx)->off = nxt;
2042 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2044 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2051 // see if page has enough space now, or does it need splitting?
2053 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2059 // split the root and raise the height of the btree
2061 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
2063 unsigned char leftkey[BT_keyarray];
2064 uint nxt = bt->mgr->page_size;
2065 unsigned char value[BtId];
2071 // save left page fence key for new root
2073 ptr = keyptr(root->page, root->page->cnt);
2074 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2076 // Obtain an empty page to use, and copy the current
2077 // root contents into it, e.g. lower keys
2079 if( bt_newpage(bt, left, root->page) )
2082 left_page_no = left->latch->page_no;
2083 bt_unpinlatch (left->latch);
2085 // preserve the page info at the bottom
2086 // of higher keys and set rest to zero
2088 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2090 // insert stopper key at top of newroot page
2091 // and increase the root height
2093 nxt -= BtId + sizeof(BtVal);
2094 bt_putid (value, right->page_no);
2095 val = (BtVal *)((unsigned char *)root->page + nxt);
2096 memcpy (val->value, value, BtId);
2099 nxt -= 2 + sizeof(BtKey);
2100 slotptr(root->page, 2)->off = nxt;
2101 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2106 // insert lower keys page fence key on newroot page as first key
2108 nxt -= BtId + sizeof(BtVal);
2109 bt_putid (value, left_page_no);
2110 val = (BtVal *)((unsigned char *)root->page + nxt);
2111 memcpy (val->value, value, BtId);
2114 ptr = (BtKey *)leftkey;
2115 nxt -= ptr->len + sizeof(BtKey);
2116 slotptr(root->page, 1)->off = nxt;
2117 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2119 bt_putid(root->page->right, 0);
2120 root->page->min = nxt; // reset lowest used offset and key count
2121 root->page->cnt = 2;
2122 root->page->act = 2;
2125 // release and unpin root pages
2127 bt_unlockpage(bt, BtLockWrite, root->latch);
2128 bt_unpinlatch (root->latch);
2130 bt_unpinlatch (right);
2134 // split already locked full node
2136 // return pool entry for new right
2139 uint bt_splitpage (BtDb *bt, BtPageSet *set)
2141 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2142 uint lvl = set->page->lvl;
2149 // split higher half of keys to bt->frame
2151 memset (bt->frame, 0, bt->mgr->page_size);
2152 max = set->page->cnt;
2156 while( cnt++ < max ) {
2157 if( cnt < max || set->page->lvl )
2158 if( slotptr(set->page, cnt)->dead )
2161 src = valptr(set->page, cnt);
2162 nxt -= src->len + sizeof(BtVal);
2163 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
2165 key = keyptr(set->page, cnt);
2166 nxt -= key->len + sizeof(BtKey);
2167 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
2168 memcpy (ptr, key, key->len + sizeof(BtKey));
2170 // add librarian slot
2172 slotptr(bt->frame, ++idx)->off = nxt;
2173 slotptr(bt->frame, idx)->type = Librarian;
2174 slotptr(bt->frame, idx)->dead = 1;
2178 slotptr(bt->frame, ++idx)->off = nxt;
2179 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2181 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2185 bt->frame->bits = bt->mgr->page_bits;
2186 bt->frame->min = nxt;
2187 bt->frame->cnt = idx;
2188 bt->frame->lvl = lvl;
2192 if( set->latch->page_no > ROOT_page )
2193 bt_putid (bt->frame->right, bt_getid (set->page->right));
2195 // get new free page and write higher keys to it.
2197 if( bt_newpage(bt, right, bt->frame) )
2200 memcpy (bt->frame, set->page, bt->mgr->page_size);
2201 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2202 set->latch->dirty = 1;
2204 nxt = bt->mgr->page_size;
2205 set->page->garbage = 0;
2211 if( slotptr(bt->frame, max)->type == Librarian )
2214 // assemble page of smaller keys
2216 while( cnt++ < max ) {
2217 if( slotptr(bt->frame, cnt)->dead )
2219 val = valptr(bt->frame, cnt);
2220 nxt -= val->len + sizeof(BtVal);
2221 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2223 key = keyptr(bt->frame, cnt);
2224 nxt -= key->len + sizeof(BtKey);
2225 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2227 // add librarian slot
2229 slotptr(set->page, ++idx)->off = nxt;
2230 slotptr(set->page, idx)->type = Librarian;
2231 slotptr(set->page, idx)->dead = 1;
2235 slotptr(set->page, ++idx)->off = nxt;
2236 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2240 bt_putid(set->page->right, right->latch->page_no);
2241 set->page->min = nxt;
2242 set->page->cnt = idx;
2244 return right->latch->entry;
2247 // fix keys for newly split page
2248 // call with page locked, return
2251 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2253 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2254 unsigned char value[BtId];
2255 uint lvl = set->page->lvl;
2259 // if current page is the root page, split it
2261 if( set->latch->page_no == ROOT_page )
2262 return bt_splitroot (bt, set, right);
2264 ptr = keyptr(set->page, set->page->cnt);
2265 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2267 page = bt_mappage (bt, right);
2269 ptr = keyptr(page, page->cnt);
2270 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2272 // insert new fences in their parent pages
2274 bt_lockpage (bt, BtLockParent, right);
2276 bt_lockpage (bt, BtLockParent, set->latch);
2277 bt_unlockpage (bt, BtLockWrite, set->latch);
2279 // insert new fence for reformulated left block of smaller keys
2281 bt_putid (value, set->latch->page_no);
2282 ptr = (BtKey *)leftkey;
2284 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2287 // switch fence for right block of larger keys to new right page
2289 bt_putid (value, right->page_no);
2290 ptr = (BtKey *)rightkey;
2292 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2295 bt_unlockpage (bt, BtLockParent, set->latch);
2296 bt_unpinlatch (set->latch);
2298 bt_unlockpage (bt, BtLockParent, right);
2299 bt_unpinlatch (right);
2303 // install new key and value onto page
2304 // page must already be checked for
2307 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2309 uint idx, librarian;
2314 // if found slot > desired slot and previous slot
2315 // is a librarian slot, use it
2318 if( slotptr(set->page, slot-1)->type == Librarian )
2321 // copy value onto page
2323 set->page->min -= vallen + sizeof(BtVal);
2324 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2325 memcpy (val->value, value, vallen);
2328 // copy key onto page
2330 set->page->min -= keylen + sizeof(BtKey);
2331 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2332 memcpy (ptr->key, key, keylen);
2335 // find first empty slot
2337 for( idx = slot; idx < set->page->cnt; idx++ )
2338 if( slotptr(set->page, idx)->dead )
2341 // now insert key into array before slot
2343 if( idx == set->page->cnt )
2344 idx += 2, set->page->cnt += 2, librarian = 2;
2348 set->latch->dirty = 1;
2351 while( idx > slot + librarian - 1 )
2352 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2354 // add librarian slot
2356 if( librarian > 1 ) {
2357 node = slotptr(set->page, slot++);
2358 node->off = set->page->min;
2359 node->type = Librarian;
2365 node = slotptr(set->page, slot);
2366 node->off = set->page->min;
2371 bt_unlockpage (bt, BtLockWrite, set->latch);
2372 bt_unpinlatch (set->latch);
2378 // Insert new key into the btree at given level.
2379 // either add a new key or update/add an existing one
2381 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2383 unsigned char newkey[BT_keyarray];
2384 uint slot, idx, len, entry;
2391 // set up the key we're working on
2393 ins = (BtKey*)newkey;
2394 memcpy (ins->key, key, keylen);
2397 // is this a non-unique index value?
2403 sequence = bt_newdup (bt);
2404 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2408 while( 1 ) { // find the page and slot for the current key
2409 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2410 ptr = keyptr(set->page, slot);
2413 bt->err = BTERR_ovflw;
2417 // if librarian slot == found slot, advance to real slot
2419 if( slotptr(set->page, slot)->type == Librarian )
2420 if( !keycmp (ptr, key, keylen) )
2421 ptr = keyptr(set->page, ++slot);
2425 if( slotptr(set->page, slot)->type == Duplicate )
2428 // if inserting a duplicate key or unique key
2429 // check for adequate space on the page
2430 // and insert the new key before slot.
2432 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2433 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2434 if( !(entry = bt_splitpage (bt, set)) )
2436 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2441 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2444 // if key already exists, update value and return
2446 val = valptr(set->page, slot);
2448 if( val->len >= vallen ) {
2449 if( slotptr(set->page, slot)->dead )
2451 set->page->garbage += val->len - vallen;
2452 set->latch->dirty = 1;
2453 slotptr(set->page, slot)->dead = 0;
2455 memcpy (val->value, value, vallen);
2456 bt_unlockpage(bt, BtLockWrite, set->latch);
2457 bt_unpinlatch (set->latch);
2461 // new update value doesn't fit in existing value area
2463 if( !slotptr(set->page, slot)->dead )
2464 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2466 slotptr(set->page, slot)->dead = 0;
2470 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2471 if( !(entry = bt_splitpage (bt, set)) )
2473 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2478 set->page->min -= vallen + sizeof(BtVal);
2479 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2480 memcpy (val->value, value, vallen);
2483 set->latch->dirty = 1;
2484 set->page->min -= keylen + sizeof(BtKey);
2485 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2486 memcpy (ptr->key, key, keylen);
2489 slotptr(set->page, slot)->off = set->page->min;
2490 bt_unlockpage(bt, BtLockWrite, set->latch);
2491 bt_unpinlatch (set->latch);
2498 logseqno lsn; // redo log sequence number
2499 uint entry; // latch table entry number
2500 uint slot:31; // page slot number
2501 uint reuse:1; // reused previous page
2505 uid page_no; // page number for split leaf
2506 void *next; // next key to insert
2507 uint entry:29; // latch table entry number
2508 uint type:2; // 0 == insert, 1 == delete, 2 == free
2509 uint nounlock:1; // don't unlock ParentModification
2510 unsigned char leafkey[BT_keyarray];
2513 // determine actual page where key is located
2514 // return slot number
2516 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2518 BtKey *key = keyptr(source,src);
2519 uint slot = locks[src].slot;
2522 if( src > 1 && locks[src].reuse )
2523 entry = locks[src-1].entry, slot = 0;
2525 entry = locks[src].entry;
2528 set->latch = bt->mgr->latchsets + entry;
2529 set->page = bt_mappage (bt, set->latch);
2533 // is locks->reuse set? or was slot zeroed?
2534 // if so, find where our key is located
2535 // on current page or pages split on
2536 // same page txn operations.
2539 set->latch = bt->mgr->latchsets + entry;
2540 set->page = bt_mappage (bt, set->latch);
2542 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2543 if( slotptr(set->page, slot)->type == Librarian )
2545 if( locks[src].reuse )
2546 locks[src].entry = entry;
2549 } while( entry = set->latch->split );
2551 bt->err = BTERR_atomic;
2555 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2557 BtKey *key = keyptr(source, src);
2558 BtVal *val = valptr(source, src);
2563 while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2564 if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) ) {
2565 if( bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2567 set->page->lsn = locks[src].lsn;
2571 if( entry = bt_splitpage (bt, set) )
2572 latch = bt->mgr->latchsets + entry;
2576 // splice right page into split chain
2577 // and WriteLock it.
2579 bt_lockpage(bt, BtLockWrite, latch);
2580 latch->split = set->latch->split;
2581 set->latch->split = entry;
2582 locks[src].slot = 0;
2585 return bt->err = BTERR_atomic;
2588 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2590 BtKey *key = keyptr(source, src);
2591 uint idx, entry, slot;
2596 if( slot = bt_atomicpage (bt, source, locks, src, set) )
2597 ptr = keyptr(set->page, slot);
2599 return bt->err = BTERR_struct;
2601 if( !keycmp (ptr, key->key, key->len) )
2602 if( !slotptr(set->page, slot)->dead )
2603 slotptr(set->page, slot)->dead = 1;
2609 val = valptr(set->page, slot);
2610 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2611 set->page->lsn = locks[src].lsn;
2612 set->latch->dirty = 1;
2618 // delete an empty master page for a transaction
2620 // note that the far right page never empties because
2621 // it always contains (at least) the infinite stopper key
2622 // and that all pages that don't contain any keys are
2623 // deleted, or are being held under Atomic lock.
2625 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2627 BtPageSet right[1], temp[1];
2628 unsigned char value[BtId];
2632 bt_lockpage(bt, BtLockWrite, prev->latch);
2634 // grab the right sibling
2636 if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2637 right->page = bt_mappage (bt, right->latch);
2641 bt_lockpage(bt, BtLockAtomic, right->latch);
2642 bt_lockpage(bt, BtLockWrite, right->latch);
2644 // and pull contents over empty page
2645 // while preserving master's left link
2647 memcpy (right->page->left, prev->page->left, BtId);
2648 memcpy (prev->page, right->page, bt->mgr->page_size);
2650 // forward seekers to old right sibling
2651 // to new page location in set
2653 bt_putid (right->page->right, prev->latch->page_no);
2654 right->latch->dirty = 1;
2655 right->page->kill = 1;
2657 // remove pointer to right page for searchers by
2658 // changing right fence key to point to the master page
2660 ptr = keyptr(right->page,right->page->cnt);
2661 bt_putid (value, prev->latch->page_no);
2663 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2666 // now that master page is in good shape we can
2667 // remove its locks.
2669 bt_unlockpage (bt, BtLockAtomic, prev->latch);
2670 bt_unlockpage (bt, BtLockWrite, prev->latch);
2672 // fix master's right sibling's left pointer
2673 // to remove scanner's poiner to the right page
2675 if( right_page_no = bt_getid (prev->page->right) ) {
2676 if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
2677 temp->page = bt_mappage (bt, temp->latch);
2679 bt_lockpage (bt, BtLockWrite, temp->latch);
2680 bt_putid (temp->page->left, prev->latch->page_no);
2681 temp->latch->dirty = 1;
2683 bt_unlockpage (bt, BtLockWrite, temp->latch);
2684 bt_unpinlatch (temp->latch);
2685 } else { // master is now the far right page
2686 bt_spinwritelock (bt->mgr->lock);
2687 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2688 bt_spinreleasewrite(bt->mgr->lock);
2691 // now that there are no pointers to the right page
2692 // we can delete it after the last read access occurs
2694 bt_unlockpage (bt, BtLockWrite, right->latch);
2695 bt_unlockpage (bt, BtLockAtomic, right->latch);
2696 bt_lockpage (bt, BtLockDelete, right->latch);
2697 bt_lockpage (bt, BtLockWrite, right->latch);
2698 bt_freepage (bt, right);
2702 // atomic modification of a batch of keys.
2704 // return -1 if BTERR is set
2705 // otherwise return slot number
2706 // causing the key constraint violation
2707 // or zero on successful completion.
2709 int bt_atomictxn (BtDb *bt, BtPage source)
2711 uint src, idx, slot, samepage, entry;
2712 AtomicKey *head, *tail, *leaf;
2713 BtPageSet set[1], prev[1];
2714 unsigned char value[BtId];
2715 BtKey *key, *ptr, *key2;
2725 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2729 // stable sort the list of keys into order to
2730 // prevent deadlocks between threads.
2732 for( src = 1; src++ < source->cnt; ) {
2733 *temp = *slotptr(source,src);
2734 key = keyptr (source,src);
2736 for( idx = src; --idx; ) {
2737 key2 = keyptr (source,idx);
2738 if( keycmp (key, key2->key, key2->len) < 0 ) {
2739 *slotptr(source,idx+1) = *slotptr(source,idx);
2740 *slotptr(source,idx) = *temp;
2746 // Load the leaf page for each key
2747 // group same page references with reuse bit
2748 // and determine any constraint violations
2750 for( src = 0; src++ < source->cnt; ) {
2751 key = keyptr(source, src);
2754 // first determine if this modification falls
2755 // on the same page as the previous modification
2756 // note that the far right leaf page is a special case
2758 if( samepage = src > 1 )
2759 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2760 slot = bt_findslot(set->page, key->key, key->len);
2762 bt_unlockpage(bt, BtLockRead, set->latch);
2765 if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) )
2766 set->latch->split = 0;
2770 if( slotptr(set->page, slot)->type == Librarian )
2771 ptr = keyptr(set->page, ++slot);
2773 ptr = keyptr(set->page, slot);
2776 locks[src].entry = set->latch->entry;
2777 locks[src].slot = slot;
2778 locks[src].reuse = 0;
2780 locks[src].entry = 0;
2781 locks[src].slot = 0;
2782 locks[src].reuse = 1;
2785 switch( slotptr(source, src)->type ) {
2788 if( !slotptr(set->page, slot)->dead )
2789 if( slot < set->page->cnt || bt_getid (set->page->right) )
2790 if( !keycmp (ptr, key->key, key->len) ) {
2792 // return constraint violation if key already exists
2794 bt_unlockpage(bt, BtLockRead, set->latch);
2798 if( locks[src].entry ) {
2799 set->latch = bt->mgr->latchsets + locks[src].entry;
2800 bt_unlockpage(bt, BtLockAtomic, set->latch);
2801 bt_unpinlatch (set->latch);
2812 // unlock last loadpage lock
2815 bt_unlockpage(bt, BtLockRead, set->latch);
2817 // and add entries to redo log
2819 for( src = 0; src++ < source->cnt; ) {
2820 key = keyptr(source, src);
2821 val = valptr(source, src);
2822 switch( slotptr(source, src)->type ) {
2837 if( locks[src].lsn = bt_newredo (bt, type, 0, key, val) )
2843 // obtain write lock for each master page
2845 for( src = 0; src++ < source->cnt; ) {
2846 if( locks[src].reuse )
2849 bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2852 // insert or delete each key
2853 // process any splits or merges
2854 // release Write & Atomic latches
2855 // set ParentModifications and build
2856 // queue of keys to insert for split pages
2857 // or delete for deleted pages.
2859 // run through txn list backwards
2861 samepage = source->cnt + 1;
2863 for( src = source->cnt; src; src-- ) {
2864 if( locks[src].reuse )
2867 // perform the txn operations
2868 // from smaller to larger on
2871 for( idx = src; idx < samepage; idx++ )
2872 switch( slotptr(source,idx)->type ) {
2874 if( bt_atomicdelete (bt, source, locks, idx) )
2880 if( bt_atomicinsert (bt, source, locks, idx) )
2885 // after the same page operations have finished,
2886 // process master page for splits or deletion.
2888 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2889 prev->page = bt_mappage (bt, prev->latch);
2892 // pick-up all splits from master page
2893 // each one is already WriteLocked.
2895 entry = prev->latch->split;
2898 set->latch = bt->mgr->latchsets + entry;
2899 set->page = bt_mappage (bt, set->latch);
2900 entry = set->latch->split;
2902 // delete empty master page by undoing its split
2903 // (this is potentially another empty page)
2904 // note that there are no new left pointers yet
2906 if( !prev->page->act ) {
2907 memcpy (set->page->left, prev->page->left, BtId);
2908 memcpy (prev->page, set->page, bt->mgr->page_size);
2909 bt_lockpage (bt, BtLockDelete, set->latch);
2910 bt_freepage (bt, set);
2912 prev->latch->split = set->latch->split;
2913 prev->latch->dirty = 1;
2917 // remove empty page from the split chain
2918 // and return it to the free list.
2920 if( !set->page->act ) {
2921 memcpy (prev->page->right, set->page->right, BtId);
2922 prev->latch->split = set->latch->split;
2923 bt_lockpage (bt, BtLockDelete, set->latch);
2924 bt_freepage (bt, set);
2928 // schedule prev fence key update
2930 ptr = keyptr(prev->page,prev->page->cnt);
2931 leaf = calloc (sizeof(AtomicKey), 1);
2933 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2934 leaf->page_no = prev->latch->page_no;
2935 leaf->entry = prev->latch->entry;
2945 // splice in the left link into the split page
2947 bt_putid (set->page->left, prev->latch->page_no);
2948 bt_lockpage(bt, BtLockParent, prev->latch);
2949 bt_unlockpage(bt, BtLockWrite, prev->latch);
2953 // update left pointer in next right page from last split page
2954 // (if all splits were reversed, latch->split == 0)
2956 if( latch->split ) {
2957 // fix left pointer in master's original (now split)
2958 // far right sibling or set rightmost page in page zero
2960 if( right = bt_getid (prev->page->right) ) {
2961 if( set->latch = bt_pinlatch (bt, right, 1) )
2962 set->page = bt_mappage (bt, set->latch);
2966 bt_lockpage (bt, BtLockWrite, set->latch);
2967 bt_putid (set->page->left, prev->latch->page_no);
2968 set->latch->dirty = 1;
2969 bt_unlockpage (bt, BtLockWrite, set->latch);
2970 bt_unpinlatch (set->latch);
2971 } else { // prev is rightmost page
2972 bt_spinwritelock (bt->mgr->lock);
2973 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2974 bt_spinreleasewrite(bt->mgr->lock);
2977 // Process last page split in chain
2979 ptr = keyptr(prev->page,prev->page->cnt);
2980 leaf = calloc (sizeof(AtomicKey), 1);
2982 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2983 leaf->page_no = prev->latch->page_no;
2984 leaf->entry = prev->latch->entry;
2994 bt_lockpage(bt, BtLockParent, prev->latch);
2995 bt_unlockpage(bt, BtLockWrite, prev->latch);
2997 // remove atomic lock on master page
2999 bt_unlockpage(bt, BtLockAtomic, latch);
3003 // finished if prev page occupied (either master or final split)
3005 if( prev->page->act ) {
3006 bt_unlockpage(bt, BtLockWrite, latch);
3007 bt_unlockpage(bt, BtLockAtomic, latch);
3008 bt_unpinlatch(latch);
3012 // any and all splits were reversed, and the
3013 // master page located in prev is empty, delete it
3014 // by pulling over master's right sibling.
3016 // Remove empty master's fence key
3018 ptr = keyptr(prev->page,prev->page->cnt);
3020 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3023 // perform the remainder of the delete
3024 // from the FIFO queue
3026 leaf = calloc (sizeof(AtomicKey), 1);
3028 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3029 leaf->page_no = prev->latch->page_no;
3030 leaf->entry = prev->latch->entry;
3041 // leave atomic lock in place until
3042 // deletion completes in next phase.
3044 bt_unlockpage(bt, BtLockWrite, prev->latch);
3047 // add & delete keys for any pages split or merged during transaction
3051 set->latch = bt->mgr->latchsets + leaf->entry;
3052 set->page = bt_mappage (bt, set->latch);
3054 bt_putid (value, leaf->page_no);
3055 ptr = (BtKey *)leaf->leafkey;
3057 switch( leaf->type ) {
3058 case 0: // insert key
3059 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
3064 case 1: // delete key
3065 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3070 case 2: // free page
3071 if( bt_atomicfree (bt, set) )
3077 if( !leaf->nounlock )
3078 bt_unlockpage (bt, BtLockParent, set->latch);
3080 bt_unpinlatch (set->latch);
3083 } while( leaf = tail );
3093 // set cursor to highest slot on highest page
3095 uint bt_lastkey (BtDb *bt)
3097 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3100 if( set->latch = bt_pinlatch (bt, page_no, 1) )
3101 set->page = bt_mappage (bt, set->latch);
3105 bt_lockpage(bt, BtLockRead, set->latch);
3106 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3107 bt_unlockpage(bt, BtLockRead, set->latch);
3108 bt_unpinlatch (set->latch);
3110 bt->cursor_page = page_no;
3111 return bt->cursor->cnt;
3114 // return previous slot on cursor page
3116 uint bt_prevkey (BtDb *bt, uint slot)
3118 uid ourright, next, us = bt->cursor_page;
3124 ourright = bt_getid(bt->cursor->right);
3127 if( !(next = bt_getid(bt->cursor->left)) )
3131 bt->cursor_page = next;
3133 if( set->latch = bt_pinlatch (bt, next, 1) )
3134 set->page = bt_mappage (bt, set->latch);
3138 bt_lockpage(bt, BtLockRead, set->latch);
3139 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3140 bt_unlockpage(bt, BtLockRead, set->latch);
3141 bt_unpinlatch (set->latch);
3143 next = bt_getid (bt->cursor->right);
3145 if( bt->cursor->kill )
3149 if( next == ourright )
3154 return bt->cursor->cnt;
3157 // return next slot on cursor page
3158 // or slide cursor right into next page
3160 uint bt_nextkey (BtDb *bt, uint slot)
3166 right = bt_getid(bt->cursor->right);
3168 while( slot++ < bt->cursor->cnt )
3169 if( slotptr(bt->cursor,slot)->dead )
3171 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3179 bt->cursor_page = right;
3181 if( set->latch = bt_pinlatch (bt, right, 1) )
3182 set->page = bt_mappage (bt, set->latch);
3186 bt_lockpage(bt, BtLockRead, set->latch);
3188 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3190 bt_unlockpage(bt, BtLockRead, set->latch);
3191 bt_unpinlatch (set->latch);
3199 // cache page of keys into cursor and return starting slot for given key
3201 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3206 // cache page for retrieval
3208 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
3209 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3213 bt->cursor_page = set->latch->page_no;
3215 bt_unlockpage(bt, BtLockRead, set->latch);
3216 bt_unpinlatch (set->latch);
3220 BtKey *bt_key(BtDb *bt, uint slot)
3222 return keyptr(bt->cursor, slot);
3225 BtVal *bt_val(BtDb *bt, uint slot)
3227 return valptr(bt->cursor,slot);
3233 double getCpuTime(int type)
3236 FILETIME xittime[1];
3237 FILETIME systime[1];
3238 FILETIME usrtime[1];
3239 SYSTEMTIME timeconv[1];
3242 memset (timeconv, 0, sizeof(SYSTEMTIME));
3246 GetSystemTimeAsFileTime (xittime);
3247 FileTimeToSystemTime (xittime, timeconv);
3248 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3251 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3252 FileTimeToSystemTime (usrtime, timeconv);
3255 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3256 FileTimeToSystemTime (systime, timeconv);
3260 ans += (double)timeconv->wHour * 3600;
3261 ans += (double)timeconv->wMinute * 60;
3262 ans += (double)timeconv->wSecond;
3263 ans += (double)timeconv->wMilliseconds / 1000;
3268 #include <sys/resource.h>
3270 double getCpuTime(int type)
3272 struct rusage used[1];
3273 struct timeval tv[1];
3277 gettimeofday(tv, NULL);
3278 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3281 getrusage(RUSAGE_SELF, used);
3282 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3285 getrusage(RUSAGE_SELF, used);
3286 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3293 void bt_poolaudit (BtMgr *mgr)
3298 while( slot++ < mgr->latchdeployed ) {
3299 latch = mgr->latchsets + slot;
3301 if( *latch->readwr->rin & MASK )
3302 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3303 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3305 if( *latch->access->rin & MASK )
3306 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3307 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3309 if( *latch->parent->ticket != *latch->parent->serving )
3310 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3311 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3313 if( latch->pin & ~CLOCK_bit ) {
3314 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3320 uint bt_latchaudit (BtDb *bt)
3322 ushort idx, hashidx;
3328 if( *(ushort *)(bt->mgr->lock) )
3329 fprintf(stderr, "Alloc page locked\n");
3330 *(ushort *)(bt->mgr->lock) = 0;
3332 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3333 latch = bt->mgr->latchsets + idx;
3334 if( *latch->readwr->rin & MASK )
3335 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3336 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3338 if( *latch->access->rin & MASK )
3339 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3340 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3342 if( *latch->parent->ticket != *latch->parent->serving )
3343 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3344 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3347 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3352 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3353 if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3354 fprintf(stderr, "hash entry %d locked\n", hashidx);
3356 *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3358 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3359 latch = bt->mgr->latchsets + idx;
3361 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3362 } while( idx = latch->next );
3365 page_no = LEAF_page;
3367 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3368 uid off = page_no << bt->mgr->page_bits;
3370 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3374 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3376 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3377 return bt->err = BTERR_map;
3379 if( *amt < bt->mgr->page_size )
3380 return bt->err = BTERR_map;
3382 if( !bt->frame->free && !bt->frame->lvl )
3383 cnt += bt->frame->act;
3387 cnt--; // remove stopper key
3388 fprintf(stderr, " Total keys read %d\n", cnt);
3402 // standalone program to index file of keys
3403 // then list them onto std-out
3406 void *index_file (void *arg)
3408 uint __stdcall index_file (void *arg)
3411 int line = 0, found = 0, cnt = 0, idx;
3412 uid next, page_no = LEAF_page; // start on first page of leaves
3413 int ch, len = 0, slot, type = 0;
3414 unsigned char key[BT_maxkey];
3415 unsigned char txn[65536];
3416 ThreadArg *args = arg;
3425 bt = bt_open (args->mgr);
3428 if( args->idx < strlen (args->type) )
3429 ch = args->type[args->idx];
3431 ch = args->type[strlen(args->type) - 1];
3436 fprintf(stderr, "started latch mgr audit\n");
3437 cnt = bt_latchaudit (bt);
3438 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3449 if( type == Delete )
3450 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3452 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3454 if( type == Delete )
3455 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3457 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3459 if( in = fopen (args->infile, "rb") )
3460 while( ch = getc(in), ch != EOF )
3466 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3467 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3473 memcpy (txn + nxt, key + 10, len - 10);
3475 txn[nxt] = len - 10;
3477 memcpy (txn + nxt, key, 10);
3480 slotptr(page,++cnt)->off = nxt;
3481 slotptr(page,cnt)->type = type;
3484 if( cnt < args->num )
3491 if( bt_atomictxn (bt, page) )
3492 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3497 else if( len < BT_maxkey )
3499 fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
3503 fprintf(stderr, "started indexing for %s\n", args->infile);
3504 if( in = fopen (args->infile, "r") )
3505 while( ch = getc(in), ch != EOF )
3510 if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3511 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3514 else if( len < BT_maxkey )
3516 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3520 fprintf(stderr, "started finding keys for %s\n", args->infile);
3521 if( in = fopen (args->infile, "rb") )
3522 while( ch = getc(in), ch != EOF )
3526 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3529 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3532 else if( len < BT_maxkey )
3534 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3538 fprintf(stderr, "started scanning\n");
3540 if( set->latch = bt_pinlatch (bt, page_no, 1) )
3541 set->page = bt_mappage (bt, set->latch);
3543 fprintf(stderr, "unable to obtain latch"), exit(1);
3544 bt_lockpage (bt, BtLockRead, set->latch);
3545 next = bt_getid (set->page->right);
3547 for( slot = 0; slot++ < set->page->cnt; )
3548 if( next || slot < set->page->cnt )
3549 if( !slotptr(set->page, slot)->dead ) {
3550 ptr = keyptr(set->page, slot);
3553 if( slotptr(set->page, slot)->type == Duplicate )
3556 fwrite (ptr->key, len, 1, stdout);
3557 val = valptr(set->page, slot);
3558 fwrite (val->value, val->len, 1, stdout);
3559 fputc ('\n', stdout);
3563 bt_unlockpage (bt, BtLockRead, set->latch);
3564 bt_unpinlatch (set->latch);
3565 } while( page_no = next );
3567 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3571 fprintf(stderr, "started reverse scan\n");
3572 if( slot = bt_lastkey (bt) )
3573 while( slot = bt_prevkey (bt, slot) ) {
3574 if( slotptr(bt->cursor, slot)->dead )
3577 ptr = keyptr(bt->cursor, slot);
3580 if( slotptr(bt->cursor, slot)->type == Duplicate )
3583 fwrite (ptr->key, len, 1, stdout);
3584 val = valptr(bt->cursor, slot);
3585 fwrite (val->value, val->len, 1, stdout);
3586 fputc ('\n', stdout);
3590 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3595 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3597 fprintf(stderr, "started counting\n");
3598 page_no = LEAF_page;
3600 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3601 if( bt_readpage (bt->mgr, bt->frame, page_no) )
3604 if( !bt->frame->free && !bt->frame->lvl )
3605 cnt += bt->frame->act;
3611 cnt--; // remove stopper key
3612 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3624 typedef struct timeval timer;
3626 int main (int argc, char **argv)
3628 int idx, cnt, len, slot, err;
3629 int segsize, bits = 16;
3647 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size recovery_pages src_file1 src_file2 ... ]\n", argv[0]);
3648 fprintf (stderr, " where idx_file is the name of the btree file\n");
3649 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3650 fprintf (stderr, " page_bits is the page size in bits\n");
3651 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
3652 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3653 fprintf (stderr, " recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3654 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3658 start = getCpuTime(0);
3661 bits = atoi(argv[3]);
3664 poolsize = atoi(argv[4]);
3667 fprintf (stderr, "Warning: no mapped_pool\n");
3670 num = atoi(argv[5]);
3673 recovery = atoi(argv[6]);
3677 threads = malloc (cnt * sizeof(pthread_t));
3679 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3681 args = malloc (cnt * sizeof(ThreadArg));
3683 mgr = bt_mgr ((argv[1]), bits, poolsize, recovery);
3686 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3692 for( idx = 0; idx < cnt; idx++ ) {
3693 args[idx].infile = argv[idx + 7];
3694 args[idx].type = argv[2];
3695 args[idx].mgr = mgr;
3696 args[idx].num = num;
3697 args[idx].idx = idx;
3699 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3700 fprintf(stderr, "Error creating thread %d\n", err);
3702 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3706 // wait for termination
3709 for( idx = 0; idx < cnt; idx++ )
3710 pthread_join (threads[idx], NULL);
3712 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3714 for( idx = 0; idx < cnt; idx++ )
3715 CloseHandle(threads[idx]);
3721 elapsed = getCpuTime(0) - start;
3722 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3723 elapsed = getCpuTime(1);
3724 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3725 elapsed = getCpuTime(2);
3726 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);