1 // btree version threadskv6 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // bi-directional cursors
7 // and traditional buffer pool manager
11 // author: karl malbrain, malbrain@cal.berkeley.edu
14 This work, including the source code, documentation
15 and related data, is placed into the public domain.
17 The orginal author is Karl Malbrain.
19 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
20 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
21 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
22 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
23 RESULTING FROM THE USE, MODIFICATION, OR
24 REDISTRIBUTION OF THIS SOFTWARE.
27 // Please see the project home page for documentation
28 // code.google.com/p/high-concurrency-btree
30 #define _FILE_OFFSET_BITS 64
31 #define _LARGEFILE64_SOURCE
47 #define WIN32_LEAN_AND_MEAN
61 typedef unsigned long long uid;
64 typedef unsigned long long off64_t;
65 typedef unsigned short ushort;
66 typedef unsigned int uint;
69 #define BT_ro 0x6f72 // ro
70 #define BT_rw 0x7772 // rw
72 #define BT_maxbits 24 // maximum page size in bits
73 #define BT_minbits 9 // minimum page size in bits
74 #define BT_minpage (1 << BT_minbits) // minimum page size
75 #define BT_maxpage (1 << BT_maxbits) // maximum page size
77 // BTree page number constants
78 #define ALLOC_page 0 // allocation page
79 #define ROOT_page 1 // root of the btree
80 #define LEAF_page 2 // first page of leaves
82 // Number of levels to create in a new BTree
87 There are five lock types for each node in three independent sets:
88 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
89 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
90 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
91 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
92 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
103 // definition for phase-fair reader/writer lock implementation
117 // definition for spin latch implementation
119 // exclusive is set for write access
120 // share is count of read accessors
121 // grant write lock when share == 0
123 volatile typedef struct {
134 // hash table entries
137 volatile uint slot; // Latch table entry at head of chain
138 BtSpinLatch latch[1];
141 // latch manager table structure
144 volatile uid page_no; // latch set page number
145 RWLock readwr[1]; // read/write page lock
146 RWLock access[1]; // Access Intent/Page delete
147 RWLock parent[1]; // Posting of fence key in parent
148 uint slot; // entry slot in latch table
149 uint next; // next entry in hash table chain
150 uint prev; // prev entry in hash table chain
151 volatile ushort pin; // number of outstanding threads
152 ushort dirty:1; // page in cache is dirty
155 // Define the length of the page record numbers
159 // Page key slot definition.
161 // Keys are marked dead, but remain on the page until
162 // it cleanup is called. The fence key (highest key) for
163 // a leaf page is always present, even after cleanup.
167 // In addition to the Unique keys that occupy slots
168 // there are Librarian and Duplicate key
169 // slots occupying the key slot array.
171 // The Librarian slots are dead keys that
172 // serve as filler, available to add new Unique
173 // or Dup slots that are inserted into the B-tree.
175 // The Duplicate slots have had their key bytes extended
176 // by 6 bytes to contain a binary duplicate key uniqueifier.
185 uint off:BT_maxbits; // page offset for key start
186 uint type:3; // type of slot
187 uint dead:1; // set for deleted slot
190 // The key structure occupies space at the upper end of
191 // each page. It's a length byte followed by the key
195 unsigned char len; // this can be changed to a ushort or uint
196 unsigned char key[0];
199 // the value structure also occupies space at the upper
200 // end of the page. Each key is immediately followed by a value.
203 unsigned char len; // this can be changed to a ushort or uint
204 unsigned char value[0];
207 #define BT_maxkey 255 // maximum number of bytes in a key
208 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
210 // The first part of an index page.
211 // It is immediately followed
212 // by the BtSlot array of keys.
214 // note that this structure size
215 // must be a multiple of 8 bytes
216 // in order to place dups correctly.
218 typedef struct BtPage_ {
219 uint cnt; // count of keys in page
220 uint act; // count of active keys
221 uint min; // next key offset
222 uint garbage; // page garbage in bytes
223 unsigned char bits:7; // page size in bits
224 unsigned char free:1; // page is on free chain
225 unsigned char lvl:7; // level of page
226 unsigned char kill:1; // page is being deleted
227 unsigned char left[BtId]; // page number to left
228 unsigned char filler[2]; // padding to multiple of 8
229 unsigned char right[BtId]; // page number to right
232 // The loadpage interface object
235 uid page_no; // current page number
236 BtPage page; // current page pointer
237 BtLatchSet *latch; // current page latch set
240 // structure for latch manager on ALLOC_page
243 struct BtPage_ alloc[1]; // next page_no in right ptr
244 unsigned long long dups[1]; // global duplicate key uniqueifier
245 unsigned char chain[BtId]; // head of free page_nos chain
248 // The object structure for Btree access
251 uint page_size; // page size
252 uint page_bits; // page size in bits
258 BtPageZero *pagezero; // mapped allocation page
259 BtSpinLatch lock[1]; // allocation area lite latch
260 uint latchdeployed; // highest number of latch entries deployed
261 uint nlatchpage; // number of latch pages at BT_latch
262 uint latchtotal; // number of page latch entries
263 uint latchhash; // number of latch hash table slots
264 uint latchvictim; // next latch entry to examine
265 BtHashEntry *hashtable; // the anonymous mapping buffer pool
266 BtLatchSet *latchsets; // mapped latch set from latch pages
267 unsigned char *pagepool; // mapped to the buffer pool pages
269 HANDLE halloc; // allocation handle
270 HANDLE hpool; // buffer pool handle
275 BtMgr *mgr; // buffer manager for thread
276 BtPage cursor; // cached frame for start/next (never mapped)
277 BtPage frame; // spare frame for the page split (never mapped)
278 uid cursor_page; // current cursor page number
279 unsigned char *mem; // frame, cursor, page memory buffer
280 unsigned char key[BT_keyarray]; // last found complete key
281 int found; // last delete or insert was found
282 int err; // last error
283 int reads, writes; // number of reads and writes from the btree
297 #define CLOCK_bit 0x8000
300 extern void bt_close (BtDb *bt);
301 extern BtDb *bt_open (BtMgr *mgr);
302 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
303 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
304 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
305 extern BtKey *bt_foundkey (BtDb *bt);
306 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
307 extern uint bt_nextkey (BtDb *bt, uint slot);
310 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
311 void bt_mgrclose (BtMgr *mgr);
313 // Helper functions to return slot values
314 // from the cursor page.
316 extern BtKey *bt_key (BtDb *bt, uint slot);
317 extern BtVal *bt_val (BtDb *bt, uint slot);
319 // The page is allocated from low and hi ends.
320 // The key slots are allocated from the bottom,
321 // while the text and value of the key
322 // are allocated from the top. When the two
323 // areas meet, the page is split into two.
325 // A key consists of a length byte, two bytes of
326 // index number (0 - 65535), and up to 253 bytes
329 // Associated with each key is a value byte string
330 // containing any value desired.
332 // The b-tree root is always located at page 1.
333 // The first leaf page of level zero is always
334 // located on page 2.
336 // The b-tree pages are linked with next
337 // pointers to facilitate enumerators,
338 // and provide for concurrency.
340 // When to root page fills, it is split in two and
341 // the tree height is raised by a new root at page
342 // one with two keys.
344 // Deleted keys are marked with a dead bit until
345 // page cleanup. The fence key for a leaf node is
348 // To achieve maximum concurrency one page is locked at a time
349 // as the tree is traversed to find leaf key in question. The right
350 // page numbers are used in cases where the page is being split,
353 // Page 0 is dedicated to lock for new page extensions,
354 // and chains empty pages together for reuse. It also
355 // contains the latch manager hash table.
357 // The ParentModification lock on a node is obtained to serialize posting
358 // or changing the fence key for a node.
360 // Empty pages are chained together through the ALLOC page and reused.
362 // Access macros to address slot and key values from the page
363 // Page slots use 1 based indexing.
365 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
366 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
367 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
369 void bt_putid(unsigned char *dest, uid id)
374 dest[i] = (unsigned char)id, id >>= 8;
377 uid bt_getid(unsigned char *src)
382 for( i = 0; i < BtId; i++ )
383 id <<= 8, id |= *src++;
388 uid bt_newdup (BtDb *bt)
391 return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
393 return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
397 // Phase-Fair reader/writer lock implementation
399 void WriteLock (RWLock *lock)
404 tix = __sync_fetch_and_add (lock->ticket, 1);
406 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
408 // wait for our ticket to come up
410 while( tix != lock->serving[0] )
417 w = PRES | (tix & PHID);
419 r = __sync_fetch_and_add (lock->rin, w);
421 r = _InterlockedExchangeAdd16 (lock->rin, w);
423 while( r != *lock->rout )
431 void WriteRelease (RWLock *lock)
434 __sync_fetch_and_and (lock->rin, ~MASK);
436 _InterlockedAnd16 (lock->rin, ~MASK);
441 void ReadLock (RWLock *lock)
445 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
447 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
450 while( w == (*lock->rin & MASK) )
458 void ReadRelease (RWLock *lock)
461 __sync_fetch_and_add (lock->rout, RINC);
463 _InterlockedExchangeAdd16 (lock->rout, RINC);
467 // Spin Latch Manager
469 // wait until write lock mode is clear
470 // and add 1 to the share count
472 void bt_spinreadlock(BtSpinLatch *latch)
478 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
480 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
482 // see if exclusive request is granted or pending
487 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
489 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
492 } while( sched_yield(), 1 );
494 } while( SwitchToThread(), 1 );
498 // wait for other read and write latches to relinquish
500 void bt_spinwritelock(BtSpinLatch *latch)
506 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
508 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
511 if( !(prev & ~BOTH) )
515 __sync_fetch_and_and ((ushort *)latch, ~XCL);
517 _InterlockedAnd16((ushort *)latch, ~XCL);
520 } while( sched_yield(), 1 );
522 } while( SwitchToThread(), 1 );
526 // try to obtain write lock
528 // return 1 if obtained,
531 int bt_spinwritetry(BtSpinLatch *latch)
536 prev = __sync_fetch_and_or((ushort *)latch, XCL);
538 prev = _InterlockedOr16((ushort *)latch, XCL);
540 // take write access if all bits are clear
543 if( !(prev & ~BOTH) )
547 __sync_fetch_and_and ((ushort *)latch, ~XCL);
549 _InterlockedAnd16((ushort *)latch, ~XCL);
556 void bt_spinreleasewrite(BtSpinLatch *latch)
559 __sync_fetch_and_and((ushort *)latch, ~BOTH);
561 _InterlockedAnd16((ushort *)latch, ~BOTH);
565 // decrement reader count
567 void bt_spinreleaseread(BtSpinLatch *latch)
570 __sync_fetch_and_add((ushort *)latch, -SHARE);
572 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
576 // read page from permanent location in Btree file
578 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
580 off64_t off = page_no << mgr->page_bits;
583 if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
584 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
591 memset (ovl, 0, sizeof(OVERLAPPED));
593 ovl->OffsetHigh = off >> 32;
595 if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
596 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
599 if( *amt < mgr->page_size ) {
600 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
607 // write page to permanent location in Btree file
608 // clear the dirty bit
610 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
612 off64_t off = page_no << mgr->page_bits;
615 if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
621 memset (ovl, 0, sizeof(OVERLAPPED));
623 ovl->OffsetHigh = off >> 32;
625 if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
628 if( *amt < mgr->page_size )
634 // link latch table entry into head of latch hash table
636 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
638 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
639 BtLatchSet *latch = bt->mgr->latchsets + slot;
641 if( latch->next = bt->mgr->hashtable[hashidx].slot )
642 bt->mgr->latchsets[latch->next].prev = slot;
644 bt->mgr->hashtable[hashidx].slot = slot;
645 latch->page_no = page_no;
651 if( bt->err = bt_readpage (bt->mgr, page, page_no) )
659 // set CLOCK bit in latch
660 // decrement pin count
662 void bt_unpinlatch (BtLatchSet *latch)
665 if( ~latch->pin & CLOCK_bit )
666 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
667 __sync_fetch_and_add(&latch->pin, -1);
669 if( ~latch->pin & CLOCK_bit )
670 _InterlockedOr16 (&latch->pin, CLOCK_bit);
671 _InterlockedDecrement16 (&latch->pin);
675 // return the btree cached page address
677 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
679 BtPage page = (BtPage)(((uid)latch->slot << bt->mgr->page_bits) + bt->mgr->pagepool);
684 // find existing latchset or inspire new one
685 // return with latchset pinned
687 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
689 uint hashidx = page_no % bt->mgr->latchhash;
697 // try to find our entry
699 bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
701 if( slot = bt->mgr->hashtable[hashidx].slot ) do
703 latch = bt->mgr->latchsets + slot;
704 if( page_no == latch->page_no )
706 } while( slot = latch->next );
712 latch = bt->mgr->latchsets + slot;
714 __sync_fetch_and_add(&latch->pin, 1);
716 _InterlockedIncrement16 (&latch->pin);
718 bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
722 // see if there are any unused pool entries
724 slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
726 slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
729 if( slot < bt->mgr->latchtotal ) {
730 latch = bt->mgr->latchsets + slot;
731 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
733 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
738 __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
740 _InterlockedDecrement (&bt->mgr->latchdeployed);
742 // find and reuse previous entry on victim
746 slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
748 slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
750 // try to get write lock on hash chain
751 // skip entry if not obtained
752 // or has outstanding pins
754 slot %= bt->mgr->latchtotal;
759 latch = bt->mgr->latchsets + slot;
760 idx = latch->page_no % bt->mgr->latchhash;
762 // see we are on same chain as hashidx
767 if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
770 // skip this slot if it is pinned
771 // or the CLOCK bit is set
774 if( latch->pin & CLOCK_bit ) {
776 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
778 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
781 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
785 // update permanent page area in btree from buffer pool
787 page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
790 if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
793 latch->dirty = 0, bt->writes++;
795 // unlink our available slot from its hash chain
798 bt->mgr->latchsets[latch->prev].next = latch->next;
800 bt->mgr->hashtable[idx].slot = latch->next;
803 bt->mgr->latchsets[latch->next].prev = latch->prev;
805 bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
807 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
810 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
815 void bt_mgrclose (BtMgr *mgr)
822 // flush dirty pool pages to the btree
824 for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
825 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
826 latch = mgr->latchsets + slot;
829 bt_writepage(mgr, page, latch->page_no);
830 latch->dirty = 0, num++;
832 // madvise (page, mgr->page_size, MADV_DONTNEED);
835 fprintf(stderr, "%d buffer pool pages flushed\n", num);
838 munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
839 munmap (mgr->pagezero, mgr->page_size);
841 FlushViewOfFile(mgr->pagezero, 0);
842 UnmapViewOfFile(mgr->pagezero);
843 UnmapViewOfFile(mgr->hashtable);
844 CloseHandle(mgr->halloc);
845 CloseHandle(mgr->hpool);
851 FlushFileBuffers(mgr->idx);
852 CloseHandle(mgr->idx);
857 // close and release memory
859 void bt_close (BtDb *bt)
866 VirtualFree (bt->mem, 0, MEM_RELEASE);
871 // open/create new btree buffer manager
873 // call with file_name, BT_openmode, bits in page size (e.g. 16),
874 // size of page pool (e.g. 262144)
876 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
878 uint lvl, attr, last, slot, idx;
879 uint nlatchpage, latchhash;
880 unsigned char value[BtId];
881 int flag, initit = 0;
882 BtPageZero *pagezero;
889 // determine sanity of page size and buffer pool
891 if( bits > BT_maxbits )
893 else if( bits < BT_minbits )
897 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
902 mgr = calloc (1, sizeof(BtMgr));
904 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
906 if( mgr->idx == -1 ) {
907 fprintf (stderr, "Unable to open btree file\n");
908 return free(mgr), NULL;
911 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
912 attr = FILE_ATTRIBUTE_NORMAL;
913 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
915 if( mgr->idx == INVALID_HANDLE_VALUE )
916 return GlobalFree(mgr), NULL;
920 pagezero = valloc (BT_maxpage);
923 // read minimum page size to get root info
924 // to support raw disk partition files
925 // check if bits == 0 on the disk.
927 if( size = lseek (mgr->idx, 0L, 2) )
928 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
929 if( pagezero->alloc->bits )
930 bits = pagezero->alloc->bits;
934 return free(mgr), free(pagezero), NULL;
938 pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
939 size = GetFileSize(mgr->idx, amt);
942 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
943 return bt_mgrclose (mgr), NULL;
944 bits = pagezero->alloc->bits;
949 mgr->page_size = 1 << bits;
950 mgr->page_bits = bits;
952 // calculate number of latch hash table entries
954 mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
955 mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
957 mgr->nlatchpage += nodemax; // size of the buffer pool in pages
958 mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
959 mgr->latchtotal = nodemax;
964 // initialize an empty b-tree with latch page, root page, page of leaves
965 // and page(s) of latches and page pool cache
967 memset (pagezero, 0, 1 << bits);
968 pagezero->alloc->bits = mgr->page_bits;
969 bt_putid(pagezero->alloc->right, MIN_lvl+1);
971 // initialize left-most LEAF page in
974 bt_putid (pagezero->alloc->left, LEAF_page);
976 if( bt_writepage (mgr, pagezero->alloc, 0) ) {
977 fprintf (stderr, "Unable to create btree page zero\n");
978 return bt_mgrclose (mgr), NULL;
981 memset (pagezero, 0, 1 << bits);
982 pagezero->alloc->bits = mgr->page_bits;
984 for( lvl=MIN_lvl; lvl--; ) {
985 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
986 key = keyptr(pagezero->alloc, 1);
987 key->len = 2; // create stopper key
991 bt_putid(value, MIN_lvl - lvl + 1);
992 val = valptr(pagezero->alloc, 1);
993 val->len = lvl ? BtId : 0;
994 memcpy (val->value, value, val->len);
996 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
997 pagezero->alloc->lvl = lvl;
998 pagezero->alloc->cnt = 1;
999 pagezero->alloc->act = 1;
1001 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1002 fprintf (stderr, "Unable to create btree page zero\n");
1003 return bt_mgrclose (mgr), NULL;
1011 VirtualFree (pagezero, 0, MEM_RELEASE);
1014 // mlock the pagezero page
1016 flag = PROT_READ | PROT_WRITE;
1017 mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1018 if( mgr->pagezero == MAP_FAILED ) {
1019 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1020 return bt_mgrclose (mgr), NULL;
1022 mlock (mgr->pagezero, mgr->page_size);
1024 mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1025 if( mgr->hashtable == MAP_FAILED ) {
1026 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1027 return bt_mgrclose (mgr), NULL;
1030 flag = PAGE_READWRITE;
1031 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1032 if( !mgr->halloc ) {
1033 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1034 return bt_mgrclose (mgr), NULL;
1037 flag = FILE_MAP_WRITE;
1038 mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1039 if( !mgr->pagezero ) {
1040 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1041 return bt_mgrclose (mgr), NULL;
1044 flag = PAGE_READWRITE;
1045 size = (uid)mgr->nlatchpage << mgr->page_bits;
1046 mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1048 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1049 return bt_mgrclose (mgr), NULL;
1052 flag = FILE_MAP_WRITE;
1053 mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1054 if( !mgr->hashtable ) {
1055 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1056 return bt_mgrclose (mgr), NULL;
1060 mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1061 mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1066 // open BTree access method
1067 // based on buffer manager
1069 BtDb *bt_open (BtMgr *mgr)
1071 BtDb *bt = malloc (sizeof(*bt));
1073 memset (bt, 0, sizeof(*bt));
1076 bt->mem = valloc (2 *mgr->page_size);
1078 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1080 bt->frame = (BtPage)bt->mem;
1081 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1085 // compare two keys, returning > 0, = 0, or < 0
1086 // as the comparison value
1088 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1090 uint len1 = key1->len;
1093 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1104 // place write, read, or parent lock on requested page_no.
1106 void bt_lockpage(BtLock mode, BtLatchSet *set)
1110 ReadLock (set->readwr);
1113 WriteLock (set->readwr);
1116 ReadLock (set->access);
1119 WriteLock (set->access);
1122 WriteLock (set->parent);
1127 // remove write, read, or parent lock on requested page
1129 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1133 ReadRelease (set->readwr);
1136 WriteRelease (set->readwr);
1139 ReadRelease (set->access);
1142 WriteRelease (set->access);
1145 WriteRelease (set->parent);
1150 // allocate a new page
1151 // return with page latched.
1153 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1157 // lock allocation page
1159 bt_spinwritelock(bt->mgr->lock);
1161 // use empty chain first
1162 // else allocate empty page
1164 if( set->page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1165 if( set->latch = bt_pinlatch (bt, set->page_no, 1) )
1166 set->page = bt_mappage (bt, set->latch);
1168 return bt->err = BTERR_struct, -1;
1170 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1171 bt_spinreleasewrite(bt->mgr->lock);
1173 memcpy (set->page, contents, bt->mgr->page_size);
1174 set->latch->dirty = 1;
1178 set->page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1179 bt_putid(bt->mgr->pagezero->alloc->right, set->page_no+1);
1181 // unlock allocation latch
1183 bt_spinreleasewrite(bt->mgr->lock);
1185 // don't load cache from btree page
1187 if( set->latch = bt_pinlatch (bt, set->page_no, 0) )
1188 set->page = bt_mappage (bt, set->latch);
1190 return bt->err = BTERR_struct;
1192 memcpy (set->page, contents, bt->mgr->page_size);
1193 set->latch->dirty = 1;
1197 // find slot in page for given key at a given level
1199 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1201 uint diff, higher = set->page->cnt, low = 1, slot;
1204 // make stopper key an infinite fence value
1206 if( bt_getid (set->page->right) )
1211 // low is the lowest candidate.
1212 // loop ends when they meet
1214 // higher is already
1215 // tested as .ge. the passed key.
1217 while( diff = higher - low ) {
1218 slot = low + ( diff >> 1 );
1219 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1222 higher = slot, good++;
1225 // return zero if key is on right link page
1227 return good ? higher : 0;
1230 // find and load page at given level for given key
1231 // leave page rd or wr locked as requested
1233 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1235 uid page_no = ROOT_page, prevpage = 0;
1236 uint drill = 0xff, slot;
1237 BtLatchSet *prevlatch;
1238 uint mode, prevmode;
1240 // start at root of btree and drill down
1243 // determine lock mode of drill level
1244 mode = (drill == lvl) ? lock : BtLockRead;
1246 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1247 set->page_no = page_no;
1251 // obtain access lock using lock chaining with Access mode
1253 if( page_no > ROOT_page )
1254 bt_lockpage(BtLockAccess, set->latch);
1256 set->page = bt_mappage (bt, set->latch);
1258 // release & unpin parent page
1261 bt_unlockpage(prevmode, prevlatch);
1262 bt_unpinlatch (prevlatch);
1266 // obtain read lock using lock chaining
1268 bt_lockpage(mode, set->latch);
1270 if( set->page->free )
1271 return bt->err = BTERR_struct, 0;
1273 if( page_no > ROOT_page )
1274 bt_unlockpage(BtLockAccess, set->latch);
1276 // re-read and re-lock root after determining actual level of root
1278 if( set->page->lvl != drill) {
1279 if( set->page_no != ROOT_page )
1280 return bt->err = BTERR_struct, 0;
1282 drill = set->page->lvl;
1284 if( lock != BtLockRead && drill == lvl ) {
1285 bt_unlockpage(mode, set->latch);
1286 bt_unpinlatch (set->latch);
1291 prevpage = set->page_no;
1292 prevlatch = set->latch;
1295 // find key on page at this level
1296 // and descend to requested level
1298 if( set->page->kill )
1301 if( slot = bt_findslot (set, key, len) ) {
1305 while( slotptr(set->page, slot)->dead )
1306 if( slot++ < set->page->cnt )
1311 page_no = bt_getid(valptr(set->page, slot)->value);
1316 // or slide right into next page
1319 page_no = bt_getid(set->page->right);
1323 // return error on end of right chain
1325 bt->err = BTERR_struct;
1326 return 0; // return error
1329 // return page to free list
1330 // page must be delete & write locked
1332 void bt_freepage (BtDb *bt, BtPageSet *set)
1334 // lock allocation page
1336 bt_spinwritelock (bt->mgr->lock);
1339 memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1340 bt_putid(bt->mgr->pagezero->chain, set->page_no);
1341 set->latch->dirty = 1;
1342 set->page->free = 1;
1344 // unlock released page
1346 bt_unlockpage (BtLockDelete, set->latch);
1347 bt_unlockpage (BtLockWrite, set->latch);
1348 bt_unpinlatch (set->latch);
1350 // unlock allocation page
1352 bt_spinreleasewrite (bt->mgr->lock);
1355 // a fence key was deleted from a page
1356 // push new fence value upwards
1358 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1360 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1361 unsigned char value[BtId];
1365 // remove the old fence value
1367 ptr = keyptr(set->page, set->page->cnt);
1368 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1369 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1370 set->latch->dirty = 1;
1372 // cache new fence value
1374 ptr = keyptr(set->page, set->page->cnt);
1375 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1377 bt_lockpage (BtLockParent, set->latch);
1378 bt_unlockpage (BtLockWrite, set->latch);
1380 // insert new (now smaller) fence key
1382 bt_putid (value, set->page_no);
1383 ptr = (BtKey*)leftkey;
1385 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1388 // now delete old fence key
1390 ptr = (BtKey*)rightkey;
1392 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1395 bt_unlockpage (BtLockParent, set->latch);
1396 bt_unpinlatch(set->latch);
1400 // root has a single child
1401 // collapse a level from the tree
1403 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1408 // find the child entry and promote as new root contents
1411 for( idx = 0; idx++ < root->page->cnt; )
1412 if( !slotptr(root->page, idx)->dead )
1415 child->page_no = bt_getid (valptr(root->page, idx)->value);
1417 if( child->latch = bt_pinlatch (bt, child->page_no, 1) )
1418 child->page = bt_mappage (bt, child->latch);
1422 bt_lockpage (BtLockDelete, child->latch);
1423 bt_lockpage (BtLockWrite, child->latch);
1425 memcpy (root->page, child->page, bt->mgr->page_size);
1426 root->latch->dirty = 1;
1428 bt_freepage (bt, child);
1430 } while( root->page->lvl > 1 && root->page->act == 1 );
1432 bt_unlockpage (BtLockWrite, root->latch);
1433 bt_unpinlatch (root->latch);
1437 // maintain reverse scan pointers by
1438 // linking left pointer of far right node
1440 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1442 BtPageSet right2[1];
1444 // keep track of rightmost leaf page
1446 if( !right_page_no ) {
1447 bt_putid (bt->mgr->pagezero->alloc->left, left_page_no);
1451 // link right page to left page
1453 if( right2->latch = bt_pinlatch (bt, right_page_no, 1) )
1454 right2->page = bt_mappage (bt, right2->latch);
1458 bt_lockpage (BtLockWrite, right2->latch);
1460 bt_putid(right2->page->left, left_page_no);
1461 bt_unlockpage (BtLockWrite, right2->latch);
1462 bt_unpinlatch (right2->latch);
1466 // find and delete key on page by marking delete flag bit
1467 // if page becomes empty, delete it from the btree
1469 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1471 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1472 uint slot, idx, found, fence;
1473 BtPageSet set[1], right[1];
1474 unsigned char value[BtId];
1478 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1479 ptr = keyptr(set->page, slot);
1483 // if librarian slot, advance to real slot
1485 if( slotptr(set->page, slot)->type == Librarian )
1486 ptr = keyptr(set->page, ++slot);
1488 fence = slot == set->page->cnt;
1490 // if key is found delete it, otherwise ignore request
1492 if( found = !keycmp (ptr, key, len) )
1493 if( found = slotptr(set->page, slot)->dead == 0 ) {
1494 val = valptr(set->page,slot);
1495 slotptr(set->page, slot)->dead = 1;
1496 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1499 // collapse empty slots beneath the fence
1501 while( idx = set->page->cnt - 1 )
1502 if( slotptr(set->page, idx)->dead ) {
1503 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1504 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1509 // did we delete a fence key in an upper level?
1511 if( found && lvl && set->page->act && fence )
1512 if( bt_fixfence (bt, set, lvl) )
1515 return bt->found = found, 0;
1517 // do we need to collapse root?
1519 if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1520 if( bt_collapseroot (bt, set) )
1523 return bt->found = found, 0;
1525 // return if page is not empty
1527 if( set->page->act ) {
1528 set->latch->dirty = 1;
1529 bt_unlockpage(BtLockWrite, set->latch);
1530 bt_unpinlatch (set->latch);
1531 return bt->found = found, 0;
1534 // cache copy of fence key
1535 // to post in parent
1537 ptr = keyptr(set->page, set->page->cnt);
1538 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1540 // obtain lock on right page
1542 right->page_no = bt_getid(set->page->right);
1544 if( right->latch = bt_pinlatch (bt, right->page_no, 1) )
1545 right->page = bt_mappage (bt, right->latch);
1549 bt_lockpage (BtLockWrite, right->latch);
1551 if( right->page->kill )
1552 return bt->err = BTERR_struct;
1554 // transfer left link
1556 memcpy (right->page->left, set->page->left, BtId);
1558 // pull contents of right peer into our empty page
1560 memcpy (set->page, right->page, bt->mgr->page_size);
1561 set->latch->dirty = 1;
1566 if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1569 // cache copy of key to update
1571 ptr = keyptr(right->page, right->page->cnt);
1572 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1574 // mark right page deleted and point it to left page
1575 // until we can post parent updates
1577 bt_putid (right->page->right, set->page_no);
1578 right->latch->dirty = 1;
1579 right->page->kill = 1;
1581 bt_lockpage (BtLockParent, right->latch);
1582 bt_unlockpage (BtLockWrite, right->latch);
1584 bt_lockpage (BtLockParent, set->latch);
1585 bt_unlockpage (BtLockWrite, set->latch);
1587 // redirect higher key directly to our new node contents
1589 bt_putid (value, set->page_no);
1590 ptr = (BtKey*)higherfence;
1592 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1595 // delete old lower key to our node
1597 ptr = (BtKey*)lowerfence;
1599 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1602 // obtain delete and write locks to right node
1604 bt_unlockpage (BtLockParent, right->latch);
1605 bt_lockpage (BtLockDelete, right->latch);
1606 bt_lockpage (BtLockWrite, right->latch);
1607 bt_freepage (bt, right);
1609 bt_unlockpage (BtLockParent, set->latch);
1610 bt_unpinlatch (set->latch);
1615 BtKey *bt_foundkey (BtDb *bt)
1617 return (BtKey*)(bt->key);
1620 // advance to next slot
1622 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1624 BtLatchSet *prevlatch;
1627 if( slot < set->page->cnt )
1630 prevlatch = set->latch;
1632 if( page_no = bt_getid(set->page->right) )
1633 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1634 set->page = bt_mappage (bt, set->latch);
1638 return bt->err = BTERR_struct, 0;
1640 // obtain access lock using lock chaining with Access mode
1642 bt_lockpage(BtLockAccess, set->latch);
1644 bt_unlockpage(BtLockRead, prevlatch);
1645 bt_unpinlatch (prevlatch);
1647 bt_lockpage(BtLockRead, set->latch);
1648 bt_unlockpage(BtLockAccess, set->latch);
1650 set->page_no = page_no;
1654 // find unique key or first duplicate key in
1655 // leaf level and return number of value bytes
1656 // or (-1) if not found. Setup key for bt_foundkey
1658 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1666 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1668 ptr = keyptr(set->page, slot);
1670 // skip librarian slot place holder
1672 if( slotptr(set->page, slot)->type == Librarian )
1673 ptr = keyptr(set->page, ++slot);
1675 // return actual key found
1677 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1680 if( slotptr(set->page, slot)->type == Duplicate )
1683 // not there if we reach the stopper key
1685 if( slot == set->page->cnt )
1686 if( !bt_getid (set->page->right) )
1689 // if key exists, return >= 0 value bytes copied
1690 // otherwise return (-1)
1692 if( slotptr(set->page, slot)->dead )
1696 if( !memcmp (ptr->key, key, len) ) {
1697 val = valptr (set->page,slot);
1698 if( valmax > val->len )
1700 memcpy (value, val->value, valmax);
1706 } while( slot = bt_findnext (bt, set, slot) );
1708 bt_unlockpage (BtLockRead, set->latch);
1709 bt_unpinlatch (set->latch);
1713 // check page for space available,
1714 // clean if necessary and return
1715 // 0 - page needs splitting
1716 // >0 new slot value
1718 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1720 uint nxt = bt->mgr->page_size;
1721 BtPage page = set->page;
1722 uint cnt = 0, idx = 0;
1723 uint max = page->cnt;
1728 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1731 // skip cleanup and proceed to split
1732 // if there's not enough garbage
1735 if( page->garbage < nxt / 5 )
1738 memcpy (bt->frame, page, bt->mgr->page_size);
1740 // skip page info and set rest of page to zero
1742 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1743 set->latch->dirty = 1;
1747 // clean up page first by
1748 // removing deleted keys
1750 while( cnt++ < max ) {
1754 if( cnt < max || page->lvl )
1755 if( slotptr(bt->frame,cnt)->dead )
1758 // copy the value across
1760 val = valptr(bt->frame, cnt);
1761 nxt -= val->len + sizeof(BtVal);
1762 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1764 // copy the key across
1766 key = keyptr(bt->frame, cnt);
1767 nxt -= key->len + sizeof(BtKey);
1768 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1770 // make a librarian slot
1772 slotptr(page, ++idx)->off = nxt;
1773 slotptr(page, idx)->type = Librarian;
1774 slotptr(page, idx)->dead = 1;
1778 slotptr(page, ++idx)->off = nxt;
1779 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1781 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1788 // see if page has enough space now, or does it need splitting?
1790 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1796 // split the root and raise the height of the btree
1798 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
1800 uint nxt = bt->mgr->page_size;
1801 unsigned char value[BtId];
1806 // Obtain an empty page to use, and copy the current
1807 // root contents into it, e.g. lower keys
1809 if( bt_newpage(bt, left, root->page) )
1812 bt_unpinlatch (left->latch);
1814 // set left from higher to lower page
1816 bt_putid (right->page->left, left->page_no);
1817 bt_unpinlatch (right->latch);
1819 // preserve the page info at the bottom
1820 // of higher keys and set rest to zero
1822 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1824 // insert stopper key at top of newroot page
1825 // and increase the root height
1827 nxt -= BtId + sizeof(BtVal);
1828 bt_putid (value, right->page_no);
1829 val = (BtVal *)((unsigned char *)root->page + nxt);
1830 memcpy (val->value, value, BtId);
1833 nxt -= 2 + sizeof(BtKey);
1834 slotptr(root->page, 2)->off = nxt;
1835 ptr = (BtKey *)((unsigned char *)root->page + nxt);
1840 // insert lower keys page fence key on newroot page as first key
1842 nxt -= BtId + sizeof(BtVal);
1843 bt_putid (value, left->page_no);
1844 val = (BtVal *)((unsigned char *)root->page + nxt);
1845 memcpy (val->value, value, BtId);
1848 nxt -= leftkey->len + sizeof(BtKey);
1849 slotptr(root->page, 1)->off = nxt;
1850 memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
1852 bt_putid(root->page->right, 0);
1853 root->page->min = nxt; // reset lowest used offset and key count
1854 root->page->cnt = 2;
1855 root->page->act = 2;
1858 // release and unpin root pages
1860 bt_unlockpage(BtLockWrite, root->latch);
1861 bt_unpinlatch (root->latch);
1865 // split already locked full node
1868 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1870 unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
1871 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1872 unsigned char value[BtId];
1873 uint lvl = set->page->lvl;
1880 // split higher half of keys to bt->frame
1882 memset (bt->frame, 0, bt->mgr->page_size);
1883 max = set->page->cnt;
1887 while( cnt++ < max ) {
1888 if( cnt < max || set->page->lvl )
1889 if( slotptr(set->page, cnt)->dead )
1892 src = valptr(set->page, cnt);
1893 nxt -= src->len + sizeof(BtVal);
1894 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1896 key = keyptr(set->page, cnt);
1897 nxt -= key->len + sizeof(BtKey);
1898 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1899 memcpy (ptr, key, key->len + sizeof(BtKey));
1901 // add librarian slot
1903 slotptr(bt->frame, ++idx)->off = nxt;
1904 slotptr(bt->frame, idx)->type = Librarian;
1905 slotptr(bt->frame, idx)->dead = 1;
1909 slotptr(bt->frame, ++idx)->off = nxt;
1910 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1912 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1916 // remember existing fence key for new page to the right
1918 memcpy (rightkey, key, key->len + sizeof(BtKey));
1920 bt->frame->bits = bt->mgr->page_bits;
1921 bt->frame->min = nxt;
1922 bt->frame->cnt = idx;
1923 bt->frame->lvl = lvl;
1927 if( set->page_no > ROOT_page ) {
1928 bt_putid (bt->frame->right, bt_getid (set->page->right));
1929 bt_putid(bt->frame->left, set->page_no);
1932 // get new free page and write higher keys to it.
1934 if( bt_newpage(bt, right, bt->frame) )
1939 if( set->page_no > ROOT_page && !lvl )
1940 if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
1943 // update lower keys to continue in old page
1945 memcpy (bt->frame, set->page, bt->mgr->page_size);
1946 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1947 set->latch->dirty = 1;
1949 nxt = bt->mgr->page_size;
1950 set->page->garbage = 0;
1956 if( slotptr(bt->frame, max)->type == Librarian )
1959 // assemble page of smaller keys
1961 while( cnt++ < max ) {
1962 if( slotptr(bt->frame, cnt)->dead )
1964 val = valptr(bt->frame, cnt);
1965 nxt -= val->len + sizeof(BtVal);
1966 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1968 key = keyptr(bt->frame, cnt);
1969 nxt -= key->len + sizeof(BtKey);
1970 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1972 // add librarian slot
1974 slotptr(set->page, ++idx)->off = nxt;
1975 slotptr(set->page, idx)->type = Librarian;
1976 slotptr(set->page, idx)->dead = 1;
1980 slotptr(set->page, ++idx)->off = nxt;
1981 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
1985 // remember fence key for smaller page
1987 memcpy(fencekey, key, key->len + sizeof(BtKey));
1989 bt_putid(set->page->right, right->page_no);
1990 set->page->min = nxt;
1991 set->page->cnt = idx;
1993 // if current page is the root page, split it
1995 if( set->page_no == ROOT_page )
1996 return bt_splitroot (bt, set, (BtKey *)fencekey, right);
1998 // insert new fences in their parent pages
2000 bt_lockpage (BtLockParent, right->latch);
2002 bt_lockpage (BtLockParent, set->latch);
2003 bt_unlockpage (BtLockWrite, set->latch);
2005 // insert new fence for reformulated left block of smaller keys
2007 ptr = (BtKey*)fencekey;
2008 bt_putid (value, set->page_no);
2010 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2013 // switch fence for right block of larger keys to new right page
2015 ptr = (BtKey*)rightkey;
2016 bt_putid (value, right->page_no);
2018 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2021 bt_unlockpage (BtLockParent, set->latch);
2022 bt_unpinlatch (set->latch);
2024 bt_unlockpage (BtLockParent, right->latch);
2025 bt_unpinlatch (right->latch);
2029 // install new key and value onto page
2030 // page must already be checked for
2033 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2035 uint idx, librarian;
2040 // if found slot > desired slot and previous slot
2041 // is a librarian slot, use it
2044 if( slotptr(set->page, slot-1)->type == Librarian )
2047 // copy value onto page
2049 set->page->min -= vallen + sizeof(BtVal);
2050 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2051 memcpy (val->value, value, vallen);
2054 // copy key onto page
2056 set->page->min -= keylen + sizeof(BtKey);
2057 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2058 memcpy (ptr->key, key, keylen);
2061 // find first empty slot
2063 for( idx = slot; idx < set->page->cnt; idx++ )
2064 if( slotptr(set->page, idx)->dead )
2067 // now insert key into array before slot
2069 if( idx == set->page->cnt )
2070 idx += 2, set->page->cnt += 2, librarian = 2;
2074 set->latch->dirty = 1;
2077 while( idx > slot + librarian - 1 )
2078 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2080 // add librarian slot
2082 if( librarian > 1 ) {
2083 node = slotptr(set->page, slot++);
2084 node->off = set->page->min;
2085 node->type = Librarian;
2091 node = slotptr(set->page, slot);
2092 node->off = set->page->min;
2096 bt_unlockpage (BtLockWrite, set->latch);
2097 bt_unpinlatch (set->latch);
2101 // Insert new key into the btree at given level.
2102 // either add a new key or update/add an existing one
2104 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2106 unsigned char newkey[BT_keyarray];
2107 uint slot, idx, len;
2114 // set up the key we're working on
2116 ins = (BtKey*)newkey;
2117 memcpy (ins->key, key, keylen);
2120 // is this a non-unique index value?
2126 sequence = bt_newdup (bt);
2127 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2131 while( 1 ) { // find the page and slot for the current key
2132 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2133 ptr = keyptr(set->page, slot);
2136 bt->err = BTERR_ovflw;
2140 // if librarian slot == found slot, advance to real slot
2142 if( slotptr(set->page, slot)->type == Librarian )
2143 if( !keycmp (ptr, key, keylen) )
2144 ptr = keyptr(set->page, ++slot);
2148 if( slotptr(set->page, slot)->type == Duplicate )
2151 // if inserting a duplicate key or unique key
2152 // check for adequate space on the page
2153 // and insert the new key before slot.
2155 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2156 if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2157 if( bt_splitpage (bt, set) )
2162 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
2165 // if key already exists, update value and return
2167 val = valptr(set->page, slot);
2169 if( val->len >= vallen ) {
2170 if( slotptr(set->page, slot)->dead )
2172 set->page->garbage += val->len - vallen;
2173 set->latch->dirty = 1;
2174 slotptr(set->page, slot)->dead = 0;
2176 memcpy (val->value, value, vallen);
2177 bt_unlockpage(BtLockWrite, set->latch);
2178 bt_unpinlatch (set->latch);
2182 // new update value doesn't fit in existing value area
2184 if( !slotptr(set->page, slot)->dead )
2185 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2187 slotptr(set->page, slot)->dead = 0;
2191 if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2192 if( bt_splitpage (bt, set) )
2197 set->page->min -= vallen + sizeof(BtVal);
2198 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2199 memcpy (val->value, value, vallen);
2202 set->latch->dirty = 1;
2203 set->page->min -= keylen + sizeof(BtKey);
2204 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2205 memcpy (ptr->key, key, keylen);
2208 slotptr(set->page, slot)->off = set->page->min;
2209 bt_unlockpage(BtLockWrite, set->latch);
2210 bt_unpinlatch (set->latch);
2216 // set cursor to highest slot on highest page
2218 uint bt_lastkey (BtDb *bt)
2220 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2224 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2225 set->page = bt_mappage (bt, set->latch);
2229 bt_lockpage(BtLockRead, set->latch);
2231 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2232 slot = set->page->cnt;
2234 bt_unlockpage(BtLockRead, set->latch);
2235 bt_unpinlatch (set->latch);
2239 // return previous slot on cursor page
2241 uint bt_prevkey (BtDb *bt, uint slot)
2249 if( left = bt_getid(bt->cursor->left) )
2250 bt->cursor_page = left;
2254 if( set->latch = bt_pinlatch (bt, left, 1) )
2255 set->page = bt_mappage (bt, set->latch);
2259 bt_lockpage(BtLockRead, set->latch);
2260 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2261 bt_unlockpage(BtLockRead, set->latch);
2262 bt_unpinlatch (set->latch);
2263 return bt->cursor->cnt;
2266 // return next slot on cursor page
2267 // or slide cursor right into next page
2269 uint bt_nextkey (BtDb *bt, uint slot)
2275 right = bt_getid(bt->cursor->right);
2277 while( slot++ < bt->cursor->cnt )
2278 if( slotptr(bt->cursor,slot)->dead )
2280 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2288 bt->cursor_page = right;
2290 if( set->latch = bt_pinlatch (bt, right, 1) )
2291 set->page = bt_mappage (bt, set->latch);
2295 bt_lockpage(BtLockRead, set->latch);
2297 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2299 bt_unlockpage(BtLockRead, set->latch);
2300 bt_unpinlatch (set->latch);
2308 // cache page of keys into cursor and return starting slot for given key
2310 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2315 // cache page for retrieval
2317 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2318 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2322 bt->cursor_page = set->page_no;
2324 bt_unlockpage(BtLockRead, set->latch);
2325 bt_unpinlatch (set->latch);
2329 BtKey *bt_key(BtDb *bt, uint slot)
2331 return keyptr(bt->cursor, slot);
2334 BtVal *bt_val(BtDb *bt, uint slot)
2336 return valptr(bt->cursor,slot);
2342 double getCpuTime(int type)
2345 FILETIME xittime[1];
2346 FILETIME systime[1];
2347 FILETIME usrtime[1];
2348 SYSTEMTIME timeconv[1];
2351 memset (timeconv, 0, sizeof(SYSTEMTIME));
2355 GetSystemTimeAsFileTime (xittime);
2356 FileTimeToSystemTime (xittime, timeconv);
2357 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2360 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2361 FileTimeToSystemTime (usrtime, timeconv);
2364 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2365 FileTimeToSystemTime (systime, timeconv);
2369 ans += (double)timeconv->wHour * 3600;
2370 ans += (double)timeconv->wMinute * 60;
2371 ans += (double)timeconv->wSecond;
2372 ans += (double)timeconv->wMilliseconds / 1000;
2377 #include <sys/resource.h>
2379 double getCpuTime(int type)
2381 struct rusage used[1];
2382 struct timeval tv[1];
2386 gettimeofday(tv, NULL);
2387 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2390 getrusage(RUSAGE_SELF, used);
2391 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2394 getrusage(RUSAGE_SELF, used);
2395 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2402 void bt_poolaudit (BtMgr *mgr)
2407 while( slot++ < mgr->latchdeployed ) {
2408 latch = mgr->latchsets + slot;
2410 if( *latch->readwr->rin & MASK )
2411 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
2412 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2414 if( *latch->access->rin & MASK )
2415 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
2416 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2418 if( *latch->parent->rin & MASK )
2419 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
2420 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2422 if( latch->pin & ~CLOCK_bit ) {
2423 fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
2429 uint bt_latchaudit (BtDb *bt)
2431 ushort idx, hashidx;
2437 if( *(ushort *)(bt->mgr->lock) )
2438 fprintf(stderr, "Alloc page locked\n");
2439 *(ushort *)(bt->mgr->lock) = 0;
2441 for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
2442 latch = bt->mgr->latchsets + idx;
2443 if( *latch->readwr->rin & MASK )
2444 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2445 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2447 if( *latch->access->rin & MASK )
2448 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2449 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2451 if( *latch->parent->rin & MASK )
2452 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2453 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2456 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2461 for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
2462 if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
2463 fprintf(stderr, "hash entry %d locked\n", hashidx);
2465 *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
2467 if( idx = bt->mgr->hashtable[hashidx].slot ) do {
2468 latch = bt->mgr->latchsets + idx;
2470 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2471 } while( idx = latch->next );
2474 page_no = LEAF_page;
2476 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2477 uid off = page_no << bt->mgr->page_bits;
2479 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2483 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2485 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2486 return bt->err = BTERR_map;
2488 if( *amt < bt->mgr->page_size )
2489 return bt->err = BTERR_map;
2491 if( !bt->frame->free && !bt->frame->lvl )
2492 cnt += bt->frame->act;
2496 cnt--; // remove stopper key
2497 fprintf(stderr, " Total keys read %d\n", cnt);
2511 // standalone program to index file of keys
2512 // then list them onto std-out
2515 void *index_file (void *arg)
2517 uint __stdcall index_file (void *arg)
2520 int line = 0, found = 0, cnt = 0, unique;
2521 uid next, page_no = LEAF_page; // start on first page of leaves
2522 unsigned char key[BT_maxkey];
2523 ThreadArg *args = arg;
2524 int ch, len = 0, slot;
2531 bt = bt_open (args->mgr);
2533 unique = (args->type[1] | 0x20) != 'd';
2535 switch(args->type[0] | 0x20)
2538 fprintf(stderr, "started latch mgr audit\n");
2539 cnt = bt_latchaudit (bt);
2540 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2544 fprintf(stderr, "started pennysort for %s\n", args->infile);
2545 if( in = fopen (args->infile, "rb") )
2546 while( ch = getc(in), ch != EOF )
2551 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2552 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2555 else if( len < BT_maxkey )
2557 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2561 fprintf(stderr, "started indexing for %s\n", args->infile);
2562 if( in = fopen (args->infile, "rb") )
2563 while( ch = getc(in), ch != EOF )
2568 if( args->num == 1 )
2569 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2571 else if( args->num )
2572 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2574 if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
2575 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2578 else if( len < BT_maxkey )
2580 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2584 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2585 if( in = fopen (args->infile, "rb") )
2586 while( ch = getc(in), ch != EOF )
2590 if( args->num == 1 )
2591 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2593 else if( args->num )
2594 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2596 if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2597 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2598 ptr = (BtKey*)(bt->key);
2601 if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2602 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2605 else if( len < BT_maxkey )
2607 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2611 fprintf(stderr, "started finding keys for %s\n", args->infile);
2612 if( in = fopen (args->infile, "rb") )
2613 while( ch = getc(in), ch != EOF )
2617 if( args->num == 1 )
2618 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2620 else if( args->num )
2621 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2623 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2626 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2629 else if( len < BT_maxkey )
2631 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2635 fprintf(stderr, "started scanning\n");
2637 if( set->latch = bt_pinlatch (bt, page_no, 1) )
2638 set->page = bt_mappage (bt, set->latch);
2640 fprintf(stderr, "unable to obtain latch"), exit(1);
2641 bt_lockpage (BtLockRead, set->latch);
2642 next = bt_getid (set->page->right);
2644 for( slot = 0; slot++ < set->page->cnt; )
2645 if( next || slot < set->page->cnt )
2646 if( !slotptr(set->page, slot)->dead ) {
2647 ptr = keyptr(set->page, slot);
2650 if( slotptr(set->page, slot)->type == Duplicate )
2653 fwrite (ptr->key, len, 1, stdout);
2654 val = valptr(set->page, slot);
2655 fwrite (val->value, val->len, 1, stdout);
2656 fputc ('\n', stdout);
2660 bt_unlockpage (BtLockRead, set->latch);
2661 bt_unpinlatch (set->latch);
2662 } while( page_no = next );
2664 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2668 fprintf(stderr, "started reverse scan\n");
2669 if( slot = bt_lastkey (bt) )
2670 while( slot = bt_prevkey (bt, slot) ) {
2671 if( slotptr(bt->cursor, slot)->dead )
2674 ptr = keyptr(bt->cursor, slot);
2677 if( slotptr(bt->cursor, slot)->type == Duplicate )
2680 fwrite (ptr->key, len, 1, stdout);
2681 val = valptr(bt->cursor, slot);
2682 fwrite (val->value, val->len, 1, stdout);
2683 fputc ('\n', stdout);
2687 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2692 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2694 fprintf(stderr, "started counting\n");
2695 page_no = LEAF_page;
2697 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2698 if( bt_readpage (bt->mgr, bt->frame, page_no) )
2701 if( !bt->frame->free && !bt->frame->lvl )
2702 cnt += bt->frame->act;
2708 cnt--; // remove stopper key
2709 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2721 typedef struct timeval timer;
2723 int main (int argc, char **argv)
2725 int idx, cnt, len, slot, err;
2726 int segsize, bits = 16;
2743 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2744 fprintf (stderr, " where page_bits is the page size in bits\n");
2745 fprintf (stderr, " buffer_pool_size is the number of pages in buffer pool\n");
2746 fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n");
2747 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
2751 start = getCpuTime(0);
2754 bits = atoi(argv[3]);
2757 poolsize = atoi(argv[4]);
2760 fprintf (stderr, "Warning: no mapped_pool\n");
2763 num = atoi(argv[5]);
2767 threads = malloc (cnt * sizeof(pthread_t));
2769 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2771 args = malloc (cnt * sizeof(ThreadArg));
2773 mgr = bt_mgr ((argv[1]), bits, poolsize);
2776 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2782 for( idx = 0; idx < cnt; idx++ ) {
2783 args[idx].infile = argv[idx + 6];
2784 args[idx].type = argv[2];
2785 args[idx].mgr = mgr;
2786 args[idx].num = num;
2787 args[idx].idx = idx;
2789 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2790 fprintf(stderr, "Error creating thread %d\n", err);
2792 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2796 // wait for termination
2799 for( idx = 0; idx < cnt; idx++ )
2800 pthread_join (threads[idx], NULL);
2802 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2804 for( idx = 0; idx < cnt; idx++ )
2805 CloseHandle(threads[idx]);
2811 elapsed = getCpuTime(0) - start;
2812 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2813 elapsed = getCpuTime(1);
2814 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2815 elapsed = getCpuTime(2);
2816 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);