1 // btree version threadskv3 sched_yield version
2 // with reworked bt_deletekey code
3 // and phase-fair reader writer lock
4 // and librarian page split code
7 // author: karl malbrain, malbrain@cal.berkeley.edu
10 This work, including the source code, documentation
11 and related data, is placed into the public domain.
13 The orginal author is Karl Malbrain.
15 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
16 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
17 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
18 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
19 RESULTING FROM THE USE, MODIFICATION, OR
20 REDISTRIBUTION OF THIS SOFTWARE.
23 // Please see the project home page for documentation
24 // code.google.com/p/high-concurrency-btree
26 #define _FILE_OFFSET_BITS 64
27 #define _LARGEFILE64_SOURCE
43 #define WIN32_LEAN_AND_MEAN
57 typedef unsigned long long uid;
60 typedef unsigned long long off64_t;
61 typedef unsigned short ushort;
62 typedef unsigned int uint;
65 #define BT_latchtable 128 // number of latch manager slots
67 #define BT_ro 0x6f72 // ro
68 #define BT_rw 0x7772 // rw
70 #define BT_maxbits 24 // maximum page size in bits
71 #define BT_minbits 9 // minimum page size in bits
72 #define BT_minpage (1 << BT_minbits) // minimum page size
73 #define BT_maxpage (1 << BT_maxbits) // maximum page size
76 There are five lock types for each node in three independent sets:
77 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
78 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
79 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
80 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
81 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
92 // definition for phase-fair reader/writer lock implementation
106 // definition for spin latch implementation
108 // exclusive is set for write access
109 // share is count of read accessors
110 // grant write lock when share == 0
112 volatile typedef struct {
123 // hash table entries
126 BtSpinLatch latch[1];
127 volatile ushort slot; // Latch table entry at head of chain
130 // latch manager table structure
133 RWLock readwr[1]; // read/write page lock
134 RWLock access[1]; // Access Intent/Page delete
135 RWLock parent[1]; // Posting of fence key in parent
136 BtSpinLatch busy[1]; // slot is being moved between chains
137 volatile ushort next; // next entry in hash table chain
138 volatile ushort prev; // prev entry in hash table chain
139 volatile ushort pin; // number of outstanding locks
140 volatile ushort hash; // hash slot entry is under
141 volatile uid page_no; // latch set page number
144 // Define the length of the page and key pointers
148 // Page key slot definition.
150 // If BT_maxbits is 15 or less, you can save 2 bytes
151 // for each key stored by making the two uints
154 // Keys are marked dead, but remain on the page until
155 // it cleanup is called. The fence key (highest key) for
156 // the page is always present, even after cleanup.
159 uint off:BT_maxbits; // page offset for key start
160 uint librarian:1; // set for librarian slot
161 uint dead:1; // set for deleted key
164 // The key structure occupies space at the upper end of
165 // each page. It's a length byte followed by the key
170 unsigned char key[1];
173 // the value structure also occupies space at the upper
178 unsigned char value[1];
181 // The first part of an index page.
182 // It is immediately followed
183 // by the BtSlot array of keys.
185 typedef struct BtPage_ {
186 uint cnt; // count of keys in page
187 uint act; // count of active keys
188 uint min; // next key offset
189 uint garbage; // page garbage in bytes
190 unsigned char bits:7; // page size in bits
191 unsigned char free:1; // page is on free chain
192 unsigned char lvl:7; // level of page
193 unsigned char kill:1; // page is being deleted
194 unsigned char right[BtId]; // page number to right
197 // The memory mapping pool table buffer manager entry
200 uid basepage; // mapped base page number
201 char *map; // mapped memory pointer
202 ushort slot; // slot index in this array
203 ushort pin; // mapped page pin counter
204 void *hashprev; // previous pool entry for the same hash idx
205 void *hashnext; // next pool entry for the same hash idx
207 HANDLE hmap; // Windows memory mapping handle
211 #define CLOCK_bit 0x8000 // bit in pool->pin
213 // The loadpage interface object
216 uid page_no; // current page number
217 BtPage page; // current page pointer
218 BtPool *pool; // current page pool
219 BtLatchSet *latch; // current page latch set
222 // structure for latch manager on ALLOC_page
225 struct BtPage_ alloc[1]; // next page_no in right ptr
226 unsigned char chain[BtId]; // head of free page_nos chain
227 BtSpinLatch lock[1]; // allocation area lite latch
228 ushort latchdeployed; // highest number of latch entries deployed
229 ushort nlatchpage; // number of latch pages at BT_latch
230 ushort latchtotal; // number of page latch entries
231 ushort latchhash; // number of latch hash table slots
232 ushort latchvictim; // next latch entry to examine
233 BtHashEntry table[0]; // the hash table
236 // The object structure for Btree access
239 uint page_size; // page size
240 uint page_bits; // page size in bits
241 uint seg_bits; // seg size in pages in bits
242 uint mode; // read-write mode
248 ushort poolcnt; // highest page pool node in use
249 ushort poolmax; // highest page pool node allocated
250 ushort poolmask; // total number of pages in mmap segment - 1
251 ushort hashsize; // size of Hash Table for pool entries
252 volatile uint evicted; // last evicted hash table slot
253 ushort *hash; // pool index for hash entries
254 BtSpinLatch *latch; // latches for hash table slots
255 BtLatchMgr *latchmgr; // mapped latch page from allocation page
256 BtLatchSet *latchsets; // mapped latch set from latch pages
257 BtPool *pool; // memory pool page segments
259 HANDLE halloc; // allocation and latch table handle
264 BtMgr *mgr; // buffer manager for thread
265 BtPage cursor; // cached frame for start/next (never mapped)
266 BtPage frame; // spare frame for the page split (never mapped)
267 uid cursor_page; // current cursor page number
268 unsigned char *mem; // frame, cursor, page memory buffer
269 int found; // last delete or insert was found
270 int err; // last error
284 extern void bt_close (BtDb *bt);
285 extern BtDb *bt_open (BtMgr *mgr);
286 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen);
287 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
288 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint vallen);
289 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
290 extern uint bt_nextkey (BtDb *bt, uint slot);
293 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
294 void bt_mgrclose (BtMgr *mgr);
296 // Helper functions to return slot values
297 // from the cursor page.
299 extern BtKey bt_key (BtDb *bt, uint slot);
300 extern BtVal bt_val (BtDb *bt, uint slot);
302 // BTree page number constants
303 #define ALLOC_page 0 // allocation & latch manager hash table
304 #define ROOT_page 1 // root of the btree
305 #define LEAF_page 2 // first page of leaves
306 #define LATCH_page 3 // pages for latch manager
308 // Number of levels to create in a new BTree
312 // The page is allocated from low and hi ends.
313 // The key slots are allocated from the bottom,
314 // while the text and value of the key
315 // are allocated from the top. When the two
316 // areas meet, the page is split into two.
318 // A key consists of a length byte, two bytes of
319 // index number (0 - 65534), and up to 253 bytes
320 // of key value. Duplicate keys are discarded.
321 // Associated with each key is a value byte string
322 // containing any value desired.
324 // The b-tree root is always located at page 1.
325 // The first leaf page of level zero is always
326 // located on page 2.
328 // The b-tree pages are linked with next
329 // pointers to facilitate enumerators,
330 // and provide for concurrency.
332 // When to root page fills, it is split in two and
333 // the tree height is raised by a new root at page
334 // one with two keys.
336 // Deleted keys are marked with a dead bit until
337 // page cleanup. The fence key for a leaf node is
340 // Groups of pages called segments from the btree are optionally
341 // cached with a memory mapped pool. A hash table is used to keep
342 // track of the cached segments. This behaviour is controlled
343 // by the cache block size parameter to bt_open.
345 // To achieve maximum concurrency one page is locked at a time
346 // as the tree is traversed to find leaf key in question. The right
347 // page numbers are used in cases where the page is being split,
350 // Page 0 is dedicated to lock for new page extensions,
351 // and chains empty pages together for reuse. It also
352 // contains the latch manager hash table.
354 // The ParentModification lock on a node is obtained to serialize posting
355 // or changing the fence key for a node.
357 // Empty pages are chained together through the ALLOC page and reused.
359 // Access macros to address slot and key values from the page
360 // Page slots use 1 based indexing.
362 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
363 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
364 #define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
366 void bt_putid(unsigned char *dest, uid id)
371 dest[i] = (unsigned char)id, id >>= 8;
374 uid bt_getid(unsigned char *src)
379 for( i = 0; i < BtId; i++ )
380 id <<= 8, id |= *src++;
385 // Phase-Fair reader/writer lock implementation
387 void WriteLock (RWLock *lock)
392 tix = __sync_fetch_and_add (lock->ticket, 1);
394 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
396 // wait for our ticket to come up
398 while( tix != lock->serving[0] )
405 w = PRES | (tix & PHID);
407 r = __sync_fetch_and_add (lock->rin, w);
409 r = _InterlockedExchangeAdd16 (lock->rin, w);
411 while( r != *lock->rout )
419 void WriteRelease (RWLock *lock)
422 __sync_fetch_and_and (lock->rin, ~MASK);
424 _InterlockedAnd16 (lock->rin, ~MASK);
429 void ReadLock (RWLock *lock)
433 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
435 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
438 while( w == (*lock->rin & MASK) )
446 void ReadRelease (RWLock *lock)
449 __sync_fetch_and_add (lock->rout, RINC);
451 _InterlockedExchangeAdd16 (lock->rout, RINC);
455 // Spin Latch Manager
457 // wait until write lock mode is clear
458 // and add 1 to the share count
460 void bt_spinreadlock(BtSpinLatch *latch)
466 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
468 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
470 // see if exclusive request is granted or pending
475 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
477 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
480 } while( sched_yield(), 1 );
482 } while( SwitchToThread(), 1 );
486 // wait for other read and write latches to relinquish
488 void bt_spinwritelock(BtSpinLatch *latch)
494 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
496 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
499 if( !(prev & ~BOTH) )
503 __sync_fetch_and_and ((ushort *)latch, ~XCL);
505 _InterlockedAnd16((ushort *)latch, ~XCL);
508 } while( sched_yield(), 1 );
510 } while( SwitchToThread(), 1 );
514 // try to obtain write lock
516 // return 1 if obtained,
519 int bt_spinwritetry(BtSpinLatch *latch)
524 prev = __sync_fetch_and_or((ushort *)latch, XCL);
526 prev = _InterlockedOr16((ushort *)latch, XCL);
528 // take write access if all bits are clear
531 if( !(prev & ~BOTH) )
535 __sync_fetch_and_and ((ushort *)latch, ~XCL);
537 _InterlockedAnd16((ushort *)latch, ~XCL);
544 void bt_spinreleasewrite(BtSpinLatch *latch)
547 __sync_fetch_and_and((ushort *)latch, ~BOTH);
549 _InterlockedAnd16((ushort *)latch, ~BOTH);
553 // decrement reader count
555 void bt_spinreleaseread(BtSpinLatch *latch)
558 __sync_fetch_and_add((ushort *)latch, -SHARE);
560 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
564 // link latch table entry into latch hash table
566 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
568 BtLatchSet *set = bt->mgr->latchsets + victim;
570 if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
571 bt->mgr->latchsets[set->next].prev = victim;
573 bt->mgr->latchmgr->table[hashidx].slot = victim;
574 set->page_no = page_no;
581 void bt_unpinlatch (BtLatchSet *set)
584 __sync_fetch_and_add(&set->pin, -1);
586 _InterlockedDecrement16 (&set->pin);
590 // find existing latchset or inspire new one
591 // return with latchset pinned
593 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
595 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
596 ushort slot, avail = 0, victim, idx;
599 // obtain read lock on hash table entry
601 bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
603 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
605 set = bt->mgr->latchsets + slot;
606 if( page_no == set->page_no )
608 } while( slot = set->next );
612 __sync_fetch_and_add(&set->pin, 1);
614 _InterlockedIncrement16 (&set->pin);
618 bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
623 // try again, this time with write lock
625 bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
627 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
629 set = bt->mgr->latchsets + slot;
630 if( page_no == set->page_no )
632 if( !set->pin && !avail )
634 } while( slot = set->next );
636 // found our entry, or take over an unpinned one
638 if( slot || (slot = avail) ) {
639 set = bt->mgr->latchsets + slot;
641 __sync_fetch_and_add(&set->pin, 1);
643 _InterlockedIncrement16 (&set->pin);
645 set->page_no = page_no;
646 bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
650 // see if there are any unused entries
652 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
654 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
657 if( victim < bt->mgr->latchmgr->latchtotal ) {
658 set = bt->mgr->latchsets + victim;
660 __sync_fetch_and_add(&set->pin, 1);
662 _InterlockedIncrement16 (&set->pin);
664 bt_latchlink (bt, hashidx, victim, page_no);
665 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
670 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
672 victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
674 // find and reuse previous lock entry
678 victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
680 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
682 // we don't use slot zero
684 if( victim %= bt->mgr->latchmgr->latchtotal )
685 set = bt->mgr->latchsets + victim;
689 // take control of our slot
690 // from other threads
692 if( set->pin || !bt_spinwritetry (set->busy) )
697 // try to get write lock on hash chain
698 // skip entry if not obtained
699 // or has outstanding locks
701 if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
702 bt_spinreleasewrite (set->busy);
707 bt_spinreleasewrite (set->busy);
708 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
712 // unlink our available victim from its hash chain
715 bt->mgr->latchsets[set->prev].next = set->next;
717 bt->mgr->latchmgr->table[idx].slot = set->next;
720 bt->mgr->latchsets[set->next].prev = set->prev;
722 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
724 __sync_fetch_and_add(&set->pin, 1);
726 _InterlockedIncrement16 (&set->pin);
728 bt_latchlink (bt, hashidx, victim, page_no);
729 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
730 bt_spinreleasewrite (set->busy);
735 void bt_mgrclose (BtMgr *mgr)
740 // release mapped pages
741 // note that slot zero is never used
743 for( slot = 1; slot < mgr->poolmax; slot++ ) {
744 pool = mgr->pool + slot;
747 munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
750 FlushViewOfFile(pool->map, 0);
751 UnmapViewOfFile(pool->map);
752 CloseHandle(pool->hmap);
758 munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
759 munmap (mgr->latchmgr, 2 * mgr->page_size);
761 FlushViewOfFile(mgr->latchmgr, 0);
762 UnmapViewOfFile(mgr->latchmgr);
763 CloseHandle(mgr->halloc);
769 free ((void *)mgr->latch);
772 FlushFileBuffers(mgr->idx);
773 CloseHandle(mgr->idx);
774 GlobalFree (mgr->pool);
775 GlobalFree (mgr->hash);
776 GlobalFree ((void *)mgr->latch);
781 // close and release memory
783 void bt_close (BtDb *bt)
790 VirtualFree (bt->mem, 0, MEM_RELEASE);
795 // open/create new btree buffer manager
797 // call with file_name, BT_openmode, bits in page size (e.g. 16),
798 // size of mapped page pool (e.g. 8192)
800 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
802 uint lvl, attr, cacheblk, last, slot, idx;
803 uint nlatchpage, latchhash;
804 unsigned char value[BtId];
805 BtLatchMgr *latchmgr;
814 SYSTEM_INFO sysinfo[1];
817 // determine sanity of page size and buffer pool
819 if( bits > BT_maxbits )
821 else if( bits < BT_minbits )
825 return NULL; // must have buffer pool
828 mgr = calloc (1, sizeof(BtMgr));
830 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
833 return free(mgr), NULL;
835 cacheblk = 4096; // minimum mmap segment size for unix
838 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
839 attr = FILE_ATTRIBUTE_NORMAL;
840 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
842 if( mgr->idx == INVALID_HANDLE_VALUE )
843 return GlobalFree(mgr), NULL;
845 // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
846 GetSystemInfo(sysinfo);
847 cacheblk = sysinfo->dwAllocationGranularity;
851 latchmgr = malloc (BT_maxpage);
854 // read minimum page size to get root info
856 if( size = lseek (mgr->idx, 0L, 2) ) {
857 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
858 bits = latchmgr->alloc->bits;
860 return free(mgr), free(latchmgr), NULL;
861 } else if( mode == BT_ro )
862 return free(latchmgr), bt_mgrclose (mgr), NULL;
864 latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
865 size = GetFileSize(mgr->idx, amt);
868 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
869 return bt_mgrclose (mgr), NULL;
870 bits = latchmgr->alloc->bits;
871 } else if( mode == BT_ro )
872 return bt_mgrclose (mgr), NULL;
875 mgr->page_size = 1 << bits;
876 mgr->page_bits = bits;
878 mgr->poolmax = poolmax;
881 if( cacheblk < mgr->page_size )
882 cacheblk = mgr->page_size;
884 // mask for partial memmaps
886 mgr->poolmask = (cacheblk >> bits) - 1;
888 // see if requested size of pages per memmap is greater
890 if( (1 << segsize) > mgr->poolmask )
891 mgr->poolmask = (1 << segsize) - 1;
895 while( (1 << mgr->seg_bits) <= mgr->poolmask )
898 mgr->hashsize = hashsize;
901 mgr->pool = calloc (poolmax, sizeof(BtPool));
902 mgr->hash = calloc (hashsize, sizeof(ushort));
903 mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
905 mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
906 mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
907 mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
913 // initialize an empty b-tree with latch page, root page, page of leaves
914 // and page(s) of latches
916 memset (latchmgr, 0, 1 << bits);
917 nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1;
918 bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
919 latchmgr->alloc->bits = mgr->page_bits;
921 latchmgr->nlatchpage = nlatchpage;
922 latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
924 // initialize latch manager
926 latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
928 // size of hash table = total number of latchsets
930 if( latchhash > latchmgr->latchtotal )
931 latchhash = latchmgr->latchtotal;
933 latchmgr->latchhash = latchhash;
936 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
937 return bt_mgrclose (mgr), NULL;
939 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
940 return bt_mgrclose (mgr), NULL;
942 if( *amt < mgr->page_size )
943 return bt_mgrclose (mgr), NULL;
946 memset (latchmgr, 0, 1 << bits);
947 latchmgr->alloc->bits = mgr->page_bits;
949 for( lvl=MIN_lvl; lvl--; ) {
950 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
951 key = keyptr(latchmgr->alloc, 1);
952 key->len = 2; // create stopper key
956 bt_putid(value, MIN_lvl - lvl + 1);
957 val = valptr(latchmgr->alloc, 1);
958 val->len = lvl ? BtId : 0;
959 memcpy (val->value, value, val->len);
961 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
962 latchmgr->alloc->lvl = lvl;
963 latchmgr->alloc->cnt = 1;
964 latchmgr->alloc->act = 1;
966 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
967 return bt_mgrclose (mgr), NULL;
969 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
970 return bt_mgrclose (mgr), NULL;
972 if( *amt < mgr->page_size )
973 return bt_mgrclose (mgr), NULL;
977 // clear out latch manager locks
978 // and rest of pages to round out segment
980 memset(latchmgr, 0, mgr->page_size);
983 while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
985 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
987 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
988 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
989 return bt_mgrclose (mgr), NULL;
990 if( *amt < mgr->page_size )
991 return bt_mgrclose (mgr), NULL;
998 // mlock the root page and the latchmgr page
1000 flag = PROT_READ | PROT_WRITE;
1001 mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1002 if( mgr->latchmgr == MAP_FAILED )
1003 return bt_mgrclose (mgr), NULL;
1004 mlock (mgr->latchmgr, 2 * mgr->page_size);
1006 mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1007 if( mgr->latchsets == MAP_FAILED )
1008 return bt_mgrclose (mgr), NULL;
1009 mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1011 flag = PAGE_READWRITE;
1012 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1014 return bt_mgrclose (mgr), NULL;
1016 flag = FILE_MAP_WRITE;
1017 mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1018 if( !mgr->latchmgr )
1019 return GetLastError(), bt_mgrclose (mgr), NULL;
1021 mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1027 VirtualFree (latchmgr, 0, MEM_RELEASE);
1032 // open BTree access method
1033 // based on buffer manager
1035 BtDb *bt_open (BtMgr *mgr)
1037 BtDb *bt = malloc (sizeof(*bt));
1039 memset (bt, 0, sizeof(*bt));
1042 bt->mem = malloc (2 *mgr->page_size);
1044 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1046 bt->frame = (BtPage)bt->mem;
1047 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1051 // compare two keys, returning > 0, = 0, or < 0
1052 // as the comparison value
1054 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1056 uint len1 = key1->len;
1059 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1072 // find segment in pool
1073 // must be called with hashslot idx locked
1074 // return NULL if not there
1075 // otherwise return node
1077 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1082 // compute start of hash chain in pool
1084 if( slot = bt->mgr->hash[idx] )
1085 pool = bt->mgr->pool + slot;
1089 page_no &= ~bt->mgr->poolmask;
1091 while( pool->basepage != page_no )
1092 if( pool = pool->hashnext )
1100 // add segment to hash table
1102 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1107 pool->hashprev = pool->hashnext = NULL;
1108 pool->basepage = page_no & ~bt->mgr->poolmask;
1109 pool->pin = CLOCK_bit + 1;
1111 if( slot = bt->mgr->hash[idx] ) {
1112 node = bt->mgr->pool + slot;
1113 pool->hashnext = node;
1114 node->hashprev = pool;
1117 bt->mgr->hash[idx] = pool->slot;
1120 // map new buffer pool segment to virtual memory
1122 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1124 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1125 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1129 flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1130 pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1131 if( pool->map == MAP_FAILED )
1132 return bt->err = BTERR_map;
1135 flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1136 pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1138 return bt->err = BTERR_map;
1140 flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1141 pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1143 return bt->err = BTERR_map;
1148 // calculate page within pool
1150 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1152 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1155 page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1156 // madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1162 void bt_unpinpool (BtPool *pool)
1165 __sync_fetch_and_add(&pool->pin, -1);
1167 _InterlockedDecrement16 (&pool->pin);
1171 // find or place requested page in segment-pool
1172 // return pool table entry, incrementing pin
1174 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1176 uint slot, hashidx, idx, victim;
1177 BtPool *pool, *node, *next;
1179 // lock hash table chain
1181 hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1182 bt_spinwritelock (&bt->mgr->latch[hashidx]);
1184 // look up in hash table
1186 if( pool = bt_findpool(bt, page_no, hashidx) ) {
1188 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1189 __sync_fetch_and_add(&pool->pin, 1);
1191 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1192 _InterlockedIncrement16 (&pool->pin);
1194 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1198 // allocate a new pool node
1199 // and add to hash table
1202 slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1204 slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1207 if( ++slot < bt->mgr->poolmax ) {
1208 pool = bt->mgr->pool + slot;
1211 if( bt_mapsegment(bt, pool, page_no) )
1214 bt_linkhash(bt, pool, page_no, hashidx);
1215 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1219 // pool table is full
1220 // find best pool entry to evict
1223 __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1225 _InterlockedDecrement16 (&bt->mgr->poolcnt);
1230 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1232 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1234 victim %= bt->mgr->poolmax;
1235 pool = bt->mgr->pool + victim;
1236 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1241 // try to get write lock
1242 // skip entry if not obtained
1244 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1247 // skip this entry if
1249 // or clock bit is set
1253 __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1255 _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1257 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1261 // unlink victim pool node from hash table
1263 if( node = pool->hashprev )
1264 node->hashnext = pool->hashnext;
1265 else if( node = pool->hashnext )
1266 bt->mgr->hash[idx] = node->slot;
1268 bt->mgr->hash[idx] = 0;
1270 if( node = pool->hashnext )
1271 node->hashprev = pool->hashprev;
1273 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1275 // remove old file mapping
1277 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1279 // FlushViewOfFile(pool->map, 0);
1280 UnmapViewOfFile(pool->map);
1281 CloseHandle(pool->hmap);
1285 // create new pool mapping
1286 // and link into hash table
1288 if( bt_mapsegment(bt, pool, page_no) )
1291 bt_linkhash(bt, pool, page_no, hashidx);
1292 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1297 // place write, read, or parent lock on requested page_no.
1299 void bt_lockpage(BtLock mode, BtLatchSet *set)
1303 ReadLock (set->readwr);
1306 WriteLock (set->readwr);
1309 ReadLock (set->access);
1312 WriteLock (set->access);
1315 WriteLock (set->parent);
1320 // remove write, read, or parent lock on requested page
1322 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1326 ReadRelease (set->readwr);
1329 WriteRelease (set->readwr);
1332 ReadRelease (set->access);
1335 WriteRelease (set->access);
1338 WriteRelease (set->parent);
1343 // allocate a new page and write page into it
1345 uid bt_newpage(BtDb *bt, BtPage page)
1351 // lock allocation page
1353 bt_spinwritelock(bt->mgr->latchmgr->lock);
1355 // use empty chain first
1356 // else allocate empty page
1358 if( new_page = bt_getid(bt->mgr->latchmgr->chain) ) {
1359 if( set->pool = bt_pinpool (bt, new_page) )
1360 set->page = bt_page (bt, set->pool, new_page);
1364 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1365 bt_unpinpool (set->pool);
1367 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1368 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1370 // if writing first page of pool block, set file length thru last page
1372 if( (new_page & bt->mgr->poolmask) == 0 )
1373 ftruncate (bt->mgr->idx, (new_page + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1377 // unlock allocation latch
1379 bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1382 // bring new page into pool and copy page.
1383 // this will extend the file into the new pages on WIN32.
1385 if( set->pool = bt_pinpool (bt, new_page) )
1386 set->page = bt_page (bt, set->pool, new_page);
1390 memcpy(set->page, page, bt->mgr->page_size);
1391 bt_unpinpool (set->pool);
1394 // unlock allocation latch
1396 bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1401 // find slot in page for given key at a given level
1403 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1405 uint diff, higher = set->page->cnt, low = 1, slot;
1408 // make stopper key an infinite fence value
1410 if( bt_getid (set->page->right) )
1415 // low is the lowest candidate.
1416 // loop ends when they meet
1418 // higher is already
1419 // tested as .ge. the passed key.
1421 while( diff = higher - low ) {
1422 slot = low + ( diff >> 1 );
1423 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1426 higher = slot, good++;
1429 // return zero if key is on right link page
1431 return good ? higher : 0;
1434 // find and load page at given level for given key
1435 // leave page rd or wr locked as requested
1437 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1439 uid page_no = ROOT_page, prevpage = 0;
1440 uint drill = 0xff, slot;
1441 BtLatchSet *prevlatch;
1442 uint mode, prevmode;
1445 // start at root of btree and drill down
1448 // determine lock mode of drill level
1449 mode = (drill == lvl) ? lock : BtLockRead;
1451 set->latch = bt_pinlatch (bt, page_no);
1452 set->page_no = page_no;
1454 // pin page contents
1456 if( set->pool = bt_pinpool (bt, page_no) )
1457 set->page = bt_page (bt, set->pool, page_no);
1461 // obtain access lock using lock chaining with Access mode
1463 if( page_no > ROOT_page )
1464 bt_lockpage(BtLockAccess, set->latch);
1466 // release & unpin parent page
1469 bt_unlockpage(prevmode, prevlatch);
1470 bt_unpinlatch (prevlatch);
1471 bt_unpinpool (prevpool);
1475 // obtain read lock using lock chaining
1477 bt_lockpage(mode, set->latch);
1479 if( set->page->free )
1480 return bt->err = BTERR_struct, 0;
1482 if( page_no > ROOT_page )
1483 bt_unlockpage(BtLockAccess, set->latch);
1485 // re-read and re-lock root after determining actual level of root
1487 if( set->page->lvl != drill) {
1488 if( set->page_no != ROOT_page )
1489 return bt->err = BTERR_struct, 0;
1491 drill = set->page->lvl;
1493 if( lock != BtLockRead && drill == lvl ) {
1494 bt_unlockpage(mode, set->latch);
1495 bt_unpinlatch (set->latch);
1496 bt_unpinpool (set->pool);
1501 prevpage = set->page_no;
1502 prevlatch = set->latch;
1503 prevpool = set->pool;
1506 // find key on page at this level
1507 // and descend to requested level
1509 if( set->page->kill )
1512 if( slot = bt_findslot (set, key, len) ) {
1516 while( slotptr(set->page, slot)->dead )
1517 if( slot++ < set->page->cnt )
1522 page_no = bt_getid(valptr(set->page, slot)->value);
1527 // or slide right into next page
1530 page_no = bt_getid(set->page->right);
1534 // return error on end of right chain
1536 bt->err = BTERR_struct;
1537 return 0; // return error
1540 // return page to free list
1541 // page must be delete & write locked
1543 void bt_freepage (BtDb *bt, BtPageSet *set)
1545 // lock allocation page
1547 bt_spinwritelock (bt->mgr->latchmgr->lock);
1550 memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1551 bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1552 set->page->free = 1;
1554 // unlock released page
1556 bt_unlockpage (BtLockDelete, set->latch);
1557 bt_unlockpage (BtLockWrite, set->latch);
1558 bt_unpinlatch (set->latch);
1559 bt_unpinpool (set->pool);
1561 // unlock allocation page
1563 bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1566 // a fence key was deleted from a page
1567 // push new fence value upwards
1569 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1571 unsigned char leftkey[256], rightkey[256];
1572 unsigned char value[BtId];
1576 // remove the old fence value
1578 ptr = keyptr(set->page, set->page->cnt);
1579 val = valptr(set->page, set->page->cnt);
1580 memcpy (rightkey, ptr, ptr->len + 1);
1581 set->page->garbage += ptr->len + val->len + 2;
1582 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1584 ptr = keyptr(set->page, set->page->cnt);
1585 memcpy (leftkey, ptr, ptr->len + 1);
1587 bt_lockpage (BtLockParent, set->latch);
1588 bt_unlockpage (BtLockWrite, set->latch);
1590 // insert new (now smaller) fence key
1592 bt_putid (value, set->page_no);
1594 if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId) )
1597 // now delete old fence key
1599 if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1) )
1602 bt_unlockpage (BtLockParent, set->latch);
1603 bt_unpinlatch(set->latch);
1604 bt_unpinpool (set->pool);
1608 // root has a single child
1609 // collapse a level from the tree
1611 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1616 // find the child entry and promote as new root contents
1619 for( idx = 0; idx++ < root->page->cnt; )
1620 if( !slotptr(root->page, idx)->dead )
1623 child->page_no = bt_getid (valptr(root->page, idx)->value);
1625 child->latch = bt_pinlatch (bt, child->page_no);
1626 bt_lockpage (BtLockDelete, child->latch);
1627 bt_lockpage (BtLockWrite, child->latch);
1629 if( child->pool = bt_pinpool (bt, child->page_no) )
1630 child->page = bt_page (bt, child->pool, child->page_no);
1634 memcpy (root->page, child->page, bt->mgr->page_size);
1635 bt_freepage (bt, child);
1637 } while( root->page->lvl > 1 && root->page->act == 1 );
1639 bt_unlockpage (BtLockWrite, root->latch);
1640 bt_unpinlatch (root->latch);
1641 bt_unpinpool (root->pool);
1645 // find and delete key on page by marking delete flag bit
1646 // if page becomes empty, delete it from the btree
1648 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1650 unsigned char lowerfence[256], higherfence[256];
1651 uint slot, idx, dirty = 0, fence, found;
1652 BtPageSet set[1], right[1];
1653 unsigned char value[BtId];
1657 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1658 ptr = keyptr(set->page, slot);
1662 // if librarian slot, advance to real slot
1664 if( slotptr(set->page, slot)->librarian )
1665 ptr = keyptr(set->page, ++slot);
1667 // are we deleting a fence slot?
1669 fence = slot == set->page->cnt;
1671 // if key is found delete it, otherwise ignore request
1673 if( found = !keycmp (ptr, key, len) )
1674 if( found = slotptr(set->page, slot)->dead == 0 ) {
1675 val = valptr(set->page,slot);
1676 dirty = slotptr(set->page, slot)->dead = 1;
1677 set->page->garbage += ptr->len + val->len + 2;
1680 // collapse empty slots beneath our slot
1682 while( idx = set->page->cnt - 1 )
1683 if( slotptr(set->page, idx)->dead ) {
1684 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1685 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1690 // did we delete a fence key in an upper level?
1692 if( dirty && lvl && set->page->act && fence )
1693 if( bt_fixfence (bt, set, lvl) )
1696 return bt->found = found, 0;
1698 // is this a collapsed root?
1700 if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1701 if( bt_collapseroot (bt, set) )
1704 return bt->found = found, 0;
1706 // return if page is not empty
1708 if( set->page->act ) {
1709 bt_unlockpage(BtLockWrite, set->latch);
1710 bt_unpinlatch (set->latch);
1711 bt_unpinpool (set->pool);
1712 return bt->found = found, 0;
1715 // cache copy of fence key
1716 // to post in parent
1718 ptr = keyptr(set->page, set->page->cnt);
1719 memcpy (lowerfence, ptr, ptr->len + 1);
1721 // obtain lock on right page
1723 right->page_no = bt_getid(set->page->right);
1724 right->latch = bt_pinlatch (bt, right->page_no);
1725 bt_lockpage (BtLockWrite, right->latch);
1727 // pin page contents
1729 if( right->pool = bt_pinpool (bt, right->page_no) )
1730 right->page = bt_page (bt, right->pool, right->page_no);
1734 if( right->page->kill )
1735 return bt->err = BTERR_struct;
1737 // pull contents of right peer into our empty page
1739 memcpy (set->page, right->page, bt->mgr->page_size);
1741 // cache copy of key to update
1743 ptr = keyptr(right->page, right->page->cnt);
1744 memcpy (higherfence, ptr, ptr->len + 1);
1746 // mark right page deleted and point it to left page
1747 // until we can post parent updates
1749 bt_putid (right->page->right, set->page_no);
1750 right->page->kill = 1;
1752 bt_lockpage (BtLockParent, right->latch);
1753 bt_unlockpage (BtLockWrite, right->latch);
1755 bt_lockpage (BtLockParent, set->latch);
1756 bt_unlockpage (BtLockWrite, set->latch);
1758 // redirect higher key directly to our new node contents
1760 bt_putid (value, set->page_no);
1762 if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId) )
1765 // delete old lower key to our node
1767 if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1) )
1770 // obtain delete and write locks to right node
1772 bt_unlockpage (BtLockParent, right->latch);
1773 bt_lockpage (BtLockDelete, right->latch);
1774 bt_lockpage (BtLockWrite, right->latch);
1775 bt_freepage (bt, right);
1777 bt_unlockpage (BtLockParent, set->latch);
1778 bt_unpinlatch (set->latch);
1779 bt_unpinpool (set->pool);
1784 // find key in leaf level and return number of value bytes
1785 // or (-1) if not found
1787 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1795 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1796 ptr = keyptr(set->page, slot);
1800 // skip librarian slot place holder
1802 if( slotptr(set->page, slot)->librarian )
1803 ptr = keyptr(set->page, ++slot);
1805 // if key exists, return >= 0 value bytes copied
1806 // otherwise return (-1)
1808 if( !slotptr(set->page, slot)->dead && !keycmp (ptr, key, keylen) ) {
1809 val = valptr (set->page,slot);
1810 if( valmax > val->len )
1812 memcpy (value, val->value, valmax);
1817 bt_unlockpage (BtLockRead, set->latch);
1818 bt_unpinlatch (set->latch);
1819 bt_unpinpool (set->pool);
1823 // check page for space available,
1824 // clean if necessary and return
1825 // 0 - page needs splitting
1826 // >0 new slot value
1828 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1830 uint nxt = bt->mgr->page_size;
1831 uint cnt = 0, idx = 0;
1832 uint max = page->cnt;
1837 if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
1840 // skip cleanup and proceed to split
1841 // if there's not enough garbage
1843 if( page->garbage + page->min < 2 * page->act * sizeof(BtSlot) + sizeof(*page) + nxt / 3 )
1846 memcpy (bt->frame, page, bt->mgr->page_size);
1848 // skip page info and set rest of page to zero
1850 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1854 // clean up page first by
1855 // removing deleted keys
1857 while( cnt++ < max ) {
1860 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1863 // copy the value across
1865 val = valptr(bt->frame, cnt);
1866 nxt -= val->len + 1;
1867 ((unsigned char *)page)[nxt] = val->len;
1868 memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
1870 // copy the key across
1872 key = keyptr(bt->frame, cnt);
1873 nxt -= key->len + 1;
1874 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1876 // make a librarian slot
1878 slotptr(page, ++idx)->off = nxt;
1879 slotptr(page, idx)->librarian = 1;
1880 slotptr(page, idx)->dead = 1;
1884 slotptr(page, ++idx)->off = nxt;
1886 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1893 // see if page has enough space now, or does it need splitting?
1895 if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
1901 // split the root and raise the height of the btree
1903 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, uid page_no2)
1905 uint nxt = bt->mgr->page_size;
1906 unsigned char value[BtId];
1909 // Obtain an empty page to use, and copy the current
1910 // root contents into it, e.g. lower keys
1912 if( !(left = bt_newpage(bt, root->page)) )
1915 // preserve the page info at the bottom
1916 // of higher keys and set rest to zero
1918 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1920 // insert lower keys page fence key on newroot page as first key
1923 bt_putid (value, left);
1924 ((unsigned char *)root->page)[nxt] = BtId;
1925 memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
1927 nxt -= *leftkey + 1;
1928 memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
1929 slotptr(root->page, 1)->off = nxt;
1931 // insert stopper key on newroot page
1932 // and increase the root height
1934 nxt -= 3 + BtId + 1;
1935 ((unsigned char *)root->page)[nxt] = 2;
1936 ((unsigned char *)root->page)[nxt+1] = 0xff;
1937 ((unsigned char *)root->page)[nxt+2] = 0xff;
1939 bt_putid (value, page_no2);
1940 ((unsigned char *)root->page)[nxt+3] = BtId;
1941 memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
1942 slotptr(root->page, 2)->off = nxt;
1944 bt_putid(root->page->right, 0);
1945 root->page->min = nxt; // reset lowest used offset and key count
1946 root->page->cnt = 2;
1947 root->page->act = 2;
1950 // release and unpin root
1952 bt_unlockpage(BtLockWrite, root->latch);
1953 bt_unpinlatch (root->latch);
1954 bt_unpinpool (root->pool);
1958 // split already locked full node
1961 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1963 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1964 unsigned char fencekey[256], rightkey[256];
1965 unsigned char value[BtId];
1966 uint lvl = set->page->lvl;
1972 // split higher half of keys to bt->frame
1974 memset (bt->frame, 0, bt->mgr->page_size);
1975 max = set->page->cnt;
1979 while( cnt++ < max ) {
1980 if( slotptr(set->page, cnt)->dead && cnt < max )
1982 val = valptr(set->page, cnt);
1983 nxt -= val->len + 1;
1984 ((unsigned char *)bt->frame)[nxt] = val->len;
1985 memcpy ((unsigned char *)bt->frame + nxt + 1, val->value, val->len);
1987 key = keyptr(set->page, cnt);
1988 nxt -= key->len + 1;
1989 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1991 // add librarian slot
1993 slotptr(bt->frame, ++idx)->off = nxt;
1994 slotptr(bt->frame, idx)->librarian = 1;
1995 slotptr(bt->frame, idx)->dead = 1;
1999 slotptr(bt->frame, ++idx)->off = nxt;
2001 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2005 // remember existing fence key for new page to the right
2007 memcpy (rightkey, key, key->len + 1);
2009 bt->frame->bits = bt->mgr->page_bits;
2010 bt->frame->min = nxt;
2011 bt->frame->cnt = idx;
2012 bt->frame->lvl = lvl;
2016 if( set->page_no > ROOT_page )
2017 memcpy (bt->frame->right, set->page->right, BtId);
2019 // get new free page and write higher keys to it.
2021 if( !(right->page_no = bt_newpage(bt, bt->frame)) )
2024 // update lower keys to continue in old page
2026 memcpy (bt->frame, set->page, bt->mgr->page_size);
2027 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2028 nxt = bt->mgr->page_size;
2029 set->page->garbage = 0;
2035 if( slotptr(bt->frame, max)->librarian )
2038 // assemble page of smaller keys
2040 while( cnt++ < max ) {
2041 if( slotptr(bt->frame, cnt)->dead )
2043 val = valptr(bt->frame, cnt);
2044 nxt -= val->len + 1;
2045 ((unsigned char *)set->page)[nxt] = val->len;
2046 memcpy ((unsigned char *)set->page + nxt + 1, val->value, val->len);
2048 key = keyptr(bt->frame, cnt);
2049 nxt -= key->len + 1;
2050 memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
2052 // add librarian slot
2054 slotptr(set->page, ++idx)->off = nxt;
2055 slotptr(set->page, idx)->librarian = 1;
2056 slotptr(set->page, idx)->dead = 1;
2060 slotptr(set->page, ++idx)->off = nxt;
2064 // remember fence key for smaller page
2066 memcpy(fencekey, key, key->len + 1);
2068 bt_putid(set->page->right, right->page_no);
2069 set->page->min = nxt;
2070 set->page->cnt = idx;
2072 // if current page is the root page, split it
2074 if( set->page_no == ROOT_page )
2075 return bt_splitroot (bt, set, fencekey, right->page_no);
2077 // insert new fences in their parent pages
2079 right->latch = bt_pinlatch (bt, right->page_no);
2080 bt_lockpage (BtLockParent, right->latch);
2082 bt_lockpage (BtLockParent, set->latch);
2083 bt_unlockpage (BtLockWrite, set->latch);
2085 // insert new fence for reformulated left block of smaller keys
2087 bt_putid (value, set->page_no);
2089 if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId) )
2092 // switch fence for right block of larger keys to new right page
2094 bt_putid (value, right->page_no);
2096 if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId) )
2099 bt_unlockpage (BtLockParent, set->latch);
2100 bt_unpinlatch (set->latch);
2101 bt_unpinpool (set->pool);
2103 bt_unlockpage (BtLockParent, right->latch);
2104 bt_unpinlatch (right->latch);
2107 // Insert new key into the btree at given level.
2109 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen)
2119 if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
2120 ptr = keyptr(set->page, slot);
2123 bt->err = BTERR_ovflw;
2127 // if librarian slot == found slot, advance to real slot
2129 if( slotptr(set->page, slot)->librarian )
2130 if( !keycmp (ptr, key, keylen) )
2131 ptr = keyptr(set->page, ++slot);
2133 // if key already exists, update value and return
2135 if( reuse = !keycmp (ptr, key, keylen) )
2136 if( val = valptr(set->page, slot), val->len >= vallen ) {
2137 if( slotptr(set->page, slot)->dead )
2139 set->page->garbage += val->len - vallen;
2140 slotptr(set->page, slot)->dead = 0;
2142 memcpy (val->value, value, vallen);
2143 bt_unlockpage(BtLockWrite, set->latch);
2144 bt_unpinlatch (set->latch);
2145 bt_unpinpool (set->pool);
2148 if( !slotptr(set->page, slot)->dead ) {
2149 set->page->garbage += val->len + ptr->len + 2;
2152 slotptr(set->page, slot)->dead = 1;
2155 // if found slot > desired slot and previous slot
2156 // is a librarian slot, use it
2158 if( !reuse && slot > 1 )
2159 if( slotptr(set->page, slot-1)->librarian )
2162 // check if page has enough space
2164 if( slot = bt_cleanpage (bt, set->page, keylen, slot, vallen) )
2167 if( bt_splitpage (bt, set) )
2171 // calculate next available slot and copy key into page
2173 set->page->min -= vallen + 1; // reset lowest used offset
2174 ((unsigned char *)set->page)[set->page->min] = vallen;
2175 memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
2177 set->page->min -= keylen + 1; // reset lowest used offset
2178 ((unsigned char *)set->page)[set->page->min] = keylen;
2179 memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
2181 for( idx = slot; idx < set->page->cnt; idx++ )
2182 if( slotptr(set->page, idx)->dead )
2185 // now insert key into array before slot
2187 if( !reuse && idx == set->page->cnt )
2188 idx++, set->page->cnt++;
2193 *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
2195 slotptr(set->page, slot)->off = set->page->min;
2196 slotptr(set->page, slot)->librarian = 0;
2197 slotptr(set->page, slot)->dead = 0;
2199 bt_unlockpage (BtLockWrite, set->latch);
2200 bt_unpinlatch (set->latch);
2201 bt_unpinpool (set->pool);
2205 // cache page of keys into cursor and return starting slot for given key
2207 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2212 // cache page for retrieval
2214 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2215 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2219 bt->cursor_page = set->page_no;
2221 bt_unlockpage(BtLockRead, set->latch);
2222 bt_unpinlatch (set->latch);
2223 bt_unpinpool (set->pool);
2227 // return next slot for cursor page
2228 // or slide cursor right into next page
2230 uint bt_nextkey (BtDb *bt, uint slot)
2236 right = bt_getid(bt->cursor->right);
2238 while( slot++ < bt->cursor->cnt )
2239 if( slotptr(bt->cursor,slot)->dead )
2241 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2249 bt->cursor_page = right;
2251 if( set->pool = bt_pinpool (bt, right) )
2252 set->page = bt_page (bt, set->pool, right);
2256 set->latch = bt_pinlatch (bt, right);
2257 bt_lockpage(BtLockRead, set->latch);
2259 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2261 bt_unlockpage(BtLockRead, set->latch);
2262 bt_unpinlatch (set->latch);
2263 bt_unpinpool (set->pool);
2271 BtKey bt_key(BtDb *bt, uint slot)
2273 return keyptr(bt->cursor, slot);
2276 BtVal bt_val(BtDb *bt, uint slot)
2278 return valptr(bt->cursor,slot);
2284 double getCpuTime(int type)
2287 FILETIME xittime[1];
2288 FILETIME systime[1];
2289 FILETIME usrtime[1];
2290 SYSTEMTIME timeconv[1];
2293 memset (timeconv, 0, sizeof(SYSTEMTIME));
2297 GetSystemTimeAsFileTime (xittime);
2298 FileTimeToSystemTime (xittime, timeconv);
2299 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2302 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2303 FileTimeToSystemTime (usrtime, timeconv);
2306 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2307 FileTimeToSystemTime (systime, timeconv);
2311 ans += (double)timeconv->wHour * 3600;
2312 ans += (double)timeconv->wMinute * 60;
2313 ans += (double)timeconv->wSecond;
2314 ans += (double)timeconv->wMilliseconds / 1000;
2319 #include <sys/resource.h>
2321 double getCpuTime(int type)
2323 struct rusage used[1];
2324 struct timeval tv[1];
2328 gettimeofday(tv, NULL);
2329 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2332 getrusage(RUSAGE_SELF, used);
2333 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2336 getrusage(RUSAGE_SELF, used);
2337 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2344 uint bt_latchaudit (BtDb *bt)
2346 ushort idx, hashidx;
2353 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2355 if( *(ushort *)(bt->mgr->latchmgr->lock) )
2356 fprintf(stderr, "Alloc page locked\n");
2357 *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2359 for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2360 latch = bt->mgr->latchsets + idx;
2361 if( *latch->readwr->rin & MASK )
2362 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2363 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2365 if( *latch->access->rin & MASK )
2366 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2367 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2369 if( *latch->parent->rin & MASK )
2370 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2371 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2374 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2379 for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2380 if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2381 fprintf(stderr, "hash entry %d locked\n", hashidx);
2383 *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2385 if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2386 latch = bt->mgr->latchsets + idx;
2387 if( *(ushort *)latch->busy )
2388 fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2389 *(ushort *)latch->busy = 0;
2391 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2392 } while( idx = latch->next );
2395 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2396 page_no = LEAF_page;
2398 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2399 off64_t off = page_no << bt->mgr->page_bits;
2401 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2405 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2407 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2408 fprintf(stderr, "page %.8x unable to read\n", page_no);
2410 if( *amt < bt->mgr->page_size )
2411 fprintf(stderr, "page %.8x unable to read\n", page_no);
2413 if( !bt->frame->free ) {
2414 for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2415 ptr = keyptr(bt->frame, idx+1);
2416 if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2417 fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2419 if( !bt->frame->lvl )
2420 cnt += bt->frame->act;
2422 if( page_no > LEAF_page )
2437 // standalone program to index file of keys
2438 // then list them onto std-out
2441 void *index_file (void *arg)
2443 uint __stdcall index_file (void *arg)
2446 int line = 0, found = 0, cnt = 0;
2447 uid next, page_no = LEAF_page; // start on first page of leaves
2448 unsigned char key[256];
2449 ThreadArg *args = arg;
2450 int ch, len = 0, slot;
2456 bt = bt_open (args->mgr);
2458 switch(args->type[0] | 0x20)
2461 fprintf(stderr, "started latch mgr audit\n");
2462 cnt = bt_latchaudit (bt);
2463 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2467 fprintf(stderr, "started pennysort for %s\n", args->infile);
2468 if( in = fopen (args->infile, "rb") )
2469 while( ch = getc(in), ch != EOF )
2474 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10) )
2475 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2478 else if( len < 255 )
2480 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2484 fprintf(stderr, "started indexing for %s\n", args->infile);
2485 if( in = fopen (args->infile, "rb") )
2486 while( ch = getc(in), ch != EOF )
2491 if( args->num == 1 )
2492 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2494 else if( args->num )
2495 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2497 if( bt_insertkey (bt, key, len, 0, NULL, 0) )
2498 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2501 else if( len < 255 )
2503 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2507 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2508 if( in = fopen (args->infile, "rb") )
2509 while( ch = getc(in), ch != EOF )
2513 if( args->num == 1 )
2514 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2516 else if( args->num )
2517 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2519 if( bt_deletekey (bt, key, len, 0) )
2520 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2523 else if( len < 255 )
2525 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
2529 fprintf(stderr, "started finding keys for %s\n", args->infile);
2530 if( in = fopen (args->infile, "rb") )
2531 while( ch = getc(in), ch != EOF )
2535 if( args->num == 1 )
2536 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2538 else if( args->num )
2539 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2541 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2544 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2547 else if( len < 255 )
2549 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2553 fprintf(stderr, "started scanning\n");
2555 if( set->pool = bt_pinpool (bt, page_no) )
2556 set->page = bt_page (bt, set->pool, page_no);
2559 set->latch = bt_pinlatch (bt, page_no);
2560 bt_lockpage (BtLockRead, set->latch);
2561 next = bt_getid (set->page->right);
2562 cnt += set->page->act;
2564 for( slot = 0; slot++ < set->page->cnt; )
2565 if( next || slot < set->page->cnt )
2566 if( !slotptr(set->page, slot)->dead ) {
2567 ptr = keyptr(set->page, slot);
2568 fwrite (ptr->key, ptr->len, 1, stdout);
2569 fputc ('\n', stdout);
2572 bt_unlockpage (BtLockRead, set->latch);
2573 bt_unpinlatch (set->latch);
2574 bt_unpinpool (set->pool);
2575 } while( page_no = next );
2577 cnt--; // remove stopper key
2578 fprintf(stderr, " Total keys read %d\n", cnt);
2583 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2585 fprintf(stderr, "started counting\n");
2586 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2587 page_no = LEAF_page;
2589 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2590 uid off = page_no << bt->mgr->page_bits;
2592 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2596 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2598 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2599 return bt->err = BTERR_map;
2601 if( *amt < bt->mgr->page_size )
2602 return bt->err = BTERR_map;
2604 if( !bt->frame->free && !bt->frame->lvl )
2605 cnt += bt->frame->act;
2606 if( page_no > LEAF_page )
2611 cnt--; // remove stopper key
2612 fprintf(stderr, " Total keys read %d\n", cnt);
2624 typedef struct timeval timer;
2626 int main (int argc, char **argv)
2628 int idx, cnt, len, slot, err;
2629 int segsize, bits = 16;
2646 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2647 fprintf (stderr, " where page_bits is the page size in bits\n");
2648 fprintf (stderr, " mapped_segments is the number of mmap segments in buffer pool\n");
2649 fprintf (stderr, " seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2650 fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n");
2651 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
2655 start = getCpuTime(0);
2658 bits = atoi(argv[3]);
2661 poolsize = atoi(argv[4]);
2664 fprintf (stderr, "Warning: no mapped_pool\n");
2666 if( poolsize > 65535 )
2667 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2670 segsize = atoi(argv[5]);
2672 segsize = 4; // 16 pages per mmap segment
2675 num = atoi(argv[6]);
2679 threads = malloc (cnt * sizeof(pthread_t));
2681 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2683 args = malloc (cnt * sizeof(ThreadArg));
2685 mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2688 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2694 for( idx = 0; idx < cnt; idx++ ) {
2695 args[idx].infile = argv[idx + 7];
2696 args[idx].type = argv[2];
2697 args[idx].mgr = mgr;
2698 args[idx].num = num;
2699 args[idx].idx = idx;
2701 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2702 fprintf(stderr, "Error creating thread %d\n", err);
2704 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2708 // wait for termination
2711 for( idx = 0; idx < cnt; idx++ )
2712 pthread_join (threads[idx], NULL);
2714 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2716 for( idx = 0; idx < cnt; idx++ )
2717 CloseHandle(threads[idx]);
2720 elapsed = getCpuTime(0) - start;
2721 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2722 elapsed = getCpuTime(1);
2723 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2724 elapsed = getCpuTime(2);
2725 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);