1 // btree version threadskv3 sched_yield version
2 // with reworked bt_deletekey code
3 // and phase-fair reader writer lock
4 // and librarian page split code
7 // author: karl malbrain, malbrain@cal.berkeley.edu
10 This work, including the source code, documentation
11 and related data, is placed into the public domain.
13 The orginal author is Karl Malbrain.
15 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
16 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
17 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
18 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
19 RESULTING FROM THE USE, MODIFICATION, OR
20 REDISTRIBUTION OF THIS SOFTWARE.
23 // Please see the project home page for documentation
24 // code.google.com/p/high-concurrency-btree
26 #define _FILE_OFFSET_BITS 64
27 #define _LARGEFILE64_SOURCE
43 #define WIN32_LEAN_AND_MEAN
57 typedef unsigned long long uid;
60 typedef unsigned long long off64_t;
61 typedef unsigned short ushort;
62 typedef unsigned int uint;
65 #define BT_latchtable 128 // number of latch manager slots
67 #define BT_ro 0x6f72 // ro
68 #define BT_rw 0x7772 // rw
70 #define BT_maxbits 24 // maximum page size in bits
71 #define BT_minbits 9 // minimum page size in bits
72 #define BT_minpage (1 << BT_minbits) // minimum page size
73 #define BT_maxpage (1 << BT_maxbits) // maximum page size
76 There are five lock types for each node in three independent sets:
77 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
78 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
79 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
80 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
81 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
92 // definition for phase-fair reader/writer lock implementation
106 // definition for spin latch implementation
108 // exclusive is set for write access
109 // share is count of read accessors
110 // grant write lock when share == 0
112 volatile typedef struct {
123 // hash table entries
126 BtSpinLatch latch[1];
127 volatile ushort slot; // Latch table entry at head of chain
130 // latch manager table structure
133 RWLock readwr[1]; // read/write page lock
134 RWLock access[1]; // Access Intent/Page delete
135 RWLock parent[1]; // Posting of fence key in parent
136 BtSpinLatch busy[1]; // slot is being moved between chains
137 volatile ushort next; // next entry in hash table chain
138 volatile ushort prev; // prev entry in hash table chain
139 volatile ushort pin; // number of outstanding locks
140 volatile ushort hash; // hash slot entry is under
141 volatile uid page_no; // latch set page number
144 // Define the length of the page and key pointers
148 // Page key slot definition.
150 // If BT_maxbits is 15 or less, you can save 2 bytes
151 // for each key stored by making the two uints
154 // Keys are marked dead, but remain on the page until
155 // it cleanup is called. The fence key (highest key) for
156 // the page is always present, even after cleanup.
159 uint off:BT_maxbits; // page offset for key start
160 uint librarian:1; // set for librarian slot
161 uint dead:1; // set for deleted key
164 // The key structure occupies space at the upper end of
165 // each page. It's a length byte followed by the key
170 unsigned char key[1];
173 // the value structure also occupies space at the upper
178 unsigned char value[1];
181 // The first part of an index page.
182 // It is immediately followed
183 // by the BtSlot array of keys.
185 typedef struct BtPage_ {
186 uint cnt; // count of keys in page
187 uint act; // count of active keys
188 uint min; // next key offset
189 uint garbage; // page garbage in bytes
190 unsigned char bits:7; // page size in bits
191 unsigned char free:1; // page is on free chain
192 unsigned char lvl:7; // level of page
193 unsigned char kill:1; // page is being deleted
194 unsigned char right[BtId]; // page number to right
197 // The memory mapping pool table buffer manager entry
200 uid basepage; // mapped base page number
201 char *map; // mapped memory pointer
202 ushort slot; // slot index in this array
203 ushort pin; // mapped page pin counter
204 void *hashprev; // previous pool entry for the same hash idx
205 void *hashnext; // next pool entry for the same hash idx
207 HANDLE hmap; // Windows memory mapping handle
211 #define CLOCK_bit 0x8000 // bit in pool->pin
213 // The loadpage interface object
216 uid page_no; // current page number
217 BtPage page; // current page pointer
218 BtPool *pool; // current page pool
219 BtLatchSet *latch; // current page latch set
222 // structure for latch manager on ALLOC_page
225 struct BtPage_ alloc[1]; // next page_no in right ptr
226 unsigned char chain[BtId]; // head of free page_nos chain
227 BtSpinLatch lock[1]; // allocation area lite latch
228 ushort latchdeployed; // highest number of latch entries deployed
229 ushort nlatchpage; // number of latch pages at BT_latch
230 ushort latchtotal; // number of page latch entries
231 ushort latchhash; // number of latch hash table slots
232 ushort latchvictim; // next latch entry to examine
233 BtHashEntry table[0]; // the hash table
236 // The object structure for Btree access
239 uint page_size; // page size
240 uint page_bits; // page size in bits
241 uint seg_bits; // seg size in pages in bits
242 uint mode; // read-write mode
248 ushort poolcnt; // highest page pool node in use
249 ushort poolmax; // highest page pool node allocated
250 ushort poolmask; // total number of pages in mmap segment - 1
251 ushort hashsize; // size of Hash Table for pool entries
252 volatile uint evicted; // last evicted hash table slot
253 ushort *hash; // pool index for hash entries
254 BtSpinLatch *latch; // latches for hash table slots
255 BtLatchMgr *latchmgr; // mapped latch page from allocation page
256 BtLatchSet *latchsets; // mapped latch set from latch pages
257 BtPool *pool; // memory pool page segments
259 HANDLE halloc; // allocation and latch table handle
264 BtMgr *mgr; // buffer manager for thread
265 BtPage cursor; // cached frame for start/next (never mapped)
266 BtPage frame; // spare frame for the page split (never mapped)
267 uid cursor_page; // current cursor page number
268 unsigned char *mem; // frame, cursor, page memory buffer
269 int found; // last delete or insert was found
270 int err; // last error
284 extern void bt_close (BtDb *bt);
285 extern BtDb *bt_open (BtMgr *mgr);
286 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen);
287 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
288 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint vallen);
289 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
290 extern uint bt_nextkey (BtDb *bt, uint slot);
293 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
294 void bt_mgrclose (BtMgr *mgr);
296 // Helper functions to return slot values
297 // from the cursor page.
299 extern BtKey bt_key (BtDb *bt, uint slot);
300 extern BtVal bt_val (BtDb *bt, uint slot);
302 // BTree page number constants
303 #define ALLOC_page 0 // allocation & latch manager hash table
304 #define ROOT_page 1 // root of the btree
305 #define LEAF_page 2 // first page of leaves
306 #define LATCH_page 3 // pages for latch manager
308 // Number of levels to create in a new BTree
312 // The page is allocated from low and hi ends.
313 // The key slots are allocated from the bottom,
314 // while the text and value of the key
315 // are allocated from the top. When the two
316 // areas meet, the page is split into two.
318 // A key consists of a length byte, two bytes of
319 // index number (0 - 65534), and up to 253 bytes
320 // of key value. Duplicate keys are discarded.
321 // Associated with each key is a value byte string
322 // containing any value desired.
324 // The b-tree root is always located at page 1.
325 // The first leaf page of level zero is always
326 // located on page 2.
328 // The b-tree pages are linked with next
329 // pointers to facilitate enumerators,
330 // and provide for concurrency.
332 // When to root page fills, it is split in two and
333 // the tree height is raised by a new root at page
334 // one with two keys.
336 // Deleted keys are marked with a dead bit until
337 // page cleanup. The fence key for a leaf node is
340 // Groups of pages called segments from the btree are optionally
341 // cached with a memory mapped pool. A hash table is used to keep
342 // track of the cached segments. This behaviour is controlled
343 // by the cache block size parameter to bt_open.
345 // To achieve maximum concurrency one page is locked at a time
346 // as the tree is traversed to find leaf key in question. The right
347 // page numbers are used in cases where the page is being split,
350 // Page 0 is dedicated to lock for new page extensions,
351 // and chains empty pages together for reuse. It also
352 // contains the latch manager hash table.
354 // The ParentModification lock on a node is obtained to serialize posting
355 // or changing the fence key for a node.
357 // Empty pages are chained together through the ALLOC page and reused.
359 // Access macros to address slot and key values from the page
360 // Page slots use 1 based indexing.
362 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
363 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
364 #define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
366 void bt_putid(unsigned char *dest, uid id)
371 dest[i] = (unsigned char)id, id >>= 8;
374 uid bt_getid(unsigned char *src)
379 for( i = 0; i < BtId; i++ )
380 id <<= 8, id |= *src++;
385 // Phase-Fair reader/writer lock implementation
387 void WriteLock (RWLock *lock)
392 tix = __sync_fetch_and_add (lock->ticket, 1);
394 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
396 // wait for our ticket to come up
398 while( tix != lock->serving[0] )
405 w = PRES | (tix & PHID);
407 r = __sync_fetch_and_add (lock->rin, w);
409 r = _InterlockedExchangeAdd16 (lock->rin, w);
411 while( r != *lock->rout )
419 void WriteRelease (RWLock *lock)
422 __sync_fetch_and_and (lock->rin, ~MASK);
424 _InterlockedAnd16 (lock->rin, ~MASK);
429 void ReadLock (RWLock *lock)
433 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
435 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
438 while( w == (*lock->rin & MASK) )
446 void ReadRelease (RWLock *lock)
449 __sync_fetch_and_add (lock->rout, RINC);
451 _InterlockedExchangeAdd16 (lock->rout, RINC);
455 // Spin Latch Manager
457 // wait until write lock mode is clear
458 // and add 1 to the share count
460 void bt_spinreadlock(BtSpinLatch *latch)
466 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
468 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
470 // see if exclusive request is granted or pending
475 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
477 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
480 } while( sched_yield(), 1 );
482 } while( SwitchToThread(), 1 );
486 // wait for other read and write latches to relinquish
488 void bt_spinwritelock(BtSpinLatch *latch)
494 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
496 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
499 if( !(prev & ~BOTH) )
503 __sync_fetch_and_and ((ushort *)latch, ~XCL);
505 _InterlockedAnd16((ushort *)latch, ~XCL);
508 } while( sched_yield(), 1 );
510 } while( SwitchToThread(), 1 );
514 // try to obtain write lock
516 // return 1 if obtained,
519 int bt_spinwritetry(BtSpinLatch *latch)
524 prev = __sync_fetch_and_or((ushort *)latch, XCL);
526 prev = _InterlockedOr16((ushort *)latch, XCL);
528 // take write access if all bits are clear
531 if( !(prev & ~BOTH) )
535 __sync_fetch_and_and ((ushort *)latch, ~XCL);
537 _InterlockedAnd16((ushort *)latch, ~XCL);
544 void bt_spinreleasewrite(BtSpinLatch *latch)
547 __sync_fetch_and_and((ushort *)latch, ~BOTH);
549 _InterlockedAnd16((ushort *)latch, ~BOTH);
553 // decrement reader count
555 void bt_spinreleaseread(BtSpinLatch *latch)
558 __sync_fetch_and_add((ushort *)latch, -SHARE);
560 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
564 // link latch table entry into latch hash table
566 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
568 BtLatchSet *set = bt->mgr->latchsets + victim;
570 if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
571 bt->mgr->latchsets[set->next].prev = victim;
573 bt->mgr->latchmgr->table[hashidx].slot = victim;
574 set->page_no = page_no;
581 void bt_unpinlatch (BtLatchSet *set)
584 __sync_fetch_and_add(&set->pin, -1);
586 _InterlockedDecrement16 (&set->pin);
590 // find existing latchset or inspire new one
591 // return with latchset pinned
593 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
595 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
596 ushort slot, avail = 0, victim, idx;
599 // obtain read lock on hash table entry
601 bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
603 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
605 set = bt->mgr->latchsets + slot;
606 if( page_no == set->page_no )
608 } while( slot = set->next );
612 __sync_fetch_and_add(&set->pin, 1);
614 _InterlockedIncrement16 (&set->pin);
618 bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
623 // try again, this time with write lock
625 bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
627 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
629 set = bt->mgr->latchsets + slot;
630 if( page_no == set->page_no )
632 if( !set->pin && !avail )
634 } while( slot = set->next );
636 // found our entry, or take over an unpinned one
638 if( slot || (slot = avail) ) {
639 set = bt->mgr->latchsets + slot;
641 __sync_fetch_and_add(&set->pin, 1);
643 _InterlockedIncrement16 (&set->pin);
645 set->page_no = page_no;
646 bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
650 // see if there are any unused entries
652 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
654 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
657 if( victim < bt->mgr->latchmgr->latchtotal ) {
658 set = bt->mgr->latchsets + victim;
660 __sync_fetch_and_add(&set->pin, 1);
662 _InterlockedIncrement16 (&set->pin);
664 bt_latchlink (bt, hashidx, victim, page_no);
665 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
670 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
672 victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
674 // find and reuse previous lock entry
678 victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
680 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
682 // we don't use slot zero
684 if( victim %= bt->mgr->latchmgr->latchtotal )
685 set = bt->mgr->latchsets + victim;
689 // take control of our slot
690 // from other threads
692 if( set->pin || !bt_spinwritetry (set->busy) )
697 // try to get write lock on hash chain
698 // skip entry if not obtained
699 // or has outstanding locks
701 if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
702 bt_spinreleasewrite (set->busy);
707 bt_spinreleasewrite (set->busy);
708 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
712 // unlink our available victim from its hash chain
715 bt->mgr->latchsets[set->prev].next = set->next;
717 bt->mgr->latchmgr->table[idx].slot = set->next;
720 bt->mgr->latchsets[set->next].prev = set->prev;
722 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
724 __sync_fetch_and_add(&set->pin, 1);
726 _InterlockedIncrement16 (&set->pin);
728 bt_latchlink (bt, hashidx, victim, page_no);
729 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
730 bt_spinreleasewrite (set->busy);
735 void bt_mgrclose (BtMgr *mgr)
740 // release mapped pages
741 // note that slot zero is never used
743 for( slot = 1; slot < mgr->poolmax; slot++ ) {
744 pool = mgr->pool + slot;
747 munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
750 FlushViewOfFile(pool->map, 0);
751 UnmapViewOfFile(pool->map);
752 CloseHandle(pool->hmap);
758 munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
759 munmap (mgr->latchmgr, 2 * mgr->page_size);
761 FlushViewOfFile(mgr->latchmgr, 0);
762 UnmapViewOfFile(mgr->latchmgr);
763 CloseHandle(mgr->halloc);
769 free ((void *)mgr->latch);
772 FlushFileBuffers(mgr->idx);
773 CloseHandle(mgr->idx);
774 GlobalFree (mgr->pool);
775 GlobalFree (mgr->hash);
776 GlobalFree ((void *)mgr->latch);
781 // close and release memory
783 void bt_close (BtDb *bt)
790 VirtualFree (bt->mem, 0, MEM_RELEASE);
795 // open/create new btree buffer manager
797 // call with file_name, BT_openmode, bits in page size (e.g. 16),
798 // size of mapped page pool (e.g. 8192)
800 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
802 uint lvl, attr, cacheblk, last, slot, idx;
803 uint nlatchpage, latchhash;
804 unsigned char value[BtId];
805 BtLatchMgr *latchmgr;
814 SYSTEM_INFO sysinfo[1];
817 // determine sanity of page size and buffer pool
819 if( bits > BT_maxbits )
821 else if( bits < BT_minbits )
825 return NULL; // must have buffer pool
828 mgr = calloc (1, sizeof(BtMgr));
830 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
833 return free(mgr), NULL;
835 cacheblk = 4096; // minimum mmap segment size for unix
838 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
839 attr = FILE_ATTRIBUTE_NORMAL;
840 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
842 if( mgr->idx == INVALID_HANDLE_VALUE )
843 return GlobalFree(mgr), NULL;
845 // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
846 GetSystemInfo(sysinfo);
847 cacheblk = sysinfo->dwAllocationGranularity;
851 latchmgr = malloc (BT_maxpage);
854 // read minimum page size to get root info
856 if( size = lseek (mgr->idx, 0L, 2) ) {
857 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
858 bits = latchmgr->alloc->bits;
860 return free(mgr), free(latchmgr), NULL;
861 } else if( mode == BT_ro )
862 return free(latchmgr), bt_mgrclose (mgr), NULL;
864 latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
865 size = GetFileSize(mgr->idx, amt);
868 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
869 return bt_mgrclose (mgr), NULL;
870 bits = latchmgr->alloc->bits;
871 } else if( mode == BT_ro )
872 return bt_mgrclose (mgr), NULL;
875 mgr->page_size = 1 << bits;
876 mgr->page_bits = bits;
878 mgr->poolmax = poolmax;
881 if( cacheblk < mgr->page_size )
882 cacheblk = mgr->page_size;
884 // mask for partial memmaps
886 mgr->poolmask = (cacheblk >> bits) - 1;
888 // see if requested size of pages per memmap is greater
890 if( (1 << segsize) > mgr->poolmask )
891 mgr->poolmask = (1 << segsize) - 1;
895 while( (1 << mgr->seg_bits) <= mgr->poolmask )
898 mgr->hashsize = hashsize;
901 mgr->pool = calloc (poolmax, sizeof(BtPool));
902 mgr->hash = calloc (hashsize, sizeof(ushort));
903 mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
905 mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
906 mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
907 mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
913 // initialize an empty b-tree with latch page, root page, page of leaves
914 // and page(s) of latches
916 memset (latchmgr, 0, 1 << bits);
917 nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1;
918 bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
919 latchmgr->alloc->bits = mgr->page_bits;
921 latchmgr->nlatchpage = nlatchpage;
922 latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
924 // initialize latch manager
926 latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
928 // size of hash table = total number of latchsets
930 if( latchhash > latchmgr->latchtotal )
931 latchhash = latchmgr->latchtotal;
933 latchmgr->latchhash = latchhash;
936 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
937 return bt_mgrclose (mgr), NULL;
939 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
940 return bt_mgrclose (mgr), NULL;
942 if( *amt < mgr->page_size )
943 return bt_mgrclose (mgr), NULL;
946 memset (latchmgr, 0, 1 << bits);
947 latchmgr->alloc->bits = mgr->page_bits;
949 for( lvl=MIN_lvl; lvl--; ) {
950 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
951 key = keyptr(latchmgr->alloc, 1);
952 key->len = 2; // create stopper key
956 bt_putid(value, MIN_lvl - lvl + 1);
957 val = valptr(latchmgr->alloc, 1);
958 val->len = lvl ? BtId : 0;
959 memcpy (val->value, value, val->len);
961 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
962 latchmgr->alloc->lvl = lvl;
963 latchmgr->alloc->cnt = 1;
964 latchmgr->alloc->act = 1;
966 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
967 return bt_mgrclose (mgr), NULL;
969 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
970 return bt_mgrclose (mgr), NULL;
972 if( *amt < mgr->page_size )
973 return bt_mgrclose (mgr), NULL;
977 // clear out latch manager locks
978 // and rest of pages to round out segment
980 memset(latchmgr, 0, mgr->page_size);
983 while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
985 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
987 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
988 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
989 return bt_mgrclose (mgr), NULL;
990 if( *amt < mgr->page_size )
991 return bt_mgrclose (mgr), NULL;
998 // mlock the root page and the latchmgr page
1000 flag = PROT_READ | PROT_WRITE;
1001 mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1002 if( mgr->latchmgr == MAP_FAILED )
1003 return bt_mgrclose (mgr), NULL;
1004 mlock (mgr->latchmgr, 2 * mgr->page_size);
1006 mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1007 if( mgr->latchsets == MAP_FAILED )
1008 return bt_mgrclose (mgr), NULL;
1009 mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1011 flag = PAGE_READWRITE;
1012 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1014 return bt_mgrclose (mgr), NULL;
1016 flag = FILE_MAP_WRITE;
1017 mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1018 if( !mgr->latchmgr )
1019 return GetLastError(), bt_mgrclose (mgr), NULL;
1021 mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1027 VirtualFree (latchmgr, 0, MEM_RELEASE);
1032 // open BTree access method
1033 // based on buffer manager
1035 BtDb *bt_open (BtMgr *mgr)
1037 BtDb *bt = malloc (sizeof(*bt));
1039 memset (bt, 0, sizeof(*bt));
1042 bt->mem = malloc (2 *mgr->page_size);
1044 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1046 bt->frame = (BtPage)bt->mem;
1047 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1051 // compare two keys, returning > 0, = 0, or < 0
1052 // as the comparison value
1054 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1056 uint len1 = key1->len;
1059 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1072 // find segment in pool
1073 // must be called with hashslot idx locked
1074 // return NULL if not there
1075 // otherwise return node
1077 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1082 // compute start of hash chain in pool
1084 if( slot = bt->mgr->hash[idx] )
1085 pool = bt->mgr->pool + slot;
1089 page_no &= ~bt->mgr->poolmask;
1091 while( pool->basepage != page_no )
1092 if( pool = pool->hashnext )
1100 // add segment to hash table
1102 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1107 pool->hashprev = pool->hashnext = NULL;
1108 pool->basepage = page_no & ~bt->mgr->poolmask;
1109 pool->pin = CLOCK_bit + 1;
1111 if( slot = bt->mgr->hash[idx] ) {
1112 node = bt->mgr->pool + slot;
1113 pool->hashnext = node;
1114 node->hashprev = pool;
1117 bt->mgr->hash[idx] = pool->slot;
1120 // map new buffer pool segment to virtual memory
1122 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1124 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1125 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1129 flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1130 pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1131 if( pool->map == MAP_FAILED )
1132 return bt->err = BTERR_map;
1135 flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1136 pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1138 return bt->err = BTERR_map;
1140 flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1141 pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1143 return bt->err = BTERR_map;
1148 // calculate page within pool
1150 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1152 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1155 page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1156 // madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1162 void bt_unpinpool (BtPool *pool)
1165 __sync_fetch_and_add(&pool->pin, -1);
1167 _InterlockedDecrement16 (&pool->pin);
1171 // find or place requested page in segment-pool
1172 // return pool table entry, incrementing pin
1174 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1176 uint slot, hashidx, idx, victim;
1177 BtPool *pool, *node, *next;
1179 // lock hash table chain
1181 hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1182 bt_spinwritelock (&bt->mgr->latch[hashidx]);
1184 // look up in hash table
1186 if( pool = bt_findpool(bt, page_no, hashidx) ) {
1188 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1189 __sync_fetch_and_add(&pool->pin, 1);
1191 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1192 _InterlockedIncrement16 (&pool->pin);
1194 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1198 // allocate a new pool node
1199 // and add to hash table
1202 slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1204 slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1207 if( ++slot < bt->mgr->poolmax ) {
1208 pool = bt->mgr->pool + slot;
1211 if( bt_mapsegment(bt, pool, page_no) )
1214 bt_linkhash(bt, pool, page_no, hashidx);
1215 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1219 // pool table is full
1220 // find best pool entry to evict
1223 __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1225 _InterlockedDecrement16 (&bt->mgr->poolcnt);
1230 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1232 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1234 victim %= bt->mgr->poolmax;
1235 pool = bt->mgr->pool + victim;
1236 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1241 // try to get write lock
1242 // skip entry if not obtained
1244 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1247 // skip this entry if
1249 // or clock bit is set
1253 __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1255 _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1257 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1261 // unlink victim pool node from hash table
1263 if( node = pool->hashprev )
1264 node->hashnext = pool->hashnext;
1265 else if( node = pool->hashnext )
1266 bt->mgr->hash[idx] = node->slot;
1268 bt->mgr->hash[idx] = 0;
1270 if( node = pool->hashnext )
1271 node->hashprev = pool->hashprev;
1273 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1275 // remove old file mapping
1277 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1279 // FlushViewOfFile(pool->map, 0);
1280 UnmapViewOfFile(pool->map);
1281 CloseHandle(pool->hmap);
1285 // create new pool mapping
1286 // and link into hash table
1288 if( bt_mapsegment(bt, pool, page_no) )
1291 bt_linkhash(bt, pool, page_no, hashidx);
1292 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1297 // place write, read, or parent lock on requested page_no.
1299 void bt_lockpage(BtLock mode, BtLatchSet *set)
1303 ReadLock (set->readwr);
1306 WriteLock (set->readwr);
1309 ReadLock (set->access);
1312 WriteLock (set->access);
1315 WriteLock (set->parent);
1320 // remove write, read, or parent lock on requested page
1322 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1326 ReadRelease (set->readwr);
1329 WriteRelease (set->readwr);
1332 ReadRelease (set->access);
1335 WriteRelease (set->access);
1338 WriteRelease (set->parent);
1343 // allocate a new page and write page into it
1345 uid bt_newpage(BtDb *bt, BtPage page)
1351 // lock allocation page
1353 bt_spinwritelock(bt->mgr->latchmgr->lock);
1355 // use empty chain first
1356 // else allocate empty page
1358 if( new_page = bt_getid(bt->mgr->latchmgr->chain) ) {
1359 if( set->pool = bt_pinpool (bt, new_page) )
1360 set->page = bt_page (bt, set->pool, new_page);
1364 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1365 bt_unpinpool (set->pool);
1367 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1368 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1370 // if writing first page of pool block, set file length thru last page
1372 if( (new_page & bt->mgr->poolmask) == 0 )
1373 ftruncate (bt->mgr->idx, (new_page + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1377 // unlock allocation latch
1379 bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1382 // bring new page into pool and copy page.
1383 // this will extend the file into the new pages on WIN32.
1385 if( set->pool = bt_pinpool (bt, new_page) )
1386 set->page = bt_page (bt, set->pool, new_page);
1390 memcpy(set->page, page, bt->mgr->page_size);
1391 bt_unpinpool (set->pool);
1394 // unlock allocation latch
1396 bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1401 // find slot in page for given key at a given level
1403 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1405 uint diff, higher = set->page->cnt, low = 1, slot;
1408 // make stopper key an infinite fence value
1410 if( bt_getid (set->page->right) )
1415 // low is the lowest candidate.
1416 // loop ends when they meet
1418 // higher is already
1419 // tested as .ge. the passed key.
1421 while( diff = higher - low ) {
1422 slot = low + ( diff >> 1 );
1423 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1426 higher = slot, good++;
1429 // return zero if key is on right link page
1431 return good ? higher : 0;
1434 // find and load page at given level for given key
1435 // leave page rd or wr locked as requested
1437 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1439 uid page_no = ROOT_page, prevpage = 0;
1440 uint drill = 0xff, slot;
1441 BtLatchSet *prevlatch;
1442 uint mode, prevmode;
1445 // start at root of btree and drill down
1448 // determine lock mode of drill level
1449 mode = (drill == lvl) ? lock : BtLockRead;
1451 set->latch = bt_pinlatch (bt, page_no);
1452 set->page_no = page_no;
1454 // pin page contents
1456 if( set->pool = bt_pinpool (bt, page_no) )
1457 set->page = bt_page (bt, set->pool, page_no);
1461 // obtain access lock using lock chaining with Access mode
1463 if( page_no > ROOT_page )
1464 bt_lockpage(BtLockAccess, set->latch);
1466 // release & unpin parent page
1469 bt_unlockpage(prevmode, prevlatch);
1470 bt_unpinlatch (prevlatch);
1471 bt_unpinpool (prevpool);
1475 // obtain read lock using lock chaining
1477 bt_lockpage(mode, set->latch);
1479 if( set->page->free )
1480 return bt->err = BTERR_struct, 0;
1482 if( page_no > ROOT_page )
1483 bt_unlockpage(BtLockAccess, set->latch);
1485 // re-read and re-lock root after determining actual level of root
1487 if( set->page->lvl != drill) {
1488 if( set->page_no != ROOT_page )
1489 return bt->err = BTERR_struct, 0;
1491 drill = set->page->lvl;
1493 if( lock != BtLockRead && drill == lvl ) {
1494 bt_unlockpage(mode, set->latch);
1495 bt_unpinlatch (set->latch);
1496 bt_unpinpool (set->pool);
1501 prevpage = set->page_no;
1502 prevlatch = set->latch;
1503 prevpool = set->pool;
1506 // find key on page at this level
1507 // and descend to requested level
1509 if( !set->page->kill )
1510 if( slot = bt_findslot (set, key, len) ) {
1514 while( slotptr(set->page, slot)->dead )
1515 if( slot++ < set->page->cnt )
1520 page_no = bt_getid(valptr(set->page, slot)->value);
1525 // or slide right into next page
1528 page_no = bt_getid(set->page->right);
1532 // return error on end of right chain
1534 bt->err = BTERR_struct;
1535 return 0; // return error
1538 // return page to free list
1539 // page must be delete & write locked
1541 void bt_freepage (BtDb *bt, BtPageSet *set)
1543 // lock allocation page
1545 bt_spinwritelock (bt->mgr->latchmgr->lock);
1548 memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1549 bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1550 set->page->free = 1;
1552 // unlock released page
1554 bt_unlockpage (BtLockDelete, set->latch);
1555 bt_unlockpage (BtLockWrite, set->latch);
1556 bt_unpinlatch (set->latch);
1557 bt_unpinpool (set->pool);
1559 // unlock allocation page
1561 bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1564 // a fence key was deleted from a page
1565 // push new fence value upwards
1567 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1569 unsigned char leftkey[256], rightkey[256];
1570 unsigned char value[BtId];
1575 // remove the old fence value
1577 ptr = keyptr(set->page, set->page->cnt);
1578 memcpy (rightkey, ptr, ptr->len + 1);
1579 set->page->garbage += ptr->len + val->len + 2;
1580 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1582 ptr = keyptr(set->page, set->page->cnt);
1583 memcpy (leftkey, ptr, ptr->len + 1);
1584 page_no = set->page_no;
1586 bt_lockpage (BtLockParent, set->latch);
1587 bt_unlockpage (BtLockWrite, set->latch);
1589 // insert new (now smaller) fence key
1591 bt_putid (value, page_no);
1593 if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId) )
1596 // now delete old fence key
1598 if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1) )
1601 bt_unlockpage (BtLockParent, set->latch);
1602 bt_unpinlatch(set->latch);
1603 bt_unpinpool (set->pool);
1607 // root has a single child
1608 // collapse a level from the tree
1610 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1615 // find the child entry and promote as new root contents
1618 for( idx = 0; idx++ < root->page->cnt; )
1619 if( !slotptr(root->page, idx)->dead )
1622 child->page_no = bt_getid (valptr(root->page, idx)->value);
1624 child->latch = bt_pinlatch (bt, child->page_no);
1625 bt_lockpage (BtLockDelete, child->latch);
1626 bt_lockpage (BtLockWrite, child->latch);
1628 if( child->pool = bt_pinpool (bt, child->page_no) )
1629 child->page = bt_page (bt, child->pool, child->page_no);
1633 memcpy (root->page, child->page, bt->mgr->page_size);
1634 bt_freepage (bt, child);
1636 } while( root->page->lvl > 1 && root->page->act == 1 );
1638 bt_unlockpage (BtLockWrite, root->latch);
1639 bt_unpinlatch (root->latch);
1640 bt_unpinpool (root->pool);
1644 // find and delete key on page by marking delete flag bit
1645 // if page becomes empty, delete it from the btree
1647 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1649 unsigned char lowerfence[256], higherfence[256];
1650 uint slot, idx, dirty = 0, fence, found;
1651 BtPageSet set[1], right[1];
1652 unsigned char value[BtId];
1656 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1657 ptr = keyptr(set->page, slot);
1661 // if librarian slot, advance to real slot
1663 if( slotptr(set->page, slot)->librarian )
1664 ptr = keyptr(set->page, ++slot);
1666 // are we deleting a fence slot?
1668 fence = slot == set->page->cnt;
1670 // if key is found delete it, otherwise ignore request
1672 if( found = !keycmp (ptr, key, len) )
1673 if( found = slotptr(set->page, slot)->dead == 0 ) {
1674 val = valptr(set->page,slot);
1675 dirty = slotptr(set->page, slot)->dead = 1;
1676 set->page->garbage += ptr->len + val->len + 2;
1679 // collapse empty slots
1681 while( idx = set->page->cnt - 1 )
1682 if( slotptr(set->page, idx)->dead ) {
1683 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1684 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1689 // did we delete a fence key in an upper level?
1691 if( dirty && lvl && set->page->act && fence )
1692 if( bt_fixfence (bt, set, lvl) )
1695 return bt->found = found, 0;
1697 // is this a collapsed root?
1699 if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1700 if( bt_collapseroot (bt, set) )
1703 return bt->found = found, 0;
1705 // return if page is not empty
1707 if( set->page->act ) {
1708 bt_unlockpage(BtLockWrite, set->latch);
1709 bt_unpinlatch (set->latch);
1710 bt_unpinpool (set->pool);
1711 return bt->found = found, 0;
1714 // cache copy of fence key
1715 // to post in parent
1717 ptr = keyptr(set->page, set->page->cnt);
1718 memcpy (lowerfence, ptr, ptr->len + 1);
1720 // obtain lock on right page
1722 right->page_no = bt_getid(set->page->right);
1723 right->latch = bt_pinlatch (bt, right->page_no);
1724 bt_lockpage (BtLockWrite, right->latch);
1726 // pin page contents
1728 if( right->pool = bt_pinpool (bt, right->page_no) )
1729 right->page = bt_page (bt, right->pool, right->page_no);
1733 if( right->page->kill )
1734 return bt->err = BTERR_struct;
1736 // pull contents of right peer into our empty page
1738 memcpy (set->page, right->page, bt->mgr->page_size);
1740 // cache copy of key to update
1742 ptr = keyptr(right->page, right->page->cnt);
1743 memcpy (higherfence, ptr, ptr->len + 1);
1745 // mark right page deleted and point it to left page
1746 // until we can post parent updates
1748 bt_putid (right->page->right, set->page_no);
1749 right->page->kill = 1;
1751 bt_lockpage (BtLockParent, right->latch);
1752 bt_unlockpage (BtLockWrite, right->latch);
1754 bt_lockpage (BtLockParent, set->latch);
1755 bt_unlockpage (BtLockWrite, set->latch);
1757 // redirect higher key directly to our new node contents
1759 bt_putid (value, set->page_no);
1761 if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId) )
1764 // delete old lower key to our node
1766 if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1) )
1769 // obtain delete and write locks to right node
1771 bt_unlockpage (BtLockParent, right->latch);
1772 bt_lockpage (BtLockDelete, right->latch);
1773 bt_lockpage (BtLockWrite, right->latch);
1774 bt_freepage (bt, right);
1776 bt_unlockpage (BtLockParent, set->latch);
1777 bt_unpinlatch (set->latch);
1778 bt_unpinpool (set->pool);
1783 // find key in leaf level and return number of value bytes
1784 // or (-1) if not found
1786 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1794 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1795 ptr = keyptr(set->page, slot);
1799 // skip librarian slot place holder
1801 if( slotptr(set->page, slot)->librarian )
1802 ptr = keyptr(set->page, ++slot);
1804 // if key exists, return >= 0 value bytes copied
1805 // otherwise return (-1)
1807 if( !slotptr(set->page, slot)->dead && !keycmp (ptr, key, keylen) ) {
1808 val = valptr (set->page,slot);
1809 if( valmax > val->len )
1811 memcpy (value, val->value, valmax);
1816 bt_unlockpage (BtLockRead, set->latch);
1817 bt_unpinlatch (set->latch);
1818 bt_unpinpool (set->pool);
1822 // check page for space available,
1823 // clean if necessary and return
1824 // 0 - page needs splitting
1825 // >0 new slot value
1827 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1829 uint nxt = bt->mgr->page_size;
1830 uint cnt = 0, idx = 0;
1831 uint max = page->cnt;
1836 if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
1839 // skip cleanup and proceed to split
1840 // if there's not enough garbage
1842 if( page->garbage + page->min < 2 * page->act * sizeof(BtSlot) + sizeof(*page) + nxt / 3 )
1845 memcpy (bt->frame, page, bt->mgr->page_size);
1847 // skip page info and set rest of page to zero
1849 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1853 // clean up page first by
1854 // removing deleted keys
1856 while( cnt++ < max ) {
1859 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1862 // copy the key across
1864 key = keyptr(bt->frame, cnt);
1865 nxt -= key->len + 1;
1866 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1868 // copy the value across
1870 val = valptr(bt->frame, cnt);
1871 nxt -= val->len + 1;
1872 ((unsigned char *)page)[nxt] = val->len;
1873 memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
1875 // make a librarian slot
1877 slotptr(page, ++idx)->off = nxt;
1878 slotptr(page, idx)->librarian = 1;
1879 slotptr(page, idx)->dead = 1;
1883 slotptr(page, ++idx)->off = nxt;
1885 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1892 // see if page has enough space now, or does it need splitting?
1894 if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
1900 // split the root and raise the height of the btree
1902 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, uid page_no2)
1904 uint nxt = bt->mgr->page_size;
1905 unsigned char value[BtId];
1908 // Obtain an empty page to use, and copy the current
1909 // root contents into it, e.g. lower keys
1911 if( !(left = bt_newpage(bt, root->page)) )
1914 // preserve the page info at the bottom
1915 // of higher keys and set rest to zero
1917 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1919 // insert lower keys page fence key on newroot page as first key
1922 bt_putid (value, left);
1923 ((unsigned char *)root->page)[nxt] = BtId;
1924 memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
1926 nxt -= *leftkey + 1;
1927 memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
1928 slotptr(root->page, 1)->off = nxt;
1930 // insert stopper key on newroot page
1931 // and increase the root height
1933 nxt -= 3 + BtId + 1;
1934 ((unsigned char *)root->page)[nxt] = 2;
1935 ((unsigned char *)root->page)[nxt+1] = 0xff;
1936 ((unsigned char *)root->page)[nxt+2] = 0xff;
1938 bt_putid (value, page_no2);
1939 ((unsigned char *)root->page)[nxt+3] = BtId;
1940 memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
1941 slotptr(root->page, 2)->off = nxt;
1943 bt_putid(root->page->right, 0);
1944 root->page->min = nxt; // reset lowest used offset and key count
1945 root->page->cnt = 2;
1946 root->page->act = 2;
1949 // release and unpin root
1951 bt_unlockpage(BtLockWrite, root->latch);
1952 bt_unpinlatch (root->latch);
1953 bt_unpinpool (root->pool);
1957 // split already locked full node
1960 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1962 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1963 unsigned char fencekey[256], rightkey[256];
1964 unsigned char value[BtId];
1965 uint lvl = set->page->lvl;
1971 // split higher half of keys to bt->frame
1973 memset (bt->frame, 0, bt->mgr->page_size);
1974 max = set->page->cnt;
1978 while( cnt++ < max ) {
1979 if( slotptr(set->page, cnt)->dead && cnt < max )
1981 val = valptr(set->page, cnt);
1982 nxt -= val->len + 1;
1983 ((unsigned char *)bt->frame)[nxt] = val->len;
1984 memcpy ((unsigned char *)bt->frame + nxt + 1, val->value, val->len);
1986 key = keyptr(set->page, cnt);
1987 nxt -= key->len + 1;
1988 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1990 // add librarian slot
1992 slotptr(bt->frame, ++idx)->off = nxt;
1993 slotptr(bt->frame, idx)->librarian = 1;
1994 slotptr(bt->frame, idx)->dead = 1;
1998 slotptr(bt->frame, ++idx)->off = nxt;
2000 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2004 // remember existing fence key for new page to the right
2006 memcpy (rightkey, key, key->len + 1);
2008 bt->frame->bits = bt->mgr->page_bits;
2009 bt->frame->min = nxt;
2010 bt->frame->cnt = idx;
2011 bt->frame->lvl = lvl;
2015 if( set->page_no > ROOT_page )
2016 memcpy (bt->frame->right, set->page->right, BtId);
2018 // get new free page and write higher keys to it.
2020 if( !(right->page_no = bt_newpage(bt, bt->frame)) )
2023 // update lower keys to continue in old page
2025 memcpy (bt->frame, set->page, bt->mgr->page_size);
2026 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2027 nxt = bt->mgr->page_size;
2028 set->page->garbage = 0;
2034 if( slotptr(bt->frame, max)->librarian )
2037 // assemble page of smaller keys
2039 while( cnt++ < max ) {
2040 if( slotptr(bt->frame, cnt)->dead )
2042 val = valptr(bt->frame, cnt);
2043 nxt -= val->len + 1;
2044 ((unsigned char *)set->page)[nxt] = val->len;
2045 memcpy ((unsigned char *)set->page + nxt + 1, val->value, val->len);
2047 key = keyptr(bt->frame, cnt);
2048 nxt -= key->len + 1;
2049 memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
2051 // add librarian slot
2053 slotptr(set->page, ++idx)->off = nxt;
2054 slotptr(set->page, idx)->librarian = 1;
2055 slotptr(set->page, idx)->dead = 1;
2059 slotptr(set->page, ++idx)->off = nxt;
2063 // remember fence key for smaller page
2065 memcpy(fencekey, key, key->len + 1);
2067 bt_putid(set->page->right, right->page_no);
2068 set->page->min = nxt;
2069 set->page->cnt = idx;
2071 // if current page is the root page, split it
2073 if( set->page_no == ROOT_page )
2074 return bt_splitroot (bt, set, fencekey, right->page_no);
2076 // insert new fences in their parent pages
2078 right->latch = bt_pinlatch (bt, right->page_no);
2079 bt_lockpage (BtLockParent, right->latch);
2081 bt_lockpage (BtLockParent, set->latch);
2082 bt_unlockpage (BtLockWrite, set->latch);
2084 // insert new fence for reformulated left block of smaller keys
2086 bt_putid (value, set->page_no);
2088 if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId) )
2091 // switch fence for right block of larger keys to new right page
2093 bt_putid (value, right->page_no);
2095 if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId) )
2098 bt_unlockpage (BtLockParent, set->latch);
2099 bt_unpinlatch (set->latch);
2100 bt_unpinpool (set->pool);
2102 bt_unlockpage (BtLockParent, right->latch);
2103 bt_unpinlatch (right->latch);
2106 // Insert new key into the btree at given level.
2108 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen)
2118 if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
2119 ptr = keyptr(set->page, slot);
2122 bt->err = BTERR_ovflw;
2126 // if librarian slot == found slot, advance to real slot
2128 if( slotptr(set->page, slot)->librarian )
2129 if( !keycmp (ptr, key, keylen) )
2130 ptr = keyptr(set->page, ++slot);
2132 // if key already exists, update value and return
2134 if( reuse = !keycmp (ptr, key, keylen) )
2135 if( val = valptr(set->page, slot), val->len >= vallen ) {
2136 if( slotptr(set->page, slot)->dead )
2138 set->page->garbage += val->len - vallen;
2139 slotptr(set->page, slot)->dead = 0;
2141 memcpy (val->value, value, vallen);
2142 bt_unlockpage(BtLockWrite, set->latch);
2143 bt_unpinlatch (set->latch);
2144 bt_unpinpool (set->pool);
2147 if( !slotptr(set->page, slot)->dead ) {
2148 set->page->garbage += val->len + ptr->len + 2;
2151 slotptr(set->page, slot)->dead = 1;
2154 // if found slot > desired slot and previous slot
2155 // is a librarian slot, use it
2157 if( !reuse && slot > 1 )
2158 if( slotptr(set->page, slot-1)->librarian )
2161 // check if page has enough space
2163 if( slot = bt_cleanpage (bt, set->page, keylen, slot, vallen) )
2166 if( bt_splitpage (bt, set) )
2170 // calculate next available slot and copy key into page
2172 set->page->min -= vallen + 1; // reset lowest used offset
2173 ((unsigned char *)set->page)[set->page->min] = vallen;
2174 memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
2176 set->page->min -= keylen + 1; // reset lowest used offset
2177 ((unsigned char *)set->page)[set->page->min] = keylen;
2178 memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
2180 for( idx = slot; idx < set->page->cnt; idx++ )
2181 if( slotptr(set->page, idx)->dead )
2184 // now insert key into array before slot
2186 if( !reuse && idx == set->page->cnt )
2187 idx++, set->page->cnt++;
2192 *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
2194 slotptr(set->page, slot)->off = set->page->min;
2195 slotptr(set->page, slot)->librarian = 0;
2196 slotptr(set->page, slot)->dead = 0;
2198 bt_unlockpage (BtLockWrite, set->latch);
2199 bt_unpinlatch (set->latch);
2200 bt_unpinpool (set->pool);
2204 // cache page of keys into cursor and return starting slot for given key
2206 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2211 // cache page for retrieval
2213 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2214 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2218 bt->cursor_page = set->page_no;
2220 bt_unlockpage(BtLockRead, set->latch);
2221 bt_unpinlatch (set->latch);
2222 bt_unpinpool (set->pool);
2226 // return next slot for cursor page
2227 // or slide cursor right into next page
2229 uint bt_nextkey (BtDb *bt, uint slot)
2235 right = bt_getid(bt->cursor->right);
2237 while( slot++ < bt->cursor->cnt )
2238 if( slotptr(bt->cursor,slot)->dead )
2240 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2248 bt->cursor_page = right;
2250 if( set->pool = bt_pinpool (bt, right) )
2251 set->page = bt_page (bt, set->pool, right);
2255 set->latch = bt_pinlatch (bt, right);
2256 bt_lockpage(BtLockRead, set->latch);
2258 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2260 bt_unlockpage(BtLockRead, set->latch);
2261 bt_unpinlatch (set->latch);
2262 bt_unpinpool (set->pool);
2270 BtKey bt_key(BtDb *bt, uint slot)
2272 return keyptr(bt->cursor, slot);
2275 BtVal bt_val(BtDb *bt, uint slot)
2277 return valptr(bt->cursor,slot);
2283 double getCpuTime(int type)
2286 FILETIME xittime[1];
2287 FILETIME systime[1];
2288 FILETIME usrtime[1];
2289 SYSTEMTIME timeconv[1];
2292 memset (timeconv, 0, sizeof(SYSTEMTIME));
2296 GetSystemTimeAsFileTime (xittime);
2297 FileTimeToSystemTime (xittime, timeconv);
2298 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2301 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2302 FileTimeToSystemTime (usrtime, timeconv);
2305 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2306 FileTimeToSystemTime (systime, timeconv);
2310 ans += (double)timeconv->wHour * 3600;
2311 ans += (double)timeconv->wMinute * 60;
2312 ans += (double)timeconv->wSecond;
2313 ans += (double)timeconv->wMilliseconds / 1000;
2318 #include <sys/resource.h>
2320 double getCpuTime(int type)
2322 struct rusage used[1];
2323 struct timeval tv[1];
2327 gettimeofday(tv, NULL);
2328 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2331 getrusage(RUSAGE_SELF, used);
2332 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2335 getrusage(RUSAGE_SELF, used);
2336 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2343 uint bt_latchaudit (BtDb *bt)
2345 ushort idx, hashidx;
2352 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2354 if( *(ushort *)(bt->mgr->latchmgr->lock) )
2355 fprintf(stderr, "Alloc page locked\n");
2356 *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2358 for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2359 latch = bt->mgr->latchsets + idx;
2360 if( *latch->readwr->rin & MASK )
2361 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2362 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2364 if( *latch->access->rin & MASK )
2365 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2366 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2368 if( *latch->parent->rin & MASK )
2369 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2370 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2373 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2378 for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2379 if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2380 fprintf(stderr, "hash entry %d locked\n", hashidx);
2382 *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2384 if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2385 latch = bt->mgr->latchsets + idx;
2386 if( *(ushort *)latch->busy )
2387 fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2388 *(ushort *)latch->busy = 0;
2390 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2391 } while( idx = latch->next );
2394 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2395 page_no = LEAF_page;
2397 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2398 off64_t off = page_no << bt->mgr->page_bits;
2400 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2404 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2406 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2407 fprintf(stderr, "page %.8x unable to read\n", page_no);
2409 if( *amt < bt->mgr->page_size )
2410 fprintf(stderr, "page %.8x unable to read\n", page_no);
2412 if( !bt->frame->free ) {
2413 for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2414 ptr = keyptr(bt->frame, idx+1);
2415 if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2416 fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2418 if( !bt->frame->lvl )
2419 cnt += bt->frame->act;
2421 if( page_no > LEAF_page )
2436 // standalone program to index file of keys
2437 // then list them onto std-out
2440 void *index_file (void *arg)
2442 uint __stdcall index_file (void *arg)
2445 int line = 0, found = 0, cnt = 0;
2446 uid next, page_no = LEAF_page; // start on first page of leaves
2447 unsigned char key[256];
2448 ThreadArg *args = arg;
2449 int ch, len = 0, slot;
2455 bt = bt_open (args->mgr);
2457 switch(args->type[0] | 0x20)
2460 fprintf(stderr, "started latch mgr audit\n");
2461 cnt = bt_latchaudit (bt);
2462 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2466 fprintf(stderr, "started pennysort for %s\n", args->infile);
2467 if( in = fopen (args->infile, "rb") )
2468 while( ch = getc(in), ch != EOF )
2473 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10) )
2474 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2477 else if( len < 255 )
2479 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2483 fprintf(stderr, "started indexing for %s\n", args->infile);
2484 if( in = fopen (args->infile, "rb") )
2485 while( ch = getc(in), ch != EOF )
2490 if( args->num == 1 )
2491 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2493 else if( args->num )
2494 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2496 if( bt_insertkey (bt, key, len, 0, NULL, 0) )
2497 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2500 else if( len < 255 )
2502 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2506 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2507 if( in = fopen (args->infile, "rb") )
2508 while( ch = getc(in), ch != EOF )
2512 if( args->num == 1 )
2513 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2515 else if( args->num )
2516 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2518 if( bt_deletekey (bt, key, len, 0) )
2519 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2522 else if( len < 255 )
2524 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
2528 fprintf(stderr, "started finding keys for %s\n", args->infile);
2529 if( in = fopen (args->infile, "rb") )
2530 while( ch = getc(in), ch != EOF )
2534 if( args->num == 1 )
2535 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2537 else if( args->num )
2538 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2540 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2543 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2546 else if( len < 255 )
2548 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2552 fprintf(stderr, "started scanning\n");
2554 if( set->pool = bt_pinpool (bt, page_no) )
2555 set->page = bt_page (bt, set->pool, page_no);
2558 set->latch = bt_pinlatch (bt, page_no);
2559 bt_lockpage (BtLockRead, set->latch);
2560 next = bt_getid (set->page->right);
2561 cnt += set->page->act;
2563 for( slot = 0; slot++ < set->page->cnt; )
2564 if( next || slot < set->page->cnt )
2565 if( !slotptr(set->page, slot)->dead ) {
2566 ptr = keyptr(set->page, slot);
2567 fwrite (ptr->key, ptr->len, 1, stdout);
2568 fputc ('\n', stdout);
2571 bt_unlockpage (BtLockRead, set->latch);
2572 bt_unpinlatch (set->latch);
2573 bt_unpinpool (set->pool);
2574 } while( page_no = next );
2576 cnt--; // remove stopper key
2577 fprintf(stderr, " Total keys read %d\n", cnt);
2582 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2584 fprintf(stderr, "started counting\n");
2585 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2586 page_no = LEAF_page;
2588 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2589 uid off = page_no << bt->mgr->page_bits;
2591 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2595 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2597 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2598 return bt->err = BTERR_map;
2600 if( *amt < bt->mgr->page_size )
2601 return bt->err = BTERR_map;
2603 if( !bt->frame->free && !bt->frame->lvl )
2604 cnt += bt->frame->act;
2605 if( page_no > LEAF_page )
2610 cnt--; // remove stopper key
2611 fprintf(stderr, " Total keys read %d\n", cnt);
2623 typedef struct timeval timer;
2625 int main (int argc, char **argv)
2627 int idx, cnt, len, slot, err;
2628 int segsize, bits = 16;
2645 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2646 fprintf (stderr, " where page_bits is the page size in bits\n");
2647 fprintf (stderr, " mapped_segments is the number of mmap segments in buffer pool\n");
2648 fprintf (stderr, " seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2649 fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n");
2650 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
2654 start = getCpuTime(0);
2657 bits = atoi(argv[3]);
2660 poolsize = atoi(argv[4]);
2663 fprintf (stderr, "Warning: no mapped_pool\n");
2665 if( poolsize > 65535 )
2666 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2669 segsize = atoi(argv[5]);
2671 segsize = 4; // 16 pages per mmap segment
2674 num = atoi(argv[6]);
2678 threads = malloc (cnt * sizeof(pthread_t));
2680 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2682 args = malloc (cnt * sizeof(ThreadArg));
2684 mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2687 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2693 for( idx = 0; idx < cnt; idx++ ) {
2694 args[idx].infile = argv[idx + 7];
2695 args[idx].type = argv[2];
2696 args[idx].mgr = mgr;
2697 args[idx].num = num;
2698 args[idx].idx = idx;
2700 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2701 fprintf(stderr, "Error creating thread %d\n", err);
2703 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2707 // wait for termination
2710 for( idx = 0; idx < cnt; idx++ )
2711 pthread_join (threads[idx], NULL);
2713 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2715 for( idx = 0; idx < cnt; idx++ )
2716 CloseHandle(threads[idx]);
2719 elapsed = getCpuTime(0) - start;
2720 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2721 elapsed = getCpuTime(1);
2722 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2723 elapsed = getCpuTime(2);
2724 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);