1 // btree version threadskv9a sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // traditional buffer pool manager
8 // ACID batched key-value updates
9 // and redo log for failure recovery
13 // author: karl malbrain, malbrain@cal.berkeley.edu
16 This work, including the source code, documentation
17 and related data, is placed into the public domain.
19 The orginal author is Karl Malbrain.
21 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
22 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
23 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
24 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
25 RESULTING FROM THE USE, MODIFICATION, OR
26 REDISTRIBUTION OF THIS SOFTWARE.
29 // Please see the project home page for documentation
30 // code.google.com/p/high-concurrency-btree
32 #define _FILE_OFFSET_BITS 64
33 #define _LARGEFILE64_SOURCE
37 #include <linux/futex.h>
52 #define WIN32_LEAN_AND_MEAN
66 typedef unsigned long long uid;
67 typedef unsigned long long logseqno;
70 typedef unsigned long long off64_t;
71 typedef unsigned short ushort;
72 typedef unsigned int uint;
75 #define BT_ro 0x6f72 // ro
76 #define BT_rw 0x7772 // rw
78 #define BT_maxbits 24 // maximum page size in bits
79 #define BT_minbits 9 // minimum page size in bits
80 #define BT_minpage (1 << BT_minbits) // minimum page size
81 #define BT_maxpage (1 << BT_maxbits) // maximum page size
83 // BTree page number constants
84 #define ALLOC_page 0 // allocation page
85 #define ROOT_page 1 // root of the btree
86 #define LEAF_page 2 // first page of leaves
87 #define REDO_page 3 // first page of redo buffer
89 // Number of levels to create in a new BTree
94 There are six lock types for each node in four independent sets:
95 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
96 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
97 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
98 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
99 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
100 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification.
112 // definition for phase-fair reader/writer lock implementation
115 volatile ushort rin[1];
116 volatile ushort rout[1];
117 volatile ushort ticket[1];
118 volatile ushort serving[1];
123 // write only queue lock
126 volatile ushort ticket[1];
127 volatile ushort serving[1];
139 // exclusive is set for write access
141 volatile typedef struct {
142 unsigned char exclusive[1];
143 unsigned char filler;
152 uint xlock:1; // owner has exclusive lock
154 uint wrt:15; // count of writers waiting
161 // lite weight spin latch
166 uint xlock:1; // writer has exclusive lock
167 uint share:15; // count of readers with lock
168 uint read:1; // readers are waiting
169 uint wrt:15; // count of writers waiting
176 // mode & definition for lite latch implementation
179 QueRd = 1, // reader queue
180 QueWr = 2 // writer queue
185 #define READ (SHARE * 32768)
186 #define WRT (READ * 2)
188 // hash table entries
191 volatile uint slot; // Latch table entry at head of chain
192 BtMutexLatch latch[1];
195 // latch manager table structure
198 uid page_no; // latch set page number
199 RWLock readwr[1]; // read/write page lock
200 RWLock access[1]; // Access Intent/Page delete
201 WOLock parent[1]; // Posting of fence key in parent
202 WOLock atomic[1]; // Atomic update in progress
203 uint split; // right split page atomic insert
204 uint entry; // entry slot in latch table
205 uint next; // next entry in hash table chain
206 uint prev; // prev entry in hash table chain
207 ushort pin[1]; // number of outstanding threads
208 ushort dirty:1; // page in cache is dirty
211 // Define the length of the page record numbers
215 // Page key slot definition.
217 // Keys are marked dead, but remain on the page until
218 // it cleanup is called. The fence key (highest key) for
219 // a leaf page is always present, even after cleanup.
223 // In addition to the Unique keys that occupy slots
224 // there are Librarian and Duplicate key
225 // slots occupying the key slot array.
227 // The Librarian slots are dead keys that
228 // serve as filler, available to add new Unique
229 // or Dup slots that are inserted into the B-tree.
231 // The Duplicate slots have had their key bytes extended
232 // by 6 bytes to contain a binary duplicate key uniqueifier.
243 uint off:BT_maxbits; // page offset for key start
244 uint type:3; // type of slot
245 uint dead:1; // set for deleted slot
248 // The key structure occupies space at the upper end of
249 // each page. It's a length byte followed by the key
253 unsigned char len; // this can be changed to a ushort or uint
254 unsigned char key[0];
257 // the value structure also occupies space at the upper
258 // end of the page. Each key is immediately followed by a value.
261 unsigned char len; // this can be changed to a ushort or uint
262 unsigned char value[0];
265 #define BT_maxkey 255 // maximum number of bytes in a key
266 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
268 // The first part of an index page.
269 // It is immediately followed
270 // by the BtSlot array of keys.
272 // note that this structure size
273 // must be a multiple of 8 bytes
274 // in order to place dups correctly.
276 typedef struct BtPage_ {
277 uint cnt; // count of keys in page
278 uint act; // count of active keys
279 uint min; // next key offset
280 uint garbage; // page garbage in bytes
281 unsigned char bits:7; // page size in bits
282 unsigned char free:1; // page is on free chain
283 unsigned char lvl:7; // level of page
284 unsigned char kill:1; // page is being deleted
285 unsigned char right[BtId]; // page number to right
286 unsigned char left[BtId]; // page number to left
287 unsigned char filler[2]; // padding to multiple of 8
288 logseqno lsn; // log sequence number applied
291 // The loadpage interface object
294 BtPage page; // current page pointer
295 BtLatchSet *latch; // current page latch set
298 // structure for latch manager on ALLOC_page
301 struct BtPage_ alloc[1]; // next page_no in right ptr
302 unsigned long long dups[1]; // global duplicate key uniqueifier
303 unsigned char chain[BtId]; // head of free page_nos chain
306 // The object structure for Btree access
309 uint page_size; // page size
310 uint page_bits; // page size in bits
316 BtPageZero *pagezero; // mapped allocation page
317 BtHashEntry *hashtable; // the buffer pool hash table entries
318 BtLatchSet *latchsets; // mapped latch set from buffer pool
319 unsigned char *pagepool; // mapped to the buffer pool pages
320 unsigned char *redobuff; // mapped recovery buffer pointer
321 logseqno lsn, flushlsn; // current & first lsn flushed
322 BtMutexLatch redo[1]; // redo area lite latch
323 BtMutexLatch lock[1]; // allocation area lite latch
324 BtMutexLatch maps[1]; // mapping segments lite latch
325 ushort thread_no[1]; // next thread number
326 uint latchdeployed; // highest number of latch entries deployed
327 uint nlatchpage; // number of latch pages at BT_latch
328 uint latchtotal; // number of page latch entries
329 uint latchhash; // number of latch hash table slots
330 uint latchvictim; // next latch entry to examine
331 uint redopages; // size of recovery buff in pages
332 uint redolast; // last msync size of recovery buff
333 uint redoend; // eof/end element in recovery buff
335 HANDLE halloc; // allocation handle
336 HANDLE hpool; // buffer pool handle
338 uint segments; // number of memory mapped segments
339 unsigned char *pages[64000];// memory mapped segments of b-tree
343 BtMgr *mgr; // buffer manager for thread
344 BtPage cursor; // cached frame for start/next (never mapped)
345 BtPage frame; // spare frame for the page split (never mapped)
346 uid cursor_page; // current cursor page number
347 unsigned char *mem; // frame, cursor, page memory buffer
348 unsigned char key[BT_keyarray]; // last found complete key
349 int found; // last delete or insert was found
350 int err; // last error
351 int line; // last error line no
352 int reads, writes; // number of reads and writes from the btree
353 ushort thread_no; // thread number
356 // Catastrophic errors
370 #define CLOCK_bit 0x8000
372 // recovery manager entry types
375 BTRM_eof = 0, // rest of buffer is emtpy
376 BTRM_add, // add a unique key-value to btree
377 BTRM_dup, // add a duplicate key-value to btree
378 BTRM_del, // delete a key-value from btree
379 BTRM_upd, // update a key with a new value
380 BTRM_new, // allocate a new empty page
381 BTRM_old // reuse an old empty page
384 // recovery manager entry
385 // structure followed by BtKey & BtVal
388 logseqno reqlsn; // log sequence number required
389 logseqno lsn; // log sequence number for entry
390 uint len; // length of entry
391 unsigned char type; // type of entry
392 unsigned char lvl; // level of btree entry pertains to
397 extern void bt_close (BtDb *bt);
398 extern BtDb *bt_open (BtMgr *mgr);
399 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
400 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
401 extern void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
402 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
403 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
404 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
405 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
406 extern BtKey *bt_foundkey (BtDb *bt);
407 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
408 extern uint bt_nextkey (BtDb *bt, uint slot);
411 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint rmpages);
412 extern void bt_mgrclose (BtMgr *mgr);
413 extern logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val);
414 extern logseqno bt_txnredo (BtDb *bt, BtPage page);
416 // Helper functions to return slot values
417 // from the cursor page.
419 extern BtKey *bt_key (BtDb *bt, uint slot);
420 extern BtVal *bt_val (BtDb *bt, uint slot);
422 // The page is allocated from low and hi ends.
423 // The key slots are allocated from the bottom,
424 // while the text and value of the key
425 // are allocated from the top. When the two
426 // areas meet, the page is split into two.
428 // A key consists of a length byte, two bytes of
429 // index number (0 - 65535), and up to 253 bytes
432 // Associated with each key is a value byte string
433 // containing any value desired.
435 // The b-tree root is always located at page 1.
436 // The first leaf page of level zero is always
437 // located on page 2.
439 // The b-tree pages are linked with next
440 // pointers to facilitate enumerators,
441 // and provide for concurrency.
443 // When to root page fills, it is split in two and
444 // the tree height is raised by a new root at page
445 // one with two keys.
447 // Deleted keys are marked with a dead bit until
448 // page cleanup. The fence key for a leaf node is
451 // To achieve maximum concurrency one page is locked at a time
452 // as the tree is traversed to find leaf key in question. The right
453 // page numbers are used in cases where the page is being split,
456 // Page 0 is dedicated to lock for new page extensions,
457 // and chains empty pages together for reuse. It also
458 // contains the latch manager hash table.
460 // The ParentModification lock on a node is obtained to serialize posting
461 // or changing the fence key for a node.
463 // Empty pages are chained together through the ALLOC page and reused.
465 // Access macros to address slot and key values from the page
466 // Page slots use 1 based indexing.
468 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
469 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
470 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
472 void bt_putid(unsigned char *dest, uid id)
477 dest[i] = (unsigned char)id, id >>= 8;
480 uid bt_getid(unsigned char *src)
485 for( i = 0; i < BtId; i++ )
486 id <<= 8, id |= *src++;
491 uid bt_newdup (BtDb *bt)
494 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
496 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
500 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
502 return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
505 // Write-Only Queue Lock
507 void WriteOLock (WOLock *lock, ushort tid)
511 if( lock->tid == tid ) {
516 tix = __sync_fetch_and_add (lock->ticket, 1);
518 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
520 // wait for our ticket to come up
522 while( tix != lock->serving[0] )
531 void WriteORelease (WOLock *lock)
542 // Phase-Fair reader/writer lock implementation
544 void WriteLock (RWLock *lock, ushort tid)
548 if( lock->tid == tid ) {
553 tix = __sync_fetch_and_add (lock->ticket, 1);
555 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
557 // wait for our ticket to come up
559 while( tix != lock->serving[0] )
566 w = PRES | (tix & PHID);
568 r = __sync_fetch_and_add (lock->rin, w);
570 r = _InterlockedExchangeAdd16 (lock->rin, w);
572 while( r != *lock->rout )
581 void WriteRelease (RWLock *lock)
590 __sync_fetch_and_and (lock->rin, ~MASK);
592 _InterlockedAnd16 (lock->rin, ~MASK);
597 // try to obtain read lock
598 // return 1 if successful
600 int ReadTry (RWLock *lock, ushort tid)
604 // OK if write lock already held by same thread
606 if( lock->tid == tid ) {
611 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
613 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
616 if( w == (*lock->rin & MASK) ) {
618 __sync_fetch_and_add (lock->rin, -RINC);
620 _InterlockedExchangeAdd16 (lock->rin, -RINC);
628 void ReadLock (RWLock *lock, ushort tid)
631 if( lock->tid == tid ) {
636 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
638 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
641 while( w == (*lock->rin & MASK) )
649 void ReadRelease (RWLock *lock)
657 __sync_fetch_and_add (lock->rout, RINC);
659 _InterlockedExchangeAdd16 (lock->rout, RINC);
663 // lite weight spin lock Latch Manager
665 void bt_spinmutexlock(BtMutexLatch *latch)
671 prev = __sync_fetch_and_or(latch->exclusive, XCL);
673 prev = _InterlockedOr8(latch->exclusive, XCL);
678 } while( sched_yield(), 1 );
680 } while( SwitchToThread(), 1 );
684 // try to obtain write lock
686 // return 1 if obtained,
689 int bt_spinmutextry(BtMutexLatch *latch)
694 prev = __sync_fetch_and_or(latch->exclusive, XCL);
696 prev = _InterlockedOr8(latch->exclusive, XCL);
698 // take write access if all bits are clear
700 return !(prev & XCL);
705 void bt_spinreleasemutex(BtMutexLatch *latch)
707 *latch->exclusive = 0;
710 // recovery manager -- flush dirty pages
712 void bt_flushlsn (BtDb *bt)
714 uint cnt2 = 0, cnt = 0;
719 // flush dirty pool pages to the btree
721 fprintf(stderr, "Start flushlsn\n");
722 for( entry = 1; entry < bt->mgr->latchtotal; entry++ ) {
723 page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
724 latch = bt->mgr->latchsets + entry;
725 bt_lockpage(bt, BtLockRead, latch);
728 bt_writepage(bt->mgr, page, latch->page_no);
729 latch->dirty = 0, cnt++;
731 if( *latch->pin & ~CLOCK_bit )
733 bt_unlockpage(BtLockRead, latch);
735 fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
736 fprintf(stderr, "begin sync");
737 sync_file_range (bt->mgr->idx, 0, 0, SYNC_FILE_RANGE_WAIT_AFTER);
738 fprintf(stderr, " end sync\n");
741 // recovery manager -- process current recovery buff on startup
742 // this won't do much if previous session was properly closed.
744 BTERR bt_recoveryredo (BtMgr *mgr)
751 hdr = (BtLogHdr *)mgr->redobuff;
752 mgr->flushlsn = hdr->lsn;
755 hdr = (BtLogHdr *)(mgr->redobuff + offset);
756 switch( hdr->type ) {
760 case BTRM_add: // add a unique key-value to btree
762 case BTRM_dup: // add a duplicate key-value to btree
763 case BTRM_del: // delete a key-value from btree
764 case BTRM_upd: // update a key with a new value
765 case BTRM_new: // allocate a new empty page
766 case BTRM_old: // reuse an old empty page
772 // recovery manager -- append new entry to recovery log
773 // flush to disk when it overflows.
775 logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val)
777 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
778 uint amt = sizeof(BtLogHdr);
782 bt_spinmutexlock (bt->mgr->redo);
785 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
787 // see if new entry fits in buffer
788 // flush and reset if it doesn't
789 // fix this for circular buffer
791 if( amt > size - bt->mgr->redoend ) {
792 bt->mgr->flushlsn = bt->mgr->lsn;
793 msync (bt->mgr->redobuff + (bt->mgr->redolast & 0xfff), bt->mgr->redoend - bt->mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
794 bt->mgr->redolast = 0;
795 bt->mgr->redoend = 0;
799 // fill in new entry & either eof or end block
801 hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
806 hdr->lsn = ++bt->mgr->lsn;
808 bt->mgr->redoend += amt;
810 eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
811 memset (eof, 0, sizeof(BtLogHdr));
812 eof->lsn = bt->mgr->lsn;
814 // fill in key and value
817 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
818 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
821 last = bt->mgr->redolast & 0xfff;
822 end = bt->mgr->redoend;
823 bt->mgr->redolast = end;
825 bt_spinreleasemutex(bt->mgr->redo);
827 msync (bt->mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
831 // recovery manager -- append transaction to recovery log
832 // flush to disk when it overflows.
834 logseqno bt_txnredo (BtDb *bt, BtPage source)
836 uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
837 uint amt = 0, src, type;
844 // determine amount of redo recovery log space required
846 for( src = 0; src++ < source->cnt; ) {
847 key = keyptr(source,src);
848 val = valptr(source,src);
849 amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
850 amt += sizeof(BtLogHdr);
853 bt_spinmutexlock (bt->mgr->redo);
855 // see if new entry fits in buffer
856 // flush and reset if it doesn't
857 // fix this for circular buffer
859 if( amt > size - bt->mgr->redoend ) {
860 bt->mgr->flushlsn = bt->mgr->lsn;
861 msync (bt->mgr->redobuff + (bt->mgr->redolast & 0xfff), bt->mgr->redoend - bt->mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
862 bt->mgr->redolast = 0;
864 bt->mgr->redoend = 0;
867 // assign new lsn to transaction
869 lsn = ++bt->mgr->lsn;
871 // fill in new entries
873 for( src = 0; src++ < source->cnt; ) {
874 key = keyptr(source, src);
875 val = valptr(source, src);
877 switch( slotptr(source, src)->type ) {
892 amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
893 amt += sizeof(BtLogHdr);
895 hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
901 // fill in key and value
903 memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
904 memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
906 bt->mgr->redoend += amt;
909 eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
910 memset (eof, 0, sizeof(BtLogHdr));
913 end = bt->mgr->redoend;
914 last = bt->mgr->redolast & 0xfff;
915 end = bt->mgr->redoend;
916 bt->mgr->redolast = end;
917 bt_spinreleasemutex(bt->mgr->redo);
919 msync (bt->mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
923 // read page into buffer pool from permanent location in Btree file
925 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
927 int flag = PROT_READ | PROT_WRITE;
928 uint segment = page_no >> 32;
932 if( segment < mgr->segments ) {
933 perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
934 memcpy (page, perm, mgr->page_size);
938 bt_spinmutexlock (mgr->maps);
940 if( segment < mgr->segments ) {
941 bt_spinreleasemutex (mgr->maps);
945 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
948 bt_spinreleasemutex (mgr->maps);
952 // copy page from buffer pool to permanent location in Btree file
954 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
956 int flag = PROT_READ | PROT_WRITE;
957 uint segment = page_no >> 32;
961 if( segment < mgr->segments ) {
962 perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
963 memcpy (perm, page, mgr->page_size);
967 bt_spinmutexlock (mgr->maps);
969 if( segment < mgr->segments ) {
970 bt_spinreleasemutex (mgr->maps);
974 mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
975 bt_spinreleasemutex (mgr->maps);
980 // link latch table entry into head of latch hash table
982 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
984 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
985 BtLatchSet *latch = bt->mgr->latchsets + slot;
987 if( latch->next = bt->mgr->hashtable[hashidx].slot )
988 bt->mgr->latchsets[latch->next].prev = slot;
990 bt->mgr->hashtable[hashidx].slot = slot;
991 latch->page_no = page_no;
998 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
999 return bt->line = __LINE__, bt->err;
1006 // set CLOCK bit in latch
1007 // decrement pin count
1009 void bt_unpinlatch (BtLatchSet *latch)
1012 if( ~*latch->pin & CLOCK_bit )
1013 __sync_fetch_and_or(latch->pin, CLOCK_bit);
1014 __sync_fetch_and_add(latch->pin, -1);
1016 if( ~*latch->pin & CLOCK_bit )
1017 _InterlockedOr16 (latch->pin, CLOCK_bit);
1018 _InterlockedDecrement16 (latch->pin);
1022 // return the btree cached page address
1024 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
1026 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
1031 // find existing latchset or inspire new one
1032 // return with latchset pinned
1034 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
1036 uint hashidx = page_no % bt->mgr->latchhash;
1037 uint cnt, slot, idx;
1043 // try to find our entry
1045 bt_spinmutexlock(bt->mgr->hashtable[hashidx].latch);
1047 if( slot = bt->mgr->hashtable[hashidx].slot ) do
1049 latch = bt->mgr->latchsets + slot;
1050 if( page_no == latch->page_no )
1052 } while( slot = latch->next );
1058 latch = bt->mgr->latchsets + slot;
1060 __sync_fetch_and_add(latch->pin, 1);
1062 _InterlockedIncrement16 (latch->pin);
1064 bt_spinreleasemutex(bt->mgr->hashtable[hashidx].latch);
1068 // see if there are any unused pool entries
1070 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
1072 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
1075 if( slot < bt->mgr->latchtotal ) {
1076 latch = bt->mgr->latchsets + slot;
1077 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
1079 bt_spinreleasemutex (bt->mgr->hashtable[hashidx].latch);
1084 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
1086 _InterlockedDecrement (&bt->mgr->latchdeployed);
1088 // find and reuse previous entry on victim
1092 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
1094 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
1096 // try to get write lock on hash chain
1097 // skip entry if not obtained
1098 // or has outstanding pins
1100 slot %= bt->mgr->latchtotal;
1105 latch = bt->mgr->latchsets + slot;
1106 idx = latch->page_no % bt->mgr->latchhash;
1108 // see we are on same chain as hashidx
1110 if( idx == hashidx )
1113 if( !bt_spinmutextry (bt->mgr->hashtable[idx].latch) )
1116 // skip this slot if it is pinned
1117 // or the CLOCK bit is set
1120 if( *latch->pin & CLOCK_bit ) {
1122 __sync_fetch_and_and(latch->pin, ~CLOCK_bit);
1124 _InterlockedAnd16 (latch->pin, ~CLOCK_bit);
1128 bt_spinreleasemutex (bt->mgr->hashtable[idx].latch);
1132 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
1134 // update permanent page area in btree from buffer pool
1135 // no read-lock is required since page is not pinned.
1138 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
1139 return bt->line = __LINE__, NULL;
1141 latch->dirty = 0, bt->writes++;
1143 // unlink our available slot from its hash chain
1146 bt->mgr->latchsets[latch->prev].next = latch->next;
1148 bt->mgr->hashtable[idx].slot = latch->next;
1151 bt->mgr->latchsets[latch->next].prev = latch->prev;
1153 bt_spinreleasemutex (bt->mgr->hashtable[idx].latch);
1155 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
1158 bt_spinreleasemutex (bt->mgr->hashtable[hashidx].latch);
1163 void bt_mgrclose (BtMgr *mgr)
1171 // write remaining dirty pool pages to the btree
1173 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
1174 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1175 latch = mgr->latchsets + slot;
1177 if( latch->dirty ) {
1178 bt_writepage(mgr, page, latch->page_no);
1179 latch->dirty = 0, num++;
1183 fdatasync (mgr->idx);
1184 fprintf(stderr, "%d buffer pool pages flushed\n", num);
1187 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
1188 munmap (mgr->pagezero, mgr->page_size);
1190 if( mgr->redopages )
1191 munmap (mgr->redobuff, mgr->redopages << mgr->page_bits);
1193 FlushViewOfFile(mgr->pagezero, 0);
1194 UnmapViewOfFile(mgr->pagezero);
1195 UnmapViewOfFile(mgr->hashtable);
1196 CloseHandle(mgr->halloc);
1197 CloseHandle(mgr->hpool);
1203 FlushFileBuffers(mgr->idx);
1204 CloseHandle(mgr->idx);
1209 // close and release memory
1211 void bt_close (BtDb *bt)
1218 VirtualFree (bt->mem, 0, MEM_RELEASE);
1223 // open/create new btree buffer manager
1225 // call with file_name, BT_openmode, bits in page size (e.g. 16),
1226 // size of page pool (e.g. 262144)
1228 BtMgr *bt_mgr (char *name, uint bits, uint nodemax, uint redopages)
1230 uint lvl, attr, last, slot, idx;
1231 uint nlatchpage, latchhash;
1232 unsigned char value[BtId];
1233 int flag, initit = 0;
1234 BtPageZero *pagezero;
1241 // determine sanity of page size and buffer pool
1243 if( bits > BT_maxbits )
1245 else if( bits < BT_minbits )
1248 if( nodemax < 16 ) {
1249 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
1254 mgr = calloc (1, sizeof(BtMgr));
1256 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1258 if( mgr->idx == -1 ) {
1259 fprintf (stderr, "Unable to open btree file\n");
1260 return free(mgr), NULL;
1263 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1264 attr = FILE_ATTRIBUTE_NORMAL;
1265 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1267 if( mgr->idx == INVALID_HANDLE_VALUE )
1268 return GlobalFree(mgr), NULL;
1272 pagezero = valloc (BT_maxpage);
1275 // read minimum page size to get root info
1276 // to support raw disk partition files
1277 // check if bits == 0 on the disk.
1279 if( size = lseek (mgr->idx, 0L, 2) )
1280 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1281 if( pagezero->alloc->bits )
1282 bits = pagezero->alloc->bits;
1286 return free(mgr), free(pagezero), NULL;
1290 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1291 size = GetFileSize(mgr->idx, amt);
1293 if( size || *amt ) {
1294 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1295 return bt_mgrclose (mgr), NULL;
1296 bits = pagezero->alloc->bits;
1301 mgr->page_size = 1 << bits;
1302 mgr->page_bits = bits;
1304 // calculate number of latch hash table entries
1306 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
1307 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
1309 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
1310 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1311 mgr->latchtotal = nodemax;
1316 // initialize an empty b-tree with latch page, root page, page of leaves
1317 // and page(s) of latches and page pool cache
1319 memset (pagezero, 0, 1 << bits);
1320 pagezero->alloc->bits = mgr->page_bits;
1321 bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
1323 // initialize left-most LEAF page in
1326 bt_putid (pagezero->alloc->left, LEAF_page);
1328 ftruncate (mgr->idx, REDO_page << mgr->page_bits);
1330 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1331 fprintf (stderr, "Unable to create btree page zero\n");
1332 return bt_mgrclose (mgr), NULL;
1335 memset (pagezero, 0, 1 << bits);
1336 pagezero->alloc->bits = mgr->page_bits;
1338 for( lvl=MIN_lvl; lvl--; ) {
1339 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1340 key = keyptr(pagezero->alloc, 1);
1341 key->len = 2; // create stopper key
1345 bt_putid(value, MIN_lvl - lvl + 1);
1346 val = valptr(pagezero->alloc, 1);
1347 val->len = lvl ? BtId : 0;
1348 memcpy (val->value, value, val->len);
1350 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1351 pagezero->alloc->lvl = lvl;
1352 pagezero->alloc->cnt = 1;
1353 pagezero->alloc->act = 1;
1355 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1356 fprintf (stderr, "Unable to create btree page zero\n");
1357 return bt_mgrclose (mgr), NULL;
1365 VirtualFree (pagezero, 0, MEM_RELEASE);
1368 // mlock the pagezero page
1370 flag = PROT_READ | PROT_WRITE;
1371 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1372 if( mgr->pagezero == MAP_FAILED ) {
1373 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1374 return bt_mgrclose (mgr), NULL;
1376 mlock (mgr->pagezero, mgr->page_size);
1378 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1379 if( mgr->hashtable == MAP_FAILED ) {
1380 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1381 return bt_mgrclose (mgr), NULL;
1383 if( mgr->redopages = redopages ) {
1384 ftruncate (mgr->idx, (REDO_page + redopages) << mgr->page_bits);
1385 mgr->redobuff = mmap (0, redopages << mgr->page_bits, flag, MAP_SHARED, mgr->idx, REDO_page << mgr->page_bits);
1386 mlock (mgr->redobuff, redopages << mgr->page_bits);
1389 flag = PAGE_READWRITE;
1390 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1391 if( !mgr->halloc ) {
1392 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1393 return bt_mgrclose (mgr), NULL;
1396 flag = FILE_MAP_WRITE;
1397 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1398 if( !mgr->pagezero ) {
1399 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1400 return bt_mgrclose (mgr), NULL;
1403 flag = PAGE_READWRITE;
1404 size = (uid)mgr->nlatchpage << mgr->page_bits;
1405 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1407 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1408 return bt_mgrclose (mgr), NULL;
1411 flag = FILE_MAP_WRITE;
1412 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1413 if( !mgr->hashtable ) {
1414 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1415 return bt_mgrclose (mgr), NULL;
1417 if( mgr->redopages = redopages )
1418 mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
1421 mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1422 mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1427 // open BTree access method
1428 // based on buffer manager
1430 BtDb *bt_open (BtMgr *mgr)
1432 BtDb *bt = malloc (sizeof(*bt));
1434 memset (bt, 0, sizeof(*bt));
1437 bt->mem = valloc (2 *mgr->page_size);
1439 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1441 bt->frame = (BtPage)bt->mem;
1442 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1444 bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1446 bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1451 // compare two keys, return > 0, = 0, or < 0
1452 // =0: keys are same
1455 // as the comparison value
1457 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1459 uint len1 = key1->len;
1462 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1473 // place write, read, or parent lock on requested page_no.
1475 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1479 ReadLock (latch->readwr, bt->thread_no);
1482 WriteLock (latch->readwr, bt->thread_no);
1485 ReadLock (latch->access, bt->thread_no);
1488 WriteLock (latch->access, bt->thread_no);
1491 WriteOLock (latch->parent, bt->thread_no);
1494 WriteOLock (latch->atomic, bt->thread_no);
1496 case BtLockAtomic | BtLockRead:
1497 WriteOLock (latch->atomic, bt->thread_no);
1498 ReadLock (latch->readwr, bt->thread_no);
1503 // remove write, read, or parent lock on requested page
1505 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
1509 ReadRelease (latch->readwr);
1512 WriteRelease (latch->readwr);
1515 ReadRelease (latch->access);
1518 WriteRelease (latch->access);
1521 WriteORelease (latch->parent);
1524 WriteORelease (latch->atomic);
1526 case BtLockAtomic | BtLockRead:
1527 WriteORelease (latch->atomic);
1528 ReadRelease (latch->readwr);
1533 // allocate a new page
1534 // return with page latched, but unlocked.
1536 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1541 // lock allocation page
1543 bt_spinmutexlock(bt->mgr->lock);
1545 // use empty chain first
1546 // else allocate empty page
1548 if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1549 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1550 set->page = bt_mappage (bt, set->latch);
1552 return bt->err = BTERR_struct, bt->line = __LINE__, -1;
1554 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1555 bt_spinreleasemutex(bt->mgr->lock);
1557 memcpy (set->page, contents, bt->mgr->page_size);
1558 set->latch->dirty = 1;
1562 // otherwise extend btree file
1564 page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1565 bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1567 // don't load cache from btree page
1569 if( set->latch = bt_pinlatch (bt, page_no, 0) )
1570 set->page = bt_mappage (bt, set->latch);
1572 return bt->line = __LINE__, bt->err = BTERR_struct;
1574 ftruncate (bt->mgr->idx, (uid)(page_no + 1) << bt->mgr->page_bits);
1576 // unlock allocation latch
1578 bt_spinreleasemutex(bt->mgr->lock);
1580 memcpy (set->page, contents, bt->mgr->page_size);
1581 set->latch->dirty = 1;
1585 // find slot in page for given key at a given level
1587 int bt_findslot (BtPage page, unsigned char *key, uint len)
1589 uint diff, higher = page->cnt, low = 1, slot;
1592 // make stopper key an infinite fence value
1594 if( bt_getid (page->right) )
1599 // low is the lowest candidate.
1600 // loop ends when they meet
1602 // higher is already
1603 // tested as .ge. the passed key.
1605 while( diff = higher - low ) {
1606 slot = low + ( diff >> 1 );
1607 if( keycmp (keyptr(page, slot), key, len) < 0 )
1610 higher = slot, good++;
1613 // return zero if key is on right link page
1615 return good ? higher : 0;
1618 // find and load page at given level for given key
1619 // leave page rd or wr locked as requested
1621 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1623 uid page_no = ROOT_page, prevpage = 0;
1624 uint drill = 0xff, slot;
1625 BtLatchSet *prevlatch;
1626 uint mode, prevmode;
1629 // start at root of btree and drill down
1632 // determine lock mode of drill level
1633 mode = (drill == lvl) ? lock : BtLockRead;
1635 if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1638 // obtain access lock using lock chaining with Access mode
1640 if( page_no > ROOT_page )
1641 bt_lockpage(bt, BtLockAccess, set->latch);
1643 set->page = bt_mappage (bt, set->latch);
1645 // release & unpin parent or left sibling page
1648 bt_unlockpage(prevmode, prevlatch);
1649 bt_unpinlatch (prevlatch);
1653 // obtain mode lock using lock chaining through AccessLock
1655 bt_lockpage(bt, mode, set->latch);
1657 if( set->page->free )
1658 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1660 if( page_no > ROOT_page )
1661 bt_unlockpage(BtLockAccess, set->latch);
1663 // re-read and re-lock root after determining actual level of root
1665 if( set->page->lvl != drill) {
1666 if( set->latch->page_no != ROOT_page )
1667 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1669 drill = set->page->lvl;
1671 if( lock != BtLockRead && drill == lvl ) {
1672 bt_unlockpage(mode, set->latch);
1673 bt_unpinlatch (set->latch);
1678 prevpage = set->latch->page_no;
1679 prevlatch = set->latch;
1682 // find key on page at this level
1683 // and descend to requested level
1685 if( !set->page->kill )
1686 if( slot = bt_findslot (set->page, key, len) ) {
1690 // find next non-dead slot -- the fence key if nothing else
1692 while( slotptr(set->page, slot)->dead )
1693 if( slot++ < set->page->cnt )
1696 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
1698 val = valptr(set->page, slot);
1700 if( val->len == BtId )
1701 page_no = bt_getid(valptr(set->page, slot)->value);
1703 return bt->line = __LINE__, bt->err = BTERR_struct, 0;
1709 // or slide right into next page
1711 page_no = bt_getid(set->page->right);
1714 // return error on end of right chain
1716 bt->line = __LINE__, bt->err = BTERR_struct;
1717 return 0; // return error
1720 // return page to free list
1721 // page must be delete & write locked
1723 void bt_freepage (BtDb *bt, BtPageSet *set)
1725 // lock allocation page
1727 bt_spinmutexlock (bt->mgr->lock);
1731 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1732 bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1733 set->latch->dirty = 1;
1734 set->page->free = 1;
1736 // unlock released page
1738 bt_unlockpage (BtLockDelete, set->latch);
1739 bt_unlockpage (BtLockWrite, set->latch);
1740 bt_unpinlatch (set->latch);
1742 // unlock allocation page
1744 bt_spinreleasemutex (bt->mgr->lock);
1747 // a fence key was deleted from a page
1748 // push new fence value upwards
1750 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1752 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1753 unsigned char value[BtId];
1757 // remove the old fence value
1759 ptr = keyptr(set->page, set->page->cnt);
1760 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1761 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1762 set->latch->dirty = 1;
1764 // cache new fence value
1766 ptr = keyptr(set->page, set->page->cnt);
1767 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1769 bt_lockpage (bt, BtLockParent, set->latch);
1770 bt_unlockpage (BtLockWrite, set->latch);
1772 // insert new (now smaller) fence key
1774 bt_putid (value, set->latch->page_no);
1775 ptr = (BtKey*)leftkey;
1777 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1780 // now delete old fence key
1782 ptr = (BtKey*)rightkey;
1784 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1787 bt_unlockpage (BtLockParent, set->latch);
1788 bt_unpinlatch(set->latch);
1792 // root has a single child
1793 // collapse a level from the tree
1795 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1802 // find the child entry and promote as new root contents
1805 for( idx = 0; idx++ < root->page->cnt; )
1806 if( !slotptr(root->page, idx)->dead )
1809 val = valptr(root->page, idx);
1811 if( val->len == BtId )
1812 page_no = bt_getid (valptr(root->page, idx)->value);
1814 return bt->line = __LINE__, bt->err = BTERR_struct;
1816 if( child->latch = bt_pinlatch (bt, page_no, 1) )
1817 child->page = bt_mappage (bt, child->latch);
1821 bt_lockpage (bt, BtLockDelete, child->latch);
1822 bt_lockpage (bt, BtLockWrite, child->latch);
1824 memcpy (root->page, child->page, bt->mgr->page_size);
1825 root->latch->dirty = 1;
1827 bt_freepage (bt, child);
1829 } while( root->page->lvl > 1 && root->page->act == 1 );
1831 bt_unlockpage (BtLockWrite, root->latch);
1832 bt_unpinlatch (root->latch);
1836 // delete a page and manage keys
1837 // call with page writelocked
1838 // returns with page unpinned
1840 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1842 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1843 unsigned char value[BtId];
1844 uint lvl = set->page->lvl;
1849 // cache copy of fence key
1850 // to post in parent
1852 ptr = keyptr(set->page, set->page->cnt);
1853 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1855 // obtain lock on right page
1857 page_no = bt_getid(set->page->right);
1859 if( right->latch = bt_pinlatch (bt, page_no, 1) )
1860 right->page = bt_mappage (bt, right->latch);
1864 bt_lockpage (bt, BtLockWrite, right->latch);
1866 // cache copy of key to update
1868 ptr = keyptr(right->page, right->page->cnt);
1869 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1871 if( right->page->kill )
1872 return bt->line = __LINE__, bt->err = BTERR_struct;
1874 // pull contents of right peer into our empty page
1876 memcpy (set->page, right->page, bt->mgr->page_size);
1877 set->latch->dirty = 1;
1879 // mark right page deleted and point it to left page
1880 // until we can post parent updates that remove access
1881 // to the deleted page.
1883 bt_putid (right->page->right, set->latch->page_no);
1884 right->latch->dirty = 1;
1885 right->page->kill = 1;
1887 bt_lockpage (bt, BtLockParent, right->latch);
1888 bt_unlockpage (BtLockWrite, right->latch);
1890 bt_lockpage (bt, BtLockParent, set->latch);
1891 bt_unlockpage (BtLockWrite, set->latch);
1893 // redirect higher key directly to our new node contents
1895 bt_putid (value, set->latch->page_no);
1896 ptr = (BtKey*)higherfence;
1898 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1901 // delete old lower key to our node
1903 ptr = (BtKey*)lowerfence;
1905 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1908 // obtain delete and write locks to right node
1910 bt_unlockpage (BtLockParent, right->latch);
1911 bt_lockpage (bt, BtLockDelete, right->latch);
1912 bt_lockpage (bt, BtLockWrite, right->latch);
1913 bt_freepage (bt, right);
1915 bt_unlockpage (BtLockParent, set->latch);
1916 bt_unpinlatch (set->latch);
1920 // find and delete key on page by marking delete flag bit
1921 // if page becomes empty, delete it from the btree
1923 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1925 uint slot, idx, found, fence;
1930 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1931 ptr = keyptr(set->page, slot);
1935 // if librarian slot, advance to real slot
1937 if( slotptr(set->page, slot)->type == Librarian )
1938 ptr = keyptr(set->page, ++slot);
1940 fence = slot == set->page->cnt;
1942 // if key is found delete it, otherwise ignore request
1944 if( found = !keycmp (ptr, key, len) )
1945 if( found = slotptr(set->page, slot)->dead == 0 ) {
1946 val = valptr(set->page,slot);
1947 slotptr(set->page, slot)->dead = 1;
1948 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1951 // collapse empty slots beneath the fence
1953 while( idx = set->page->cnt - 1 )
1954 if( slotptr(set->page, idx)->dead ) {
1955 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1956 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1961 // did we delete a fence key in an upper level?
1963 if( found && lvl && set->page->act && fence )
1964 if( bt_fixfence (bt, set, lvl) )
1969 // do we need to collapse root?
1971 if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1972 if( bt_collapseroot (bt, set) )
1977 // delete empty page
1979 if( !set->page->act )
1980 return bt_deletepage (bt, set);
1982 set->latch->dirty = 1;
1983 bt_unlockpage(BtLockWrite, set->latch);
1984 bt_unpinlatch (set->latch);
1988 BtKey *bt_foundkey (BtDb *bt)
1990 return (BtKey*)(bt->key);
1993 // advance to next slot
1995 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1997 BtLatchSet *prevlatch;
2000 if( slot < set->page->cnt )
2003 prevlatch = set->latch;
2005 if( page_no = bt_getid(set->page->right) )
2006 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2007 set->page = bt_mappage (bt, set->latch);
2011 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
2013 // obtain access lock using lock chaining with Access mode
2015 bt_lockpage(bt, BtLockAccess, set->latch);
2017 bt_unlockpage(BtLockRead, prevlatch);
2018 bt_unpinlatch (prevlatch);
2020 bt_lockpage(bt, BtLockRead, set->latch);
2021 bt_unlockpage(BtLockAccess, set->latch);
2025 // find unique key or first duplicate key in
2026 // leaf level and return number of value bytes
2027 // or (-1) if not found. Setup key for bt_foundkey
2029 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2037 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
2039 ptr = keyptr(set->page, slot);
2041 // skip librarian slot place holder
2043 if( slotptr(set->page, slot)->type == Librarian )
2044 ptr = keyptr(set->page, ++slot);
2046 // return actual key found
2048 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
2051 if( slotptr(set->page, slot)->type == Duplicate )
2054 // not there if we reach the stopper key
2056 if( slot == set->page->cnt )
2057 if( !bt_getid (set->page->right) )
2060 // if key exists, return >= 0 value bytes copied
2061 // otherwise return (-1)
2063 if( slotptr(set->page, slot)->dead )
2067 if( !memcmp (ptr->key, key, len) ) {
2068 val = valptr (set->page,slot);
2069 if( valmax > val->len )
2071 memcpy (value, val->value, valmax);
2077 } while( slot = bt_findnext (bt, set, slot) );
2079 bt_unlockpage (BtLockRead, set->latch);
2080 bt_unpinlatch (set->latch);
2084 // check page for space available,
2085 // clean if necessary and return
2086 // 0 - page needs splitting
2087 // >0 new slot value
2089 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
2091 uint nxt = bt->mgr->page_size;
2092 BtPage page = set->page;
2093 uint cnt = 0, idx = 0;
2094 uint max = page->cnt;
2099 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2102 // skip cleanup and proceed to split
2103 // if there's not enough garbage
2106 if( page->garbage < nxt / 5 )
2109 memcpy (bt->frame, page, bt->mgr->page_size);
2111 // skip page info and set rest of page to zero
2113 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2114 set->latch->dirty = 1;
2118 // clean up page first by
2119 // removing deleted keys
2121 while( cnt++ < max ) {
2125 if( cnt < max || bt->frame->lvl )
2126 if( slotptr(bt->frame,cnt)->dead )
2129 // copy the value across
2131 val = valptr(bt->frame, cnt);
2132 nxt -= val->len + sizeof(BtVal);
2133 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
2135 // copy the key across
2137 key = keyptr(bt->frame, cnt);
2138 nxt -= key->len + sizeof(BtKey);
2139 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2141 // make a librarian slot
2143 slotptr(page, ++idx)->off = nxt;
2144 slotptr(page, idx)->type = Librarian;
2145 slotptr(page, idx)->dead = 1;
2149 slotptr(page, ++idx)->off = nxt;
2150 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2152 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2159 // see if page has enough space now, or does it need splitting?
2161 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2167 // split the root and raise the height of the btree
2169 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
2171 unsigned char leftkey[BT_keyarray];
2172 uint nxt = bt->mgr->page_size;
2173 unsigned char value[BtId];
2179 // save left page fence key for new root
2181 ptr = keyptr(root->page, root->page->cnt);
2182 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2184 // Obtain an empty page to use, and copy the current
2185 // root contents into it, e.g. lower keys
2187 if( bt_newpage(bt, left, root->page) )
2190 left_page_no = left->latch->page_no;
2191 bt_unpinlatch (left->latch);
2193 // preserve the page info at the bottom
2194 // of higher keys and set rest to zero
2196 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2198 // insert stopper key at top of newroot page
2199 // and increase the root height
2201 nxt -= BtId + sizeof(BtVal);
2202 bt_putid (value, right->page_no);
2203 val = (BtVal *)((unsigned char *)root->page + nxt);
2204 memcpy (val->value, value, BtId);
2207 nxt -= 2 + sizeof(BtKey);
2208 slotptr(root->page, 2)->off = nxt;
2209 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2214 // insert lower keys page fence key on newroot page as first key
2216 nxt -= BtId + sizeof(BtVal);
2217 bt_putid (value, left_page_no);
2218 val = (BtVal *)((unsigned char *)root->page + nxt);
2219 memcpy (val->value, value, BtId);
2222 ptr = (BtKey *)leftkey;
2223 nxt -= ptr->len + sizeof(BtKey);
2224 slotptr(root->page, 1)->off = nxt;
2225 memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2227 bt_putid(root->page->right, 0);
2228 root->page->min = nxt; // reset lowest used offset and key count
2229 root->page->cnt = 2;
2230 root->page->act = 2;
2233 // release and unpin root pages
2235 bt_unlockpage(BtLockWrite, root->latch);
2236 bt_unpinlatch (root->latch);
2238 bt_unpinlatch (right);
2242 // split already locked full node
2244 // return pool entry for new right
2247 uint bt_splitpage (BtDb *bt, BtPageSet *set)
2249 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2250 uint lvl = set->page->lvl;
2257 // split higher half of keys to bt->frame
2259 memset (bt->frame, 0, bt->mgr->page_size);
2260 max = set->page->cnt;
2264 while( cnt++ < max ) {
2265 if( cnt < max || set->page->lvl )
2266 if( slotptr(set->page, cnt)->dead )
2269 src = valptr(set->page, cnt);
2270 nxt -= src->len + sizeof(BtVal);
2271 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
2273 key = keyptr(set->page, cnt);
2274 nxt -= key->len + sizeof(BtKey);
2275 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
2276 memcpy (ptr, key, key->len + sizeof(BtKey));
2278 // add librarian slot
2280 slotptr(bt->frame, ++idx)->off = nxt;
2281 slotptr(bt->frame, idx)->type = Librarian;
2282 slotptr(bt->frame, idx)->dead = 1;
2286 slotptr(bt->frame, ++idx)->off = nxt;
2287 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2289 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2293 bt->frame->bits = bt->mgr->page_bits;
2294 bt->frame->min = nxt;
2295 bt->frame->cnt = idx;
2296 bt->frame->lvl = lvl;
2300 if( set->latch->page_no > ROOT_page )
2301 bt_putid (bt->frame->right, bt_getid (set->page->right));
2303 // get new free page and write higher keys to it.
2305 if( bt_newpage(bt, right, bt->frame) )
2308 memcpy (bt->frame, set->page, bt->mgr->page_size);
2309 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2310 set->latch->dirty = 1;
2312 nxt = bt->mgr->page_size;
2313 set->page->garbage = 0;
2319 if( slotptr(bt->frame, max)->type == Librarian )
2322 // assemble page of smaller keys
2324 while( cnt++ < max ) {
2325 if( slotptr(bt->frame, cnt)->dead )
2327 val = valptr(bt->frame, cnt);
2328 nxt -= val->len + sizeof(BtVal);
2329 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
2331 key = keyptr(bt->frame, cnt);
2332 nxt -= key->len + sizeof(BtKey);
2333 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2335 // add librarian slot
2337 slotptr(set->page, ++idx)->off = nxt;
2338 slotptr(set->page, idx)->type = Librarian;
2339 slotptr(set->page, idx)->dead = 1;
2343 slotptr(set->page, ++idx)->off = nxt;
2344 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2348 bt_putid(set->page->right, right->latch->page_no);
2349 set->page->min = nxt;
2350 set->page->cnt = idx;
2352 return right->latch->entry;
2355 // fix keys for newly split page
2356 // call with page locked, return
2359 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2361 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2362 unsigned char value[BtId];
2363 uint lvl = set->page->lvl;
2367 // if current page is the root page, split it
2369 if( set->latch->page_no == ROOT_page )
2370 return bt_splitroot (bt, set, right);
2372 ptr = keyptr(set->page, set->page->cnt);
2373 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2375 page = bt_mappage (bt, right);
2377 ptr = keyptr(page, page->cnt);
2378 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2380 // insert new fences in their parent pages
2382 bt_lockpage (bt, BtLockParent, right);
2384 bt_lockpage (bt, BtLockParent, set->latch);
2385 bt_unlockpage (BtLockWrite, set->latch);
2387 // insert new fence for reformulated left block of smaller keys
2389 bt_putid (value, set->latch->page_no);
2390 ptr = (BtKey *)leftkey;
2392 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2395 // switch fence for right block of larger keys to new right page
2397 bt_putid (value, right->page_no);
2398 ptr = (BtKey *)rightkey;
2400 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2403 bt_unlockpage (BtLockParent, set->latch);
2404 bt_unpinlatch (set->latch);
2406 bt_unlockpage (BtLockParent, right);
2407 bt_unpinlatch (right);
2411 // install new key and value onto page
2412 // page must already be checked for
2415 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2417 uint idx, librarian;
2422 // if found slot > desired slot and previous slot
2423 // is a librarian slot, use it
2426 if( slotptr(set->page, slot-1)->type == Librarian )
2429 // copy value onto page
2431 set->page->min -= vallen + sizeof(BtVal);
2432 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2433 memcpy (val->value, value, vallen);
2436 // copy key onto page
2438 set->page->min -= keylen + sizeof(BtKey);
2439 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2440 memcpy (ptr->key, key, keylen);
2443 // find first empty slot
2445 for( idx = slot; idx < set->page->cnt; idx++ )
2446 if( slotptr(set->page, idx)->dead )
2449 // now insert key into array before slot
2451 if( idx == set->page->cnt )
2452 idx += 2, set->page->cnt += 2, librarian = 2;
2456 set->latch->dirty = 1;
2459 while( idx > slot + librarian - 1 )
2460 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2462 // add librarian slot
2464 if( librarian > 1 ) {
2465 node = slotptr(set->page, slot++);
2466 node->off = set->page->min;
2467 node->type = Librarian;
2473 node = slotptr(set->page, slot);
2474 node->off = set->page->min;
2479 bt_unlockpage (BtLockWrite, set->latch);
2480 bt_unpinlatch (set->latch);
2486 // Insert new key into the btree at given level.
2487 // either add a new key or update/add an existing one
2489 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2491 unsigned char newkey[BT_keyarray];
2492 uint slot, idx, len, entry;
2499 // set up the key we're working on
2501 ins = (BtKey*)newkey;
2502 memcpy (ins->key, key, keylen);
2505 // is this a non-unique index value?
2511 sequence = bt_newdup (bt);
2512 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2516 while( 1 ) { // find the page and slot for the current key
2517 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2518 ptr = keyptr(set->page, slot);
2521 bt->line = __LINE__, bt->err = BTERR_ovflw;
2525 // if librarian slot == found slot, advance to real slot
2527 if( slotptr(set->page, slot)->type == Librarian )
2528 if( !keycmp (ptr, key, keylen) )
2529 ptr = keyptr(set->page, ++slot);
2533 if( slotptr(set->page, slot)->type == Duplicate )
2536 // if inserting a duplicate key or unique key
2537 // check for adequate space on the page
2538 // and insert the new key before slot.
2540 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2541 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2542 if( !(entry = bt_splitpage (bt, set)) )
2544 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2549 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2552 // if key already exists, update value and return
2554 val = valptr(set->page, slot);
2556 if( val->len >= vallen ) {
2557 if( slotptr(set->page, slot)->dead )
2559 set->page->garbage += val->len - vallen;
2560 set->latch->dirty = 1;
2561 slotptr(set->page, slot)->dead = 0;
2563 memcpy (val->value, value, vallen);
2564 bt_unlockpage(BtLockWrite, set->latch);
2565 bt_unpinlatch (set->latch);
2569 // new update value doesn't fit in existing value area
2571 if( !slotptr(set->page, slot)->dead )
2572 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2574 slotptr(set->page, slot)->dead = 0;
2578 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2579 if( !(entry = bt_splitpage (bt, set)) )
2581 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2586 set->page->min -= vallen + sizeof(BtVal);
2587 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2588 memcpy (val->value, value, vallen);
2591 set->latch->dirty = 1;
2592 set->page->min -= keylen + sizeof(BtKey);
2593 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2594 memcpy (ptr->key, key, keylen);
2597 slotptr(set->page, slot)->off = set->page->min;
2598 bt_unlockpage(BtLockWrite, set->latch);
2599 bt_unpinlatch (set->latch);
2606 logseqno reqlsn; // redo log seq no required
2607 logseqno lsn; // redo log sequence number
2608 uint entry; // latch table entry number
2609 uint slot:31; // page slot number
2610 uint reuse:1; // reused previous page
2614 uid page_no; // page number for split leaf
2615 void *next; // next key to insert
2616 uint entry:29; // latch table entry number
2617 uint type:2; // 0 == insert, 1 == delete, 2 == free
2618 uint nounlock:1; // don't unlock ParentModification
2619 unsigned char leafkey[BT_keyarray];
2622 // determine actual page where key is located
2623 // return slot number
2625 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2627 BtKey *key = keyptr(source,src);
2628 uint slot = locks[src].slot;
2631 if( src > 1 && locks[src].reuse )
2632 entry = locks[src-1].entry, slot = 0;
2634 entry = locks[src].entry;
2637 set->latch = bt->mgr->latchsets + entry;
2638 set->page = bt_mappage (bt, set->latch);
2642 // is locks->reuse set? or was slot zeroed?
2643 // if so, find where our key is located
2644 // on current page or pages split on
2645 // same page txn operations.
2648 set->latch = bt->mgr->latchsets + entry;
2649 set->page = bt_mappage (bt, set->latch);
2651 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2652 if( slotptr(set->page, slot)->type == Librarian )
2654 if( locks[src].reuse )
2655 locks[src].entry = entry;
2658 } while( entry = set->latch->split );
2660 bt->line = __LINE__, bt->err = BTERR_atomic;
2664 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2666 BtKey *key = keyptr(source, src);
2667 BtVal *val = valptr(source, src);
2672 while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2673 if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) ) {
2674 if( bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
2676 set->page->lsn = locks[src].lsn;
2680 if( entry = bt_splitpage (bt, set) )
2681 latch = bt->mgr->latchsets + entry;
2685 // splice right page into split chain
2686 // and WriteLock it.
2688 bt_lockpage(bt, BtLockWrite, latch);
2689 latch->split = set->latch->split;
2690 set->latch->split = entry;
2691 locks[src].slot = 0;
2694 return bt->line = __LINE__, bt->err = BTERR_atomic;
2697 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2699 BtKey *key = keyptr(source, src);
2705 if( slot = bt_atomicpage (bt, source, locks, src, set) )
2706 ptr = keyptr(set->page, slot);
2708 return bt->line = __LINE__, bt->err = BTERR_struct;
2710 if( !keycmp (ptr, key->key, key->len) )
2711 if( !slotptr(set->page, slot)->dead )
2712 slotptr(set->page, slot)->dead = 1;
2718 val = valptr(set->page, slot);
2719 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2720 set->page->lsn = locks[src].lsn;
2721 set->latch->dirty = 1;
2727 // delete an empty master page for a transaction
2729 // note that the far right page never empties because
2730 // it always contains (at least) the infinite stopper key
2731 // and that all pages that don't contain any keys are
2732 // deleted, or are being held under Atomic lock.
2734 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2736 BtPageSet right[1], temp[1];
2737 unsigned char value[BtId];
2741 bt_lockpage(bt, BtLockWrite, prev->latch);
2743 // grab the right sibling
2745 if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2746 right->page = bt_mappage (bt, right->latch);
2750 bt_lockpage(bt, BtLockAtomic, right->latch);
2751 bt_lockpage(bt, BtLockWrite, right->latch);
2753 // and pull contents over empty page
2754 // while preserving master's left link
2756 memcpy (right->page->left, prev->page->left, BtId);
2757 memcpy (prev->page, right->page, bt->mgr->page_size);
2759 // forward seekers to old right sibling
2760 // to new page location in set
2762 bt_putid (right->page->right, prev->latch->page_no);
2763 right->latch->dirty = 1;
2764 right->page->kill = 1;
2766 // remove pointer to right page for searchers by
2767 // changing right fence key to point to the master page
2769 ptr = keyptr(right->page,right->page->cnt);
2770 bt_putid (value, prev->latch->page_no);
2772 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2775 // now that master page is in good shape we can
2776 // remove its locks.
2778 bt_unlockpage (BtLockAtomic, prev->latch);
2779 bt_unlockpage (BtLockWrite, prev->latch);
2781 // fix master's right sibling's left pointer
2782 // to remove scanner's poiner to the right page
2784 if( right_page_no = bt_getid (prev->page->right) ) {
2785 if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
2786 temp->page = bt_mappage (bt, temp->latch);
2788 bt_lockpage (bt, BtLockWrite, temp->latch);
2789 bt_putid (temp->page->left, prev->latch->page_no);
2790 temp->latch->dirty = 1;
2792 bt_unlockpage (BtLockWrite, temp->latch);
2793 bt_unpinlatch (temp->latch);
2794 } else { // master is now the far right page
2795 bt_spinmutexlock (bt->mgr->lock);
2796 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2797 bt_spinreleasemutex(bt->mgr->lock);
2800 // now that there are no pointers to the right page
2801 // we can delete it after the last read access occurs
2803 bt_unlockpage (BtLockWrite, right->latch);
2804 bt_unlockpage (BtLockAtomic, right->latch);
2805 bt_lockpage (bt, BtLockDelete, right->latch);
2806 bt_lockpage (bt, BtLockWrite, right->latch);
2807 bt_freepage (bt, right);
2811 // atomic modification of a batch of keys.
2813 // return -1 if BTERR is set
2814 // otherwise return slot number
2815 // causing the key constraint violation
2816 // or zero on successful completion.
2818 int bt_atomictxn (BtDb *bt, BtPage source)
2820 uint src, idx, slot, samepage, entry;
2821 AtomicKey *head, *tail, *leaf;
2822 BtPageSet set[1], prev[1];
2823 unsigned char value[BtId];
2824 BtKey *key, *ptr, *key2;
2835 locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2839 // stable sort the list of keys into order to
2840 // prevent deadlocks between threads.
2842 for( src = 1; src++ < source->cnt; ) {
2843 *temp = *slotptr(source,src);
2844 key = keyptr (source,src);
2846 for( idx = src; --idx; ) {
2847 key2 = keyptr (source,idx);
2848 if( keycmp (key, key2->key, key2->len) < 0 ) {
2849 *slotptr(source,idx+1) = *slotptr(source,idx);
2850 *slotptr(source,idx) = *temp;
2856 // Load the leaf page for each key
2857 // group same page references with reuse bit
2858 // and determine any constraint violations
2860 for( src = 0; src++ < source->cnt; ) {
2861 key = keyptr(source, src);
2864 // first determine if this modification falls
2865 // on the same page as the previous modification
2866 // note that the far right leaf page is a special case
2868 if( samepage = src > 1 )
2869 if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2870 slot = bt_findslot(set->page, key->key, key->len);
2872 bt_unlockpage(BtLockRead, set->latch);
2875 if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) )
2876 set->latch->split = 0;
2880 if( slotptr(set->page, slot)->type == Librarian )
2881 ptr = keyptr(set->page, ++slot);
2883 ptr = keyptr(set->page, slot);
2886 locks[src].entry = set->latch->entry;
2887 locks[src].slot = slot;
2888 locks[src].reuse = 0;
2890 locks[src].entry = 0;
2891 locks[src].slot = 0;
2892 locks[src].reuse = 1;
2895 // capture current lsn for master page
2897 locks[src].reqlsn = set->page->lsn;
2899 // perform constraint checks
2901 switch( slotptr(source, src)->type ) {
2904 if( !slotptr(set->page, slot)->dead )
2905 if( slot < set->page->cnt || bt_getid (set->page->right) )
2906 if( !keycmp (ptr, key->key, key->len) ) {
2908 // return constraint violation if key already exists
2910 bt_unlockpage(BtLockRead, set->latch);
2914 if( locks[src].entry ) {
2915 set->latch = bt->mgr->latchsets + locks[src].entry;
2916 bt_unlockpage(BtLockAtomic, set->latch);
2917 bt_unpinlatch (set->latch);
2928 // unlock last loadpage lock
2931 bt_unlockpage(BtLockRead, set->latch);
2933 // and add entries to redo log
2935 if( bt->mgr->redopages )
2936 if( lsn = bt_txnredo (bt, source) )
2937 for( src = 0; src++ < source->cnt; )
2938 locks[src].lsn = lsn;
2942 // obtain write lock for each master page
2943 // sync flushed pages to disk
2945 for( src = 0; src++ < source->cnt; ) {
2946 if( locks[src].reuse )
2949 set->latch = bt->mgr->latchsets + locks[src].entry;
2950 bt_lockpage(bt, BtLockWrite, set->latch);
2953 // insert or delete each key
2954 // process any splits or merges
2955 // release Write & Atomic latches
2956 // set ParentModifications and build
2957 // queue of keys to insert for split pages
2958 // or delete for deleted pages.
2960 // run through txn list backwards
2962 samepage = source->cnt + 1;
2964 for( src = source->cnt; src; src-- ) {
2965 if( locks[src].reuse )
2968 // perform the txn operations
2969 // from smaller to larger on
2972 for( idx = src; idx < samepage; idx++ )
2973 switch( slotptr(source,idx)->type ) {
2975 if( bt_atomicdelete (bt, source, locks, idx) )
2981 if( bt_atomicinsert (bt, source, locks, idx) )
2986 // after the same page operations have finished,
2987 // process master page for splits or deletion.
2989 latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2990 prev->page = bt_mappage (bt, prev->latch);
2993 // pick-up all splits from master page
2994 // each one is already WriteLocked.
2996 entry = prev->latch->split;
2999 set->latch = bt->mgr->latchsets + entry;
3000 set->page = bt_mappage (bt, set->latch);
3001 entry = set->latch->split;
3003 // delete empty master page by undoing its split
3004 // (this is potentially another empty page)
3005 // note that there are no new left pointers yet
3007 if( !prev->page->act ) {
3008 memcpy (set->page->left, prev->page->left, BtId);
3009 memcpy (prev->page, set->page, bt->mgr->page_size);
3010 bt_lockpage (bt, BtLockDelete, set->latch);
3011 bt_freepage (bt, set);
3013 prev->latch->split = set->latch->split;
3014 prev->latch->dirty = 1;
3018 // remove empty page from the split chain
3019 // and return it to the free list.
3021 if( !set->page->act ) {
3022 memcpy (prev->page->right, set->page->right, BtId);
3023 prev->latch->split = set->latch->split;
3024 bt_lockpage (bt, BtLockDelete, set->latch);
3025 bt_freepage (bt, set);
3029 // schedule prev fence key update
3031 ptr = keyptr(prev->page,prev->page->cnt);
3032 leaf = calloc (sizeof(AtomicKey), 1);
3034 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3035 leaf->page_no = prev->latch->page_no;
3036 leaf->entry = prev->latch->entry;
3046 // splice in the left link into the split page
3048 bt_putid (set->page->left, prev->latch->page_no);
3049 bt_lockpage(bt, BtLockParent, prev->latch);
3050 bt_unlockpage(BtLockWrite, prev->latch);
3054 // update left pointer in next right page from last split page
3055 // (if all splits were reversed, latch->split == 0)
3057 if( latch->split ) {
3058 // fix left pointer in master's original (now split)
3059 // far right sibling or set rightmost page in page zero
3061 if( right = bt_getid (prev->page->right) ) {
3062 if( set->latch = bt_pinlatch (bt, right, 1) )
3063 set->page = bt_mappage (bt, set->latch);
3067 bt_lockpage (bt, BtLockWrite, set->latch);
3068 bt_putid (set->page->left, prev->latch->page_no);
3069 set->latch->dirty = 1;
3070 bt_unlockpage (BtLockWrite, set->latch);
3071 bt_unpinlatch (set->latch);
3072 } else { // prev is rightmost page
3073 bt_spinmutexlock (bt->mgr->lock);
3074 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
3075 bt_spinreleasemutex(bt->mgr->lock);
3078 // Process last page split in chain
3080 ptr = keyptr(prev->page,prev->page->cnt);
3081 leaf = calloc (sizeof(AtomicKey), 1);
3083 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3084 leaf->page_no = prev->latch->page_no;
3085 leaf->entry = prev->latch->entry;
3095 bt_lockpage(bt, BtLockParent, prev->latch);
3096 bt_unlockpage(BtLockWrite, prev->latch);
3098 // remove atomic lock on master page
3100 bt_unlockpage(BtLockAtomic, latch);
3104 // finished if prev page occupied (either master or final split)
3106 if( prev->page->act ) {
3107 bt_unlockpage(BtLockWrite, latch);
3108 bt_unlockpage(BtLockAtomic, latch);
3109 bt_unpinlatch(latch);
3113 // any and all splits were reversed, and the
3114 // master page located in prev is empty, delete it
3115 // by pulling over master's right sibling.
3117 // Remove empty master's fence key
3119 ptr = keyptr(prev->page,prev->page->cnt);
3121 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3124 // perform the remainder of the delete
3125 // from the FIFO queue
3127 leaf = calloc (sizeof(AtomicKey), 1);
3129 memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
3130 leaf->page_no = prev->latch->page_no;
3131 leaf->entry = prev->latch->entry;
3142 // leave atomic lock in place until
3143 // deletion completes in next phase.
3145 bt_unlockpage(BtLockWrite, prev->latch);
3148 // add & delete keys for any pages split or merged during transaction
3152 set->latch = bt->mgr->latchsets + leaf->entry;
3153 set->page = bt_mappage (bt, set->latch);
3155 bt_putid (value, leaf->page_no);
3156 ptr = (BtKey *)leaf->leafkey;
3158 switch( leaf->type ) {
3159 case 0: // insert key
3160 if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
3165 case 1: // delete key
3166 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
3171 case 2: // free page
3172 if( bt_atomicfree (bt, set) )
3178 if( !leaf->nounlock )
3179 bt_unlockpage (BtLockParent, set->latch);
3181 bt_unpinlatch (set->latch);
3184 } while( leaf = tail );
3194 // set cursor to highest slot on highest page
3196 uint bt_lastkey (BtDb *bt)
3198 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3201 if( set->latch = bt_pinlatch (bt, page_no, 1) )
3202 set->page = bt_mappage (bt, set->latch);
3206 bt_lockpage(bt, BtLockRead, set->latch);
3207 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3208 bt_unlockpage(BtLockRead, set->latch);
3209 bt_unpinlatch (set->latch);
3211 bt->cursor_page = page_no;
3212 return bt->cursor->cnt;
3215 // return previous slot on cursor page
3217 uint bt_prevkey (BtDb *bt, uint slot)
3219 uid ourright, next, us = bt->cursor_page;
3225 ourright = bt_getid(bt->cursor->right);
3228 if( !(next = bt_getid(bt->cursor->left)) )
3232 bt->cursor_page = next;
3234 if( set->latch = bt_pinlatch (bt, next, 1) )
3235 set->page = bt_mappage (bt, set->latch);
3239 bt_lockpage(bt, BtLockRead, set->latch);
3240 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3241 bt_unlockpage(BtLockRead, set->latch);
3242 bt_unpinlatch (set->latch);
3244 next = bt_getid (bt->cursor->right);
3246 if( bt->cursor->kill )
3250 if( next == ourright )
3255 return bt->cursor->cnt;
3258 // return next slot on cursor page
3259 // or slide cursor right into next page
3261 uint bt_nextkey (BtDb *bt, uint slot)
3267 right = bt_getid(bt->cursor->right);
3269 while( slot++ < bt->cursor->cnt )
3270 if( slotptr(bt->cursor,slot)->dead )
3272 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
3280 bt->cursor_page = right;
3282 if( set->latch = bt_pinlatch (bt, right, 1) )
3283 set->page = bt_mappage (bt, set->latch);
3287 bt_lockpage(bt, BtLockRead, set->latch);
3289 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3291 bt_unlockpage(BtLockRead, set->latch);
3292 bt_unpinlatch (set->latch);
3300 // cache page of keys into cursor and return starting slot for given key
3302 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3307 // cache page for retrieval
3309 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
3310 memcpy (bt->cursor, set->page, bt->mgr->page_size);
3314 bt->cursor_page = set->latch->page_no;
3316 bt_unlockpage(BtLockRead, set->latch);
3317 bt_unpinlatch (set->latch);
3321 BtKey *bt_key(BtDb *bt, uint slot)
3323 return keyptr(bt->cursor, slot);
3326 BtVal *bt_val(BtDb *bt, uint slot)
3328 return valptr(bt->cursor,slot);
3334 double getCpuTime(int type)
3337 FILETIME xittime[1];
3338 FILETIME systime[1];
3339 FILETIME usrtime[1];
3340 SYSTEMTIME timeconv[1];
3343 memset (timeconv, 0, sizeof(SYSTEMTIME));
3347 GetSystemTimeAsFileTime (xittime);
3348 FileTimeToSystemTime (xittime, timeconv);
3349 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3352 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3353 FileTimeToSystemTime (usrtime, timeconv);
3356 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3357 FileTimeToSystemTime (systime, timeconv);
3361 ans += (double)timeconv->wHour * 3600;
3362 ans += (double)timeconv->wMinute * 60;
3363 ans += (double)timeconv->wSecond;
3364 ans += (double)timeconv->wMilliseconds / 1000;
3369 #include <sys/resource.h>
3371 double getCpuTime(int type)
3373 struct rusage used[1];
3374 struct timeval tv[1];
3378 gettimeofday(tv, NULL);
3379 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3382 getrusage(RUSAGE_SELF, used);
3383 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3386 getrusage(RUSAGE_SELF, used);
3387 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3394 void bt_poolaudit (BtMgr *mgr)
3399 while( slot++ < mgr->latchdeployed ) {
3400 latch = mgr->latchsets + slot;
3402 if( *latch->readwr->rin & MASK )
3403 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3404 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3406 if( *latch->access->rin & MASK )
3407 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3408 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3410 if( *latch->parent->ticket != *latch->parent->serving )
3411 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3412 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3414 if( *latch->pin & ~CLOCK_bit ) {
3415 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3421 uint bt_latchaudit (BtDb *bt)
3423 ushort idx, hashidx;
3429 if( *(ushort *)(bt->mgr->lock) )
3430 fprintf(stderr, "Alloc page locked\n");
3431 *(ushort *)(bt->mgr->lock) = 0;
3433 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3434 latch = bt->mgr->latchsets + idx;
3435 if( *latch->readwr->rin & MASK )
3436 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3437 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3439 if( *latch->access->rin & MASK )
3440 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3441 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3443 if( *latch->parent->ticket != *latch->parent->serving )
3444 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3445 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3448 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3453 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3454 if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3455 fprintf(stderr, "hash entry %d locked\n", hashidx);
3457 *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3459 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3460 latch = bt->mgr->latchsets + idx;
3462 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3463 } while( idx = latch->next );
3466 page_no = LEAF_page;
3468 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3469 uid off = page_no << bt->mgr->page_bits;
3471 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3475 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3477 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3478 return bt->line = __LINE__, bt->err = BTERR_map;
3480 if( *amt < bt->mgr->page_size )
3481 return bt->line = __LINE__, bt->err = BTERR_map;
3483 if( !bt->frame->free && !bt->frame->lvl )
3484 cnt += bt->frame->act;
3488 cnt--; // remove stopper key
3489 fprintf(stderr, " Total keys read %d\n", cnt);
3503 // standalone program to index file of keys
3504 // then list them onto std-out
3507 void *index_file (void *arg)
3509 uint __stdcall index_file (void *arg)
3512 int line = 0, found = 0, cnt = 0, idx;
3513 uid next, page_no = LEAF_page; // start on first page of leaves
3514 int ch, len = 0, slot, type = 0;
3515 unsigned char key[BT_maxkey];
3516 unsigned char txn[65536];
3517 ThreadArg *args = arg;
3526 bt = bt_open (args->mgr);
3529 if( args->idx < strlen (args->type) )
3530 ch = args->type[args->idx];
3532 ch = args->type[strlen(args->type) - 1];
3537 fprintf(stderr, "started latch mgr audit\n");
3538 cnt = bt_latchaudit (bt);
3539 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3550 if( type == Delete )
3551 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3553 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3555 if( type == Delete )
3556 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3558 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3560 if( in = fopen (args->infile, "rb") )
3561 while( ch = getc(in), ch != EOF )
3567 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3568 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3574 memcpy (txn + nxt, key + 10, len - 10);
3576 txn[nxt] = len - 10;
3578 memcpy (txn + nxt, key, 10);
3581 slotptr(page,++cnt)->off = nxt;
3582 slotptr(page,cnt)->type = type;
3585 if( cnt < args->num )
3592 if( bt_atomictxn (bt, page) )
3593 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3598 else if( len < BT_maxkey )
3600 fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
3604 fprintf(stderr, "started indexing for %s\n", args->infile);
3605 if( in = fopen (args->infile, "r") )
3606 while( ch = getc(in), ch != EOF )
3611 if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3612 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
3615 else if( len < BT_maxkey )
3617 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3621 fprintf(stderr, "started finding keys for %s\n", args->infile);
3622 if( in = fopen (args->infile, "rb") )
3623 while( ch = getc(in), ch != EOF )
3627 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3630 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->err, errno, bt->line, line), exit(0);
3633 else if( len < BT_maxkey )
3635 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3639 fprintf(stderr, "started scanning\n");
3641 if( set->latch = bt_pinlatch (bt, page_no, 1) )
3642 set->page = bt_mappage (bt, set->latch);
3644 fprintf(stderr, "unable to obtain latch"), exit(1);
3645 bt_lockpage (bt, BtLockRead, set->latch);
3646 next = bt_getid (set->page->right);
3648 for( slot = 0; slot++ < set->page->cnt; )
3649 if( next || slot < set->page->cnt )
3650 if( !slotptr(set->page, slot)->dead ) {
3651 ptr = keyptr(set->page, slot);
3654 if( slotptr(set->page, slot)->type == Duplicate )
3657 fwrite (ptr->key, len, 1, stdout);
3658 val = valptr(set->page, slot);
3659 fwrite (val->value, val->len, 1, stdout);
3660 fputc ('\n', stdout);
3664 bt_unlockpage (BtLockRead, set->latch);
3665 bt_unpinlatch (set->latch);
3666 } while( page_no = next );
3668 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3672 fprintf(stderr, "started reverse scan\n");
3673 if( slot = bt_lastkey (bt) )
3674 while( slot = bt_prevkey (bt, slot) ) {
3675 if( slotptr(bt->cursor, slot)->dead )
3678 ptr = keyptr(bt->cursor, slot);
3681 if( slotptr(bt->cursor, slot)->type == Duplicate )
3684 fwrite (ptr->key, len, 1, stdout);
3685 val = valptr(bt->cursor, slot);
3686 fwrite (val->value, val->len, 1, stdout);
3687 fputc ('\n', stdout);
3691 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3696 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3698 fprintf(stderr, "started counting\n");
3699 page_no = LEAF_page;
3701 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3702 if( bt_readpage (bt->mgr, bt->frame, page_no) )
3705 if( !bt->frame->free && !bt->frame->lvl )
3706 cnt += bt->frame->act;
3712 cnt--; // remove stopper key
3713 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3725 typedef struct timeval timer;
3727 int main (int argc, char **argv)
3729 int idx, cnt, len, slot, err;
3730 int segsize, bits = 16;
3748 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size recovery_pages src_file1 src_file2 ... ]\n", argv[0]);
3749 fprintf (stderr, " where idx_file is the name of the btree file\n");
3750 fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3751 fprintf (stderr, " page_bits is the page size in bits\n");
3752 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
3753 fprintf (stderr, " txn_size = n to block transactions into n units, or zero for no transactions\n");
3754 fprintf (stderr, " recovery_pages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3755 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3759 start = getCpuTime(0);
3762 bits = atoi(argv[3]);
3765 poolsize = atoi(argv[4]);
3768 fprintf (stderr, "Warning: no mapped_pool\n");
3771 num = atoi(argv[5]);
3774 recovery = atoi(argv[6]);
3778 threads = malloc (cnt * sizeof(pthread_t));
3780 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3782 args = malloc (cnt * sizeof(ThreadArg));
3784 mgr = bt_mgr ((argv[1]), bits, poolsize, recovery);
3787 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3793 for( idx = 0; idx < cnt; idx++ ) {
3794 args[idx].infile = argv[idx + 7];
3795 args[idx].type = argv[2];
3796 args[idx].mgr = mgr;
3797 args[idx].num = num;
3798 args[idx].idx = idx;
3800 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3801 fprintf(stderr, "Error creating thread %d\n", err);
3803 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3807 // wait for termination
3810 for( idx = 0; idx < cnt; idx++ )
3811 pthread_join (threads[idx], NULL);
3813 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3815 for( idx = 0; idx < cnt; idx++ )
3816 CloseHandle(threads[idx]);
3822 elapsed = getCpuTime(0) - start;
3823 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3824 elapsed = getCpuTime(1);
3825 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3826 elapsed = getCpuTime(2);
3827 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);