1 // btree version threadskv3 sched_yield version
2 // with reworked bt_deletekey code
3 // and phase-fair reader writer lock
4 // and librarian page split code
7 // author: karl malbrain, malbrain@cal.berkeley.edu
10 This work, including the source code, documentation
11 and related data, is placed into the public domain.
13 The orginal author is Karl Malbrain.
15 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
16 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
17 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
18 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
19 RESULTING FROM THE USE, MODIFICATION, OR
20 REDISTRIBUTION OF THIS SOFTWARE.
23 // Please see the project home page for documentation
24 // code.google.com/p/high-concurrency-btree
26 #define _FILE_OFFSET_BITS 64
27 #define _LARGEFILE64_SOURCE
43 #define WIN32_LEAN_AND_MEAN
57 typedef unsigned long long uid;
60 typedef unsigned long long off64_t;
61 typedef unsigned short ushort;
62 typedef unsigned int uint;
65 #define BT_latchtable 128 // number of latch manager slots
67 #define BT_ro 0x6f72 // ro
68 #define BT_rw 0x7772 // rw
70 #define BT_maxbits 24 // maximum page size in bits
71 #define BT_minbits 9 // minimum page size in bits
72 #define BT_minpage (1 << BT_minbits) // minimum page size
73 #define BT_maxpage (1 << BT_maxbits) // maximum page size
76 There are five lock types for each node in three independent sets:
77 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
78 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
79 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
80 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
81 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
92 // definition for phase-fair reader/writer lock implementation
106 // definition for spin latch implementation
108 // exclusive is set for write access
109 // share is count of read accessors
110 // grant write lock when share == 0
112 volatile typedef struct {
123 // hash table entries
126 BtSpinLatch latch[1];
127 volatile ushort slot; // Latch table entry at head of chain
130 // latch manager table structure
133 RWLock readwr[1]; // read/write page lock
134 RWLock access[1]; // Access Intent/Page delete
135 RWLock parent[1]; // Posting of fence key in parent
136 BtSpinLatch busy[1]; // slot is being moved between chains
137 volatile ushort next; // next entry in hash table chain
138 volatile ushort prev; // prev entry in hash table chain
139 volatile ushort pin; // number of outstanding locks
140 volatile ushort hash; // hash slot entry is under
141 volatile uid page_no; // latch set page number
144 // Define the length of the page and key pointers
148 // Page key slot definition.
150 // Keys are marked dead, but remain on the page until
151 // it cleanup is called. The fence key (highest key) for
152 // the page is always present, even after cleanup.
155 uint off:BT_maxbits; // page offset for key start
156 uint librarian:1; // set for librarian slot
157 uint dead:1; // set for deleted key
160 // The key structure occupies space at the upper end of
161 // each page. It's a length byte followed by the key
166 unsigned char key[1];
169 // the value structure also occupies space at the upper
174 unsigned char value[1];
177 // The first part of an index page.
178 // It is immediately followed
179 // by the BtSlot array of keys.
181 typedef struct BtPage_ {
182 uint cnt; // count of keys in page
183 uint act; // count of active keys
184 uint min; // next key offset
185 uint garbage; // page garbage in bytes
186 unsigned char bits:7; // page size in bits
187 unsigned char free:1; // page is on free chain
188 unsigned char lvl:7; // level of page
189 unsigned char kill:1; // page is being deleted
190 unsigned char right[BtId]; // page number to right
193 // The memory mapping pool table buffer manager entry
196 uid basepage; // mapped base page number
197 char *map; // mapped memory pointer
198 ushort slot; // slot index in this array
199 ushort pin; // mapped page pin counter
200 void *hashprev; // previous pool entry for the same hash idx
201 void *hashnext; // next pool entry for the same hash idx
203 HANDLE hmap; // Windows memory mapping handle
207 #define CLOCK_bit 0x8000 // bit in pool->pin
209 // The loadpage interface object
212 uid page_no; // current page number
213 BtPage page; // current page pointer
214 BtPool *pool; // current page pool
215 BtLatchSet *latch; // current page latch set
218 // structure for latch manager on ALLOC_page
221 struct BtPage_ alloc[1]; // next page_no in right ptr
222 unsigned char chain[BtId]; // head of free page_nos chain
223 BtSpinLatch lock[1]; // allocation area lite latch
224 ushort latchdeployed; // highest number of latch entries deployed
225 ushort nlatchpage; // number of latch pages at BT_latch
226 ushort latchtotal; // number of page latch entries
227 ushort latchhash; // number of latch hash table slots
228 ushort latchvictim; // next latch entry to examine
229 BtHashEntry table[0]; // the hash table
232 // The object structure for Btree access
235 uint page_size; // page size
236 uint page_bits; // page size in bits
237 uint seg_bits; // seg size in pages in bits
238 uint mode; // read-write mode
244 ushort poolcnt; // highest page pool node in use
245 ushort poolmax; // highest page pool node allocated
246 ushort poolmask; // total number of pages in mmap segment - 1
247 ushort hashsize; // size of Hash Table for pool entries
248 volatile uint evicted; // last evicted hash table slot
249 ushort *hash; // pool index for hash entries
250 BtSpinLatch *latch; // latches for hash table slots
251 BtLatchMgr *latchmgr; // mapped latch page from allocation page
252 BtLatchSet *latchsets; // mapped latch set from latch pages
253 BtPool *pool; // memory pool page segments
255 HANDLE halloc; // allocation and latch table handle
260 BtMgr *mgr; // buffer manager for thread
261 BtPage cursor; // cached frame for start/next (never mapped)
262 BtPage frame; // spare frame for the page split (never mapped)
263 uid cursor_page; // current cursor page number
264 unsigned char *mem; // frame, cursor, page memory buffer
265 int found; // last delete or insert was found
266 int err; // last error
280 extern void bt_close (BtDb *bt);
281 extern BtDb *bt_open (BtMgr *mgr);
282 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen);
283 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
284 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint vallen);
285 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
286 extern uint bt_nextkey (BtDb *bt, uint slot);
289 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
290 void bt_mgrclose (BtMgr *mgr);
292 // Helper functions to return slot values
293 // from the cursor page.
295 extern BtKey bt_key (BtDb *bt, uint slot);
296 extern BtVal bt_val (BtDb *bt, uint slot);
298 // BTree page number constants
299 #define ALLOC_page 0 // allocation & latch manager hash table
300 #define ROOT_page 1 // root of the btree
301 #define LEAF_page 2 // first page of leaves
302 #define LATCH_page 3 // pages for latch manager
304 // Number of levels to create in a new BTree
308 // The page is allocated from low and hi ends.
309 // The key slots are allocated from the bottom,
310 // while the text and value of the key
311 // are allocated from the top. When the two
312 // areas meet, the page is split into two.
314 // A key consists of a length byte, two bytes of
315 // index number (0 - 65534), and up to 253 bytes
316 // of key value. Duplicate keys are discarded.
317 // Associated with each key is a value byte string
318 // containing any value desired.
320 // The b-tree root is always located at page 1.
321 // The first leaf page of level zero is always
322 // located on page 2.
324 // The b-tree pages are linked with next
325 // pointers to facilitate enumerators,
326 // and provide for concurrency.
328 // When to root page fills, it is split in two and
329 // the tree height is raised by a new root at page
330 // one with two keys.
332 // Deleted keys are marked with a dead bit until
333 // page cleanup. The fence key for a leaf node is
336 // Groups of pages called segments from the btree are optionally
337 // cached with a memory mapped pool. A hash table is used to keep
338 // track of the cached segments. This behaviour is controlled
339 // by the cache block size parameter to bt_open.
341 // To achieve maximum concurrency one page is locked at a time
342 // as the tree is traversed to find leaf key in question. The right
343 // page numbers are used in cases where the page is being split,
346 // Page 0 is dedicated to lock for new page extensions,
347 // and chains empty pages together for reuse. It also
348 // contains the latch manager hash table.
350 // The ParentModification lock on a node is obtained to serialize posting
351 // or changing the fence key for a node.
353 // Empty pages are chained together through the ALLOC page and reused.
355 // Access macros to address slot and key values from the page
356 // Page slots use 1 based indexing.
358 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
359 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
360 #define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
362 void bt_putid(unsigned char *dest, uid id)
367 dest[i] = (unsigned char)id, id >>= 8;
370 uid bt_getid(unsigned char *src)
375 for( i = 0; i < BtId; i++ )
376 id <<= 8, id |= *src++;
381 // Phase-Fair reader/writer lock implementation
383 void WriteLock (RWLock *lock)
388 tix = __sync_fetch_and_add (lock->ticket, 1);
390 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
392 // wait for our ticket to come up
394 while( tix != lock->serving[0] )
401 w = PRES | (tix & PHID);
403 r = __sync_fetch_and_add (lock->rin, w);
405 r = _InterlockedExchangeAdd16 (lock->rin, w);
407 while( r != *lock->rout )
415 void WriteRelease (RWLock *lock)
418 __sync_fetch_and_and (lock->rin, ~MASK);
420 _InterlockedAnd16 (lock->rin, ~MASK);
425 void ReadLock (RWLock *lock)
429 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
431 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
434 while( w == (*lock->rin & MASK) )
442 void ReadRelease (RWLock *lock)
445 __sync_fetch_and_add (lock->rout, RINC);
447 _InterlockedExchangeAdd16 (lock->rout, RINC);
451 // Spin Latch Manager
453 // wait until write lock mode is clear
454 // and add 1 to the share count
456 void bt_spinreadlock(BtSpinLatch *latch)
462 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
464 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
466 // see if exclusive request is granted or pending
471 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
473 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
476 } while( sched_yield(), 1 );
478 } while( SwitchToThread(), 1 );
482 // wait for other read and write latches to relinquish
484 void bt_spinwritelock(BtSpinLatch *latch)
490 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
492 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
495 if( !(prev & ~BOTH) )
499 __sync_fetch_and_and ((ushort *)latch, ~XCL);
501 _InterlockedAnd16((ushort *)latch, ~XCL);
504 } while( sched_yield(), 1 );
506 } while( SwitchToThread(), 1 );
510 // try to obtain write lock
512 // return 1 if obtained,
515 int bt_spinwritetry(BtSpinLatch *latch)
520 prev = __sync_fetch_and_or((ushort *)latch, XCL);
522 prev = _InterlockedOr16((ushort *)latch, XCL);
524 // take write access if all bits are clear
527 if( !(prev & ~BOTH) )
531 __sync_fetch_and_and ((ushort *)latch, ~XCL);
533 _InterlockedAnd16((ushort *)latch, ~XCL);
540 void bt_spinreleasewrite(BtSpinLatch *latch)
543 __sync_fetch_and_and((ushort *)latch, ~BOTH);
545 _InterlockedAnd16((ushort *)latch, ~BOTH);
549 // decrement reader count
551 void bt_spinreleaseread(BtSpinLatch *latch)
554 __sync_fetch_and_add((ushort *)latch, -SHARE);
556 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
560 // link latch table entry into latch hash table
562 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
564 BtLatchSet *set = bt->mgr->latchsets + victim;
566 if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
567 bt->mgr->latchsets[set->next].prev = victim;
569 bt->mgr->latchmgr->table[hashidx].slot = victim;
570 set->page_no = page_no;
577 void bt_unpinlatch (BtLatchSet *set)
580 __sync_fetch_and_add(&set->pin, -1);
582 _InterlockedDecrement16 (&set->pin);
586 // find existing latchset or inspire new one
587 // return with latchset pinned
589 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
591 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
592 ushort slot, avail = 0, victim, idx;
595 // obtain read lock on hash table entry
597 bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
599 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
601 set = bt->mgr->latchsets + slot;
602 if( page_no == set->page_no )
604 } while( slot = set->next );
608 __sync_fetch_and_add(&set->pin, 1);
610 _InterlockedIncrement16 (&set->pin);
614 bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
619 // try again, this time with write lock
621 bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
623 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
625 set = bt->mgr->latchsets + slot;
626 if( page_no == set->page_no )
628 if( !set->pin && !avail )
630 } while( slot = set->next );
632 // found our entry, or take over an unpinned one
634 if( slot || (slot = avail) ) {
635 set = bt->mgr->latchsets + slot;
637 __sync_fetch_and_add(&set->pin, 1);
639 _InterlockedIncrement16 (&set->pin);
641 set->page_no = page_no;
642 bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
646 // see if there are any unused entries
648 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
650 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
653 if( victim < bt->mgr->latchmgr->latchtotal ) {
654 set = bt->mgr->latchsets + victim;
656 __sync_fetch_and_add(&set->pin, 1);
658 _InterlockedIncrement16 (&set->pin);
660 bt_latchlink (bt, hashidx, victim, page_no);
661 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
666 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
668 victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
670 // find and reuse previous lock entry
674 victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
676 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
678 // we don't use slot zero
680 if( victim %= bt->mgr->latchmgr->latchtotal )
681 set = bt->mgr->latchsets + victim;
685 // take control of our slot
686 // from other threads
688 if( set->pin || !bt_spinwritetry (set->busy) )
693 // try to get write lock on hash chain
694 // skip entry if not obtained
695 // or has outstanding locks
697 if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
698 bt_spinreleasewrite (set->busy);
703 bt_spinreleasewrite (set->busy);
704 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
708 // unlink our available victim from its hash chain
711 bt->mgr->latchsets[set->prev].next = set->next;
713 bt->mgr->latchmgr->table[idx].slot = set->next;
716 bt->mgr->latchsets[set->next].prev = set->prev;
718 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
720 __sync_fetch_and_add(&set->pin, 1);
722 _InterlockedIncrement16 (&set->pin);
724 bt_latchlink (bt, hashidx, victim, page_no);
725 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
726 bt_spinreleasewrite (set->busy);
731 void bt_mgrclose (BtMgr *mgr)
736 // release mapped pages
737 // note that slot zero is never used
739 for( slot = 1; slot < mgr->poolmax; slot++ ) {
740 pool = mgr->pool + slot;
743 munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
746 FlushViewOfFile(pool->map, 0);
747 UnmapViewOfFile(pool->map);
748 CloseHandle(pool->hmap);
754 munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
755 munmap (mgr->latchmgr, 2 * mgr->page_size);
757 FlushViewOfFile(mgr->latchmgr, 0);
758 UnmapViewOfFile(mgr->latchmgr);
759 CloseHandle(mgr->halloc);
765 free ((void *)mgr->latch);
768 FlushFileBuffers(mgr->idx);
769 CloseHandle(mgr->idx);
770 GlobalFree (mgr->pool);
771 GlobalFree (mgr->hash);
772 GlobalFree ((void *)mgr->latch);
777 // close and release memory
779 void bt_close (BtDb *bt)
786 VirtualFree (bt->mem, 0, MEM_RELEASE);
791 // open/create new btree buffer manager
793 // call with file_name, BT_openmode, bits in page size (e.g. 16),
794 // size of mapped page pool (e.g. 8192)
796 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
798 uint lvl, attr, cacheblk, last, slot, idx;
799 uint nlatchpage, latchhash;
800 unsigned char value[BtId];
801 BtLatchMgr *latchmgr;
810 SYSTEM_INFO sysinfo[1];
813 // determine sanity of page size and buffer pool
815 if( bits > BT_maxbits )
817 else if( bits < BT_minbits )
821 return NULL; // must have buffer pool
824 mgr = calloc (1, sizeof(BtMgr));
826 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
829 return free(mgr), NULL;
831 cacheblk = 4096; // minimum mmap segment size for unix
834 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
835 attr = FILE_ATTRIBUTE_NORMAL;
836 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
838 if( mgr->idx == INVALID_HANDLE_VALUE )
839 return GlobalFree(mgr), NULL;
841 // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
842 GetSystemInfo(sysinfo);
843 cacheblk = sysinfo->dwAllocationGranularity;
847 latchmgr = malloc (BT_maxpage);
850 // read minimum page size to get root info
852 if( size = lseek (mgr->idx, 0L, 2) ) {
853 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
854 bits = latchmgr->alloc->bits;
856 return free(mgr), free(latchmgr), NULL;
857 } else if( mode == BT_ro )
858 return free(latchmgr), bt_mgrclose (mgr), NULL;
860 latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
861 size = GetFileSize(mgr->idx, amt);
864 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
865 return bt_mgrclose (mgr), NULL;
866 bits = latchmgr->alloc->bits;
867 } else if( mode == BT_ro )
868 return bt_mgrclose (mgr), NULL;
871 mgr->page_size = 1 << bits;
872 mgr->page_bits = bits;
874 mgr->poolmax = poolmax;
877 if( cacheblk < mgr->page_size )
878 cacheblk = mgr->page_size;
880 // mask for partial memmaps
882 mgr->poolmask = (cacheblk >> bits) - 1;
884 // see if requested size of pages per memmap is greater
886 if( (1 << segsize) > mgr->poolmask )
887 mgr->poolmask = (1 << segsize) - 1;
891 while( (1 << mgr->seg_bits) <= mgr->poolmask )
894 mgr->hashsize = hashsize;
897 mgr->pool = calloc (poolmax, sizeof(BtPool));
898 mgr->hash = calloc (hashsize, sizeof(ushort));
899 mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
901 mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
902 mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
903 mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
909 // initialize an empty b-tree with latch page, root page, page of leaves
910 // and page(s) of latches
912 memset (latchmgr, 0, 1 << bits);
913 nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1;
914 bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
915 latchmgr->alloc->bits = mgr->page_bits;
917 latchmgr->nlatchpage = nlatchpage;
918 latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
920 // initialize latch manager
922 latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
924 // size of hash table = total number of latchsets
926 if( latchhash > latchmgr->latchtotal )
927 latchhash = latchmgr->latchtotal;
929 latchmgr->latchhash = latchhash;
932 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
933 return bt_mgrclose (mgr), NULL;
935 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
936 return bt_mgrclose (mgr), NULL;
938 if( *amt < mgr->page_size )
939 return bt_mgrclose (mgr), NULL;
942 memset (latchmgr, 0, 1 << bits);
943 latchmgr->alloc->bits = mgr->page_bits;
945 for( lvl=MIN_lvl; lvl--; ) {
946 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
947 key = keyptr(latchmgr->alloc, 1);
948 key->len = 2; // create stopper key
952 bt_putid(value, MIN_lvl - lvl + 1);
953 val = valptr(latchmgr->alloc, 1);
954 val->len = lvl ? BtId : 0;
955 memcpy (val->value, value, val->len);
957 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
958 latchmgr->alloc->lvl = lvl;
959 latchmgr->alloc->cnt = 1;
960 latchmgr->alloc->act = 1;
962 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
963 return bt_mgrclose (mgr), NULL;
965 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
966 return bt_mgrclose (mgr), NULL;
968 if( *amt < mgr->page_size )
969 return bt_mgrclose (mgr), NULL;
973 // clear out latch manager locks
974 // and rest of pages to round out segment
976 memset(latchmgr, 0, mgr->page_size);
979 while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
981 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
983 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
984 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
985 return bt_mgrclose (mgr), NULL;
986 if( *amt < mgr->page_size )
987 return bt_mgrclose (mgr), NULL;
994 // mlock the root page and the latchmgr page
996 flag = PROT_READ | PROT_WRITE;
997 mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
998 if( mgr->latchmgr == MAP_FAILED )
999 return bt_mgrclose (mgr), NULL;
1000 mlock (mgr->latchmgr, 2 * mgr->page_size);
1002 mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1003 if( mgr->latchsets == MAP_FAILED )
1004 return bt_mgrclose (mgr), NULL;
1005 mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1007 flag = PAGE_READWRITE;
1008 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1010 return bt_mgrclose (mgr), NULL;
1012 flag = FILE_MAP_WRITE;
1013 mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1014 if( !mgr->latchmgr )
1015 return GetLastError(), bt_mgrclose (mgr), NULL;
1017 mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1023 VirtualFree (latchmgr, 0, MEM_RELEASE);
1028 // open BTree access method
1029 // based on buffer manager
1031 BtDb *bt_open (BtMgr *mgr)
1033 BtDb *bt = malloc (sizeof(*bt));
1035 memset (bt, 0, sizeof(*bt));
1038 bt->mem = malloc (2 *mgr->page_size);
1040 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1042 bt->frame = (BtPage)bt->mem;
1043 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1047 // compare two keys, returning > 0, = 0, or < 0
1048 // as the comparison value
1050 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1052 uint len1 = key1->len;
1055 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1068 // find segment in pool
1069 // must be called with hashslot idx locked
1070 // return NULL if not there
1071 // otherwise return node
1073 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1078 // compute start of hash chain in pool
1080 if( slot = bt->mgr->hash[idx] )
1081 pool = bt->mgr->pool + slot;
1085 page_no &= ~bt->mgr->poolmask;
1087 while( pool->basepage != page_no )
1088 if( pool = pool->hashnext )
1096 // add segment to hash table
1098 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1103 pool->hashprev = pool->hashnext = NULL;
1104 pool->basepage = page_no & ~bt->mgr->poolmask;
1105 pool->pin = CLOCK_bit + 1;
1107 if( slot = bt->mgr->hash[idx] ) {
1108 node = bt->mgr->pool + slot;
1109 pool->hashnext = node;
1110 node->hashprev = pool;
1113 bt->mgr->hash[idx] = pool->slot;
1116 // map new buffer pool segment to virtual memory
1118 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1120 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1121 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1125 flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1126 pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1127 if( pool->map == MAP_FAILED )
1128 return bt->err = BTERR_map;
1131 flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1132 pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1134 return bt->err = BTERR_map;
1136 flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1137 pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1139 return bt->err = BTERR_map;
1144 // calculate page within pool
1146 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1148 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1151 page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1152 // madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1158 void bt_unpinpool (BtPool *pool)
1161 __sync_fetch_and_add(&pool->pin, -1);
1163 _InterlockedDecrement16 (&pool->pin);
1167 // find or place requested page in segment-pool
1168 // return pool table entry, incrementing pin
1170 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1172 uint slot, hashidx, idx, victim;
1173 BtPool *pool, *node, *next;
1175 // lock hash table chain
1177 hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1178 bt_spinwritelock (&bt->mgr->latch[hashidx]);
1180 // look up in hash table
1182 if( pool = bt_findpool(bt, page_no, hashidx) ) {
1184 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1185 __sync_fetch_and_add(&pool->pin, 1);
1187 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1188 _InterlockedIncrement16 (&pool->pin);
1190 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1194 // allocate a new pool node
1195 // and add to hash table
1198 slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1200 slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1203 if( ++slot < bt->mgr->poolmax ) {
1204 pool = bt->mgr->pool + slot;
1207 if( bt_mapsegment(bt, pool, page_no) )
1210 bt_linkhash(bt, pool, page_no, hashidx);
1211 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1215 // pool table is full
1216 // find best pool entry to evict
1219 __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1221 _InterlockedDecrement16 (&bt->mgr->poolcnt);
1226 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1228 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1230 victim %= bt->mgr->poolmax;
1231 pool = bt->mgr->pool + victim;
1232 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1237 // try to get write lock
1238 // skip entry if not obtained
1240 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1243 // skip this entry if
1245 // or clock bit is set
1249 __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1251 _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1253 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1257 // unlink victim pool node from hash table
1259 if( node = pool->hashprev )
1260 node->hashnext = pool->hashnext;
1261 else if( node = pool->hashnext )
1262 bt->mgr->hash[idx] = node->slot;
1264 bt->mgr->hash[idx] = 0;
1266 if( node = pool->hashnext )
1267 node->hashprev = pool->hashprev;
1269 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1271 // remove old file mapping
1273 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1275 // FlushViewOfFile(pool->map, 0);
1276 UnmapViewOfFile(pool->map);
1277 CloseHandle(pool->hmap);
1281 // create new pool mapping
1282 // and link into hash table
1284 if( bt_mapsegment(bt, pool, page_no) )
1287 bt_linkhash(bt, pool, page_no, hashidx);
1288 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1293 // place write, read, or parent lock on requested page_no.
1295 void bt_lockpage(BtLock mode, BtLatchSet *set)
1299 ReadLock (set->readwr);
1302 WriteLock (set->readwr);
1305 ReadLock (set->access);
1308 WriteLock (set->access);
1311 WriteLock (set->parent);
1316 // remove write, read, or parent lock on requested page
1318 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1322 ReadRelease (set->readwr);
1325 WriteRelease (set->readwr);
1328 ReadRelease (set->access);
1331 WriteRelease (set->access);
1334 WriteRelease (set->parent);
1339 // allocate a new page and write page into it
1341 uid bt_newpage(BtDb *bt, BtPage page)
1347 // lock allocation page
1349 bt_spinwritelock(bt->mgr->latchmgr->lock);
1351 // use empty chain first
1352 // else allocate empty page
1354 if( new_page = bt_getid(bt->mgr->latchmgr->chain) ) {
1355 if( set->pool = bt_pinpool (bt, new_page) )
1356 set->page = bt_page (bt, set->pool, new_page);
1360 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1361 bt_unpinpool (set->pool);
1363 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1364 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1366 // if writing first page of pool block, set file length thru last page
1368 if( (new_page & bt->mgr->poolmask) == 0 )
1369 ftruncate (bt->mgr->idx, (new_page + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1373 // unlock allocation latch
1375 bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1378 // bring new page into pool and copy page.
1379 // this will extend the file into the new pages on WIN32.
1381 if( set->pool = bt_pinpool (bt, new_page) )
1382 set->page = bt_page (bt, set->pool, new_page);
1386 memcpy(set->page, page, bt->mgr->page_size);
1387 bt_unpinpool (set->pool);
1390 // unlock allocation latch
1392 bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1397 // find slot in page for given key at a given level
1399 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1401 uint diff, higher = set->page->cnt, low = 1, slot;
1404 // make stopper key an infinite fence value
1406 if( bt_getid (set->page->right) )
1411 // low is the lowest candidate.
1412 // loop ends when they meet
1414 // higher is already
1415 // tested as .ge. the passed key.
1417 while( diff = higher - low ) {
1418 slot = low + ( diff >> 1 );
1419 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1422 higher = slot, good++;
1425 // return zero if key is on right link page
1427 return good ? higher : 0;
1430 // find and load page at given level for given key
1431 // leave page rd or wr locked as requested
1433 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1435 uid page_no = ROOT_page, prevpage = 0;
1436 uint drill = 0xff, slot;
1437 BtLatchSet *prevlatch;
1438 uint mode, prevmode;
1441 // start at root of btree and drill down
1444 // determine lock mode of drill level
1445 mode = (drill == lvl) ? lock : BtLockRead;
1447 set->latch = bt_pinlatch (bt, page_no);
1448 set->page_no = page_no;
1450 // pin page contents
1452 if( set->pool = bt_pinpool (bt, page_no) )
1453 set->page = bt_page (bt, set->pool, page_no);
1457 // obtain access lock using lock chaining with Access mode
1459 if( page_no > ROOT_page )
1460 bt_lockpage(BtLockAccess, set->latch);
1462 // release & unpin parent page
1465 bt_unlockpage(prevmode, prevlatch);
1466 bt_unpinlatch (prevlatch);
1467 bt_unpinpool (prevpool);
1471 // obtain read lock using lock chaining
1473 bt_lockpage(mode, set->latch);
1475 if( set->page->free )
1476 return bt->err = BTERR_struct, 0;
1478 if( page_no > ROOT_page )
1479 bt_unlockpage(BtLockAccess, set->latch);
1481 // re-read and re-lock root after determining actual level of root
1483 if( set->page->lvl != drill) {
1484 if( set->page_no != ROOT_page )
1485 return bt->err = BTERR_struct, 0;
1487 drill = set->page->lvl;
1489 if( lock != BtLockRead && drill == lvl ) {
1490 bt_unlockpage(mode, set->latch);
1491 bt_unpinlatch (set->latch);
1492 bt_unpinpool (set->pool);
1497 prevpage = set->page_no;
1498 prevlatch = set->latch;
1499 prevpool = set->pool;
1502 // find key on page at this level
1503 // and descend to requested level
1505 if( set->page->kill )
1508 if( slot = bt_findslot (set, key, len) ) {
1512 while( slotptr(set->page, slot)->dead )
1513 if( slot++ < set->page->cnt )
1518 page_no = bt_getid(valptr(set->page, slot)->value);
1523 // or slide right into next page
1526 page_no = bt_getid(set->page->right);
1530 // return error on end of right chain
1532 bt->err = BTERR_struct;
1533 return 0; // return error
1536 // return page to free list
1537 // page must be delete & write locked
1539 void bt_freepage (BtDb *bt, BtPageSet *set)
1541 // lock allocation page
1543 bt_spinwritelock (bt->mgr->latchmgr->lock);
1546 memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1547 bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1548 set->page->free = 1;
1550 // unlock released page
1552 bt_unlockpage (BtLockDelete, set->latch);
1553 bt_unlockpage (BtLockWrite, set->latch);
1554 bt_unpinlatch (set->latch);
1555 bt_unpinpool (set->pool);
1557 // unlock allocation page
1559 bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1562 // a fence key was deleted from a page
1563 // push new fence value upwards
1565 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1567 unsigned char leftkey[256], rightkey[256];
1568 unsigned char value[BtId];
1571 // remove the old fence value
1573 ptr = keyptr(set->page, set->page->cnt);
1574 memcpy (rightkey, ptr, ptr->len + 1);
1575 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1577 ptr = keyptr(set->page, set->page->cnt);
1578 memcpy (leftkey, ptr, ptr->len + 1);
1580 bt_lockpage (BtLockParent, set->latch);
1581 bt_unlockpage (BtLockWrite, set->latch);
1583 // insert new (now smaller) fence key
1585 bt_putid (value, set->page_no);
1587 if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId) )
1590 // now delete old fence key
1592 if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1) )
1595 bt_unlockpage (BtLockParent, set->latch);
1596 bt_unpinlatch(set->latch);
1597 bt_unpinpool (set->pool);
1601 // root has a single child
1602 // collapse a level from the tree
1604 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1609 // find the child entry and promote as new root contents
1612 for( idx = 0; idx++ < root->page->cnt; )
1613 if( !slotptr(root->page, idx)->dead )
1616 child->page_no = bt_getid (valptr(root->page, idx)->value);
1618 child->latch = bt_pinlatch (bt, child->page_no);
1619 bt_lockpage (BtLockDelete, child->latch);
1620 bt_lockpage (BtLockWrite, child->latch);
1622 if( child->pool = bt_pinpool (bt, child->page_no) )
1623 child->page = bt_page (bt, child->pool, child->page_no);
1627 memcpy (root->page, child->page, bt->mgr->page_size);
1628 bt_freepage (bt, child);
1630 } while( root->page->lvl > 1 && root->page->act == 1 );
1632 bt_unlockpage (BtLockWrite, root->latch);
1633 bt_unpinlatch (root->latch);
1634 bt_unpinpool (root->pool);
1638 // find and delete key on page by marking delete flag bit
1639 // if page becomes empty, delete it from the btree
1641 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1643 unsigned char lowerfence[256], higherfence[256];
1644 uint slot, idx, found, fence;
1645 BtPageSet set[1], right[1];
1646 unsigned char value[BtId];
1650 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1651 ptr = keyptr(set->page, slot);
1655 // if librarian slot, advance to real slot
1657 if( slotptr(set->page, slot)->librarian )
1658 ptr = keyptr(set->page, ++slot);
1660 fence = slot == set->page->cnt;
1662 // if key is found delete it, otherwise ignore request
1664 if( found = !keycmp (ptr, key, len) )
1665 if( found = slotptr(set->page, slot)->dead == 0 ) {
1666 val = valptr(set->page,slot);
1667 slotptr(set->page, slot)->dead = 1;
1668 set->page->garbage += ptr->len + val->len + 2;
1671 // collapse empty slots beneath the fence
1673 while( idx = set->page->cnt - 1 )
1674 if( slotptr(set->page, idx)->dead ) {
1675 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1676 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1681 // did we delete a fence key in an upper level?
1683 if( found && fence && lvl && set->page->act )
1684 if( bt_fixfence (bt, set, lvl) )
1687 return bt->found = found, 0;
1689 // is this a collapsed root?
1691 if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1692 if( bt_collapseroot (bt, set) )
1695 return bt->found = found, 0;
1697 // return if page is not empty
1699 if( set->page->act ) {
1700 bt_unlockpage(BtLockWrite, set->latch);
1701 bt_unpinlatch (set->latch);
1702 bt_unpinpool (set->pool);
1703 return bt->found = found, 0;
1706 // cache copy of fence key
1707 // to post in parent
1709 ptr = keyptr(set->page, set->page->cnt);
1710 memcpy (lowerfence, ptr, ptr->len + 1);
1712 // obtain lock on right page
1714 right->page_no = bt_getid(set->page->right);
1715 right->latch = bt_pinlatch (bt, right->page_no);
1716 bt_lockpage (BtLockWrite, right->latch);
1718 // pin page contents
1720 if( right->pool = bt_pinpool (bt, right->page_no) )
1721 right->page = bt_page (bt, right->pool, right->page_no);
1725 if( right->page->kill )
1726 return bt->err = BTERR_struct;
1728 // pull contents of right peer into our empty page
1730 memcpy (set->page, right->page, bt->mgr->page_size);
1732 // cache copy of key to update
1734 ptr = keyptr(right->page, right->page->cnt);
1735 memcpy (higherfence, ptr, ptr->len + 1);
1737 // mark right page deleted and point it to left page
1738 // until we can post parent updates
1740 bt_putid (right->page->right, set->page_no);
1741 right->page->kill = 1;
1743 bt_lockpage (BtLockParent, right->latch);
1744 bt_unlockpage (BtLockWrite, right->latch);
1746 bt_lockpage (BtLockParent, set->latch);
1747 bt_unlockpage (BtLockWrite, set->latch);
1749 // redirect higher key directly to our new node contents
1751 bt_putid (value, set->page_no);
1753 if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId) )
1756 // delete old lower key to our node
1758 if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1) )
1761 // obtain delete and write locks to right node
1763 bt_unlockpage (BtLockParent, right->latch);
1764 bt_lockpage (BtLockDelete, right->latch);
1765 bt_lockpage (BtLockWrite, right->latch);
1766 bt_freepage (bt, right);
1768 bt_unlockpage (BtLockParent, set->latch);
1769 bt_unpinlatch (set->latch);
1770 bt_unpinpool (set->pool);
1775 // find key in leaf level and return number of value bytes
1776 // or (-1) if not found
1778 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1786 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1787 ptr = keyptr(set->page, slot);
1791 // skip librarian slot place holder
1793 if( slotptr(set->page, slot)->librarian )
1794 ptr = keyptr(set->page, ++slot);
1796 // if key exists, return >= 0 value bytes copied
1797 // otherwise return (-1)
1799 if( !slotptr(set->page, slot)->dead && !keycmp (ptr, key, keylen) ) {
1800 val = valptr (set->page,slot);
1801 if( valmax > val->len )
1803 memcpy (value, val->value, valmax);
1808 bt_unlockpage (BtLockRead, set->latch);
1809 bt_unpinlatch (set->latch);
1810 bt_unpinpool (set->pool);
1814 // check page for space available,
1815 // clean if necessary and return
1816 // 0 - page needs splitting
1817 // >0 new slot value
1819 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1821 uint nxt = bt->mgr->page_size;
1822 uint cnt = 0, idx = 0;
1823 uint max = page->cnt;
1828 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
1831 // skip cleanup and proceed to split
1832 // if there's not enough garbage
1834 if( page->garbage < nxt / 5 )
1837 memcpy (bt->frame, page, bt->mgr->page_size);
1839 // skip page info and set rest of page to zero
1841 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1845 // clean up page first by
1846 // removing deleted keys
1848 while( cnt++ < max ) {
1851 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1854 // copy the value across
1856 val = valptr(bt->frame, cnt);
1857 nxt -= val->len + 1;
1858 ((unsigned char *)page)[nxt] = val->len;
1859 memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
1861 // copy the key across
1863 key = keyptr(bt->frame, cnt);
1864 nxt -= key->len + 1;
1865 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1867 // make a librarian slot
1870 slotptr(page, ++idx)->off = nxt;
1871 slotptr(page, idx)->librarian = 1;
1872 slotptr(page, idx)->dead = 1;
1877 slotptr(page, ++idx)->off = nxt;
1879 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1886 // see if page has enough space now, or does it need splitting?
1888 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
1894 // split the root and raise the height of the btree
1896 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, uid page_no2)
1898 uint nxt = bt->mgr->page_size;
1899 unsigned char value[BtId];
1902 // Obtain an empty page to use, and copy the current
1903 // root contents into it, e.g. lower keys
1905 if( !(left = bt_newpage(bt, root->page)) )
1908 // preserve the page info at the bottom
1909 // of higher keys and set rest to zero
1911 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1913 // insert lower keys page fence key on newroot page as first key
1916 bt_putid (value, left);
1917 ((unsigned char *)root->page)[nxt] = BtId;
1918 memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
1920 nxt -= *leftkey + 1;
1921 memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
1922 slotptr(root->page, 1)->off = nxt;
1924 // insert stopper key on newroot page
1925 // and increase the root height
1927 nxt -= 3 + BtId + 1;
1928 ((unsigned char *)root->page)[nxt] = 2;
1929 ((unsigned char *)root->page)[nxt+1] = 0xff;
1930 ((unsigned char *)root->page)[nxt+2] = 0xff;
1932 bt_putid (value, page_no2);
1933 ((unsigned char *)root->page)[nxt+3] = BtId;
1934 memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
1935 slotptr(root->page, 2)->off = nxt;
1937 bt_putid(root->page->right, 0);
1938 root->page->min = nxt; // reset lowest used offset and key count
1939 root->page->cnt = 2;
1940 root->page->act = 2;
1943 // release and unpin root
1945 bt_unlockpage(BtLockWrite, root->latch);
1946 bt_unpinlatch (root->latch);
1947 bt_unpinpool (root->pool);
1951 // split already locked full node
1954 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1956 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1957 unsigned char fencekey[256], rightkey[256];
1958 unsigned char value[BtId];
1959 uint lvl = set->page->lvl;
1965 // split higher half of keys to bt->frame
1967 memset (bt->frame, 0, bt->mgr->page_size);
1968 max = set->page->cnt;
1972 while( cnt++ < max ) {
1973 if( slotptr(set->page, cnt)->dead && cnt < max )
1975 val = valptr(set->page, cnt);
1976 nxt -= val->len + 1;
1977 ((unsigned char *)bt->frame)[nxt] = val->len;
1978 memcpy ((unsigned char *)bt->frame + nxt + 1, val->value, val->len);
1980 key = keyptr(set->page, cnt);
1981 nxt -= key->len + 1;
1982 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1984 // add librarian slot
1987 slotptr(bt->frame, ++idx)->off = nxt;
1988 slotptr(bt->frame, idx)->librarian = 1;
1989 slotptr(bt->frame, idx)->dead = 1;
1994 slotptr(bt->frame, ++idx)->off = nxt;
1996 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2000 // remember existing fence key for new page to the right
2002 memcpy (rightkey, key, key->len + 1);
2004 bt->frame->bits = bt->mgr->page_bits;
2005 bt->frame->min = nxt;
2006 bt->frame->cnt = idx;
2007 bt->frame->lvl = lvl;
2011 if( set->page_no > ROOT_page )
2012 memcpy (bt->frame->right, set->page->right, BtId);
2014 // get new free page and write higher keys to it.
2016 if( !(right->page_no = bt_newpage(bt, bt->frame)) )
2019 // update lower keys to continue in old page
2021 memcpy (bt->frame, set->page, bt->mgr->page_size);
2022 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2023 nxt = bt->mgr->page_size;
2024 set->page->garbage = 0;
2030 if( slotptr(bt->frame, max)->librarian )
2033 // assemble page of smaller keys
2035 while( cnt++ < max ) {
2036 if( slotptr(bt->frame, cnt)->dead )
2038 val = valptr(bt->frame, cnt);
2039 nxt -= val->len + 1;
2040 ((unsigned char *)set->page)[nxt] = val->len;
2041 memcpy ((unsigned char *)set->page + nxt + 1, val->value, val->len);
2043 key = keyptr(bt->frame, cnt);
2044 nxt -= key->len + 1;
2045 memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
2047 // add librarian slot
2050 slotptr(set->page, ++idx)->off = nxt;
2051 slotptr(set->page, idx)->librarian = 1;
2052 slotptr(set->page, idx)->dead = 1;
2057 slotptr(set->page, ++idx)->off = nxt;
2061 // remember fence key for smaller page
2063 memcpy(fencekey, key, key->len + 1);
2065 bt_putid(set->page->right, right->page_no);
2066 set->page->min = nxt;
2067 set->page->cnt = idx;
2069 // if current page is the root page, split it
2071 if( set->page_no == ROOT_page )
2072 return bt_splitroot (bt, set, fencekey, right->page_no);
2074 // insert new fences in their parent pages
2076 right->latch = bt_pinlatch (bt, right->page_no);
2077 bt_lockpage (BtLockParent, right->latch);
2079 bt_lockpage (BtLockParent, set->latch);
2080 bt_unlockpage (BtLockWrite, set->latch);
2082 // insert new fence for reformulated left block of smaller keys
2084 bt_putid (value, set->page_no);
2086 if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId) )
2089 // switch fence for right block of larger keys to new right page
2091 bt_putid (value, right->page_no);
2093 if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId) )
2096 bt_unlockpage (BtLockParent, set->latch);
2097 bt_unpinlatch (set->latch);
2098 bt_unpinpool (set->pool);
2100 bt_unlockpage (BtLockParent, right->latch);
2101 bt_unpinlatch (right->latch);
2105 // install new key and value onto page
2106 // page must already be checked for
2109 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen)
2111 uint idx, librarian;
2114 // if found slot > desired slot and previous slot
2115 // is a librarian slot, use it
2118 if( slotptr(set->page, slot-1)->librarian )
2121 // copy value onto page
2123 set->page->min -= vallen + 1; // reset lowest used offset
2124 ((unsigned char *)set->page)[set->page->min] = vallen;
2125 memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
2127 // copy key onto page
2129 set->page->min -= keylen + 1; // reset lowest used offset
2130 ((unsigned char *)set->page)[set->page->min] = keylen;
2131 memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
2133 // find first empty slot
2135 for( idx = slot; idx < set->page->cnt; idx++ )
2136 if( slotptr(set->page, idx)->dead )
2139 // now insert key into array before slot
2141 if( idx == set->page->cnt )
2142 idx += 2, set->page->cnt += 2, librarian = 2;
2148 while( idx > slot + librarian - 1 )
2149 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2151 // add librarian slot
2153 if( librarian > 1 ) {
2154 node = slotptr(set->page, slot++);
2155 node->off = set->page->min;
2156 node->librarian = 1;
2162 node = slotptr(set->page, slot);
2163 node->off = set->page->min;
2164 node->librarian = 0;
2167 bt_unlockpage (BtLockWrite, set->latch);
2168 bt_unpinlatch (set->latch);
2169 bt_unpinpool (set->pool);
2173 // Insert new key into the btree at given level.
2174 // either add a new key or update/add an existing one
2176 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen)
2185 while( 1 ) { // find the page and slot for the current key
2186 if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
2187 ptr = keyptr(set->page, slot);
2190 bt->err = BTERR_ovflw;
2194 // if librarian slot == found slot, advance to real slot
2196 if( slotptr(set->page, slot)->librarian )
2197 if( !keycmp (ptr, key, keylen) )
2198 ptr = keyptr(set->page, ++slot);
2200 // check for adequate space on the page
2201 // and insert the new key before slot.
2203 if( keycmp (ptr, key, keylen) ) {
2204 if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
2205 if( bt_splitpage (bt, set) )
2210 return bt_insertslot (bt, set, slot, key, keylen, value, vallen);
2213 // if key already exists, update value and return
2215 if( val = valptr(set->page, slot), val->len >= vallen ) {
2216 if( slotptr(set->page, slot)->dead )
2218 set->page->garbage += val->len - vallen;
2219 slotptr(set->page, slot)->dead = 0;
2221 memcpy (val->value, value, vallen);
2222 bt_unlockpage(BtLockWrite, set->latch);
2223 bt_unpinlatch (set->latch);
2224 bt_unpinpool (set->pool);
2228 // new update value doesn't fit in existing value area
2230 if( !slotptr(set->page, slot)->dead )
2231 set->page->garbage += val->len + ptr->len + 2;
2233 slotptr(set->page, slot)->dead = 0;
2237 if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
2238 if( bt_splitpage (bt, set) )
2243 set->page->min -= vallen + 1;
2244 ((unsigned char *)set->page)[set->page->min] = vallen;
2245 memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen);
2247 set->page->min -= keylen + 1;
2248 ((unsigned char *)set->page)[set->page->min] = keylen;
2249 memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen);
2251 slotptr(set->page, slot)->off = set->page->min;
2252 bt_unlockpage(BtLockWrite, set->latch);
2253 bt_unpinlatch (set->latch);
2254 bt_unpinpool (set->pool);
2260 // cache page of keys into cursor and return starting slot for given key
2262 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2267 // cache page for retrieval
2269 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2270 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2274 bt->cursor_page = set->page_no;
2276 bt_unlockpage(BtLockRead, set->latch);
2277 bt_unpinlatch (set->latch);
2278 bt_unpinpool (set->pool);
2282 // return next slot for cursor page
2283 // or slide cursor right into next page
2285 uint bt_nextkey (BtDb *bt, uint slot)
2291 right = bt_getid(bt->cursor->right);
2293 while( slot++ < bt->cursor->cnt )
2294 if( slotptr(bt->cursor,slot)->dead )
2296 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2304 bt->cursor_page = right;
2306 if( set->pool = bt_pinpool (bt, right) )
2307 set->page = bt_page (bt, set->pool, right);
2311 set->latch = bt_pinlatch (bt, right);
2312 bt_lockpage(BtLockRead, set->latch);
2314 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2316 bt_unlockpage(BtLockRead, set->latch);
2317 bt_unpinlatch (set->latch);
2318 bt_unpinpool (set->pool);
2326 BtKey bt_key(BtDb *bt, uint slot)
2328 return keyptr(bt->cursor, slot);
2331 BtVal bt_val(BtDb *bt, uint slot)
2333 return valptr(bt->cursor,slot);
2339 double getCpuTime(int type)
2342 FILETIME xittime[1];
2343 FILETIME systime[1];
2344 FILETIME usrtime[1];
2345 SYSTEMTIME timeconv[1];
2348 memset (timeconv, 0, sizeof(SYSTEMTIME));
2352 GetSystemTimeAsFileTime (xittime);
2353 FileTimeToSystemTime (xittime, timeconv);
2354 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2357 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2358 FileTimeToSystemTime (usrtime, timeconv);
2361 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2362 FileTimeToSystemTime (systime, timeconv);
2366 ans += (double)timeconv->wHour * 3600;
2367 ans += (double)timeconv->wMinute * 60;
2368 ans += (double)timeconv->wSecond;
2369 ans += (double)timeconv->wMilliseconds / 1000;
2374 #include <sys/resource.h>
2376 double getCpuTime(int type)
2378 struct rusage used[1];
2379 struct timeval tv[1];
2383 gettimeofday(tv, NULL);
2384 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2387 getrusage(RUSAGE_SELF, used);
2388 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2391 getrusage(RUSAGE_SELF, used);
2392 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2399 uint bt_latchaudit (BtDb *bt)
2401 ushort idx, hashidx;
2408 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2410 if( *(ushort *)(bt->mgr->latchmgr->lock) )
2411 fprintf(stderr, "Alloc page locked\n");
2412 *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2414 for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2415 latch = bt->mgr->latchsets + idx;
2416 if( *latch->readwr->rin & MASK )
2417 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2418 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2420 if( *latch->access->rin & MASK )
2421 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2422 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2424 if( *latch->parent->rin & MASK )
2425 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2426 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2429 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2434 for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2435 if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2436 fprintf(stderr, "hash entry %d locked\n", hashidx);
2438 *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2440 if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2441 latch = bt->mgr->latchsets + idx;
2442 if( *(ushort *)latch->busy )
2443 fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2444 *(ushort *)latch->busy = 0;
2446 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2447 } while( idx = latch->next );
2450 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2451 page_no = LEAF_page;
2453 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2454 off64_t off = page_no << bt->mgr->page_bits;
2456 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2460 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2462 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2463 fprintf(stderr, "page %.8x unable to read\n", page_no);
2465 if( *amt < bt->mgr->page_size )
2466 fprintf(stderr, "page %.8x unable to read\n", page_no);
2468 if( !bt->frame->free ) {
2469 for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2470 ptr = keyptr(bt->frame, idx+1);
2471 if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2472 fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2474 if( !bt->frame->lvl )
2475 cnt += bt->frame->act;
2477 if( page_no > LEAF_page )
2492 // standalone program to index file of keys
2493 // then list them onto std-out
2496 void *index_file (void *arg)
2498 uint __stdcall index_file (void *arg)
2501 int line = 0, found = 0, cnt = 0;
2502 uid next, page_no = LEAF_page; // start on first page of leaves
2503 unsigned char key[256];
2504 ThreadArg *args = arg;
2505 int ch, len = 0, slot;
2511 bt = bt_open (args->mgr);
2513 switch(args->type[0] | 0x20)
2516 fprintf(stderr, "started latch mgr audit\n");
2517 cnt = bt_latchaudit (bt);
2518 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2522 fprintf(stderr, "started pennysort for %s\n", args->infile);
2523 if( in = fopen (args->infile, "rb") )
2524 while( ch = getc(in), ch != EOF )
2529 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10) )
2530 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2533 else if( len < 255 )
2535 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2539 fprintf(stderr, "started indexing for %s\n", args->infile);
2540 if( in = fopen (args->infile, "rb") )
2541 while( ch = getc(in), ch != EOF )
2546 if( args->num == 1 )
2547 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2549 else if( args->num )
2550 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2552 if( bt_insertkey (bt, key, len, 0, NULL, 0) )
2553 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2556 else if( len < 255 )
2558 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2562 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2563 if( in = fopen (args->infile, "rb") )
2564 while( ch = getc(in), ch != EOF )
2568 if( args->num == 1 )
2569 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2571 else if( args->num )
2572 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2574 if( bt_deletekey (bt, key, len, 0) )
2575 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2579 else if( len < 255 )
2581 fprintf(stderr, "finished %s for %d keys, found %d \n", args->infile, line, found);
2585 fprintf(stderr, "started finding keys for %s\n", args->infile);
2586 if( in = fopen (args->infile, "rb") )
2587 while( ch = getc(in), ch != EOF )
2591 if( args->num == 1 )
2592 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2594 else if( args->num )
2595 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2597 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2600 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2603 else if( len < 255 )
2605 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2609 fprintf(stderr, "started scanning\n");
2611 if( set->pool = bt_pinpool (bt, page_no) )
2612 set->page = bt_page (bt, set->pool, page_no);
2615 set->latch = bt_pinlatch (bt, page_no);
2616 bt_lockpage (BtLockRead, set->latch);
2617 next = bt_getid (set->page->right);
2619 for( slot = 0; slot++ < set->page->cnt; )
2620 if( next || slot < set->page->cnt )
2621 if( !slotptr(set->page, slot)->dead ) {
2622 ptr = keyptr(set->page, slot);
2623 fwrite (ptr->key, ptr->len, 1, stdout);
2624 fputc ('\n', stdout);
2628 bt_unlockpage (BtLockRead, set->latch);
2629 bt_unpinlatch (set->latch);
2630 bt_unpinpool (set->pool);
2631 } while( page_no = next );
2633 fprintf(stderr, " Total keys read %d\n", cnt);
2638 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2640 fprintf(stderr, "started counting\n");
2641 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2642 page_no = LEAF_page;
2644 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2645 uid off = page_no << bt->mgr->page_bits;
2647 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2651 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2653 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2654 return bt->err = BTERR_map;
2656 if( *amt < bt->mgr->page_size )
2657 return bt->err = BTERR_map;
2659 if( !bt->frame->free && !bt->frame->lvl )
2660 cnt += bt->frame->act;
2661 if( page_no > LEAF_page )
2666 cnt--; // remove stopper key
2667 fprintf(stderr, " Total keys read %d\n", cnt);
2679 typedef struct timeval timer;
2681 int main (int argc, char **argv)
2683 int idx, cnt, len, slot, err;
2684 int segsize, bits = 16;
2701 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2702 fprintf (stderr, " where page_bits is the page size in bits\n");
2703 fprintf (stderr, " mapped_segments is the number of mmap segments in buffer pool\n");
2704 fprintf (stderr, " seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2705 fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n");
2706 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
2710 start = getCpuTime(0);
2713 bits = atoi(argv[3]);
2716 poolsize = atoi(argv[4]);
2719 fprintf (stderr, "Warning: no mapped_pool\n");
2721 if( poolsize > 65535 )
2722 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2725 segsize = atoi(argv[5]);
2727 segsize = 4; // 16 pages per mmap segment
2730 num = atoi(argv[6]);
2734 threads = malloc (cnt * sizeof(pthread_t));
2736 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2738 args = malloc (cnt * sizeof(ThreadArg));
2740 mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2743 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2749 for( idx = 0; idx < cnt; idx++ ) {
2750 args[idx].infile = argv[idx + 7];
2751 args[idx].type = argv[2];
2752 args[idx].mgr = mgr;
2753 args[idx].num = num;
2754 args[idx].idx = idx;
2756 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2757 fprintf(stderr, "Error creating thread %d\n", err);
2759 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2763 // wait for termination
2766 for( idx = 0; idx < cnt; idx++ )
2767 pthread_join (threads[idx], NULL);
2769 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2771 for( idx = 0; idx < cnt; idx++ )
2772 CloseHandle(threads[idx]);
2775 elapsed = getCpuTime(0) - start;
2776 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2777 elapsed = getCpuTime(1);
2778 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2779 elapsed = getCpuTime(2);
2780 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);