1 // btree version threadskv5 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // and bi-directional cursors
10 // author: karl malbrain, malbrain@cal.berkeley.edu
13 This work, including the source code, documentation
14 and related data, is placed into the public domain.
16 The orginal author is Karl Malbrain.
18 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
19 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
20 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
21 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
22 RESULTING FROM THE USE, MODIFICATION, OR
23 REDISTRIBUTION OF THIS SOFTWARE.
26 // Please see the project home page for documentation
27 // code.google.com/p/high-concurrency-btree
29 #define _FILE_OFFSET_BITS 64
30 #define _LARGEFILE64_SOURCE
46 #define WIN32_LEAN_AND_MEAN
60 typedef unsigned long long uid;
63 typedef unsigned long long off64_t;
64 typedef unsigned short ushort;
65 typedef unsigned int uint;
68 #define BT_latchtable 128 // number of latch manager slots
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
79 There are five lock types for each node in three independent sets:
80 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
81 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
82 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
83 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
84 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
95 // definition for phase-fair reader/writer lock implementation
109 // definition for spin latch implementation
111 // exclusive is set for write access
112 // share is count of read accessors
113 // grant write lock when share == 0
115 volatile typedef struct {
126 // hash table entries
129 BtSpinLatch latch[1];
130 volatile ushort slot; // Latch table entry at head of chain
133 // latch manager table structure
136 RWLock readwr[1]; // read/write page lock
137 RWLock access[1]; // Access Intent/Page delete
138 RWLock parent[1]; // Posting of fence key in parent
139 BtSpinLatch busy[1]; // slot is being moved between chains
140 volatile ushort next; // next entry in hash table chain
141 volatile ushort prev; // prev entry in hash table chain
142 volatile ushort pin; // number of outstanding locks
143 volatile ushort hash; // hash slot entry is under
144 volatile uid page_no; // latch set page number
147 // Define the length of the page and key pointers
151 // Page key slot definition.
153 // Keys are marked dead, but remain on the page until
154 // it cleanup is called. The fence key (highest key) for
155 // a leaf page is always present, even after cleanup.
159 // In addition to the Unique keys that occupy slots
160 // there are Librarian and Duplicate key
161 // slots occupying the key slot array.
163 // The Librarian slots are dead keys that
164 // serve as filler, available to add new Unique
165 // or Dup slots that are inserted into the B-tree.
167 // The Duplicate slots have had their key bytes extended
168 // by 6 bytes to contain a binary duplicate key uniqueifier.
177 uint off:BT_maxbits; // page offset for key start
178 uint type:3; // type of slot
179 uint dead:1; // set for deleted slot
182 // The key structure occupies space at the upper end of
183 // each page. It's a length byte followed by the key
188 unsigned char key[1];
191 // the value structure also occupies space at the upper
192 // end of the page. Each key is immediately followed by a value.
196 unsigned char value[1];
199 // The first part of an index page.
200 // It is immediately followed
201 // by the BtSlot array of keys.
203 // note that this structure size
204 // must be a multiple of 8 bytes
205 // in order to place dups correctly.
207 typedef struct BtPage_ {
208 uint cnt; // count of keys in page
209 uint act; // count of active keys
210 uint min; // next key offset
211 uint garbage; // page garbage in bytes
212 unsigned char bits:7; // page size in bits
213 unsigned char free:1; // page is on free chain
214 unsigned char lvl:7; // level of page
215 unsigned char kill:1; // page is being deleted
216 unsigned char left[BtId]; // page number to left
217 unsigned char filler[2]; // padding to multiple of 8
218 unsigned char right[BtId]; // page number to right
221 // The memory mapping pool table buffer manager entry
224 uid basepage; // mapped base page number
225 char *map; // mapped memory pointer
226 ushort slot; // slot index in this array
227 ushort pin; // mapped page pin counter
228 void *hashprev; // previous pool entry for the same hash idx
229 void *hashnext; // next pool entry for the same hash idx
231 HANDLE hmap; // Windows memory mapping handle
235 #define CLOCK_bit 0x8000 // bit in pool->pin
237 // The loadpage interface object
240 uid page_no; // current page number
241 BtPage page; // current page pointer
242 BtPool *pool; // current page pool
243 BtLatchSet *latch; // current page latch set
246 // structure for latch manager on ALLOC_page
249 struct BtPage_ alloc[1]; // next page_no in right ptr
250 unsigned long long dups[1]; // global duplicate key uniqueifier
251 unsigned char chain[BtId]; // head of free page_nos chain
252 BtSpinLatch lock[1]; // allocation area lite latch
253 ushort latchdeployed; // highest number of latch entries deployed
254 ushort nlatchpage; // number of latch pages at BT_latch
255 ushort latchtotal; // number of page latch entries
256 ushort latchhash; // number of latch hash table slots
257 ushort latchvictim; // next latch entry to examine
258 BtHashEntry table[0]; // the hash table
261 // The object structure for Btree access
264 uint page_size; // page size
265 uint page_bits; // page size in bits
266 uint seg_bits; // seg size in pages in bits
267 uint mode; // read-write mode
273 ushort poolcnt; // highest page pool node in use
274 ushort poolmax; // highest page pool node allocated
275 ushort poolmask; // total number of pages in mmap segment - 1
276 ushort hashsize; // size of Hash Table for pool entries
277 volatile uint evicted; // last evicted hash table slot
278 ushort *hash; // pool index for hash entries
279 BtSpinLatch *latch; // latches for hash table slots
280 BtLatchMgr *latchmgr; // mapped latch page from allocation page
281 BtLatchSet *latchsets; // mapped latch set from latch pages
282 BtPool *pool; // memory pool page segments
284 HANDLE halloc; // allocation and latch table handle
289 BtMgr *mgr; // buffer manager for thread
290 BtPage cursor; // cached frame for start/next (never mapped)
291 BtPage frame; // spare frame for the page split (never mapped)
292 uid cursor_page; // current cursor page number
293 unsigned char *mem; // frame, cursor, page memory buffer
294 unsigned char key[256]; // last found complete key
295 int found; // last delete or insert was found
296 int err; // last error
310 extern void bt_close (BtDb *bt);
311 extern BtDb *bt_open (BtMgr *mgr);
312 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
313 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
314 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
315 extern BtKey bt_foundkey (BtDb *bt);
316 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
317 extern uint bt_nextkey (BtDb *bt, uint slot);
320 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
321 void bt_mgrclose (BtMgr *mgr);
323 // Helper functions to return slot values
324 // from the cursor page.
326 extern BtKey bt_key (BtDb *bt, uint slot);
327 extern BtVal bt_val (BtDb *bt, uint slot);
329 // BTree page number constants
330 #define ALLOC_page 0 // allocation & latch manager hash table
331 #define ROOT_page 1 // root of the btree
332 #define LEAF_page 2 // first page of leaves
333 #define LATCH_page 3 // pages for latch manager
335 // Number of levels to create in a new BTree
339 // The page is allocated from low and hi ends.
340 // The key slots are allocated from the bottom,
341 // while the text and value of the key
342 // are allocated from the top. When the two
343 // areas meet, the page is split into two.
345 // A key consists of a length byte, two bytes of
346 // index number (0 - 65534), and up to 253 bytes
349 // Associated with each key is a value byte string
350 // containing any value desired.
352 // The b-tree root is always located at page 1.
353 // The first leaf page of level zero is always
354 // located on page 2.
356 // The b-tree pages are linked with next
357 // pointers to facilitate enumerators,
358 // and provide for concurrency.
360 // When to root page fills, it is split in two and
361 // the tree height is raised by a new root at page
362 // one with two keys.
364 // Deleted keys are marked with a dead bit until
365 // page cleanup. The fence key for a leaf node is
368 // Groups of pages called segments from the btree are optionally
369 // cached with a memory mapped pool. A hash table is used to keep
370 // track of the cached segments. This behaviour is controlled
371 // by the cache block size parameter to bt_open.
373 // To achieve maximum concurrency one page is locked at a time
374 // as the tree is traversed to find leaf key in question. The right
375 // page numbers are used in cases where the page is being split,
378 // Page 0 is dedicated to lock for new page extensions,
379 // and chains empty pages together for reuse. It also
380 // contains the latch manager hash table.
382 // The ParentModification lock on a node is obtained to serialize posting
383 // or changing the fence key for a node.
385 // Empty pages are chained together through the ALLOC page and reused.
387 // Access macros to address slot and key values from the page
388 // Page slots use 1 based indexing.
390 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
391 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
392 #define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
394 void bt_putid(unsigned char *dest, uid id)
399 dest[i] = (unsigned char)id, id >>= 8;
402 uid bt_getid(unsigned char *src)
407 for( i = 0; i < BtId; i++ )
408 id <<= 8, id |= *src++;
413 uid bt_newdup (BtDb *bt)
416 return __sync_fetch_and_add (bt->mgr->latchmgr->dups, 1) + 1;
418 return _InterlockedIncrement64(bt->mgr->latchmgr->dups, 1);
422 // Phase-Fair reader/writer lock implementation
424 void WriteLock (RWLock *lock)
429 tix = __sync_fetch_and_add (lock->ticket, 1);
431 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
433 // wait for our ticket to come up
435 while( tix != lock->serving[0] )
442 w = PRES | (tix & PHID);
444 r = __sync_fetch_and_add (lock->rin, w);
446 r = _InterlockedExchangeAdd16 (lock->rin, w);
448 while( r != *lock->rout )
456 void WriteRelease (RWLock *lock)
459 __sync_fetch_and_and (lock->rin, ~MASK);
461 _InterlockedAnd16 (lock->rin, ~MASK);
466 void ReadLock (RWLock *lock)
470 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
472 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
475 while( w == (*lock->rin & MASK) )
483 void ReadRelease (RWLock *lock)
486 __sync_fetch_and_add (lock->rout, RINC);
488 _InterlockedExchangeAdd16 (lock->rout, RINC);
492 // Spin Latch Manager
494 // wait until write lock mode is clear
495 // and add 1 to the share count
497 void bt_spinreadlock(BtSpinLatch *latch)
503 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
505 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
507 // see if exclusive request is granted or pending
512 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
514 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
517 } while( sched_yield(), 1 );
519 } while( SwitchToThread(), 1 );
523 // wait for other read and write latches to relinquish
525 void bt_spinwritelock(BtSpinLatch *latch)
531 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
533 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
536 if( !(prev & ~BOTH) )
540 __sync_fetch_and_and ((ushort *)latch, ~XCL);
542 _InterlockedAnd16((ushort *)latch, ~XCL);
545 } while( sched_yield(), 1 );
547 } while( SwitchToThread(), 1 );
551 // try to obtain write lock
553 // return 1 if obtained,
556 int bt_spinwritetry(BtSpinLatch *latch)
561 prev = __sync_fetch_and_or((ushort *)latch, XCL);
563 prev = _InterlockedOr16((ushort *)latch, XCL);
565 // take write access if all bits are clear
568 if( !(prev & ~BOTH) )
572 __sync_fetch_and_and ((ushort *)latch, ~XCL);
574 _InterlockedAnd16((ushort *)latch, ~XCL);
581 void bt_spinreleasewrite(BtSpinLatch *latch)
584 __sync_fetch_and_and((ushort *)latch, ~BOTH);
586 _InterlockedAnd16((ushort *)latch, ~BOTH);
590 // decrement reader count
592 void bt_spinreleaseread(BtSpinLatch *latch)
595 __sync_fetch_and_add((ushort *)latch, -SHARE);
597 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
601 // link latch table entry into latch hash table
603 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
605 BtLatchSet *set = bt->mgr->latchsets + victim;
607 if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
608 bt->mgr->latchsets[set->next].prev = victim;
610 bt->mgr->latchmgr->table[hashidx].slot = victim;
611 set->page_no = page_no;
618 void bt_unpinlatch (BtLatchSet *set)
621 __sync_fetch_and_add(&set->pin, -1);
623 _InterlockedDecrement16 (&set->pin);
627 // find existing latchset or inspire new one
628 // return with latchset pinned
630 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
632 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
633 ushort slot, avail = 0, victim, idx;
636 // obtain read lock on hash table entry
638 bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
640 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
642 set = bt->mgr->latchsets + slot;
643 if( page_no == set->page_no )
645 } while( slot = set->next );
649 __sync_fetch_and_add(&set->pin, 1);
651 _InterlockedIncrement16 (&set->pin);
655 bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
660 // try again, this time with write lock
662 bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
664 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
666 set = bt->mgr->latchsets + slot;
667 if( page_no == set->page_no )
669 if( !set->pin && !avail )
671 } while( slot = set->next );
673 // found our entry, or take over an unpinned one
675 if( slot || (slot = avail) ) {
676 set = bt->mgr->latchsets + slot;
678 __sync_fetch_and_add(&set->pin, 1);
680 _InterlockedIncrement16 (&set->pin);
682 set->page_no = page_no;
683 bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
687 // see if there are any unused entries
689 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
691 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
694 if( victim < bt->mgr->latchmgr->latchtotal ) {
695 set = bt->mgr->latchsets + victim;
697 __sync_fetch_and_add(&set->pin, 1);
699 _InterlockedIncrement16 (&set->pin);
701 bt_latchlink (bt, hashidx, victim, page_no);
702 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
707 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
709 victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
711 // find and reuse previous lock entry
715 victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
717 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
719 // we don't use slot zero
721 if( victim %= bt->mgr->latchmgr->latchtotal )
722 set = bt->mgr->latchsets + victim;
726 // take control of our slot
727 // from other threads
729 if( set->pin || !bt_spinwritetry (set->busy) )
734 // try to get write lock on hash chain
735 // skip entry if not obtained
736 // or has outstanding locks
738 if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
739 bt_spinreleasewrite (set->busy);
744 bt_spinreleasewrite (set->busy);
745 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
749 // unlink our available victim from its hash chain
752 bt->mgr->latchsets[set->prev].next = set->next;
754 bt->mgr->latchmgr->table[idx].slot = set->next;
757 bt->mgr->latchsets[set->next].prev = set->prev;
759 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
761 __sync_fetch_and_add(&set->pin, 1);
763 _InterlockedIncrement16 (&set->pin);
765 bt_latchlink (bt, hashidx, victim, page_no);
766 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
767 bt_spinreleasewrite (set->busy);
772 void bt_mgrclose (BtMgr *mgr)
777 // release mapped pages
778 // note that slot zero is never used
780 for( slot = 1; slot < mgr->poolmax; slot++ ) {
781 pool = mgr->pool + slot;
784 munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
787 FlushViewOfFile(pool->map, 0);
788 UnmapViewOfFile(pool->map);
789 CloseHandle(pool->hmap);
795 munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
796 munmap (mgr->latchmgr, 2 * mgr->page_size);
798 FlushViewOfFile(mgr->latchmgr, 0);
799 UnmapViewOfFile(mgr->latchmgr);
800 CloseHandle(mgr->halloc);
806 free ((void *)mgr->latch);
809 FlushFileBuffers(mgr->idx);
810 CloseHandle(mgr->idx);
811 GlobalFree (mgr->pool);
812 GlobalFree (mgr->hash);
813 GlobalFree ((void *)mgr->latch);
818 // close and release memory
820 void bt_close (BtDb *bt)
827 VirtualFree (bt->mem, 0, MEM_RELEASE);
832 // open/create new btree buffer manager
834 // call with file_name, BT_openmode, bits in page size (e.g. 16),
835 // size of mapped page pool (e.g. 8192)
837 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
839 uint lvl, attr, cacheblk, last, slot, idx;
840 uint nlatchpage, latchhash;
841 unsigned char value[BtId];
842 BtLatchMgr *latchmgr;
851 SYSTEM_INFO sysinfo[1];
854 // determine sanity of page size and buffer pool
856 if( bits > BT_maxbits )
858 else if( bits < BT_minbits )
862 return NULL; // must have buffer pool
865 mgr = calloc (1, sizeof(BtMgr));
867 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
870 return free(mgr), NULL;
872 cacheblk = 4096; // minimum mmap segment size for unix
875 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
876 attr = FILE_ATTRIBUTE_NORMAL;
877 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
879 if( mgr->idx == INVALID_HANDLE_VALUE )
880 return GlobalFree(mgr), NULL;
882 // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
883 GetSystemInfo(sysinfo);
884 cacheblk = sysinfo->dwAllocationGranularity;
888 latchmgr = malloc (BT_maxpage);
891 // read minimum page size to get root info
893 if( size = lseek (mgr->idx, 0L, 2) ) {
894 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
895 bits = latchmgr->alloc->bits;
897 return free(mgr), free(latchmgr), NULL;
898 } else if( mode == BT_ro )
899 return free(latchmgr), bt_mgrclose (mgr), NULL;
901 latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
902 size = GetFileSize(mgr->idx, amt);
905 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
906 return bt_mgrclose (mgr), NULL;
907 bits = latchmgr->alloc->bits;
908 } else if( mode == BT_ro )
909 return bt_mgrclose (mgr), NULL;
912 mgr->page_size = 1 << bits;
913 mgr->page_bits = bits;
915 mgr->poolmax = poolmax;
918 if( cacheblk < mgr->page_size )
919 cacheblk = mgr->page_size;
921 // mask for partial memmaps
923 mgr->poolmask = (cacheblk >> bits) - 1;
925 // see if requested size of pages per memmap is greater
927 if( (1 << segsize) > mgr->poolmask )
928 mgr->poolmask = (1 << segsize) - 1;
932 while( (1 << mgr->seg_bits) <= mgr->poolmask )
935 mgr->hashsize = hashsize;
938 mgr->pool = calloc (poolmax, sizeof(BtPool));
939 mgr->hash = calloc (hashsize, sizeof(ushort));
940 mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
942 mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
943 mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
944 mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
950 // initialize an empty b-tree with latch page, root page, page of leaves
951 // and page(s) of latches
953 memset (latchmgr, 0, 1 << bits);
954 nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1;
955 bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
956 latchmgr->alloc->bits = mgr->page_bits;
958 latchmgr->nlatchpage = nlatchpage;
959 latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
961 // initialize latch manager
963 latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
965 // size of hash table = total number of latchsets
967 if( latchhash > latchmgr->latchtotal )
968 latchhash = latchmgr->latchtotal;
970 latchmgr->latchhash = latchhash;
972 // initialize left-most LEAF page in
975 bt_putid (latchmgr->alloc->left, LEAF_page);
978 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
979 return bt_mgrclose (mgr), NULL;
981 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
982 return bt_mgrclose (mgr), NULL;
984 if( *amt < mgr->page_size )
985 return bt_mgrclose (mgr), NULL;
988 memset (latchmgr, 0, 1 << bits);
989 latchmgr->alloc->bits = mgr->page_bits;
991 for( lvl=MIN_lvl; lvl--; ) {
992 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
993 key = keyptr(latchmgr->alloc, 1);
994 key->len = 2; // create stopper key
998 bt_putid(value, MIN_lvl - lvl + 1);
999 val = valptr(latchmgr->alloc, 1);
1000 val->len = lvl ? BtId : 0;
1001 memcpy (val->value, value, val->len);
1003 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
1004 latchmgr->alloc->lvl = lvl;
1005 latchmgr->alloc->cnt = 1;
1006 latchmgr->alloc->act = 1;
1008 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
1009 return bt_mgrclose (mgr), NULL;
1011 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1012 return bt_mgrclose (mgr), NULL;
1014 if( *amt < mgr->page_size )
1015 return bt_mgrclose (mgr), NULL;
1019 // clear out latch manager locks
1020 // and rest of pages to round out segment
1022 memset(latchmgr, 0, mgr->page_size);
1025 while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
1027 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
1029 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
1030 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1031 return bt_mgrclose (mgr), NULL;
1032 if( *amt < mgr->page_size )
1033 return bt_mgrclose (mgr), NULL;
1040 // mlock the root page and the latchmgr page
1042 flag = PROT_READ | PROT_WRITE;
1043 mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1044 if( mgr->latchmgr == MAP_FAILED )
1045 return bt_mgrclose (mgr), NULL;
1046 mlock (mgr->latchmgr, 2 * mgr->page_size);
1048 mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1049 if( mgr->latchsets == MAP_FAILED )
1050 return bt_mgrclose (mgr), NULL;
1051 mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1053 flag = PAGE_READWRITE;
1054 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1056 return bt_mgrclose (mgr), NULL;
1058 flag = FILE_MAP_WRITE;
1059 mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1060 if( !mgr->latchmgr )
1061 return GetLastError(), bt_mgrclose (mgr), NULL;
1063 mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1069 VirtualFree (latchmgr, 0, MEM_RELEASE);
1074 // open BTree access method
1075 // based on buffer manager
1077 BtDb *bt_open (BtMgr *mgr)
1079 BtDb *bt = malloc (sizeof(*bt));
1081 memset (bt, 0, sizeof(*bt));
1084 bt->mem = malloc (2 *mgr->page_size);
1086 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1088 bt->frame = (BtPage)bt->mem;
1089 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1093 // compare two keys, returning > 0, = 0, or < 0
1094 // as the comparison value
1096 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1098 uint len1 = key1->len;
1101 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1114 // find segment in pool
1115 // must be called with hashslot idx locked
1116 // return NULL if not there
1117 // otherwise return node
1119 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1124 // compute start of hash chain in pool
1126 if( slot = bt->mgr->hash[idx] )
1127 pool = bt->mgr->pool + slot;
1131 page_no &= ~bt->mgr->poolmask;
1133 while( pool->basepage != page_no )
1134 if( pool = pool->hashnext )
1142 // add segment to hash table
1144 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1149 pool->hashprev = pool->hashnext = NULL;
1150 pool->basepage = page_no & ~bt->mgr->poolmask;
1151 pool->pin = CLOCK_bit + 1;
1153 if( slot = bt->mgr->hash[idx] ) {
1154 node = bt->mgr->pool + slot;
1155 pool->hashnext = node;
1156 node->hashprev = pool;
1159 bt->mgr->hash[idx] = pool->slot;
1162 // map new buffer pool segment to virtual memory
1164 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1166 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1167 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1171 flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1172 pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1173 if( pool->map == MAP_FAILED )
1174 return bt->err = BTERR_map;
1177 flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1178 pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1180 return bt->err = BTERR_map;
1182 flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1183 pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1185 return bt->err = BTERR_map;
1190 // calculate page within pool
1192 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1194 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1197 page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1198 // madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1204 void bt_unpinpool (BtPool *pool)
1207 __sync_fetch_and_add(&pool->pin, -1);
1209 _InterlockedDecrement16 (&pool->pin);
1213 // find or place requested page in segment-pool
1214 // return pool table entry, incrementing pin
1216 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1218 uint slot, hashidx, idx, victim;
1219 BtPool *pool, *node, *next;
1221 // lock hash table chain
1223 hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1224 bt_spinwritelock (&bt->mgr->latch[hashidx]);
1226 // look up in hash table
1228 if( pool = bt_findpool(bt, page_no, hashidx) ) {
1230 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1231 __sync_fetch_and_add(&pool->pin, 1);
1233 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1234 _InterlockedIncrement16 (&pool->pin);
1236 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1240 // allocate a new pool node
1241 // and add to hash table
1244 slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1246 slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1249 if( ++slot < bt->mgr->poolmax ) {
1250 pool = bt->mgr->pool + slot;
1253 if( bt_mapsegment(bt, pool, page_no) )
1256 bt_linkhash(bt, pool, page_no, hashidx);
1257 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1261 // pool table is full
1262 // find best pool entry to evict
1265 __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1267 _InterlockedDecrement16 (&bt->mgr->poolcnt);
1272 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1274 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1276 victim %= bt->mgr->poolmax;
1277 pool = bt->mgr->pool + victim;
1278 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1283 // try to get write lock
1284 // skip entry if not obtained
1286 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1289 // skip this entry if
1291 // or clock bit is set
1295 __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1297 _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1299 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1303 // unlink victim pool node from hash table
1305 if( node = pool->hashprev )
1306 node->hashnext = pool->hashnext;
1307 else if( node = pool->hashnext )
1308 bt->mgr->hash[idx] = node->slot;
1310 bt->mgr->hash[idx] = 0;
1312 if( node = pool->hashnext )
1313 node->hashprev = pool->hashprev;
1315 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1317 // remove old file mapping
1319 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1321 // FlushViewOfFile(pool->map, 0);
1322 UnmapViewOfFile(pool->map);
1323 CloseHandle(pool->hmap);
1327 // create new pool mapping
1328 // and link into hash table
1330 if( bt_mapsegment(bt, pool, page_no) )
1333 bt_linkhash(bt, pool, page_no, hashidx);
1334 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1339 // place write, read, or parent lock on requested page_no.
1341 void bt_lockpage(BtLock mode, BtLatchSet *set)
1345 ReadLock (set->readwr);
1348 WriteLock (set->readwr);
1351 ReadLock (set->access);
1354 WriteLock (set->access);
1357 WriteLock (set->parent);
1362 // remove write, read, or parent lock on requested page
1364 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1368 ReadRelease (set->readwr);
1371 WriteRelease (set->readwr);
1374 ReadRelease (set->access);
1377 WriteRelease (set->access);
1380 WriteRelease (set->parent);
1385 // allocate a new page
1387 BTERR bt_newpage(BtDb *bt, BtPageSet *set)
1391 // lock allocation page
1393 bt_spinwritelock(bt->mgr->latchmgr->lock);
1395 // use empty chain first
1396 // else allocate empty page
1398 if( set->page_no = bt_getid(bt->mgr->latchmgr->chain) ) {
1399 if( set->pool = bt_pinpool (bt, set->page_no) )
1400 set->page = bt_page (bt, set->pool, set->page_no);
1402 return bt->err = BTERR_struct;
1404 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1406 set->page_no = bt_getid(bt->mgr->latchmgr->alloc->right);
1407 bt_putid(bt->mgr->latchmgr->alloc->right, set->page_no+1);
1409 // if writing first page of pool block, set file length thru last page
1411 if( (set->page_no & bt->mgr->poolmask) == 0 )
1412 ftruncate (bt->mgr->idx, (set->page_no + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1414 if( set->pool = bt_pinpool (bt, set->page_no) )
1415 set->page = bt_page (bt, set->pool, set->page_no);
1420 // unlock allocation latch
1422 bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1426 // find slot in page for given key at a given level
1428 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1430 uint diff, higher = set->page->cnt, low = 1, slot;
1433 // make stopper key an infinite fence value
1435 if( bt_getid (set->page->right) )
1440 // low is the lowest candidate.
1441 // loop ends when they meet
1443 // higher is already
1444 // tested as .ge. the passed key.
1446 while( diff = higher - low ) {
1447 slot = low + ( diff >> 1 );
1448 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1451 higher = slot, good++;
1454 // return zero if key is on right link page
1456 return good ? higher : 0;
1459 // find and load page at given level for given key
1460 // leave page rd or wr locked as requested
1462 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1464 uid page_no = ROOT_page, prevpage = 0;
1465 uint drill = 0xff, slot;
1466 BtLatchSet *prevlatch;
1467 uint mode, prevmode;
1470 // start at root of btree and drill down
1473 // determine lock mode of drill level
1474 mode = (drill == lvl) ? lock : BtLockRead;
1476 set->latch = bt_pinlatch (bt, page_no);
1477 set->page_no = page_no;
1479 // pin page contents
1481 if( set->pool = bt_pinpool (bt, page_no) )
1482 set->page = bt_page (bt, set->pool, page_no);
1486 // obtain access lock using lock chaining with Access mode
1488 if( page_no > ROOT_page )
1489 bt_lockpage(BtLockAccess, set->latch);
1491 // release & unpin parent page
1494 bt_unlockpage(prevmode, prevlatch);
1495 bt_unpinlatch (prevlatch);
1496 bt_unpinpool (prevpool);
1500 // obtain read lock using lock chaining
1502 bt_lockpage(mode, set->latch);
1504 if( set->page->free )
1505 return bt->err = BTERR_struct, 0;
1507 if( page_no > ROOT_page )
1508 bt_unlockpage(BtLockAccess, set->latch);
1510 // re-read and re-lock root after determining actual level of root
1512 if( set->page->lvl != drill) {
1513 if( set->page_no != ROOT_page )
1514 return bt->err = BTERR_struct, 0;
1516 drill = set->page->lvl;
1518 if( lock != BtLockRead && drill == lvl ) {
1519 bt_unlockpage(mode, set->latch);
1520 bt_unpinlatch (set->latch);
1521 bt_unpinpool (set->pool);
1526 prevpage = set->page_no;
1527 prevlatch = set->latch;
1528 prevpool = set->pool;
1531 // find key on page at this level
1532 // and descend to requested level
1534 if( set->page->kill )
1537 if( slot = bt_findslot (set, key, len) ) {
1541 while( slotptr(set->page, slot)->dead )
1542 if( slot++ < set->page->cnt )
1547 page_no = bt_getid(valptr(set->page, slot)->value);
1552 // or slide right into next page
1555 page_no = bt_getid(set->page->right);
1559 // return error on end of right chain
1561 bt->err = BTERR_struct;
1562 return 0; // return error
1565 // return page to free list
1566 // page must be delete & write locked
1568 void bt_freepage (BtDb *bt, BtPageSet *set)
1570 // lock allocation page
1572 bt_spinwritelock (bt->mgr->latchmgr->lock);
1575 memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1576 bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1577 set->page->free = 1;
1579 // unlock released page
1581 bt_unlockpage (BtLockDelete, set->latch);
1582 bt_unlockpage (BtLockWrite, set->latch);
1583 bt_unpinlatch (set->latch);
1584 bt_unpinpool (set->pool);
1586 // unlock allocation page
1588 bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1591 // a fence key was deleted from a page
1592 // push new fence value upwards
1594 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1596 unsigned char leftkey[256], rightkey[256];
1597 unsigned char value[BtId];
1601 // collapse empty slots beneath our slot
1603 while( idx = set->page->cnt - 1 )
1604 if( slotptr(set->page, idx)->dead ) {
1605 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1606 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1610 // remove the old fence value
1612 ptr = keyptr(set->page, set->page->cnt);
1613 memcpy (rightkey, ptr, ptr->len + 1);
1614 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1616 // cache new fence value
1618 ptr = keyptr(set->page, set->page->cnt);
1619 memcpy (leftkey, ptr, ptr->len + 1);
1621 bt_lockpage (BtLockParent, set->latch);
1622 bt_unlockpage (BtLockWrite, set->latch);
1624 // insert new (now smaller) fence key
1626 bt_putid (value, set->page_no);
1628 if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId, 1) )
1631 // now delete old fence key
1633 if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1) )
1636 bt_unlockpage (BtLockParent, set->latch);
1637 bt_unpinlatch(set->latch);
1638 bt_unpinpool (set->pool);
1642 // root has a single child
1643 // collapse a level from the tree
1645 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1650 // find the child entry and promote as new root contents
1653 for( idx = 0; idx++ < root->page->cnt; )
1654 if( !slotptr(root->page, idx)->dead )
1657 child->page_no = bt_getid (valptr(root->page, idx)->value);
1659 child->latch = bt_pinlatch (bt, child->page_no);
1660 bt_lockpage (BtLockDelete, child->latch);
1661 bt_lockpage (BtLockWrite, child->latch);
1663 if( child->pool = bt_pinpool (bt, child->page_no) )
1664 child->page = bt_page (bt, child->pool, child->page_no);
1668 memcpy (root->page, child->page, bt->mgr->page_size);
1669 bt_freepage (bt, child);
1671 } while( root->page->lvl > 1 && root->page->act == 1 );
1673 bt_unlockpage (BtLockWrite, root->latch);
1674 bt_unpinlatch (root->latch);
1675 bt_unpinpool (root->pool);
1679 // maintain reverse scan pointers by
1680 // linking left pointer of far right node
1682 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1684 BtPageSet right2[1];
1686 // keep track of rightmost leaf page
1688 if( !right_page_no ) {
1689 bt_putid (bt->mgr->latchmgr->alloc->left, left_page_no);
1693 // link right page to left page
1695 right2->latch = bt_pinlatch (bt, right_page_no);
1696 bt_lockpage (BtLockWrite, right2->latch);
1698 if( right2->pool = bt_pinpool (bt, right_page_no) )
1699 right2->page = bt_page (bt, right2->pool, right_page_no);
1703 bt_putid(right2->page->left, left_page_no);
1704 bt_unlockpage (BtLockWrite, right2->latch);
1705 bt_unpinlatch (right2->latch);
1709 // find and delete key on page by marking delete flag bit
1710 // if page becomes empty, delete it from the btree
1712 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1714 unsigned char lowerfence[256], higherfence[256];
1715 BtPageSet set[1], right[1], right2[1];
1716 unsigned char value[BtId];
1717 uint slot, idx, found;
1721 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1722 ptr = keyptr(set->page, slot);
1726 // if librarian slot, advance to real slot
1728 if( slotptr(set->page, slot)->type == Librarian )
1729 ptr = keyptr(set->page, ++slot);
1731 // if key is found delete it, otherwise ignore request
1733 if( found = !keycmp (ptr, key, len) )
1734 if( found = slotptr(set->page, slot)->dead == 0 ) {
1735 val = valptr(set->page,slot);
1736 slotptr(set->page, slot)->dead = 1;
1737 set->page->garbage += ptr->len + val->len + 2;
1741 // did we delete a fence key in an upper level?
1743 if( found && lvl && set->page->act && slot == set->page->cnt )
1744 if( bt_fixfence (bt, set, lvl) )
1747 return bt->found = found, 0;
1749 // do we need to collapse root?
1751 if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1752 if( bt_collapseroot (bt, set) )
1755 return bt->found = found, 0;
1757 // return if page is not empty
1759 if( set->page->act ) {
1760 bt_unlockpage(BtLockWrite, set->latch);
1761 bt_unpinlatch (set->latch);
1762 bt_unpinpool (set->pool);
1763 return bt->found = found, 0;
1766 // cache copy of fence key
1767 // to post in parent
1769 ptr = keyptr(set->page, set->page->cnt);
1770 memcpy (lowerfence, ptr, ptr->len + 1);
1772 // obtain lock on right page
1774 right->page_no = bt_getid(set->page->right);
1775 right->latch = bt_pinlatch (bt, right->page_no);
1776 bt_lockpage (BtLockWrite, right->latch);
1778 // pin page contents
1780 if( right->pool = bt_pinpool (bt, right->page_no) )
1781 right->page = bt_page (bt, right->pool, right->page_no);
1785 if( right->page->kill )
1786 return bt->err = BTERR_struct;
1788 // transfer left link
1790 memcpy (right->page->left, set->page->left, BtId);
1792 // pull contents of right peer into our empty page
1794 memcpy (set->page, right->page, bt->mgr->page_size);
1799 if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1802 // cache copy of key to update
1804 ptr = keyptr(right->page, right->page->cnt);
1805 memcpy (higherfence, ptr, ptr->len + 1);
1807 // mark right page deleted and point it to left page
1808 // until we can post parent updates
1810 bt_putid (right->page->right, set->page_no);
1811 right->page->kill = 1;
1813 bt_lockpage (BtLockParent, right->latch);
1814 bt_unlockpage (BtLockWrite, right->latch);
1816 bt_lockpage (BtLockParent, set->latch);
1817 bt_unlockpage (BtLockWrite, set->latch);
1819 // redirect higher key directly to our new node contents
1821 bt_putid (value, set->page_no);
1823 if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId, 1) )
1826 // delete old lower key to our node
1828 if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1) )
1831 // obtain delete and write locks to right node
1833 bt_unlockpage (BtLockParent, right->latch);
1834 bt_lockpage (BtLockDelete, right->latch);
1835 bt_lockpage (BtLockWrite, right->latch);
1836 bt_freepage (bt, right);
1838 bt_unlockpage (BtLockParent, set->latch);
1839 bt_unpinlatch (set->latch);
1840 bt_unpinpool (set->pool);
1845 BtKey bt_foundkey (BtDb *bt)
1847 return (BtKey)(bt->key);
1850 // advance to next slot
1852 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1854 BtLatchSet *prevlatch;
1858 if( slot < set->page->cnt )
1861 prevlatch = set->latch;
1862 prevpool = set->pool;
1864 if( page_no = bt_getid(set->page->right) )
1865 set->latch = bt_pinlatch (bt, page_no);
1867 return bt->err = BTERR_struct, 0;
1869 // pin page contents
1871 if( set->pool = bt_pinpool (bt, page_no) )
1872 set->page = bt_page (bt, set->pool, page_no);
1876 // obtain access lock using lock chaining with Access mode
1878 bt_lockpage(BtLockAccess, set->latch);
1880 bt_unlockpage(BtLockRead, prevlatch);
1881 bt_unpinlatch (prevlatch);
1882 bt_unpinpool (prevpool);
1884 bt_lockpage(BtLockRead, set->latch);
1885 bt_unlockpage(BtLockAccess, set->latch);
1887 set->page_no = page_no;
1891 // find unique key or first duplicate key in
1892 // leaf level and return number of value bytes
1893 // or (-1) if not found. Setup key for bt_foundkey
1895 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1903 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1905 ptr = keyptr(set->page, slot);
1907 // skip librarian slot place holder
1909 if( slotptr(set->page, slot)->type == Librarian )
1910 ptr = keyptr(set->page, ++slot);
1912 // return actual key found
1914 memcpy (bt->key, ptr, ptr->len + 1);
1917 if( slotptr(set->page, slot)->type == Duplicate )
1920 // if key exists, return >= 0 value bytes copied
1921 // otherwise return (-1)
1923 if( slotptr(set->page, slot)->dead )
1927 if( !memcmp (ptr->key, key, len) ) {
1928 val = valptr (set->page,slot);
1929 if( valmax > val->len )
1931 memcpy (value, val->value, valmax);
1937 } while( slot = bt_findnext (bt, set, slot) );
1939 bt_unlockpage (BtLockRead, set->latch);
1940 bt_unpinlatch (set->latch);
1941 bt_unpinpool (set->pool);
1945 // check page for space available,
1946 // clean if necessary and return
1947 // 0 - page needs splitting
1948 // >0 new slot value
1950 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1952 uint nxt = bt->mgr->page_size;
1953 uint cnt = 0, idx = 0;
1954 uint max = page->cnt;
1959 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
1962 // skip cleanup and proceed to split
1963 // if there's not enough garbage
1966 if( page->garbage < nxt / 5 )
1969 memcpy (bt->frame, page, bt->mgr->page_size);
1971 // skip page info and set rest of page to zero
1973 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1977 // clean up page first by
1978 // removing deleted keys
1980 while( cnt++ < max ) {
1983 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1986 // copy the value across
1988 val = valptr(bt->frame, cnt);
1989 nxt -= val->len + 1;
1990 ((unsigned char *)page)[nxt] = val->len;
1991 memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
1993 // copy the key across
1995 key = keyptr(bt->frame, cnt);
1996 nxt -= key->len + 1;
1997 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1999 // make a librarian slot
2002 slotptr(page, ++idx)->off = nxt;
2003 slotptr(page, idx)->type = Librarian;
2004 slotptr(page, idx)->dead = 1;
2009 slotptr(page, ++idx)->off = nxt;
2010 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2012 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2019 // see if page has enough space now, or does it need splitting?
2021 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
2027 // split the root and raise the height of the btree
2029 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, BtPageSet *right)
2031 uint nxt = bt->mgr->page_size;
2032 unsigned char value[BtId];
2035 // Obtain an empty page to use, and copy the current
2036 // root contents into it, e.g. lower keys
2038 if( bt_newpage(bt, left) )
2041 memcpy(left->page, root->page, bt->mgr->page_size);
2042 bt_unpinpool (left->pool);
2044 // set left from higher to lower page
2046 bt_putid (right->page->left, left->page_no);
2047 bt_unpinpool (right->pool);
2049 // preserve the page info at the bottom
2050 // of higher keys and set rest to zero
2052 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2054 // insert lower keys page fence key on newroot page as first key
2057 bt_putid (value, left->page_no);
2058 ((unsigned char *)root->page)[nxt] = BtId;
2059 memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
2061 nxt -= *leftkey + 1;
2062 memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
2063 slotptr(root->page, 1)->off = nxt;
2065 // insert stopper key on newroot page
2066 // and increase the root height
2068 nxt -= 3 + BtId + 1;
2069 ((unsigned char *)root->page)[nxt] = 2;
2070 ((unsigned char *)root->page)[nxt+1] = 0xff;
2071 ((unsigned char *)root->page)[nxt+2] = 0xff;
2073 bt_putid (value, right->page_no);
2074 ((unsigned char *)root->page)[nxt+3] = BtId;
2075 memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
2076 slotptr(root->page, 2)->off = nxt;
2078 bt_putid(root->page->right, 0);
2079 root->page->min = nxt; // reset lowest used offset and key count
2080 root->page->cnt = 2;
2081 root->page->act = 2;
2084 // release and unpin root
2086 bt_unlockpage(BtLockWrite, root->latch);
2087 bt_unpinlatch (root->latch);
2088 bt_unpinpool (root->pool);
2092 // split already locked full node
2095 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
2097 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2098 unsigned char fencekey[256], rightkey[256];
2099 unsigned char value[BtId];
2100 uint lvl = set->page->lvl;
2107 // split higher half of keys to bt->frame
2109 memset (bt->frame, 0, bt->mgr->page_size);
2110 max = set->page->cnt;
2114 while( cnt++ < max ) {
2115 if( slotptr(set->page, cnt)->dead && cnt < max )
2117 val = valptr(set->page, cnt);
2118 nxt -= val->len + 1;
2119 ((unsigned char *)bt->frame)[nxt] = val->len;
2120 memcpy ((unsigned char *)bt->frame + nxt + 1, val->value, val->len);
2122 key = keyptr(set->page, cnt);
2123 nxt -= key->len + 1;
2124 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
2126 // add librarian slot
2129 slotptr(bt->frame, ++idx)->off = nxt;
2130 slotptr(bt->frame, idx)->type = Librarian;
2131 slotptr(bt->frame, idx)->dead = 1;
2136 slotptr(bt->frame, ++idx)->off = nxt;
2137 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2139 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2143 // remember existing fence key for new page to the right
2145 memcpy (rightkey, key, key->len + 1);
2147 bt->frame->bits = bt->mgr->page_bits;
2148 bt->frame->min = nxt;
2149 bt->frame->cnt = idx;
2150 bt->frame->lvl = lvl;
2154 if( set->page_no > ROOT_page ) {
2155 bt_putid (bt->frame->right, bt_getid (set->page->right));
2156 bt_putid(bt->frame->left, set->page_no);
2159 // get new free page and write higher keys to it.
2161 if( bt_newpage(bt, right) )
2166 if( set->page_no > ROOT_page && !lvl )
2167 if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
2170 memcpy (right->page, bt->frame, bt->mgr->page_size);
2171 bt_unpinpool (right->pool);
2173 // update lower keys to continue in old page
2175 memcpy (bt->frame, set->page, bt->mgr->page_size);
2176 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2177 nxt = bt->mgr->page_size;
2178 set->page->garbage = 0;
2184 if( slotptr(bt->frame, max)->type == Librarian )
2187 // assemble page of smaller keys
2189 while( cnt++ < max ) {
2190 if( slotptr(bt->frame, cnt)->dead )
2192 val = valptr(bt->frame, cnt);
2193 nxt -= val->len + 1;
2194 ((unsigned char *)set->page)[nxt] = val->len;
2195 memcpy ((unsigned char *)set->page + nxt + 1, val->value, val->len);
2197 key = keyptr(bt->frame, cnt);
2198 nxt -= key->len + 1;
2199 memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
2201 // add librarian slot
2204 slotptr(set->page, ++idx)->off = nxt;
2205 slotptr(set->page, idx)->type = Librarian;
2206 slotptr(set->page, idx)->dead = 1;
2211 slotptr(set->page, ++idx)->off = nxt;
2212 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2216 // remember fence key for smaller page
2218 memcpy(fencekey, key, key->len + 1);
2220 bt_putid(set->page->right, right->page_no);
2221 set->page->min = nxt;
2222 set->page->cnt = idx;
2224 // if current page is the root page, split it
2226 if( set->page_no == ROOT_page )
2227 return bt_splitroot (bt, set, fencekey, right);
2229 // insert new fences in their parent pages
2231 right->latch = bt_pinlatch (bt, right->page_no);
2232 bt_lockpage (BtLockParent, right->latch);
2234 bt_lockpage (BtLockParent, set->latch);
2235 bt_unlockpage (BtLockWrite, set->latch);
2237 // insert new fence for reformulated left block of smaller keys
2239 bt_putid (value, set->page_no);
2241 if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 1) )
2244 // switch fence for right block of larger keys to new right page
2246 bt_putid (value, right->page_no);
2248 if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, 1) )
2251 bt_unlockpage (BtLockParent, set->latch);
2252 bt_unpinlatch (set->latch);
2253 bt_unpinpool (set->pool);
2255 bt_unlockpage (BtLockParent, right->latch);
2256 bt_unpinlatch (right->latch);
2260 // install new key and value onto page
2261 // page must already be checked for
2264 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2266 uint idx, librarian;
2269 // if found slot > desired slot and previous slot
2270 // is a librarian slot, use it
2273 if( slotptr(set->page, slot-1)->type == Librarian )
2276 // copy value onto page
2278 set->page->min -= vallen + 1; // reset lowest used offset
2279 ((unsigned char *)set->page)[set->page->min] = vallen;
2280 memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
2282 // copy key onto page
2284 set->page->min -= keylen + 1; // reset lowest used offset
2285 ((unsigned char *)set->page)[set->page->min] = keylen;
2286 memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
2288 // find first empty slot
2290 for( idx = slot; idx < set->page->cnt; idx++ )
2291 if( slotptr(set->page, idx)->dead )
2294 // now insert key into array before slot
2296 if( idx == set->page->cnt )
2297 idx += 2, set->page->cnt += 2, librarian = 2;
2303 while( idx > slot + librarian - 1 )
2304 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2306 // add librarian slot
2308 if( librarian > 1 ) {
2309 node = slotptr(set->page, slot++);
2310 node->off = set->page->min;
2311 node->type = Librarian;
2317 node = slotptr(set->page, slot);
2318 node->off = set->page->min;
2322 bt_unlockpage (BtLockWrite, set->latch);
2323 bt_unpinlatch (set->latch);
2324 bt_unpinpool (set->pool);
2328 // Insert new key into the btree at given level.
2329 // either add a new key or update/add an existing one
2331 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2333 unsigned char newkey[256];
2334 uint slot, idx, len;
2341 // set up the key we're working on
2343 memcpy (newkey + 1, key, keylen);
2346 // is this a non-unique index value?
2352 sequence = bt_newdup (bt);
2353 bt_putid (newkey + *newkey + 1, sequence);
2357 while( 1 ) { // find the page and slot for the current key
2358 if( slot = bt_loadpage (bt, set, newkey + 1, *newkey, lvl, BtLockWrite) )
2359 ptr = keyptr(set->page, slot);
2362 bt->err = BTERR_ovflw;
2366 // if librarian slot == found slot, advance to real slot
2368 if( slotptr(set->page, slot)->type == Librarian )
2369 if( !keycmp (ptr, key, keylen) )
2370 ptr = keyptr(set->page, ++slot);
2374 if( slotptr(set->page, slot)->type == Duplicate )
2377 // if inserting a duplicate key or unique key
2378 // check for adequate space on the page
2379 // and insert the new key before slot.
2381 if( unique && (len != *newkey || memcmp (ptr->key, newkey+1, *newkey)) || !unique ) {
2382 if( !(slot = bt_cleanpage (bt, set->page, *newkey, slot, vallen)) )
2383 if( bt_splitpage (bt, set) )
2388 return bt_insertslot (bt, set, slot, newkey + 1, *newkey, value, vallen, type);
2391 // if key already exists, update value and return
2393 if( val = valptr(set->page, slot), val->len >= vallen ) {
2394 if( slotptr(set->page, slot)->dead )
2396 set->page->garbage += val->len - vallen;
2397 slotptr(set->page, slot)->dead = 0;
2399 memcpy (val->value, value, vallen);
2400 bt_unlockpage(BtLockWrite, set->latch);
2401 bt_unpinlatch (set->latch);
2402 bt_unpinpool (set->pool);
2406 // new update value doesn't fit in existing value area
2408 if( !slotptr(set->page, slot)->dead )
2409 set->page->garbage += val->len + ptr->len + 2;
2411 slotptr(set->page, slot)->dead = 0;
2415 if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
2416 if( bt_splitpage (bt, set) )
2421 set->page->min -= vallen + 1;
2422 ((unsigned char *)set->page)[set->page->min] = vallen;
2423 memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen);
2425 set->page->min -= keylen + 1;
2426 ((unsigned char *)set->page)[set->page->min] = keylen;
2427 memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen);
2429 slotptr(set->page, slot)->off = set->page->min;
2430 bt_unlockpage(BtLockWrite, set->latch);
2431 bt_unpinlatch (set->latch);
2432 bt_unpinpool (set->pool);
2438 // set cursor to highest slot on highest page
2440 uint bt_lastkey (BtDb *bt)
2442 uid page_no = bt_getid (bt->mgr->latchmgr->alloc->left);
2446 if( set->pool = bt_pinpool (bt, page_no) )
2447 set->page = bt_page (bt, set->pool, page_no);
2451 set->latch = bt_pinlatch (bt, page_no);
2452 bt_lockpage(BtLockRead, set->latch);
2454 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2455 slot = set->page->cnt;
2457 bt_unlockpage(BtLockRead, set->latch);
2458 bt_unpinlatch (set->latch);
2459 bt_unpinpool (set->pool);
2464 // return previous slot on cursor page
2466 uint bt_prevkey (BtDb *bt, uint slot)
2474 if( left = bt_getid(bt->cursor->left) )
2475 bt->cursor_page = left;
2479 if( set->pool = bt_pinpool (bt, left) )
2480 set->page = bt_page (bt, set->pool, left);
2484 set->latch = bt_pinlatch (bt, left);
2485 bt_lockpage(BtLockRead, set->latch);
2487 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2489 bt_unlockpage(BtLockRead, set->latch);
2490 bt_unpinlatch (set->latch);
2491 bt_unpinpool (set->pool);
2492 return bt->cursor->cnt;
2495 // return next slot on cursor page
2496 // or slide cursor right into next page
2498 uint bt_nextkey (BtDb *bt, uint slot)
2504 right = bt_getid(bt->cursor->right);
2506 while( slot++ < bt->cursor->cnt )
2507 if( slotptr(bt->cursor,slot)->dead )
2509 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2517 bt->cursor_page = right;
2519 if( set->pool = bt_pinpool (bt, right) )
2520 set->page = bt_page (bt, set->pool, right);
2524 set->latch = bt_pinlatch (bt, right);
2525 bt_lockpage(BtLockRead, set->latch);
2527 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2529 bt_unlockpage(BtLockRead, set->latch);
2530 bt_unpinlatch (set->latch);
2531 bt_unpinpool (set->pool);
2539 // cache page of keys into cursor and return starting slot for given key
2541 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2546 // cache page for retrieval
2548 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2549 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2553 bt->cursor_page = set->page_no;
2555 bt_unlockpage(BtLockRead, set->latch);
2556 bt_unpinlatch (set->latch);
2557 bt_unpinpool (set->pool);
2561 BtKey bt_key(BtDb *bt, uint slot)
2563 return keyptr(bt->cursor, slot);
2566 BtVal bt_val(BtDb *bt, uint slot)
2568 return valptr(bt->cursor,slot);
2574 double getCpuTime(int type)
2577 FILETIME xittime[1];
2578 FILETIME systime[1];
2579 FILETIME usrtime[1];
2580 SYSTEMTIME timeconv[1];
2583 memset (timeconv, 0, sizeof(SYSTEMTIME));
2587 GetSystemTimeAsFileTime (xittime);
2588 FileTimeToSystemTime (xittime, timeconv);
2589 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2592 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2593 FileTimeToSystemTime (usrtime, timeconv);
2596 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2597 FileTimeToSystemTime (systime, timeconv);
2601 ans += (double)timeconv->wHour * 3600;
2602 ans += (double)timeconv->wMinute * 60;
2603 ans += (double)timeconv->wSecond;
2604 ans += (double)timeconv->wMilliseconds / 1000;
2609 #include <sys/resource.h>
2611 double getCpuTime(int type)
2613 struct rusage used[1];
2614 struct timeval tv[1];
2618 gettimeofday(tv, NULL);
2619 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2622 getrusage(RUSAGE_SELF, used);
2623 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2626 getrusage(RUSAGE_SELF, used);
2627 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2634 uint bt_latchaudit (BtDb *bt)
2636 ushort idx, hashidx;
2643 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2645 if( *(ushort *)(bt->mgr->latchmgr->lock) )
2646 fprintf(stderr, "Alloc page locked\n");
2647 *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2649 for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2650 latch = bt->mgr->latchsets + idx;
2651 if( *latch->readwr->rin & MASK )
2652 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2653 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2655 if( *latch->access->rin & MASK )
2656 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2657 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2659 if( *latch->parent->rin & MASK )
2660 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2661 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2664 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2669 for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2670 if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2671 fprintf(stderr, "hash entry %d locked\n", hashidx);
2673 *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2675 if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2676 latch = bt->mgr->latchsets + idx;
2677 if( *(ushort *)latch->busy )
2678 fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2679 *(ushort *)latch->busy = 0;
2681 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2682 } while( idx = latch->next );
2685 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2686 page_no = LEAF_page;
2688 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2689 off64_t off = page_no << bt->mgr->page_bits;
2691 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2695 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2697 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2698 fprintf(stderr, "page %.8x unable to read\n", page_no);
2700 if( *amt < bt->mgr->page_size )
2701 fprintf(stderr, "page %.8x unable to read\n", page_no);
2703 if( !bt->frame->free ) {
2704 for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2705 ptr = keyptr(bt->frame, idx+1);
2706 if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2707 fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2709 if( !bt->frame->lvl )
2710 cnt += bt->frame->act;
2712 if( page_no > LEAF_page )
2727 // standalone program to index file of keys
2728 // then list them onto std-out
2731 void *index_file (void *arg)
2733 uint __stdcall index_file (void *arg)
2736 int line = 0, found = 0, cnt = 0;
2737 uid next, page_no = LEAF_page; // start on first page of leaves
2738 unsigned char key[256];
2739 ThreadArg *args = arg;
2740 int ch, len = 0, slot;
2746 bt = bt_open (args->mgr);
2748 switch(args->type[0] | 0x20)
2751 fprintf(stderr, "started latch mgr audit\n");
2752 cnt = bt_latchaudit (bt);
2753 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2757 fprintf(stderr, "started pennysort for %s\n", args->infile);
2758 if( in = fopen (args->infile, "rb") )
2759 while( ch = getc(in), ch != EOF )
2764 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 0) )
2765 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2768 else if( len < 255 )
2770 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2774 fprintf(stderr, "started indexing for %s\n", args->infile);
2775 if( in = fopen (args->infile, "rb") )
2776 while( ch = getc(in), ch != EOF )
2781 if( args->num == 1 )
2782 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2784 else if( args->num )
2785 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2787 if( bt_insertkey (bt, key, len, 0, NULL, 0, 0) )
2788 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2791 else if( len < 255 )
2793 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2797 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2798 if( in = fopen (args->infile, "rb") )
2799 while( ch = getc(in), ch != EOF )
2803 if( args->num == 1 )
2804 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2806 else if( args->num )
2807 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2809 if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2810 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2811 ptr = (BtKey)(bt->key);
2814 if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2815 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2818 else if( len < 255 )
2820 fprintf(stderr, "finished %s for %d keys, %d found\n", args->infile, line, found);
2824 fprintf(stderr, "started finding keys for %s\n", args->infile);
2825 if( in = fopen (args->infile, "rb") )
2826 while( ch = getc(in), ch != EOF )
2830 if( args->num == 1 )
2831 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2833 else if( args->num )
2834 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2836 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2839 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2842 else if( len < 255 )
2844 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2848 fprintf(stderr, "started scanning\n");
2850 if( set->pool = bt_pinpool (bt, page_no) )
2851 set->page = bt_page (bt, set->pool, page_no);
2854 set->latch = bt_pinlatch (bt, page_no);
2855 bt_lockpage (BtLockRead, set->latch);
2856 next = bt_getid (set->page->right);
2858 for( slot = 0; slot++ < set->page->cnt; )
2859 if( next || slot < set->page->cnt )
2860 if( !slotptr(set->page, slot)->dead ) {
2861 ptr = keyptr(set->page, slot);
2864 if( slotptr(set->page, slot)->type == Duplicate )
2867 fwrite (ptr->key, len, 1, stdout);
2868 fputc ('\n', stdout);
2872 bt_unlockpage (BtLockRead, set->latch);
2873 bt_unpinlatch (set->latch);
2874 bt_unpinpool (set->pool);
2875 } while( page_no = next );
2877 fprintf(stderr, " Total keys read %d\n", cnt);
2881 fprintf(stderr, "started reverse scan\n");
2882 if( slot = bt_lastkey (bt) )
2883 while( slot = bt_prevkey (bt, slot) ) {
2884 if( slotptr(bt->cursor, slot)->dead )
2887 ptr = keyptr(bt->cursor, slot);
2890 if( slotptr(bt->cursor, slot)->type == Duplicate )
2893 fwrite (ptr->key, len, 1, stdout);
2894 fputc ('\n', stdout);
2898 fprintf(stderr, " Total keys read %d\n", cnt);
2903 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2905 fprintf(stderr, "started counting\n");
2906 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2907 page_no = LEAF_page;
2909 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2910 uid off = page_no << bt->mgr->page_bits;
2912 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2916 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2918 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2919 return bt->err = BTERR_map;
2921 if( *amt < bt->mgr->page_size )
2922 return bt->err = BTERR_map;
2924 if( !bt->frame->free && !bt->frame->lvl )
2925 cnt += bt->frame->act;
2926 if( page_no > LEAF_page )
2931 cnt--; // remove stopper key
2932 fprintf(stderr, " Total keys read %d\n", cnt);
2944 typedef struct timeval timer;
2946 int main (int argc, char **argv)
2948 int idx, cnt, len, slot, err;
2949 int segsize, bits = 16;
2966 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2967 fprintf (stderr, " where page_bits is the page size in bits\n");
2968 fprintf (stderr, " mapped_segments is the number of mmap segments in buffer pool\n");
2969 fprintf (stderr, " seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2970 fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n");
2971 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
2975 start = getCpuTime(0);
2978 bits = atoi(argv[3]);
2981 poolsize = atoi(argv[4]);
2984 fprintf (stderr, "Warning: no mapped_pool\n");
2986 if( poolsize > 65535 )
2987 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2990 segsize = atoi(argv[5]);
2992 segsize = 4; // 16 pages per mmap segment
2995 num = atoi(argv[6]);
2999 threads = malloc (cnt * sizeof(pthread_t));
3001 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3003 args = malloc (cnt * sizeof(ThreadArg));
3005 mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
3008 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3014 for( idx = 0; idx < cnt; idx++ ) {
3015 args[idx].infile = argv[idx + 7];
3016 args[idx].type = argv[2];
3017 args[idx].mgr = mgr;
3018 args[idx].num = num;
3019 args[idx].idx = idx;
3021 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3022 fprintf(stderr, "Error creating thread %d\n", err);
3024 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3028 // wait for termination
3031 for( idx = 0; idx < cnt; idx++ )
3032 pthread_join (threads[idx], NULL);
3034 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3036 for( idx = 0; idx < cnt; idx++ )
3037 CloseHandle(threads[idx]);
3040 elapsed = getCpuTime(0) - start;
3041 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3042 elapsed = getCpuTime(1);
3043 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3044 elapsed = getCpuTime(2);
3045 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);