1 // btree version threadskv2 sched_yield version
2 // with reworked bt_deletekey code
3 // phase-fair reader writer lock
4 // generalized key-value interface
6 // reworked btree node as red/black binomial tree
9 // author: karl malbrain, malbrain@cal.berkeley.edu
12 This work, including the source code, documentation
13 and related data, is placed into the public domain.
15 The orginal author is Karl Malbrain.
17 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
18 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
19 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
20 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
21 RESULTING FROM THE USE, MODIFICATION, OR
22 REDISTRIBUTION OF THIS SOFTWARE.
25 // Please see the project home page for documentation
26 // code.google.com/p/high-concurrency-btree
28 #define _FILE_OFFSET_BITS 64
29 #define _LARGEFILE64_SOURCE
44 #define WIN32_LEAN_AND_MEAN
57 typedef unsigned long long uid;
60 typedef unsigned long long off64_t;
61 typedef unsigned short ushort;
62 typedef unsigned int uint;
65 #define BT_latchtable 128 // number of latch manager slots
67 #define BT_ro 0x6f72 // ro
68 #define BT_rw 0x7772 // rw
70 #define BT_maxbits 24 // maximum page size in bits
71 #define BT_minbits 9 // minimum page size in bits
72 #define BT_minpage (1 << BT_minbits) // minimum page size
73 #define BT_maxpage (1 << BT_maxbits) // maximum page size
75 #define BT_binomial 5 // number of levels to emit together
77 There are five lock types for each node in three independent sets:
78 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
79 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
80 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
81 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
82 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
93 // definition for phase-fair reader/writer lock implementation
107 // definition for spin latch implementation
109 // exclusive is set for write access
110 // share is count of read accessors
111 // grant write lock when share == 0
113 volatile typedef struct {
124 // hash table entries
127 BtSpinLatch latch[1];
128 volatile ushort slot; // Latch table entry at head of chain
131 // latch manager table structure
134 RWLock readwr[1]; // read/write page lock
135 RWLock access[1]; // Access Intent/Page delete
136 RWLock parent[1]; // Posting of fence key in parent
137 BtSpinLatch busy[1]; // slot is being moved between chains
138 volatile ushort next; // next entry in hash table chain
139 volatile ushort prev; // prev entry in hash table chain
140 volatile ushort pin; // number of outstanding locks
141 volatile ushort hash; // hash slot entry is under
142 volatile uid page_no; // latch set page number
145 // Define the length of the page and key pointers
149 // Page key slot definition.
151 // Keys are marked dead, but remain on the page until
152 // cleanup is called. The fence key (highest key) for
153 // a leaf page is always present, even after cleanup.
156 uint off:BT_maxbits; // page offset for key start
157 uint fence:1; // is tree node the fence key?
158 uint red:1; // is tree node red?
159 uint dead:1; // set for deleted key
160 uint left, right; // next nodes down
163 // The key structure occupies space at the upper end of
164 // each page. It's a length byte followed by the key
169 unsigned char key[1];
172 // the value structure also occupies space at the upper
177 unsigned char value[1];
180 // The first part of an index page.
181 // It is immediately followed
182 // by the BtSlot array of keys.
184 typedef struct BtPage_ {
185 uint cnt; // count of keys in page
186 uint act; // count of active keys
187 uint min; // next key offset
188 uint root; // slot of root node
189 unsigned char bits:7; // page size in bits
190 unsigned char free:1; // page is on free chain
191 unsigned char lvl:6; // level of page
192 unsigned char kill:1; // page is being deleted
193 unsigned char dirty:1; // page has deleted keys
194 unsigned char right[BtId]; // page number to right
197 // The memory mapping pool table buffer manager entry
200 uid basepage; // mapped base page number
201 char *map; // mapped memory pointer
202 ushort slot; // slot index in this array
203 ushort pin; // mapped page pin counter
204 void *hashprev; // previous pool entry for the same hash idx
205 void *hashnext; // next pool entry for the same hash idx
207 HANDLE hmap; // Windows memory mapping handle
211 #define CLOCK_bit 0x8000 // bit in pool->pin
213 // The loadpage interface object
216 uid page_no; // current page number
217 BtPage page; // current page pointer
218 BtPool *pool; // current page pool
219 BtLatchSet *latch; // current page latch set
222 // structure for latch manager on ALLOC_page
225 struct BtPage_ alloc[1]; // next page_no in right ptr
226 unsigned char chain[BtId]; // head of free page_nos chain
227 BtSpinLatch lock[1]; // allocation area lite latch
228 ushort latchdeployed; // highest number of latch entries deployed
229 ushort nlatchpage; // number of latch pages at BT_latch
230 ushort latchtotal; // number of page latch entries
231 ushort latchhash; // number of latch hash table slots
232 ushort latchvictim; // next latch entry to examine
233 BtHashEntry table[0]; // the hash table
236 // The object structure for Btree access
239 uint page_size; // page size
240 uint page_bits; // page size in bits
241 uint seg_bits; // seg size in pages in bits
242 uint mode; // read-write mode
248 ushort poolcnt; // highest page pool node in use
249 ushort poolmax; // highest page pool node allocated
250 ushort poolmask; // total number of pages in mmap segment - 1
251 ushort hashsize; // size of Hash Table for pool entries
252 volatile uint evicted; // last evicted hash table slot
253 ushort *hash; // pool index for hash entries
254 BtSpinLatch *latch; // latches for hash table slots
255 BtLatchMgr *latchmgr; // mapped latch page from allocation page
256 BtLatchSet *latchsets; // mapped latch set from latch pages
257 BtPool *pool; // memory pool page segments
259 HANDLE halloc; // allocation and latch table handle
263 // red-black tree descent stack
266 uint slot:BT_maxbits;
267 int cmp:2; // comparison result
271 int lvl; // height of the stack
272 int ge; // last node that is >= given node
273 BtPathEntry entry[BT_maxbits+2]; // stacked tree descent
277 BtMgr *mgr; // buffer manager for thread
278 unsigned char *mem; // frame, cursor, page memory buffer
279 BtPathStk path[1]; // cached frame path stack for begin/next
280 BtPage cursor; // cached frame for start/next (never mapped)
281 BtPage frame; // spare frame for the page split (never mapped)
282 uint *que; // binomial key distribution buffer
283 int found; // last delete or insert was found
284 int base; // maximum binomial assignment
285 int err; // last error
299 extern void bt_close (BtDb *bt);
300 extern BtDb *bt_open (BtMgr *mgr);
301 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint stopper);
302 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl, uint stopper);
303 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint vallen);
304 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
305 extern uint bt_nextkey (BtDb *bt);
308 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
309 void bt_mgrclose (BtMgr *mgr);
311 // forward definitions
312 uint bt_rbremovefence (BtPage page, uint slot, BtPathStk *path);
314 // Helper functions to return slot values
315 // from the cursor page.
317 extern BtKey bt_key (BtDb *bt, uint slot);
318 extern BtVal bt_val (BtDb *bt, uint slot);
320 // BTree page number constants
321 #define ALLOC_page 0 // allocation & latch manager hash table
322 #define ROOT_page 1 // root of the btree
323 #define LEAF_page 2 // first page of leaves
324 #define LATCH_page 3 // pages for latch manager
326 // Number of levels to create in a new BTree
330 // The page is allocated from low and hi ends.
331 // The key slots are allocated from the bottom,
332 // and are organized into a balanced binary tree.
333 // The text and value of the key
334 // are allocated from the top. When the two
335 // areas meet, the page is split into two.
337 // A key consists of a length byte, two bytes of
338 // index number (0 - 65534), and up to 253 bytes
339 // of key value. Duplicate keys are discarded.
340 // Associated with each key is a value byte string
341 // containing any value desired.
343 // The b-tree root is always located at page 1.
344 // The first leaf page of level zero is always
345 // located on page 2.
347 // The b-tree pages are linked with next
348 // pointers to facilitate enumerators,
349 // and provide for concurrency.
351 // When to root page fills, it is split in two and
352 // the tree height is raised by a new root at page
353 // one with two keys.
355 // Deleted keys are marked with a dead bit until
356 // page cleanup. The fence key for a leaf node is
359 // Groups of pages called segments from the btree are optionally
360 // cached with a memory mapped pool. A hash table is used to keep
361 // track of the cached segments. This behaviour is controlled
362 // by the cache block size parameter to bt_open.
364 // To achieve maximum concurrency one page is locked at a time
365 // as the tree is traversed to find leaf key in question. The right
366 // page numbers are used in cases where the page is being split,
369 // Page 0 is dedicated to lock for new page extensions,
370 // and chains empty pages together for reuse. It also
371 // contains the latch manager hash table.
373 // The ParentModification lock on a node is obtained to serialize posting
374 // or changing the fence key for a node.
376 // Empty pages are chained together through the ALLOC page and reused.
378 // Access macros to address slot and key values from the page
379 // Page slots use 1 based indexing.
381 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
382 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
383 #define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
385 void bt_putid(unsigned char *dest, uid id)
390 dest[i] = (unsigned char)id, id >>= 8;
393 uid bt_getid(unsigned char *src)
398 for( i = 0; i < BtId; i++ )
399 id <<= 8, id |= *src++;
404 // Phase-Fair reader/writer lock implementation
406 void WriteLock (RWLock *lock)
411 tix = __sync_fetch_and_add (lock->ticket, 1);
413 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
415 // wait for our ticket to come up
417 while( tix != lock->serving[0] )
424 w = PRES | (tix & PHID);
426 r = __sync_fetch_and_add (lock->rin, w);
428 r = _InterlockedExchangeAdd16 (lock->rin, w);
430 while( r != *lock->rout )
438 void WriteRelease (RWLock *lock)
441 __sync_fetch_and_and (lock->rin, ~MASK);
443 _InterlockedAnd16 (lock->rin, ~MASK);
448 void ReadLock (RWLock *lock)
452 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
454 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
457 while( w == (*lock->rin & MASK) )
465 void ReadRelease (RWLock *lock)
468 __sync_fetch_and_add (lock->rout, RINC);
470 _InterlockedExchangeAdd16 (lock->rout, RINC);
474 // Spin Latch Manager
476 // wait until write lock mode is clear
477 // and add 1 to the share count
479 void bt_spinreadlock(BtSpinLatch *latch)
485 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
487 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
489 // see if exclusive request is granted or pending
494 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
496 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
499 } while( sched_yield(), 1 );
501 } while( SwitchToThread(), 1 );
505 // wait for other read and write latches to relinquish
507 void bt_spinwritelock(BtSpinLatch *latch)
513 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
515 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
518 if( !(prev & ~BOTH) )
522 __sync_fetch_and_and ((ushort *)latch, ~XCL);
524 _InterlockedAnd16((ushort *)latch, ~XCL);
527 } while( sched_yield(), 1 );
529 } while( SwitchToThread(), 1 );
533 // try to obtain write lock
535 // return 1 if obtained,
538 int bt_spinwritetry(BtSpinLatch *latch)
543 prev = __sync_fetch_and_or((ushort *)latch, XCL);
545 prev = _InterlockedOr16((ushort *)latch, XCL);
547 // take write access if all bits are clear
550 if( !(prev & ~BOTH) )
554 __sync_fetch_and_and ((ushort *)latch, ~XCL);
556 _InterlockedAnd16((ushort *)latch, ~XCL);
563 void bt_spinreleasewrite(BtSpinLatch *latch)
566 __sync_fetch_and_and((ushort *)latch, ~BOTH);
568 _InterlockedAnd16((ushort *)latch, ~BOTH);
572 // decrement reader count
574 void bt_spinreleaseread(BtSpinLatch *latch)
577 __sync_fetch_and_add((ushort *)latch, -SHARE);
579 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
583 // link latch table entry into latch hash table
585 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
587 BtLatchSet *set = bt->mgr->latchsets + victim;
589 if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
590 bt->mgr->latchsets[set->next].prev = victim;
592 bt->mgr->latchmgr->table[hashidx].slot = victim;
593 set->page_no = page_no;
600 void bt_unpinlatch (BtLatchSet *set)
603 __sync_fetch_and_add(&set->pin, -1);
605 _InterlockedDecrement16 (&set->pin);
609 // find existing latchset or inspire new one
610 // return with latchset pinned
612 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
614 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
615 ushort slot, avail = 0, victim, idx;
618 // obtain read lock on hash table entry
620 bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
622 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
624 set = bt->mgr->latchsets + slot;
625 if( page_no == set->page_no )
627 } while( slot = set->next );
631 __sync_fetch_and_add(&set->pin, 1);
633 _InterlockedIncrement16 (&set->pin);
637 bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
642 // try again, this time with write lock
644 bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
646 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
648 set = bt->mgr->latchsets + slot;
649 if( page_no == set->page_no )
651 if( !set->pin && !avail )
653 } while( slot = set->next );
655 // found our entry, or take over an unpinned one
657 if( slot || (slot = avail) ) {
658 set = bt->mgr->latchsets + slot;
660 __sync_fetch_and_add(&set->pin, 1);
662 _InterlockedIncrement16 (&set->pin);
664 set->page_no = page_no;
665 bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
669 // see if there are any unused entries
671 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
673 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
676 if( victim < bt->mgr->latchmgr->latchtotal ) {
677 set = bt->mgr->latchsets + victim;
679 __sync_fetch_and_add(&set->pin, 1);
681 _InterlockedIncrement16 (&set->pin);
683 bt_latchlink (bt, hashidx, victim, page_no);
684 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
689 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
691 victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
693 // find and reuse previous lock entry
697 victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
699 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
701 // we don't use slot zero
703 if( victim %= bt->mgr->latchmgr->latchtotal )
704 set = bt->mgr->latchsets + victim;
708 // take control of our slot
709 // from other threads
711 if( set->pin || !bt_spinwritetry (set->busy) )
716 // try to get write lock on hash chain
717 // skip entry if not obtained
718 // or has outstanding locks
720 if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
721 bt_spinreleasewrite (set->busy);
726 bt_spinreleasewrite (set->busy);
727 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
731 // unlink our available victim from its hash chain
734 bt->mgr->latchsets[set->prev].next = set->next;
736 bt->mgr->latchmgr->table[idx].slot = set->next;
739 bt->mgr->latchsets[set->next].prev = set->prev;
741 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
743 __sync_fetch_and_add(&set->pin, 1);
745 _InterlockedIncrement16 (&set->pin);
747 bt_latchlink (bt, hashidx, victim, page_no);
748 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
749 bt_spinreleasewrite (set->busy);
754 void bt_mgrclose (BtMgr *mgr)
759 // release mapped pages
760 // note that slot zero is never used
762 for( slot = 1; slot < mgr->poolmax; slot++ ) {
763 pool = mgr->pool + slot;
766 munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
769 FlushViewOfFile(pool->map, 0);
770 UnmapViewOfFile(pool->map);
771 CloseHandle(pool->hmap);
777 munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
778 munmap (mgr->latchmgr, 2 * mgr->page_size);
780 FlushViewOfFile(mgr->latchmgr, 0);
781 UnmapViewOfFile(mgr->latchmgr);
782 CloseHandle(mgr->halloc);
788 free ((void *)mgr->latch);
791 FlushFileBuffers(mgr->idx);
792 CloseHandle(mgr->idx);
793 GlobalFree (mgr->pool);
794 GlobalFree (mgr->hash);
795 GlobalFree ((void *)mgr->latch);
800 // close and release memory
802 void bt_close (BtDb *bt)
809 VirtualFree (bt->mem, 0, MEM_RELEASE);
814 // open/create new btree buffer manager
816 // call with file_name, BT_openmode, bits in page size (e.g. 16),
817 // size of mapped page pool (e.g. 8192)
819 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
821 uint lvl, attr, cacheblk, last, slot, idx;
822 uint nlatchpage, latchhash;
823 unsigned char value[BtId];
824 BtLatchMgr *latchmgr;
833 SYSTEM_INFO sysinfo[1];
836 // determine sanity of page size and buffer pool
838 if( bits > BT_maxbits )
840 else if( bits < BT_minbits )
844 return NULL; // must have buffer pool
847 mgr = calloc (1, sizeof(BtMgr));
849 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
852 return free(mgr), NULL;
854 cacheblk = 4096; // minimum mmap segment size for unix
857 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
858 attr = FILE_ATTRIBUTE_NORMAL;
859 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
861 if( mgr->idx == INVALID_HANDLE_VALUE )
862 return GlobalFree(mgr), NULL;
864 // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
865 GetSystemInfo(sysinfo);
866 cacheblk = sysinfo->dwAllocationGranularity;
870 latchmgr = malloc (BT_maxpage);
873 // read minimum page size to get root info
875 if( size = lseek (mgr->idx, 0L, 2) ) {
876 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
877 bits = latchmgr->alloc->bits;
879 return free(mgr), free(latchmgr), NULL;
880 } else if( mode == BT_ro )
881 return free(latchmgr), bt_mgrclose (mgr), NULL;
883 latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
884 size = GetFileSize(mgr->idx, amt);
887 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
888 return bt_mgrclose (mgr), NULL;
889 bits = latchmgr->alloc->bits;
890 } else if( mode == BT_ro )
891 return bt_mgrclose (mgr), NULL;
894 mgr->page_size = 1 << bits;
895 mgr->page_bits = bits;
897 mgr->poolmax = poolmax;
900 if( cacheblk < mgr->page_size )
901 cacheblk = mgr->page_size;
903 // mask for partial memmaps
905 mgr->poolmask = (cacheblk >> bits) - 1;
907 // see if requested size of pages per memmap is greater
909 if( (1 << segsize) > mgr->poolmask )
910 mgr->poolmask = (1 << segsize) - 1;
914 while( (1 << mgr->seg_bits) <= mgr->poolmask )
917 mgr->hashsize = hashsize;
920 mgr->pool = calloc (poolmax, sizeof(BtPool));
921 mgr->hash = calloc (hashsize, sizeof(ushort));
922 mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
924 mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
925 mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
926 mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
932 // initialize an empty b-tree with latch page, root page, page of leaves
933 // and page(s) of latches
935 memset (latchmgr, 0, 1 << bits);
936 nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1;
937 bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
938 latchmgr->alloc->bits = mgr->page_bits;
940 latchmgr->nlatchpage = nlatchpage;
941 latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
943 // initialize latch manager
945 latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
947 // size of hash table = total number of latchsets
949 if( latchhash > latchmgr->latchtotal )
950 latchhash = latchmgr->latchtotal;
952 latchmgr->latchhash = latchhash;
955 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
956 return bt_mgrclose (mgr), NULL;
958 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
959 return bt_mgrclose (mgr), NULL;
961 if( *amt < mgr->page_size )
962 return bt_mgrclose (mgr), NULL;
965 memset (latchmgr, 0, 1 << bits);
966 latchmgr->alloc->bits = mgr->page_bits;
968 for( lvl=MIN_lvl; lvl--; ) {
969 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
970 slotptr(latchmgr->alloc, 1)->fence = 1;
972 slotptr(latchmgr->alloc, 1)->dead = 1;
973 key = keyptr(latchmgr->alloc, 1);
974 key->len = 2; // create stopper key
978 bt_putid(value, MIN_lvl - lvl + 1);
979 val = valptr(latchmgr->alloc, 1);
980 val->len = lvl ? BtId : 0;
981 memcpy (val->value, value, val->len);
983 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
984 latchmgr->alloc->root = 1;
985 latchmgr->alloc->lvl = lvl;
986 latchmgr->alloc->cnt = 1;
987 latchmgr->alloc->act = 1;
989 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
990 return bt_mgrclose (mgr), NULL;
992 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
993 return bt_mgrclose (mgr), NULL;
995 if( *amt < mgr->page_size )
996 return bt_mgrclose (mgr), NULL;
1000 // clear out latch manager locks
1001 // and rest of pages to round out segment
1003 memset(latchmgr, 0, mgr->page_size);
1006 while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
1008 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
1010 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
1011 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1012 return bt_mgrclose (mgr), NULL;
1013 if( *amt < mgr->page_size )
1014 return bt_mgrclose (mgr), NULL;
1021 // mlock the root page and the latchmgr page
1023 flag = PROT_READ | PROT_WRITE;
1024 mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1025 if( mgr->latchmgr == MAP_FAILED )
1026 return bt_mgrclose (mgr), NULL;
1027 mlock (mgr->latchmgr, 2 * mgr->page_size);
1029 mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1030 if( mgr->latchsets == MAP_FAILED )
1031 return bt_mgrclose (mgr), NULL;
1032 mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1034 flag = PAGE_READWRITE;
1035 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1037 return bt_mgrclose (mgr), NULL;
1039 flag = FILE_MAP_WRITE;
1040 mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1041 if( !mgr->latchmgr )
1042 return GetLastError(), bt_mgrclose (mgr), NULL;
1044 mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1050 VirtualFree (latchmgr, 0, MEM_RELEASE);
1055 // open BTree access method
1056 // based on buffer manager
1058 BtDb *bt_open (BtMgr *mgr)
1060 BtDb *bt = malloc (sizeof(*bt));
1062 memset (bt, 0, sizeof(*bt));
1065 bt->mem = malloc (3 *mgr->page_size);
1067 bt->mem = VirtualAlloc(NULL, 3 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1069 bt->frame = (BtPage)bt->mem;
1070 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1071 bt->que = (uint *)(bt->mem + 2 * mgr->page_size);
1075 // compare two keys, returning > 0, = 0, or < 0
1076 // as the comparison value
1080 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1082 uint len1 = key1->len;
1085 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1086 return ans > 0 ? 1 : -1;
1098 // find segment in pool
1099 // must be called with hashslot idx locked
1100 // return NULL if not there
1101 // otherwise return node
1103 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1108 // compute start of hash chain in pool
1110 if( slot = bt->mgr->hash[idx] )
1111 pool = bt->mgr->pool + slot;
1115 page_no &= ~bt->mgr->poolmask;
1117 while( pool->basepage != page_no )
1118 if( pool = pool->hashnext )
1126 // add segment to hash table
1128 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1133 pool->hashprev = pool->hashnext = NULL;
1134 pool->basepage = page_no & ~bt->mgr->poolmask;
1135 pool->pin = CLOCK_bit + 1;
1137 if( slot = bt->mgr->hash[idx] ) {
1138 node = bt->mgr->pool + slot;
1139 pool->hashnext = node;
1140 node->hashprev = pool;
1143 bt->mgr->hash[idx] = pool->slot;
1146 // map new buffer pool segment to virtual memory
1148 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1150 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1151 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1155 flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1156 pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1157 if( pool->map == MAP_FAILED )
1158 return bt->err = BTERR_map;
1161 flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1162 pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1164 return bt->err = BTERR_map;
1166 flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1167 pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1169 return bt->err = BTERR_map;
1174 // calculate page within pool
1176 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1178 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1181 page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1182 // madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1188 void bt_unpinpool (BtPool *pool)
1191 __sync_fetch_and_add(&pool->pin, -1);
1193 _InterlockedDecrement16 (&pool->pin);
1197 // find or place requested page in segment-pool
1198 // return pool table entry, incrementing pin
1200 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1202 uint slot, hashidx, idx, victim;
1203 BtPool *pool, *node, *next;
1205 // lock hash table chain
1207 hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1208 bt_spinwritelock (&bt->mgr->latch[hashidx]);
1210 // look up in hash table
1212 if( pool = bt_findpool(bt, page_no, hashidx) ) {
1214 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1215 __sync_fetch_and_add(&pool->pin, 1);
1217 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1218 _InterlockedIncrement16 (&pool->pin);
1220 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1224 // allocate a new pool node
1225 // and add to hash table
1228 slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1230 slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1233 if( ++slot < bt->mgr->poolmax ) {
1234 pool = bt->mgr->pool + slot;
1237 if( bt_mapsegment(bt, pool, page_no) )
1240 bt_linkhash(bt, pool, page_no, hashidx);
1241 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1245 // pool table is full
1246 // find best pool entry to evict
1249 __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1251 _InterlockedDecrement16 (&bt->mgr->poolcnt);
1256 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1258 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1260 victim %= bt->mgr->poolmax;
1261 pool = bt->mgr->pool + victim;
1262 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1267 // try to get write lock
1268 // skip entry if not obtained
1270 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1273 // skip this entry if
1275 // or clock bit is set
1279 __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1281 _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1283 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1287 // unlink victim pool node from hash table
1289 if( node = pool->hashprev )
1290 node->hashnext = pool->hashnext;
1291 else if( node = pool->hashnext )
1292 bt->mgr->hash[idx] = node->slot;
1294 bt->mgr->hash[idx] = 0;
1296 if( node = pool->hashnext )
1297 node->hashprev = pool->hashprev;
1299 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1301 // remove old file mapping
1303 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1305 // FlushViewOfFile(pool->map, 0);
1306 UnmapViewOfFile(pool->map);
1307 CloseHandle(pool->hmap);
1311 // create new pool mapping
1312 // and link into hash table
1314 if( bt_mapsegment(bt, pool, page_no) )
1317 bt_linkhash(bt, pool, page_no, hashidx);
1318 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1323 // place write, read, or parent lock on requested page_no.
1325 void bt_lockpage(BtLock mode, BtLatchSet *set)
1329 ReadLock (set->readwr);
1332 WriteLock (set->readwr);
1335 ReadLock (set->access);
1338 WriteLock (set->access);
1341 WriteLock (set->parent);
1346 // remove write, read, or parent lock on requested page
1348 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1352 ReadRelease (set->readwr);
1355 WriteRelease (set->readwr);
1358 ReadRelease (set->access);
1361 WriteRelease (set->access);
1364 WriteRelease (set->parent);
1369 // allocate a new page and write page into it
1371 uid bt_newpage(BtDb *bt, BtPage page)
1377 // lock allocation page
1379 bt_spinwritelock(bt->mgr->latchmgr->lock);
1381 // use empty chain first
1382 // else allocate empty page
1384 if( new_page = bt_getid(bt->mgr->latchmgr->chain) ) {
1385 if( set->pool = bt_pinpool (bt, new_page) )
1386 set->page = bt_page (bt, set->pool, new_page);
1390 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1391 bt_unpinpool (set->pool);
1393 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1394 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1396 // if writing first page of pool block, set file length thru last page
1398 if( (new_page & bt->mgr->poolmask) == 0 )
1399 ftruncate (bt->mgr->idx, (new_page + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1403 // unlock allocation latch
1405 bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1408 // bring new page into pool and copy page.
1409 // this will extend the file into the new pages on WIN32.
1411 if( set->pool = bt_pinpool (bt, new_page) )
1412 set->page = bt_page (bt, set->pool, new_page);
1416 memcpy(set->page, page, bt->mgr->page_size);
1417 bt_unpinpool (set->pool);
1420 // unlock allocation latch
1422 bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1427 // find slot in page for given key at a given level
1429 uint bt_findslot (BtPage page, BtPage src, unsigned char *key, uint len, BtPathStk *path, uint stopper)
1437 if( slot = page->root ) do {
1438 path->entry[++path->lvl].slot = slot;
1439 node = slotptr(page,slot);
1441 if( node->fence && !bt_getid(page->right) && stopper )
1442 path->entry[path->lvl].cmp = 0;
1443 else if( node->fence && !bt_getid(page->right) )
1444 path->entry[path->lvl].cmp = 1;
1446 path->entry[path->lvl].cmp = -1;
1448 path->entry[path->lvl].cmp = keycmp (keyptr(src, slot), key, len);
1450 if( path->entry[path->lvl].cmp == 0 )
1451 return path->ge = path->lvl, slot;
1453 if( path->entry[path->lvl].cmp > 0 )
1454 path->ge = path->lvl, slot = node->left;
1457 } while( slot && path->lvl < BT_maxbits );
1461 return path->entry[path->lvl].slot;
1464 // return next slot on the page using the path stack
1466 uint bt_nextslot (BtPage page, BtPathStk *path)
1471 slot = path->entry[path->lvl].slot;
1472 node = slotptr(page,slot);
1474 if( slot = node->right ) {
1475 path->entry[path->lvl++].cmp = -1;
1476 path->entry[path->lvl].slot = slot;
1478 while( slot = slotptr(page,slot)->left ) {
1479 path->entry[path->lvl++].cmp = 1;
1480 path->entry[path->lvl].slot = slot;
1483 return path->entry[path->lvl].slot;
1487 if( path->entry[--path->lvl].cmp > 0 )
1488 return path->entry[path->lvl].slot;
1493 // find and load page at given level for given key
1494 // leave page rd or wr locked as requested
1496 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, BtPathStk *path, uint stopper)
1498 uid page_no = ROOT_page, prevpage = 0;
1499 uint drill = 0xff, slot;
1500 BtLatchSet *prevlatch;
1501 uint mode, prevmode;
1504 // start at root of btree and drill down
1507 // determine lock mode of drill level
1508 mode = (drill == lvl) ? lock : BtLockRead;
1510 set->latch = bt_pinlatch (bt, page_no);
1511 set->page_no = page_no;
1513 // pin page contents
1515 if( set->pool = bt_pinpool (bt, page_no) )
1516 set->page = bt_page (bt, set->pool, page_no);
1520 // obtain access lock using lock chaining with Access mode
1522 if( page_no > ROOT_page )
1523 bt_lockpage(BtLockAccess, set->latch);
1525 // release & unpin parent page
1528 bt_unlockpage(prevmode, prevlatch);
1529 bt_unpinlatch (prevlatch);
1530 bt_unpinpool (prevpool);
1534 // obtain read lock using lock chaining
1536 bt_lockpage(mode, set->latch);
1538 if( set->page->free )
1539 return bt->err = BTERR_struct, 0;
1541 if( page_no > ROOT_page )
1542 bt_unlockpage(BtLockAccess, set->latch);
1544 // re-read and re-lock root after determining actual level of root
1546 if( set->page->lvl != drill) {
1547 if( set->page_no != ROOT_page )
1548 return bt->err = BTERR_struct, 0;
1550 drill = set->page->lvl;
1552 if( lock != BtLockRead && drill == lvl ) {
1553 bt_unlockpage(mode, set->latch);
1554 bt_unpinlatch (set->latch);
1555 bt_unpinpool (set->pool);
1560 prevpage = set->page_no;
1561 prevlatch = set->latch;
1562 prevpool = set->pool;
1565 // find key on page at this level
1566 // and descend to requested level
1568 if( set->page->kill )
1571 slot = bt_findslot (set->page, set->page, key, len, path, stopper);
1579 slot = path->entry[path->ge].slot;
1580 path->lvl = path->ge;
1582 while( slotptr(set->page, slot)->dead )
1583 if( slot = bt_nextslot (set->page, path) )
1588 page_no = bt_getid(valptr(set->page, slot)->value);
1592 // or slide right into next page
1595 page_no = bt_getid(set->page->right);
1599 // return error on end of right chain
1601 bt->err = BTERR_struct;
1602 return 0; // return error
1605 // return page to free list
1606 // page must be delete & write locked
1608 void bt_freepage (BtDb *bt, BtPageSet *set)
1610 // lock allocation page
1612 bt_spinwritelock (bt->mgr->latchmgr->lock);
1615 memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1616 bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1617 set->page->free = 1;
1619 // unlock released page
1621 bt_unlockpage (BtLockDelete, set->latch);
1622 bt_unlockpage (BtLockWrite, set->latch);
1623 bt_unpinlatch (set->latch);
1624 bt_unpinpool (set->pool);
1626 // unlock allocation page
1628 bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1631 // a fence key was deleted from a page
1632 // push new fence value upwards in the btree
1634 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1636 unsigned char leftkey[256], rightkey[256];
1637 unsigned char value[BtId];
1643 // remove the old fence value
1645 fence = set->page->root;
1646 slot = set->page->root;
1649 do path->entry[++path->lvl].slot = fence = slot;
1650 while( slot = slotptr(set->page,slot)->right );
1652 ptr = keyptr(set->page, fence);
1653 memcpy (rightkey, ptr, ptr->len + 1);
1655 do fence = bt_rbremovefence (set->page, fence, path);
1656 while( slotptr(set->page,fence)->dead );
1658 set->page->dirty = 1;
1660 ptr = keyptr(set->page, fence);
1661 memcpy (leftkey, ptr, ptr->len + 1);
1662 page_no = set->page_no;
1664 bt_lockpage (BtLockParent, set->latch);
1665 bt_unlockpage (BtLockWrite, set->latch);
1667 // insert new (now smaller) fence key
1669 bt_putid (value, page_no);
1671 if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId, 0) )
1674 // now delete old fence key
1676 if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1, !bt_getid (set->page->right)) )
1679 bt_unlockpage (BtLockParent, set->latch);
1680 bt_unpinlatch(set->latch);
1681 bt_unpinpool (set->pool);
1685 // root has a single child
1686 // collapse a level from the tree
1688 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1693 // find the child entry and promote as new root contents
1696 slot = root->page->root;
1697 child->page_no = bt_getid (valptr(root->page, slot)->value);
1699 child->latch = bt_pinlatch (bt, child->page_no);
1700 bt_lockpage (BtLockDelete, child->latch);
1701 bt_lockpage (BtLockWrite, child->latch);
1703 if( child->pool = bt_pinpool (bt, child->page_no) )
1704 child->page = bt_page (bt, child->pool, child->page_no);
1708 memcpy (root->page, child->page, bt->mgr->page_size);
1709 bt_freepage (bt, child);
1711 } while( root->page->lvl > 1 && root->page->act == 1 );
1713 bt_unlockpage (BtLockWrite, root->latch);
1714 bt_unpinlatch (root->latch);
1715 bt_unpinpool (root->pool);
1719 // find and delete key on page by marking delete flag bit
1720 // if page becomes empty, delete it from the btree
1722 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl, uint stopper)
1724 unsigned char lowerfence[256], higherfence[256];
1725 uint slot, idx, dirty = 0, fence, found;
1726 BtPageSet set[1], right[1];
1727 unsigned char value[BtId];
1733 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite, path, stopper) )
1734 node = slotptr(set->page, slot);
1738 // if key is found delete it, otherwise ignore request
1740 if( found = !path->entry[path->lvl].cmp )
1741 if( found = node->dead == 0 ) {
1742 dirty = node->dead = 1;
1743 set->page->dirty = 1;
1747 // did we delete a fence key in an upper level?
1749 if( lvl && node->fence )
1750 if( dirty && set->page->act )
1751 if( bt_fixfence (bt, set, lvl) )
1754 return bt->found = found, 0;
1756 // is this a collapsed root?
1758 if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1759 if( bt_collapseroot (bt, set) )
1762 return bt->found = found, 0;
1764 // return if page is not empty
1766 if( set->page->act ) {
1767 bt_unlockpage(BtLockWrite, set->latch);
1768 bt_unpinlatch (set->latch);
1769 bt_unpinpool (set->pool);
1770 return bt->found = found, 0;
1773 // cache copy of fence key
1774 // to post in parent
1776 fence = set->page->root;
1777 slot = set->page->root;
1779 while( slot = slotptr(set->page,slot)->right )
1782 ptr = keyptr(set->page, fence);
1783 memcpy (lowerfence, ptr, ptr->len + 1);
1785 // obtain lock on right page
1787 right->page_no = bt_getid(set->page->right);
1788 right->latch = bt_pinlatch (bt, right->page_no);
1789 bt_lockpage (BtLockWrite, right->latch);
1791 // pin page contents
1793 if( right->pool = bt_pinpool (bt, right->page_no) )
1794 right->page = bt_page (bt, right->pool, right->page_no);
1798 if( right->page->kill )
1799 return bt->err = BTERR_struct;
1801 // pull contents of right peer into our empty page
1803 memcpy (set->page, right->page, bt->mgr->page_size);
1805 // cache copy of key to update
1807 fence = set->page->root;
1808 slot = set->page->root;
1810 while( slot = slotptr(set->page,slot)->right )
1813 ptr = keyptr(right->page, fence);
1814 memcpy (higherfence, ptr, ptr->len + 1);
1816 // mark right page deleted and point it to left page
1817 // until we can post parent updates
1819 bt_putid (right->page->right, set->page_no);
1820 right->page->kill = 1;
1822 bt_lockpage (BtLockParent, right->latch);
1823 bt_unlockpage (BtLockWrite, right->latch);
1825 bt_lockpage (BtLockParent, set->latch);
1826 bt_unlockpage (BtLockWrite, set->latch);
1828 // redirect higher key directly to our new node contents
1830 stopper = !bt_getid (set->page->right);
1831 bt_putid (value, set->page_no);
1833 if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId, stopper) )
1836 // delete old lower key to our node
1838 if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1, 0) )
1841 // obtain delete and write locks to right node
1843 bt_unlockpage (BtLockParent, right->latch);
1844 bt_lockpage (BtLockDelete, right->latch);
1845 bt_lockpage (BtLockWrite, right->latch);
1846 bt_freepage (bt, right);
1848 bt_unlockpage (BtLockParent, set->latch);
1849 bt_unpinlatch (set->latch);
1850 bt_unpinpool (set->pool);
1855 // find key in leaf level and return number of value bytes
1856 // or (-1) if not found
1858 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1867 if( !(slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead, path, 0)) )
1870 // if key exists, return TRUE
1871 // otherwise return FALSE
1873 if( !slotptr(set->page, slot)->dead && !path->entry[path->lvl].cmp ) {
1874 val = valptr (set->page,slot);
1875 if( valmax > val->len )
1877 memcpy (value, val->value, valmax);
1882 bt_unlockpage (BtLockRead, set->latch);
1883 bt_unpinlatch (set->latch);
1884 bt_unpinpool (set->pool);
1888 // left rotate parent node
1890 void bt_leftrotate (BtPage page, uint slot, BtSlot *parent, int cmp)
1892 BtSlot *x = slotptr(page,slot);
1893 uint right = x->right;
1894 BtSlot *y = slotptr(page,right);
1898 if( !parent ) // is x the root node?
1901 parent->left = right;
1903 parent->right = right;
1908 // right rotate parent node
1910 void bt_rightrotate (BtPage page, uint slot, BtSlot *parent, int cmp)
1912 BtSlot *x = slotptr(page,slot);
1913 uint left = x->left;
1914 BtSlot *y = slotptr(page,left);
1918 if( !parent ) // is y the root node?
1921 parent->left = left;
1923 parent->right = left;
1928 // delete fence slot from rbtree at path point
1929 // return with new fence slot
1931 uint bt_rbremovefence (BtPage page, uint slot, BtPathStk *path)
1933 BtSlot *node = slotptr (page,slot), *parent, *sibling, *grand;
1934 uint red = node->red, lvl, fence, idx, left;
1936 if( lvl = path->lvl ) {
1937 parent = slotptr(page,path->entry[lvl - 1].slot);
1938 parent->right = node->left;
1940 page->root = node->left;
1942 if( fence = node->left )
1943 node = slotptr(page,fence);
1946 return path->entry[--path->lvl].slot;
1949 // extend path to new fence
1951 path->entry[page->lvl].slot = fence;
1953 while( slot = slotptr(page, fence)->right )
1954 path->entry[++page->lvl].slot = fence = slot;
1956 slotptr(page,fence)->fence = 1;
1961 while( !node->red && lvl ) {
1962 left = parent->left;
1963 sibling = slotptr(page,left);
1964 if( sibling->red ) {
1968 grand = slotptr(page,path->entry[lvl-2].slot);
1971 bt_rightrotate(page, path->entry[lvl-1].slot, grand, -1);
1972 sibling = slotptr(page,parent->left);
1974 for( idx = ++path->lvl; idx > lvl - 1; idx-- )
1975 path->entry[idx].slot = path->entry[idx-1].slot;
1977 path->entry[idx].slot = left;
1980 if( !sibling->right || !slotptr(page,sibling->right)->red )
1981 if( !sibling->left || !slotptr(page,sibling->left)->red ) {
1989 if( !sibling->left || !slotptr(page,sibling->left)->red ) {
1990 if( sibling->right )
1991 slotptr(page,sibling->right)->red = 0;
1994 bt_leftrotate (page, parent->left, parent, 1);
1995 sibling = slotptr(page,parent->left);
1998 slotptr(page, sibling->left)->red = 0;
1999 sibling->red = parent->red;
2001 bt_rightrotate(page, path->entry[lvl-1].slot, grand, -1);
2005 slotptr(page,page->root)->red = 0;
2009 // insert slot into rbtree at path point
2011 void bt_rbinsert (BtPage page, uint slot, BtPathStk *path)
2013 BtSlot *parent = slotptr(page,path->entry[path->lvl].slot);
2014 BtSlot *uncle, *grand;
2015 int lvl = path->lvl;
2017 if( path->entry[lvl].cmp == 1 )
2018 parent->left = slot;
2020 parent->right = slot;
2022 slotptr(page,slot)->red = 1;
2024 while( lvl > 0 && parent->red ) {
2025 grand = slotptr(page,path->entry[lvl-1].slot);
2027 if( path->entry[lvl-1].cmp == 1 ) { // was grandparent left followed?
2028 uncle = slotptr(page,grand->right);
2029 if( grand->right && uncle->red ) {
2034 // move to grandparent & its parent (if any)
2036 slot = path->entry[--lvl].slot;
2039 parent = slotptr(page,path->entry[--lvl].slot);
2043 // was the parent right link followed?
2044 // if so, left rotate parent
2046 if( path->entry[lvl].cmp == -1 ) {
2047 bt_leftrotate(page, path->entry[lvl].slot, grand, path->entry[lvl-1].cmp);
2048 parent = slotptr(page,slot); // slot was rotated to parent
2054 // get pointer to grandparent's parent
2057 grand = slotptr(page,path->entry[lvl-2].slot);
2061 // right rotate the grandparent slot
2063 slot = path->entry[lvl-1].slot;
2064 bt_rightrotate(page, slot, grand, path->entry[lvl-2].cmp);
2066 } else { // symmetrical case
2067 uncle = slotptr(page,grand->left);
2068 if( grand->left && uncle->red ) {
2073 // move to grandparent & its parent (if any)
2074 slot = path->entry[--lvl].slot;
2077 parent = slotptr(page,path->entry[--lvl].slot);
2081 // was the parent left link followed?
2082 // if so, right rotate parent
2084 if( path->entry[lvl].cmp == 1 ) {
2085 bt_rightrotate(page, path->entry[lvl].slot, grand, path->entry[lvl-1].cmp);
2086 parent = slotptr(page,slot); // slot was rotated to parent
2092 // get pointer to grandparent's parent
2095 grand = slotptr(page,path->entry[lvl-2].slot);
2099 // left rotate the grandparent slot
2101 slot = path->entry[lvl-1].slot;
2102 bt_leftrotate(page, slot, grand, path->entry[lvl-2].cmp);
2109 slotptr(page,page->root)->red = 0;
2112 // transfer a slot from one page to another
2114 void bt_xfrslot (BtPage page, BtPage src, uint slot, BtPathStk *path, uint copykey)
2116 BtKey key = keyptr(src,slot);
2117 BtVal val = valptr(src,slot);
2120 // calculate next available slot and copy key into page
2123 page->min -= val->len + 1; // reset lowest used offset
2124 ((unsigned char *)page)[page->min] = val->len;
2125 memcpy ((unsigned char *)page + page->min +1, val->value, val->len );
2127 page->min -= key->len + 1; // reset lowest used offset
2128 ((unsigned char *)page)[page->min] = key->len;
2129 memcpy ((unsigned char *)page + page->min +1, key->key, key->len );
2132 node = slotptr(page, ++page->cnt);
2133 node->off = copykey ? page->min : slotptr(src,slot)->off;
2134 node->fence = slotptr(src,slot)->fence;
2135 node->dead = slotptr(src,slot)->dead;
2139 if( path->lvl < 0 ) {
2140 page->root = page->cnt;
2144 bt_rbinsert (page, page->cnt, path);
2147 // copy keys across into a binomial tree
2149 void bt_copykeys (BtPage page, uint slot, BtPage frame, uint *que, int lvl)
2151 BtSlot *node = slotptr(page,slot);
2152 BtKey key = ((BtKey)((unsigned char*)(frame) + node->off));
2153 BtVal val = ((BtVal)(key->key + key->len));
2154 uint off, nxt = page->min;
2155 uint right = node->right;
2156 uint left = node->left;
2160 nxt -= val->len + 1;
2161 ((unsigned char *)page)[nxt] = val->len;
2162 memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
2166 nxt -= key->len + 1;
2167 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
2171 // punt if group of keys has filled a 4K VM block
2172 // which we determine by the number of red/black
2173 // levels have been copied across.
2175 if( lvl > BT_binomial ) {
2177 que[++(*que)] = left;
2179 que[++(*que)] = right;
2184 bt_copykeys (page, left, frame, que, lvl+1);
2187 bt_copykeys (page, right, frame, que, lvl+1);
2190 // clean page and rebuild red-black tree
2191 // return 0 - page needs splitting
2192 // >0 cleanup done, try again
2194 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint vallen)
2196 uint max = page->cnt;
2204 // skip cleanup if nothing to reclaim
2209 memcpy (bt->frame, page, bt->mgr->page_size);
2211 // skip page info and set rest of page to zero
2213 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2214 page->min = bt->mgr->page_size;
2220 // try cleaning up page first
2221 // by removing deleted keys
2224 right = bt_getid (bt->frame->right);
2227 while( ++slot <= max ) {
2228 node = slotptr(bt->frame,slot);
2233 if( page->lvl || !node->fence )
2237 // xfr the slot to the page b-heap using keys from bt->frame
2239 key = keyptr(bt->frame, slot);
2240 bt_findslot (page, bt->frame, key->key, key->len, path, node->fence && !right);
2241 bt_xfrslot (page, bt->frame, slot, path, 0);
2244 // now copy keys & values across from bt->frame to page
2245 // in blocks of BT_binomial tree levels
2247 page->min = bt->mgr->page_size;
2248 bt->que[1] = page->root;
2252 do bt_copykeys (page, bt->que[++cnt], bt->frame, bt->que, 0);
2253 while( cnt < *bt->que );
2255 if( page->min < (page->cnt+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
2261 // split the root and raise the height of the btree
2263 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, uid page_no2)
2265 uint nxt = bt->mgr->page_size;
2266 unsigned char value[BtId];
2270 // Obtain an empty page to use, and copy the current
2271 // root contents into it, e.g. lower keys
2273 if( !(left = bt_newpage(bt, root->page)) )
2276 // preserve the page info at the bottom
2277 // of higher keys and set rest to zero
2279 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2281 // insert lower keys page fence key on newroot page as first key
2284 bt_putid (value, left);
2285 ((unsigned char *)root->page)[nxt] = BtId;
2286 memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
2288 nxt -= *leftkey + 1;
2289 memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
2290 node = slotptr(root->page, 1);
2294 // insert stopper key on newroot page
2295 // and increase the root height
2297 nxt -= 3 + BtId + 1;
2298 ((unsigned char *)root->page)[nxt] = 2;
2299 ((unsigned char *)root->page)[nxt+1] = 0xff;
2300 ((unsigned char *)root->page)[nxt+2] = 0xff;
2302 bt_putid (value, page_no2);
2303 ((unsigned char *)root->page)[nxt+3] = BtId;
2304 memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
2305 node = slotptr(root->page, 2);
2310 bt_putid(root->page->right, 0);
2311 root->page->min = nxt; // reset lowest used offset and key count
2312 root->page->root = 2;
2313 root->page->cnt = 2;
2314 root->page->act = 2;
2317 // release and unpin root
2319 bt_unlockpage(BtLockWrite, root->latch);
2320 bt_unpinlatch (root->latch);
2321 bt_unpinpool (root->pool);
2325 // copy sub-tree from one node to the root of another
2326 // return slot number in destination
2328 uint bt_copysubtree (BtDb *bt, BtPage dest, BtPage src, uint idx, uint parent, uint off)
2330 BtSlot *node = slotptr(src, idx);
2333 slot = parent * 2 + off;
2335 if( slot > bt->base )
2338 *slotptr(dest, slot) = *node;
2341 if( child = node->left )
2342 child = bt_copysubtree (bt, dest, src, child, slot, 0);
2344 slotptr(dest, slot)->left = child;
2346 if( child = node->right )
2347 child = bt_copysubtree (bt, dest, src, child, slot, 1);
2349 slotptr(dest, slot)->right = child;
2353 // split already locked full node
2356 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
2358 unsigned char fencekey[256], rightkey[256];
2359 unsigned char value[BtId];
2360 uint lvl = set->page->lvl;
2361 uint prev, fence, stopper;
2369 // split higher half of keys to bt->frame
2371 slot = slotptr(set->page, set->page->root)->right;
2372 memset (bt->frame, 0, bt->mgr->page_size);
2373 bt->base = set->page->cnt / 2;
2374 bt->frame->cnt = bt->base;
2376 bt->frame->root = bt_copysubtree (bt, bt->frame, set->page, slot, 0, 1);
2377 bt->frame->lvl = set->page->lvl;
2379 // now copy keys & values across from page to bt->frame
2380 // in blocks of BT_binomial tree levels
2382 slotptr(bt->frame,bt->frame->root)->red = 0;
2383 bt->frame->min = bt->mgr->page_size;
2384 bt->que[1] = bt->frame->root;
2388 do bt_copykeys (bt->frame, bt->que[++cnt], set->page, bt->que, 0);
2389 while( cnt < *bt->que );
2391 // remember existing fence key for new page to the right
2393 fence = set->page->root;
2394 slot = set->page->root;
2396 while( slot = slotptr(set->page,slot)->right )
2399 key = keyptr(set->page,fence);
2400 memcpy (rightkey, key, key->len + 1);
2402 bt->frame->bits = bt->mgr->page_bits;
2403 bt->frame->lvl = lvl;
2407 if( set->page_no > ROOT_page )
2408 memcpy (bt->frame->right, set->page->right, BtId);
2410 stopper = !bt_getid (bt->frame->right);
2412 // get new free page and write higher keys in bt->frame to it.
2414 if( !(right->page_no = bt_newpage(bt, bt->frame)) )
2417 // update lower keys to continue in old page
2419 memcpy (bt->frame, set->page, bt->mgr->page_size);
2420 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2421 slot = slotptr(bt->frame, bt->frame->root)->left;
2422 set->page->cnt = bt->base;
2423 set->page->dirty = 0;
2426 // assemble page of smaller keys in set->page
2428 set->page->root = bt_copysubtree (bt, set->page, bt->frame, slot, 0, 1);
2429 set->page->min = bt->mgr->page_size;
2431 bt->que[1] = set->page->root;
2435 do bt_copykeys (set->page, bt->que[++cnt], bt->frame, bt->que, 0);
2436 while( cnt < *bt->que );
2438 bt_putid(set->page->right, right->page_no);
2440 // translate old r/b root as new left fence
2442 key = keyptr(bt->frame,bt->frame->root);
2443 bt_findslot (set->page, set->page, key->key, key->len, path, 0);
2444 bt_xfrslot (set->page, bt->frame, bt->frame->root, path, 1);
2446 node = slotptr(set->page,set->page->cnt);
2447 memcpy(fencekey, key, key->len + 1);
2450 // if current page is the root page, split it
2452 if( set->page_no == ROOT_page )
2453 return bt_splitroot (bt, set, fencekey, right->page_no);
2455 // insert new fences in their parent pages
2457 right->latch = bt_pinlatch (bt, right->page_no);
2458 bt_lockpage (BtLockParent, right->latch);
2460 bt_lockpage (BtLockParent, set->latch);
2461 bt_unlockpage (BtLockWrite, set->latch);
2463 // insert new fence for reformulated left block of smaller keys
2465 bt_putid (value, set->page_no);
2467 if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 0) )
2470 // switch fence for right block of larger keys to new right page
2472 bt_putid (value, right->page_no);
2474 if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, stopper) )
2477 bt_unlockpage (BtLockParent, set->latch);
2478 bt_unpinlatch (set->latch);
2479 bt_unpinpool (set->pool);
2481 bt_unlockpage (BtLockParent, right->latch);
2482 bt_unpinlatch (right->latch);
2485 // Insert new key into the btree at given level.
2487 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint stopper)
2498 if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite, path, stopper) )
2499 node = slotptr(set->page, slot);
2502 bt->err = BTERR_ovflw;
2506 // if key already exists, update id and return
2508 if( reuse = !path->entry[path->lvl].cmp )
2509 if( val = valptr(set->page, slot), val->len >= vallen ) {
2514 memcpy (val->value, value, vallen);
2515 bt_unlockpage(BtLockWrite, set->latch);
2516 bt_unpinlatch (set->latch);
2517 bt_unpinpool (set->pool);
2522 set->page->dirty = 1;
2526 // check if page has enough space
2528 if( set->page->min >= (set->page->cnt+1) * sizeof(BtSlot) + sizeof(*set->page) + keylen + 1 + vallen + 1)
2531 if( bt_cleanpage (bt, set->page, keylen, vallen) ) {
2532 bt_unlockpage (BtLockWrite, set->latch);
2533 bt_unpinlatch (set->latch);
2534 bt_unpinpool (set->pool);
2535 continue; // find new slot number
2538 if( bt_splitpage (bt, set) )
2542 // calculate next available slot and copy key into page
2544 set->page->min -= vallen + 1; // reset lowest used offset
2545 ((unsigned char *)set->page)[set->page->min] = vallen;
2546 memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
2548 set->page->min -= keylen + 1; // reset lowest used offset
2549 ((unsigned char *)set->page)[set->page->min] = keylen;
2550 memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
2555 slot = ++set->page->cnt;
2556 node = slotptr(set->page,slot);
2559 node->off = set->page->min;
2563 bt_rbinsert (set->page, slot, path);
2565 bt_unlockpage (BtLockWrite, set->latch);
2566 bt_unpinlatch (set->latch);
2567 bt_unpinpool (set->pool);
2571 BTERR bt_startpage (BtDb *bt, uid page_no)
2577 if( set->pool = bt_pinpool (bt, page_no) )
2578 set->page = bt_page (bt, set->pool, page_no);
2582 set->latch = bt_pinlatch (bt, page_no);
2583 bt_lockpage(BtLockRead, set->latch);
2585 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2587 bt_unlockpage(BtLockRead, set->latch);
2588 bt_unpinlatch (set->latch);
2589 bt_unpinpool (set->pool);
2591 slot = bt->cursor->root;
2595 bt->path->entry[++bt->path->lvl].slot = slot;
2596 bt->path->entry[bt->path->lvl].cmp = 1;
2597 slot = slotptr(bt->cursor, slot)->left;
2598 } while( slot && bt->path->lvl < BT_maxbits );
2600 slot = bt->path->entry[bt->path->lvl].slot;
2602 while( slot && slotptr(bt->cursor,slot)->dead )
2603 slot = bt_nextkey (bt);
2608 // cache page of keys into cursor and return starting slot for given key
2610 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2615 // cache page for retrieval
2617 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead, bt->path, 0) )
2618 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2622 bt_unlockpage(BtLockRead, set->latch);
2623 bt_unpinlatch (set->latch);
2624 bt_unpinpool (set->pool);
2626 while( bt->path->lvl && bt->path->entry[bt->path->lvl - 1].cmp < 0 )
2627 slot = bt->path->entry[--bt->path->lvl].slot;
2632 // return next slot for cursor page
2633 // or slide cursor right into next page
2635 uint bt_nextkey (BtDb *bt)
2640 while( slot = bt_nextslot (bt->cursor, bt->path) )
2641 if( slotptr(bt->cursor,slot)->dead )
2646 if( right = bt_getid(bt->cursor->right) )
2647 return bt_startpage (bt, right);
2652 BtKey bt_key(BtDb *bt, uint slot)
2654 return keyptr(bt->cursor, slot);
2657 BtVal bt_val(BtDb *bt, uint slot)
2659 return valptr(bt->cursor,slot);
2665 double getCpuTime(int type)
2668 FILETIME xittime[1];
2669 FILETIME systime[1];
2670 FILETIME usrtime[1];
2671 SYSTEMTIME timeconv[1];
2674 memset (timeconv, 0, sizeof(SYSTEMTIME));
2678 GetSystemTimeAsFileTime (xittime);
2679 FileTimeToSystemTime (xittime, timeconv);
2680 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2683 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2684 FileTimeToSystemTime (usrtime, timeconv);
2687 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2688 FileTimeToSystemTime (systime, timeconv);
2692 ans += (double)timeconv->wHour * 3600;
2693 ans += (double)timeconv->wMinute * 60;
2694 ans += (double)timeconv->wSecond;
2695 ans += (double)timeconv->wMilliseconds / 1000;
2700 #include <sys/resource.h>
2702 double getCpuTime(int type)
2704 struct rusage used[1];
2705 struct timeval tv[1];
2709 gettimeofday(tv, NULL);
2710 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2713 getrusage(RUSAGE_SELF, used);
2714 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2717 getrusage(RUSAGE_SELF, used);
2718 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2725 uint bt_latchaudit (BtDb *bt)
2727 ushort idx, hashidx;
2734 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2736 if( *(ushort *)(bt->mgr->latchmgr->lock) )
2737 fprintf(stderr, "Alloc page locked\n");
2738 *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2740 for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2741 latch = bt->mgr->latchsets + idx;
2742 if( *latch->readwr->rin & MASK )
2743 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2744 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2746 if( *latch->access->rin & MASK )
2747 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2748 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2750 if( *latch->parent->rin & MASK )
2751 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2752 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2755 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2760 for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2761 if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2762 fprintf(stderr, "hash entry %d locked\n", hashidx);
2764 *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2766 if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2767 latch = bt->mgr->latchsets + idx;
2768 if( *(ushort *)latch->busy )
2769 fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2770 *(ushort *)latch->busy = 0;
2772 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2773 } while( idx = latch->next );
2776 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2777 page_no = LEAF_page;
2779 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2780 off64_t off = page_no << bt->mgr->page_bits;
2782 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2786 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2788 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2789 fprintf(stderr, "page %.8x unable to read\n", page_no);
2791 if( *amt < bt->mgr->page_size )
2792 fprintf(stderr, "page %.8x unable to read\n", page_no);
2794 if( !bt->frame->free )
2795 if( !bt->frame->lvl )
2796 cnt += bt->frame->act;
2798 if( page_no > LEAF_page )
2813 // standalone program to index file of keys
2814 // then list them onto std-out
2817 void *index_file (void *arg)
2819 uint __stdcall index_file (void *arg)
2822 int line = 0, found = 0, cnt = 0;
2823 uid next, page_no = LEAF_page; // start on first page of leaves
2824 unsigned char key[256];
2825 ThreadArg *args = arg;
2826 int ch, len = 0, slot;
2831 bt = bt_open (args->mgr);
2833 switch(args->type[0] | 0x20)
2836 fprintf(stderr, "started latch mgr audit\n");
2837 cnt = bt_latchaudit (bt);
2838 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2842 fprintf(stderr, "started pennysort for %s\n", args->infile);
2843 if( in = fopen (args->infile, "rb") )
2844 while( ch = getc(in), ch != EOF )
2849 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 0) )
2850 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2853 else if( len < 255 )
2855 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2859 fprintf(stderr, "started indexing for %s\n", args->infile);
2860 if( in = fopen (args->infile, "rb") )
2861 while( ch = getc(in), ch != EOF )
2866 if( args->num == 1 )
2867 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2869 else if( args->num )
2870 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2872 if( bt_insertkey (bt, key, len, 0, NULL, 0, 0) )
2873 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2876 else if( len < 255 )
2878 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2882 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2883 if( in = fopen (args->infile, "rb") )
2884 while( ch = getc(in), ch != EOF )
2888 if( args->num == 1 )
2889 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2891 else if( args->num )
2892 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2894 if( bt_deletekey (bt, key, len, 0, 0) )
2895 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2898 else if( len < 255 )
2900 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
2904 fprintf(stderr, "started finding keys for %s\n", args->infile);
2905 if( in = fopen (args->infile, "rb") )
2906 while( ch = getc(in), ch != EOF )
2910 if( args->num == 1 )
2911 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2913 else if( args->num )
2914 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2916 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2919 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2922 else if( len < 255 )
2924 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2928 fprintf(stderr, "started scanning\n");
2930 if( slot = bt_startpage (bt, LEAF_page) )
2932 ptr = keyptr(bt->cursor, slot);
2933 fwrite (ptr->key, ptr->len, 1, stdout);
2934 fputc ('\n', stdout);
2936 } while( slot = bt_nextkey (bt) );
2938 fprintf(stderr, " Total keys read %d\n", cnt);
2943 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2945 fprintf(stderr, "started counting\n");
2946 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2947 page_no = LEAF_page;
2949 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2950 uid off = page_no << bt->mgr->page_bits;
2952 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2956 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2958 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2959 return bt->err = BTERR_map;
2961 if( *amt < bt->mgr->page_size )
2962 return bt->err = BTERR_map;
2964 if( !bt->frame->free && !bt->frame->lvl )
2965 cnt += bt->frame->act;
2966 if( page_no > LEAF_page )
2971 fprintf(stderr, " Total keys read %d\n", cnt);
2983 typedef struct timeval timer;
2985 int main (int argc, char **argv)
2987 int idx, cnt, len, slot, err;
2988 int segsize, bits = 16;
3005 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3006 fprintf (stderr, " where page_bits is the page size in bits\n");
3007 fprintf (stderr, " mapped_segments is the number of mmap segments in buffer pool\n");
3008 fprintf (stderr, " seg_bits is the size of individual segments in buffer pool in pages in bits\n");
3009 fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n");
3010 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3014 start = getCpuTime(0);
3017 bits = atoi(argv[3]);
3020 poolsize = atoi(argv[4]);
3023 fprintf (stderr, "Warning: no mapped_pool\n");
3025 if( poolsize > 65535 )
3026 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
3029 segsize = atoi(argv[5]);
3031 segsize = 4; // 16 pages per mmap segment
3034 num = atoi(argv[6]);
3038 threads = malloc (cnt * sizeof(pthread_t));
3040 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3042 args = malloc (cnt * sizeof(ThreadArg));
3044 mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
3047 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3053 for( idx = 0; idx < cnt; idx++ ) {
3054 args[idx].infile = argv[idx + 7];
3055 args[idx].type = argv[2];
3056 args[idx].mgr = mgr;
3057 args[idx].num = num;
3058 args[idx].idx = idx;
3060 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3061 fprintf(stderr, "Error creating thread %d\n", err);
3063 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3067 // wait for termination
3070 for( idx = 0; idx < cnt; idx++ )
3071 pthread_join (threads[idx], NULL);
3073 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3075 for( idx = 0; idx < cnt; idx++ )
3076 CloseHandle(threads[idx]);
3079 elapsed = getCpuTime(0) - start;
3080 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3081 elapsed = getCpuTime(1);
3082 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3083 elapsed = getCpuTime(2);
3084 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);