1 // btree version threadskv5 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // and bi-directional cursors
10 // author: karl malbrain, malbrain@cal.berkeley.edu
13 This work, including the source code, documentation
14 and related data, is placed into the public domain.
16 The orginal author is Karl Malbrain.
18 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
19 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
20 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
21 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
22 RESULTING FROM THE USE, MODIFICATION, OR
23 REDISTRIBUTION OF THIS SOFTWARE.
26 // Please see the project home page for documentation
27 // code.google.com/p/high-concurrency-btree
29 #define _FILE_OFFSET_BITS 64
30 #define _LARGEFILE64_SOURCE
46 #define WIN32_LEAN_AND_MEAN
60 typedef unsigned long long uid;
63 typedef unsigned long long off64_t;
64 typedef unsigned short ushort;
65 typedef unsigned int uint;
68 #define BT_latchtable 128 // number of latch manager slots
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
79 There are five lock types for each node in three independent sets:
80 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
81 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
82 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
83 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
84 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
95 // definition for phase-fair reader/writer lock implementation
109 // definition for spin latch implementation
111 // exclusive is set for write access
112 // share is count of read accessors
113 // grant write lock when share == 0
115 volatile typedef struct {
126 // hash table entries
129 BtSpinLatch latch[1];
130 volatile ushort slot; // Latch table entry at head of chain
133 // latch manager table structure
136 RWLock readwr[1]; // read/write page lock
137 RWLock access[1]; // Access Intent/Page delete
138 RWLock parent[1]; // Posting of fence key in parent
139 BtSpinLatch busy[1]; // slot is being moved between chains
140 volatile ushort next; // next entry in hash table chain
141 volatile ushort prev; // prev entry in hash table chain
142 volatile ushort pin; // number of outstanding locks
143 volatile ushort hash; // hash slot entry is under
144 volatile uid page_no; // latch set page number
147 // Define the length of the page and key pointers
151 // Page key slot definition.
153 // Keys are marked dead, but remain on the page until
154 // it cleanup is called. The fence key (highest key) for
155 // a leaf page is always present, even after cleanup.
159 // In addition to the Unique keys that occupy slots
160 // there are Librarian and Duplicate key
161 // slots occupying the key slot array.
163 // The Librarian slots are dead keys that
164 // serve as filler, available to add new Unique
165 // or Dup slots that are inserted into the B-tree.
167 // The Duplicate slots have had their key bytes extended
168 // by 6 bytes to contain a binary duplicate key uniqueifier.
177 uint off:BT_maxbits; // page offset for key start
178 uint type:3; // type of slot
179 uint dead:1; // set for deleted slot
182 // The key structure occupies space at the upper end of
183 // each page. It's a length byte followed by the key
188 unsigned char key[1];
191 // the value structure also occupies space at the upper
192 // end of the page. Each key is immediately followed by a value.
196 unsigned char value[1];
199 // The first part of an index page.
200 // It is immediately followed
201 // by the BtSlot array of keys.
203 // note that this structure size
204 // must be a multiple of 8 bytes
205 // in order to place dups correctly.
207 typedef struct BtPage_ {
208 uint cnt; // count of keys in page
209 uint act; // count of active keys
210 uint min; // next key offset
211 uint garbage; // page garbage in bytes
212 unsigned char bits:7; // page size in bits
213 unsigned char free:1; // page is on free chain
214 unsigned char lvl:7; // level of page
215 unsigned char kill:1; // page is being deleted
216 unsigned char left[BtId]; // page number to left
217 unsigend char filler[2]; // padding to multiple of 8
218 unsigned char right[BtId]; // page number to right
221 // The memory mapping pool table buffer manager entry
224 uid basepage; // mapped base page number
225 char *map; // mapped memory pointer
226 ushort slot; // slot index in this array
227 ushort pin; // mapped page pin counter
228 void *hashprev; // previous pool entry for the same hash idx
229 void *hashnext; // next pool entry for the same hash idx
231 HANDLE hmap; // Windows memory mapping handle
235 #define CLOCK_bit 0x8000 // bit in pool->pin
237 // The loadpage interface object
240 uid page_no; // current page number
241 BtPage page; // current page pointer
242 BtPool *pool; // current page pool
243 BtLatchSet *latch; // current page latch set
246 // structure for latch manager on ALLOC_page
249 struct BtPage_ alloc[1]; // next page_no in right ptr
250 unsigned long long dups[1]; // global duplicate key uniqueifier
251 unsigned char chain[BtId]; // head of free page_nos chain
252 BtSpinLatch lock[1]; // allocation area lite latch
253 ushort latchdeployed; // highest number of latch entries deployed
254 ushort nlatchpage; // number of latch pages at BT_latch
255 ushort latchtotal; // number of page latch entries
256 ushort latchhash; // number of latch hash table slots
257 ushort latchvictim; // next latch entry to examine
258 BtHashEntry table[0]; // the hash table
261 // The object structure for Btree access
264 uint page_size; // page size
265 uint page_bits; // page size in bits
266 uint seg_bits; // seg size in pages in bits
267 uint mode; // read-write mode
273 ushort poolcnt; // highest page pool node in use
274 ushort poolmax; // highest page pool node allocated
275 ushort poolmask; // total number of pages in mmap segment - 1
276 ushort hashsize; // size of Hash Table for pool entries
277 volatile uint evicted; // last evicted hash table slot
278 ushort *hash; // pool index for hash entries
279 BtSpinLatch *latch; // latches for hash table slots
280 BtLatchMgr *latchmgr; // mapped latch page from allocation page
281 BtLatchSet *latchsets; // mapped latch set from latch pages
282 BtPool *pool; // memory pool page segments
284 HANDLE halloc; // allocation and latch table handle
289 BtMgr *mgr; // buffer manager for thread
290 BtPage cursor; // cached frame for start/next (never mapped)
291 BtPage frame; // spare frame for the page split (never mapped)
292 uid cursor_page; // current cursor page number
293 unsigned char *mem; // frame, cursor, page memory buffer
294 unsigned char key[256]; // last found complete key
295 int found; // last delete or insert was found
296 int err; // last error
310 extern void bt_close (BtDb *bt);
311 extern BtDb *bt_open (BtMgr *mgr);
312 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
313 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
314 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
315 extern BtKey bt_foundkey (BtDb *bt);
316 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
317 extern uint bt_nextkey (BtDb *bt, uint slot);
320 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
321 void bt_mgrclose (BtMgr *mgr);
323 // Helper functions to return slot values
324 // from the cursor page.
326 extern BtKey bt_key (BtDb *bt, uint slot);
327 extern BtVal bt_val (BtDb *bt, uint slot);
329 // BTree page number constants
330 #define ALLOC_page 0 // allocation & latch manager hash table
331 #define ROOT_page 1 // root of the btree
332 #define LEAF_page 2 // first page of leaves
333 #define LATCH_page 3 // pages for latch manager
335 // Number of levels to create in a new BTree
339 // The page is allocated from low and hi ends.
340 // The key slots are allocated from the bottom,
341 // while the text and value of the key
342 // are allocated from the top. When the two
343 // areas meet, the page is split into two.
345 // A key consists of a length byte, two bytes of
346 // index number (0 - 65534), and up to 253 bytes
349 // Associated with each key is a value byte string
350 // containing any value desired.
352 // The b-tree root is always located at page 1.
353 // The first leaf page of level zero is always
354 // located on page 2.
356 // The b-tree pages are linked with next
357 // pointers to facilitate enumerators,
358 // and provide for concurrency.
360 // When to root page fills, it is split in two and
361 // the tree height is raised by a new root at page
362 // one with two keys.
364 // Deleted keys are marked with a dead bit until
365 // page cleanup. The fence key for a leaf node is
368 // Groups of pages called segments from the btree are optionally
369 // cached with a memory mapped pool. A hash table is used to keep
370 // track of the cached segments. This behaviour is controlled
371 // by the cache block size parameter to bt_open.
373 // To achieve maximum concurrency one page is locked at a time
374 // as the tree is traversed to find leaf key in question. The right
375 // page numbers are used in cases where the page is being split,
378 // Page 0 is dedicated to lock for new page extensions,
379 // and chains empty pages together for reuse. It also
380 // contains the latch manager hash table.
382 // The ParentModification lock on a node is obtained to serialize posting
383 // or changing the fence key for a node.
385 // Empty pages are chained together through the ALLOC page and reused.
387 // Access macros to address slot and key values from the page
388 // Page slots use 1 based indexing.
390 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
391 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
392 #define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
394 void bt_putid(unsigned char *dest, uid id)
399 dest[i] = (unsigned char)id, id >>= 8;
402 uid bt_getid(unsigned char *src)
407 for( i = 0; i < BtId; i++ )
408 id <<= 8, id |= *src++;
413 uid bt_newdup (BtDb *bt)
416 return __sync_fetch_and_add (bt->mgr->latchmgr->dups, 1) + 1;
418 return _InterlockedIncrement64(bt->mgr->latchmgr->dups, 1);
422 // Phase-Fair reader/writer lock implementation
424 void WriteLock (RWLock *lock)
429 tix = __sync_fetch_and_add (lock->ticket, 1);
431 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
433 // wait for our ticket to come up
435 while( tix != lock->serving[0] )
442 w = PRES | (tix & PHID);
444 r = __sync_fetch_and_add (lock->rin, w);
446 r = _InterlockedExchangeAdd16 (lock->rin, w);
448 while( r != *lock->rout )
456 void WriteRelease (RWLock *lock)
459 __sync_fetch_and_and (lock->rin, ~MASK);
461 _InterlockedAnd16 (lock->rin, ~MASK);
466 void ReadLock (RWLock *lock)
470 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
472 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
475 while( w == (*lock->rin & MASK) )
483 void ReadRelease (RWLock *lock)
486 __sync_fetch_and_add (lock->rout, RINC);
488 _InterlockedExchangeAdd16 (lock->rout, RINC);
492 // Spin Latch Manager
494 // wait until write lock mode is clear
495 // and add 1 to the share count
497 void bt_spinreadlock(BtSpinLatch *latch)
503 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
505 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
507 // see if exclusive request is granted or pending
512 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
514 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
517 } while( sched_yield(), 1 );
519 } while( SwitchToThread(), 1 );
523 // wait for other read and write latches to relinquish
525 void bt_spinwritelock(BtSpinLatch *latch)
531 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
533 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
536 if( !(prev & ~BOTH) )
540 __sync_fetch_and_and ((ushort *)latch, ~XCL);
542 _InterlockedAnd16((ushort *)latch, ~XCL);
545 } while( sched_yield(), 1 );
547 } while( SwitchToThread(), 1 );
551 // try to obtain write lock
553 // return 1 if obtained,
556 int bt_spinwritetry(BtSpinLatch *latch)
561 prev = __sync_fetch_and_or((ushort *)latch, XCL);
563 prev = _InterlockedOr16((ushort *)latch, XCL);
565 // take write access if all bits are clear
568 if( !(prev & ~BOTH) )
572 __sync_fetch_and_and ((ushort *)latch, ~XCL);
574 _InterlockedAnd16((ushort *)latch, ~XCL);
581 void bt_spinreleasewrite(BtSpinLatch *latch)
584 __sync_fetch_and_and((ushort *)latch, ~BOTH);
586 _InterlockedAnd16((ushort *)latch, ~BOTH);
590 // decrement reader count
592 void bt_spinreleaseread(BtSpinLatch *latch)
595 __sync_fetch_and_add((ushort *)latch, -SHARE);
597 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
601 // link latch table entry into latch hash table
603 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
605 BtLatchSet *set = bt->mgr->latchsets + victim;
607 if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
608 bt->mgr->latchsets[set->next].prev = victim;
610 bt->mgr->latchmgr->table[hashidx].slot = victim;
611 set->page_no = page_no;
618 void bt_unpinlatch (BtLatchSet *set)
621 __sync_fetch_and_add(&set->pin, -1);
623 _InterlockedDecrement16 (&set->pin);
627 // find existing latchset or inspire new one
628 // return with latchset pinned
630 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
632 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
633 ushort slot, avail = 0, victim, idx;
636 // obtain read lock on hash table entry
638 bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
640 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
642 set = bt->mgr->latchsets + slot;
643 if( page_no == set->page_no )
645 } while( slot = set->next );
649 __sync_fetch_and_add(&set->pin, 1);
651 _InterlockedIncrement16 (&set->pin);
655 bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
660 // try again, this time with write lock
662 bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
664 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
666 set = bt->mgr->latchsets + slot;
667 if( page_no == set->page_no )
669 if( !set->pin && !avail )
671 } while( slot = set->next );
673 // found our entry, or take over an unpinned one
675 if( slot || (slot = avail) ) {
676 set = bt->mgr->latchsets + slot;
678 __sync_fetch_and_add(&set->pin, 1);
680 _InterlockedIncrement16 (&set->pin);
682 set->page_no = page_no;
683 bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
687 // see if there are any unused entries
689 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
691 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
694 if( victim < bt->mgr->latchmgr->latchtotal ) {
695 set = bt->mgr->latchsets + victim;
697 __sync_fetch_and_add(&set->pin, 1);
699 _InterlockedIncrement16 (&set->pin);
701 bt_latchlink (bt, hashidx, victim, page_no);
702 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
707 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
709 victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
711 // find and reuse previous lock entry
715 victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
717 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
719 // we don't use slot zero
721 if( victim %= bt->mgr->latchmgr->latchtotal )
722 set = bt->mgr->latchsets + victim;
726 // take control of our slot
727 // from other threads
729 if( set->pin || !bt_spinwritetry (set->busy) )
734 // try to get write lock on hash chain
735 // skip entry if not obtained
736 // or has outstanding locks
738 if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
739 bt_spinreleasewrite (set->busy);
744 bt_spinreleasewrite (set->busy);
745 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
749 // unlink our available victim from its hash chain
752 bt->mgr->latchsets[set->prev].next = set->next;
754 bt->mgr->latchmgr->table[idx].slot = set->next;
757 bt->mgr->latchsets[set->next].prev = set->prev;
759 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
761 __sync_fetch_and_add(&set->pin, 1);
763 _InterlockedIncrement16 (&set->pin);
765 bt_latchlink (bt, hashidx, victim, page_no);
766 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
767 bt_spinreleasewrite (set->busy);
772 void bt_mgrclose (BtMgr *mgr)
777 // release mapped pages
778 // note that slot zero is never used
780 for( slot = 1; slot < mgr->poolmax; slot++ ) {
781 pool = mgr->pool + slot;
784 munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
787 FlushViewOfFile(pool->map, 0);
788 UnmapViewOfFile(pool->map);
789 CloseHandle(pool->hmap);
795 munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
796 munmap (mgr->latchmgr, 2 * mgr->page_size);
798 FlushViewOfFile(mgr->latchmgr, 0);
799 UnmapViewOfFile(mgr->latchmgr);
800 CloseHandle(mgr->halloc);
806 free ((void *)mgr->latch);
809 FlushFileBuffers(mgr->idx);
810 CloseHandle(mgr->idx);
811 GlobalFree (mgr->pool);
812 GlobalFree (mgr->hash);
813 GlobalFree ((void *)mgr->latch);
818 // close and release memory
820 void bt_close (BtDb *bt)
827 VirtualFree (bt->mem, 0, MEM_RELEASE);
832 // open/create new btree buffer manager
834 // call with file_name, BT_openmode, bits in page size (e.g. 16),
835 // size of mapped page pool (e.g. 8192)
837 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
839 uint lvl, attr, cacheblk, last, slot, idx;
840 uint nlatchpage, latchhash;
841 unsigned char value[BtId];
842 BtLatchMgr *latchmgr;
851 SYSTEM_INFO sysinfo[1];
854 // determine sanity of page size and buffer pool
856 if( bits > BT_maxbits )
858 else if( bits < BT_minbits )
862 return NULL; // must have buffer pool
865 mgr = calloc (1, sizeof(BtMgr));
867 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
870 return free(mgr), NULL;
872 cacheblk = 4096; // minimum mmap segment size for unix
875 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
876 attr = FILE_ATTRIBUTE_NORMAL;
877 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
879 if( mgr->idx == INVALID_HANDLE_VALUE )
880 return GlobalFree(mgr), NULL;
882 // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
883 GetSystemInfo(sysinfo);
884 cacheblk = sysinfo->dwAllocationGranularity;
888 latchmgr = malloc (BT_maxpage);
891 // read minimum page size to get root info
893 if( size = lseek (mgr->idx, 0L, 2) ) {
894 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
895 bits = latchmgr->alloc->bits;
897 return free(mgr), free(latchmgr), NULL;
898 } else if( mode == BT_ro )
899 return free(latchmgr), bt_mgrclose (mgr), NULL;
901 latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
902 size = GetFileSize(mgr->idx, amt);
905 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
906 return bt_mgrclose (mgr), NULL;
907 bits = latchmgr->alloc->bits;
908 } else if( mode == BT_ro )
909 return bt_mgrclose (mgr), NULL;
912 mgr->page_size = 1 << bits;
913 mgr->page_bits = bits;
915 mgr->poolmax = poolmax;
918 if( cacheblk < mgr->page_size )
919 cacheblk = mgr->page_size;
921 // mask for partial memmaps
923 mgr->poolmask = (cacheblk >> bits) - 1;
925 // see if requested size of pages per memmap is greater
927 if( (1 << segsize) > mgr->poolmask )
928 mgr->poolmask = (1 << segsize) - 1;
932 while( (1 << mgr->seg_bits) <= mgr->poolmask )
935 mgr->hashsize = hashsize;
938 mgr->pool = calloc (poolmax, sizeof(BtPool));
939 mgr->hash = calloc (hashsize, sizeof(ushort));
940 mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
942 mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
943 mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
944 mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
950 // initialize an empty b-tree with latch page, root page, page of leaves
951 // and page(s) of latches
953 memset (latchmgr, 0, 1 << bits);
954 nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1;
955 bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
956 latchmgr->alloc->bits = mgr->page_bits;
958 latchmgr->nlatchpage = nlatchpage;
959 latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
961 // initialize latch manager
963 latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
965 // size of hash table = total number of latchsets
967 if( latchhash > latchmgr->latchtotal )
968 latchhash = latchmgr->latchtotal;
970 latchmgr->latchhash = latchhash;
973 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
974 return bt_mgrclose (mgr), NULL;
976 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
977 return bt_mgrclose (mgr), NULL;
979 if( *amt < mgr->page_size )
980 return bt_mgrclose (mgr), NULL;
983 memset (latchmgr, 0, 1 << bits);
984 latchmgr->alloc->bits = mgr->page_bits;
986 for( lvl=MIN_lvl; lvl--; ) {
987 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
988 key = keyptr(latchmgr->alloc, 1);
989 key->len = 2; // create stopper key
993 bt_putid(value, MIN_lvl - lvl + 1);
994 val = valptr(latchmgr->alloc, 1);
995 val->len = lvl ? BtId : 0;
996 memcpy (val->value, value, val->len);
998 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
999 latchmgr->alloc->lvl = lvl;
1000 latchmgr->alloc->cnt = 1;
1001 latchmgr->alloc->act = 1;
1003 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
1004 return bt_mgrclose (mgr), NULL;
1006 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1007 return bt_mgrclose (mgr), NULL;
1009 if( *amt < mgr->page_size )
1010 return bt_mgrclose (mgr), NULL;
1014 // clear out latch manager locks
1015 // and rest of pages to round out segment
1017 memset(latchmgr, 0, mgr->page_size);
1020 while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
1022 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
1024 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
1025 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1026 return bt_mgrclose (mgr), NULL;
1027 if( *amt < mgr->page_size )
1028 return bt_mgrclose (mgr), NULL;
1035 // mlock the root page and the latchmgr page
1037 flag = PROT_READ | PROT_WRITE;
1038 mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1039 if( mgr->latchmgr == MAP_FAILED )
1040 return bt_mgrclose (mgr), NULL;
1041 mlock (mgr->latchmgr, 2 * mgr->page_size);
1043 mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1044 if( mgr->latchsets == MAP_FAILED )
1045 return bt_mgrclose (mgr), NULL;
1046 mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1048 flag = PAGE_READWRITE;
1049 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1051 return bt_mgrclose (mgr), NULL;
1053 flag = FILE_MAP_WRITE;
1054 mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1055 if( !mgr->latchmgr )
1056 return GetLastError(), bt_mgrclose (mgr), NULL;
1058 mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1064 VirtualFree (latchmgr, 0, MEM_RELEASE);
1069 // open BTree access method
1070 // based on buffer manager
1072 BtDb *bt_open (BtMgr *mgr)
1074 BtDb *bt = malloc (sizeof(*bt));
1076 memset (bt, 0, sizeof(*bt));
1079 bt->mem = malloc (2 *mgr->page_size);
1081 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1083 bt->frame = (BtPage)bt->mem;
1084 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1088 // compare two keys, returning > 0, = 0, or < 0
1089 // as the comparison value
1091 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1093 uint len1 = key1->len;
1096 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1109 // find segment in pool
1110 // must be called with hashslot idx locked
1111 // return NULL if not there
1112 // otherwise return node
1114 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1119 // compute start of hash chain in pool
1121 if( slot = bt->mgr->hash[idx] )
1122 pool = bt->mgr->pool + slot;
1126 page_no &= ~bt->mgr->poolmask;
1128 while( pool->basepage != page_no )
1129 if( pool = pool->hashnext )
1137 // add segment to hash table
1139 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1144 pool->hashprev = pool->hashnext = NULL;
1145 pool->basepage = page_no & ~bt->mgr->poolmask;
1146 pool->pin = CLOCK_bit + 1;
1148 if( slot = bt->mgr->hash[idx] ) {
1149 node = bt->mgr->pool + slot;
1150 pool->hashnext = node;
1151 node->hashprev = pool;
1154 bt->mgr->hash[idx] = pool->slot;
1157 // map new buffer pool segment to virtual memory
1159 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1161 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1162 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1166 flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1167 pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1168 if( pool->map == MAP_FAILED )
1169 return bt->err = BTERR_map;
1172 flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1173 pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1175 return bt->err = BTERR_map;
1177 flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1178 pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1180 return bt->err = BTERR_map;
1185 // calculate page within pool
1187 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1189 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1192 page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1193 // madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1199 void bt_unpinpool (BtPool *pool)
1202 __sync_fetch_and_add(&pool->pin, -1);
1204 _InterlockedDecrement16 (&pool->pin);
1208 // find or place requested page in segment-pool
1209 // return pool table entry, incrementing pin
1211 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1213 uint slot, hashidx, idx, victim;
1214 BtPool *pool, *node, *next;
1216 // lock hash table chain
1218 hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1219 bt_spinwritelock (&bt->mgr->latch[hashidx]);
1221 // look up in hash table
1223 if( pool = bt_findpool(bt, page_no, hashidx) ) {
1225 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1226 __sync_fetch_and_add(&pool->pin, 1);
1228 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1229 _InterlockedIncrement16 (&pool->pin);
1231 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1235 // allocate a new pool node
1236 // and add to hash table
1239 slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1241 slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1244 if( ++slot < bt->mgr->poolmax ) {
1245 pool = bt->mgr->pool + slot;
1248 if( bt_mapsegment(bt, pool, page_no) )
1251 bt_linkhash(bt, pool, page_no, hashidx);
1252 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1256 // pool table is full
1257 // find best pool entry to evict
1260 __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1262 _InterlockedDecrement16 (&bt->mgr->poolcnt);
1267 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1269 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1271 victim %= bt->mgr->poolmax;
1272 pool = bt->mgr->pool + victim;
1273 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1278 // try to get write lock
1279 // skip entry if not obtained
1281 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1284 // skip this entry if
1286 // or clock bit is set
1290 __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1292 _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1294 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1298 // unlink victim pool node from hash table
1300 if( node = pool->hashprev )
1301 node->hashnext = pool->hashnext;
1302 else if( node = pool->hashnext )
1303 bt->mgr->hash[idx] = node->slot;
1305 bt->mgr->hash[idx] = 0;
1307 if( node = pool->hashnext )
1308 node->hashprev = pool->hashprev;
1310 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1312 // remove old file mapping
1314 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1316 // FlushViewOfFile(pool->map, 0);
1317 UnmapViewOfFile(pool->map);
1318 CloseHandle(pool->hmap);
1322 // create new pool mapping
1323 // and link into hash table
1325 if( bt_mapsegment(bt, pool, page_no) )
1328 bt_linkhash(bt, pool, page_no, hashidx);
1329 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1334 // place write, read, or parent lock on requested page_no.
1336 void bt_lockpage(BtLock mode, BtLatchSet *set)
1340 ReadLock (set->readwr);
1343 WriteLock (set->readwr);
1346 ReadLock (set->access);
1349 WriteLock (set->access);
1352 WriteLock (set->parent);
1357 // remove write, read, or parent lock on requested page
1359 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1363 ReadRelease (set->readwr);
1366 WriteRelease (set->readwr);
1369 ReadRelease (set->access);
1372 WriteRelease (set->access);
1375 WriteRelease (set->parent);
1380 // allocate a new page
1382 BTERR bt_newpage(BtDb *bt, BtPageSet *set)
1386 // lock allocation page
1388 bt_spinwritelock(bt->mgr->latchmgr->lock);
1390 // use empty chain first
1391 // else allocate empty page
1393 if( set->page_no = bt_getid(bt->mgr->latchmgr->chain) ) {
1394 if( set->pool = bt_pinpool (bt, set->page_no) )
1395 set->page = bt_page (bt, set->pool, set->page_no);
1397 return bt->err = BTERR_struct;
1399 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1401 set->page_no = bt_getid(bt->mgr->latchmgr->alloc->right);
1402 bt_putid(bt->mgr->latchmgr->alloc->right, set->page_no+1);
1404 // if writing first page of pool block, set file length thru last page
1406 if( (set->page_no & bt->mgr->poolmask) == 0 )
1407 ftruncate (bt->mgr->idx, (set->page_no + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1409 if( set->pool = bt_pinpool (bt, set->page_no) )
1410 set->page = bt_page (bt, set->pool, set->page_no);
1415 // unlock allocation latch
1417 bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1421 // find slot in page for given key at a given level
1423 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1425 uint diff, higher = set->page->cnt, low = 1, slot;
1428 // make stopper key an infinite fence value
1430 if( bt_getid (set->page->right) )
1435 // low is the lowest candidate.
1436 // loop ends when they meet
1438 // higher is already
1439 // tested as .ge. the passed key.
1441 while( diff = higher - low ) {
1442 slot = low + ( diff >> 1 );
1443 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1446 higher = slot, good++;
1449 // return zero if key is on right link page
1451 return good ? higher : 0;
1454 // find and load page at given level for given key
1455 // leave page rd or wr locked as requested
1457 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1459 uid page_no = ROOT_page, prevpage = 0;
1460 uint drill = 0xff, slot;
1461 BtLatchSet *prevlatch;
1462 uint mode, prevmode;
1465 // start at root of btree and drill down
1468 // determine lock mode of drill level
1469 mode = (drill == lvl) ? lock : BtLockRead;
1471 set->latch = bt_pinlatch (bt, page_no);
1472 set->page_no = page_no;
1474 // pin page contents
1476 if( set->pool = bt_pinpool (bt, page_no) )
1477 set->page = bt_page (bt, set->pool, page_no);
1481 // obtain access lock using lock chaining with Access mode
1483 if( page_no > ROOT_page )
1484 bt_lockpage(BtLockAccess, set->latch);
1486 // release & unpin parent page
1489 bt_unlockpage(prevmode, prevlatch);
1490 bt_unpinlatch (prevlatch);
1491 bt_unpinpool (prevpool);
1495 // obtain read lock using lock chaining
1497 bt_lockpage(mode, set->latch);
1499 if( set->page->free )
1500 return bt->err = BTERR_struct, 0;
1502 if( page_no > ROOT_page )
1503 bt_unlockpage(BtLockAccess, set->latch);
1505 // re-read and re-lock root after determining actual level of root
1507 if( set->page->lvl != drill) {
1508 if( set->page_no != ROOT_page )
1509 return bt->err = BTERR_struct, 0;
1511 drill = set->page->lvl;
1513 if( lock != BtLockRead && drill == lvl ) {
1514 bt_unlockpage(mode, set->latch);
1515 bt_unpinlatch (set->latch);
1516 bt_unpinpool (set->pool);
1521 prevpage = set->page_no;
1522 prevlatch = set->latch;
1523 prevpool = set->pool;
1526 // find key on page at this level
1527 // and descend to requested level
1529 if( set->page->kill )
1532 if( slot = bt_findslot (set, key, len) ) {
1536 while( slotptr(set->page, slot)->dead )
1537 if( slot++ < set->page->cnt )
1542 page_no = bt_getid(valptr(set->page, slot)->value);
1547 // or slide right into next page
1550 page_no = bt_getid(set->page->right);
1554 // return error on end of right chain
1556 bt->err = BTERR_struct;
1557 return 0; // return error
1560 // return page to free list
1561 // page must be delete & write locked
1563 void bt_freepage (BtDb *bt, BtPageSet *set)
1565 // lock allocation page
1567 bt_spinwritelock (bt->mgr->latchmgr->lock);
1570 memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1571 bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1572 set->page->free = 1;
1574 // unlock released page
1576 bt_unlockpage (BtLockDelete, set->latch);
1577 bt_unlockpage (BtLockWrite, set->latch);
1578 bt_unpinlatch (set->latch);
1579 bt_unpinpool (set->pool);
1581 // unlock allocation page
1583 bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1586 // a fence key was deleted from a page
1587 // push new fence value upwards
1589 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1591 unsigned char leftkey[256], rightkey[256];
1592 unsigned char value[BtId];
1596 // collapse empty slots beneath our slot
1598 while( idx = set->page->cnt - 1 )
1599 if( slotptr(set->page, idx)->dead ) {
1600 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1601 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1605 // remove the old fence value
1607 ptr = keyptr(set->page, set->page->cnt);
1608 memcpy (rightkey, ptr, ptr->len + 1);
1609 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1611 // cache new fence value
1613 ptr = keyptr(set->page, set->page->cnt);
1614 memcpy (leftkey, ptr, ptr->len + 1);
1616 bt_lockpage (BtLockParent, set->latch);
1617 bt_unlockpage (BtLockWrite, set->latch);
1619 // insert new (now smaller) fence key
1621 bt_putid (value, set->page_no);
1623 if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId, 1) )
1626 // now delete old fence key
1628 if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1) )
1631 bt_unlockpage (BtLockParent, set->latch);
1632 bt_unpinlatch(set->latch);
1633 bt_unpinpool (set->pool);
1637 // root has a single child
1638 // collapse a level from the tree
1640 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1645 // find the child entry and promote as new root contents
1648 for( idx = 0; idx++ < root->page->cnt; )
1649 if( !slotptr(root->page, idx)->dead )
1652 child->page_no = bt_getid (valptr(root->page, idx)->value);
1654 child->latch = bt_pinlatch (bt, child->page_no);
1655 bt_lockpage (BtLockDelete, child->latch);
1656 bt_lockpage (BtLockWrite, child->latch);
1658 if( child->pool = bt_pinpool (bt, child->page_no) )
1659 child->page = bt_page (bt, child->pool, child->page_no);
1663 memcpy (root->page, child->page, bt->mgr->page_size);
1664 bt_freepage (bt, child);
1666 } while( root->page->lvl > 1 && root->page->act == 1 );
1668 bt_unlockpage (BtLockWrite, root->latch);
1669 bt_unpinlatch (root->latch);
1670 bt_unpinpool (root->pool);
1674 // maintain reverse scan pointers by
1675 // linking left pointer of far right node
1677 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1679 BtPageSet right2[1];
1681 // keep track of rightmost leaf page
1683 if( !right_page_no ) {
1684 bt_putid (bt->mgr->latchmgr->alloc->left, left_page_no);
1688 // link right page to left page
1690 right2->latch = bt_pinlatch (bt, right_page_no);
1691 bt_lockpage (BtLockWrite, right2->latch);
1693 if( right2->pool = bt_pinpool (bt, right_page_no) )
1694 right2->page = bt_page (bt, right2->pool, right_page_no);
1698 bt_putid(right2->page->left, left_page_no);
1699 bt_unlockpage (BtLockWrite, right2->latch);
1700 bt_unpinlatch (right2->latch);
1704 // find and delete key on page by marking delete flag bit
1705 // if page becomes empty, delete it from the btree
1707 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1709 unsigned char lowerfence[256], higherfence[256];
1710 BtPageSet set[1], right[1], right2[1];
1711 unsigned char value[BtId];
1712 uint slot, idx, found;
1716 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1717 ptr = keyptr(set->page, slot);
1721 // if librarian slot, advance to real slot
1723 if( slotptr(set->page, slot)->type == Librarian )
1724 ptr = keyptr(set->page, ++slot);
1726 // if key is found delete it, otherwise ignore request
1728 if( found = !keycmp (ptr, key, len) )
1729 if( found = slotptr(set->page, slot)->dead == 0 ) {
1730 val = valptr(set->page,slot);
1731 slotptr(set->page, slot)->dead = 1;
1732 set->page->garbage += ptr->len + val->len + 2;
1736 // did we delete a fence key in an upper level?
1738 if( found && lvl && set->page->act && slot == set->page->cnt )
1739 if( bt_fixfence (bt, set, lvl) )
1742 return bt->found = found, 0;
1744 // do we need to collapse root?
1746 if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1747 if( bt_collapseroot (bt, set) )
1750 return bt->found = found, 0;
1752 // return if page is not empty
1754 if( set->page->act ) {
1755 bt_unlockpage(BtLockWrite, set->latch);
1756 bt_unpinlatch (set->latch);
1757 bt_unpinpool (set->pool);
1758 return bt->found = found, 0;
1761 // cache copy of fence key
1762 // to post in parent
1764 ptr = keyptr(set->page, set->page->cnt);
1765 memcpy (lowerfence, ptr, ptr->len + 1);
1767 // obtain lock on right page
1769 right->page_no = bt_getid(set->page->right);
1770 right->latch = bt_pinlatch (bt, right->page_no);
1771 bt_lockpage (BtLockWrite, right->latch);
1773 // pin page contents
1775 if( right->pool = bt_pinpool (bt, right->page_no) )
1776 right->page = bt_page (bt, right->pool, right->page_no);
1780 if( right->page->kill )
1781 return bt->err = BTERR_struct;
1783 // transfer left link
1785 memcpy (right->page->left, set->page->left, BtId);
1787 // pull contents of right peer into our empty page
1789 memcpy (set->page, right->page, bt->mgr->page_size);
1794 if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1797 // cache copy of key to update
1799 ptr = keyptr(right->page, right->page->cnt);
1800 memcpy (higherfence, ptr, ptr->len + 1);
1802 // mark right page deleted and point it to left page
1803 // until we can post parent updates
1805 bt_putid (right->page->right, set->page_no);
1806 right->page->kill = 1;
1808 bt_lockpage (BtLockParent, right->latch);
1809 bt_unlockpage (BtLockWrite, right->latch);
1811 bt_lockpage (BtLockParent, set->latch);
1812 bt_unlockpage (BtLockWrite, set->latch);
1814 // redirect higher key directly to our new node contents
1816 bt_putid (value, set->page_no);
1818 if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId, 1) )
1821 // delete old lower key to our node
1823 if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1) )
1826 // obtain delete and write locks to right node
1828 bt_unlockpage (BtLockParent, right->latch);
1829 bt_lockpage (BtLockDelete, right->latch);
1830 bt_lockpage (BtLockWrite, right->latch);
1831 bt_freepage (bt, right);
1833 bt_unlockpage (BtLockParent, set->latch);
1834 bt_unpinlatch (set->latch);
1835 bt_unpinpool (set->pool);
1840 BtKey bt_foundkey (BtDb *bt)
1842 return (BtKey)(bt->key);
1845 // advance to next slot
1847 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1849 BtLatchSet *prevlatch;
1853 if( slot < set->page->cnt )
1856 prevlatch = set->latch;
1857 prevpool = set->pool;
1859 if( page_no = bt_getid(set->page->right) )
1860 set->latch = bt_pinlatch (bt, page_no);
1862 return bt->err = BTERR_struct, 0;
1864 // pin page contents
1866 if( set->pool = bt_pinpool (bt, page_no) )
1867 set->page = bt_page (bt, set->pool, page_no);
1871 // obtain access lock using lock chaining with Access mode
1873 bt_lockpage(BtLockAccess, set->latch);
1875 bt_unlockpage(BtLockRead, prevlatch);
1876 bt_unpinlatch (prevlatch);
1877 bt_unpinpool (prevpool);
1879 bt_lockpage(BtLockRead, set->latch);
1880 bt_unlockpage(BtLockAccess, set->latch);
1882 set->page_no = page_no;
1886 // find unique key or first duplicate key in
1887 // leaf level and return number of value bytes
1888 // or (-1) if not found. Setup key for bt_foundkey
1890 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1898 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1900 ptr = keyptr(set->page, slot);
1902 // skip librarian slot place holder
1904 if( slotptr(set->page, slot)->type == Librarian )
1905 ptr = keyptr(set->page, ++slot);
1907 // return actual key found
1909 memcpy (bt->key, ptr, ptr->len + 1);
1912 if( slotptr(set->page, slot)->type == Duplicate )
1915 // if key exists, return >= 0 value bytes copied
1916 // otherwise return (-1)
1918 if( slotptr(set->page, slot)->dead )
1922 if( !memcmp (ptr->key, key, len) ) {
1923 val = valptr (set->page,slot);
1924 if( valmax > val->len )
1926 memcpy (value, val->value, valmax);
1932 } while( slot = bt_findnext (bt, set, slot) );
1934 bt_unlockpage (BtLockRead, set->latch);
1935 bt_unpinlatch (set->latch);
1936 bt_unpinpool (set->pool);
1940 // check page for space available,
1941 // clean if necessary and return
1942 // 0 - page needs splitting
1943 // >0 new slot value
1945 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1947 uint nxt = bt->mgr->page_size;
1948 uint cnt = 0, idx = 0;
1949 uint max = page->cnt;
1954 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
1957 // skip cleanup and proceed to split
1958 // if there's not enough garbage
1961 if( page->garbage < nxt / 5 )
1964 memcpy (bt->frame, page, bt->mgr->page_size);
1966 // skip page info and set rest of page to zero
1968 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1972 // clean up page first by
1973 // removing deleted keys
1975 while( cnt++ < max ) {
1978 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1981 // copy the value across
1983 val = valptr(bt->frame, cnt);
1984 nxt -= val->len + 1;
1985 ((unsigned char *)page)[nxt] = val->len;
1986 memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
1988 // copy the key across
1990 key = keyptr(bt->frame, cnt);
1991 nxt -= key->len + 1;
1992 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1994 // make a librarian slot
1997 slotptr(page, ++idx)->off = nxt;
1998 slotptr(page, idx)->type = Librarian;
1999 slotptr(page, idx)->dead = 1;
2004 slotptr(page, ++idx)->off = nxt;
2005 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2007 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2014 // see if page has enough space now, or does it need splitting?
2016 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
2022 // split the root and raise the height of the btree
2024 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, BtPageSet *right)
2026 uint nxt = bt->mgr->page_size;
2027 unsigned char value[BtId];
2030 // Obtain an empty page to use, and copy the current
2031 // root contents into it, e.g. lower keys
2033 if( bt_newpage(bt, left) )
2036 memcpy(left->page, root->page, bt->mgr->page_size);
2037 bt_unpinpool (left->pool);
2039 // set left from higher to lower page
2041 bt_putid (right->page->left, left->page_no);
2042 bt_unpinpool (right->pool);
2044 // preserve the page info at the bottom
2045 // of higher keys and set rest to zero
2047 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2049 // insert lower keys page fence key on newroot page as first key
2052 bt_putid (value, left->page_no);
2053 ((unsigned char *)root->page)[nxt] = BtId;
2054 memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
2056 nxt -= *leftkey + 1;
2057 memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
2058 slotptr(root->page, 1)->off = nxt;
2060 // insert stopper key on newroot page
2061 // and increase the root height
2063 nxt -= 3 + BtId + 1;
2064 ((unsigned char *)root->page)[nxt] = 2;
2065 ((unsigned char *)root->page)[nxt+1] = 0xff;
2066 ((unsigned char *)root->page)[nxt+2] = 0xff;
2068 bt_putid (value, right->page_no);
2069 ((unsigned char *)root->page)[nxt+3] = BtId;
2070 memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
2071 slotptr(root->page, 2)->off = nxt;
2073 bt_putid(root->page->right, 0);
2074 root->page->min = nxt; // reset lowest used offset and key count
2075 root->page->cnt = 2;
2076 root->page->act = 2;
2079 // release and unpin root
2081 bt_unlockpage(BtLockWrite, root->latch);
2082 bt_unpinlatch (root->latch);
2083 bt_unpinpool (root->pool);
2087 // split already locked full node
2090 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
2092 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2093 unsigned char fencekey[256], rightkey[256];
2094 unsigned char value[BtId];
2095 uint lvl = set->page->lvl;
2102 // split higher half of keys to bt->frame
2104 memset (bt->frame, 0, bt->mgr->page_size);
2105 max = set->page->cnt;
2109 while( cnt++ < max ) {
2110 if( slotptr(set->page, cnt)->dead && cnt < max )
2112 val = valptr(set->page, cnt);
2113 nxt -= val->len + 1;
2114 ((unsigned char *)bt->frame)[nxt] = val->len;
2115 memcpy ((unsigned char *)bt->frame + nxt + 1, val->value, val->len);
2117 key = keyptr(set->page, cnt);
2118 nxt -= key->len + 1;
2119 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
2121 // add librarian slot
2124 slotptr(bt->frame, ++idx)->off = nxt;
2125 slotptr(bt->frame, idx)->type = Librarian;
2126 slotptr(bt->frame, idx)->dead = 1;
2131 slotptr(bt->frame, ++idx)->off = nxt;
2132 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2134 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2138 // remember existing fence key for new page to the right
2140 memcpy (rightkey, key, key->len + 1);
2142 bt->frame->bits = bt->mgr->page_bits;
2143 bt->frame->min = nxt;
2144 bt->frame->cnt = idx;
2145 bt->frame->lvl = lvl;
2149 if( set->page_no > ROOT_page ) {
2150 bt_putid (bt->frame->right, bt_getid (set->page->right));
2151 bt_putid(bt->frame->left, set->page_no);
2154 // get new free page and write higher keys to it.
2156 if( bt_newpage(bt, right) )
2161 if( set->page_no > ROOT_page && !lvl )
2162 if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
2165 memcpy (right->page, bt->frame, bt->mgr->page_size);
2166 bt_unpinpool (right->pool);
2168 // update lower keys to continue in old page
2170 memcpy (bt->frame, set->page, bt->mgr->page_size);
2171 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2172 nxt = bt->mgr->page_size;
2173 set->page->garbage = 0;
2179 if( slotptr(bt->frame, max)->type == Librarian )
2182 // assemble page of smaller keys
2184 while( cnt++ < max ) {
2185 if( slotptr(bt->frame, cnt)->dead )
2187 val = valptr(bt->frame, cnt);
2188 nxt -= val->len + 1;
2189 ((unsigned char *)set->page)[nxt] = val->len;
2190 memcpy ((unsigned char *)set->page + nxt + 1, val->value, val->len);
2192 key = keyptr(bt->frame, cnt);
2193 nxt -= key->len + 1;
2194 memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
2196 // add librarian slot
2199 slotptr(set->page, ++idx)->off = nxt;
2200 slotptr(set->page, idx)->type = Librarian;
2201 slotptr(set->page, idx)->dead = 1;
2206 slotptr(set->page, ++idx)->off = nxt;
2207 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2211 // remember fence key for smaller page
2213 memcpy(fencekey, key, key->len + 1);
2215 bt_putid(set->page->right, right->page_no);
2216 set->page->min = nxt;
2217 set->page->cnt = idx;
2219 // if current page is the root page, split it
2221 if( set->page_no == ROOT_page )
2222 return bt_splitroot (bt, set, fencekey, right);
2224 // insert new fences in their parent pages
2226 right->latch = bt_pinlatch (bt, right->page_no);
2227 bt_lockpage (BtLockParent, right->latch);
2229 bt_lockpage (BtLockParent, set->latch);
2230 bt_unlockpage (BtLockWrite, set->latch);
2232 // insert new fence for reformulated left block of smaller keys
2234 bt_putid (value, set->page_no);
2236 if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 1) )
2239 // switch fence for right block of larger keys to new right page
2241 bt_putid (value, right->page_no);
2243 if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, 1) )
2246 bt_unlockpage (BtLockParent, set->latch);
2247 bt_unpinlatch (set->latch);
2248 bt_unpinpool (set->pool);
2250 bt_unlockpage (BtLockParent, right->latch);
2251 bt_unpinlatch (right->latch);
2255 // install new key and value onto page
2256 // page must already be checked for
2259 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2261 uint idx, librarian;
2264 // if found slot > desired slot and previous slot
2265 // is a librarian slot, use it
2268 if( slotptr(set->page, slot-1)->type == Librarian )
2271 // copy value onto page
2273 set->page->min -= vallen + 1; // reset lowest used offset
2274 ((unsigned char *)set->page)[set->page->min] = vallen;
2275 memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
2277 // copy key onto page
2279 set->page->min -= keylen + 1; // reset lowest used offset
2280 ((unsigned char *)set->page)[set->page->min] = keylen;
2281 memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
2283 // find first empty slot
2285 for( idx = slot; idx < set->page->cnt; idx++ )
2286 if( slotptr(set->page, idx)->dead )
2289 // now insert key into array before slot
2291 if( idx == set->page->cnt )
2292 idx += 2, set->page->cnt += 2, librarian = 2;
2298 while( idx > slot + librarian - 1 )
2299 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2301 // add librarian slot
2303 if( librarian > 1 ) {
2304 node = slotptr(set->page, slot++);
2305 node->off = set->page->min;
2306 node->type = Librarian;
2312 node = slotptr(set->page, slot);
2313 node->off = set->page->min;
2317 bt_unlockpage (BtLockWrite, set->latch);
2318 bt_unpinlatch (set->latch);
2319 bt_unpinpool (set->pool);
2323 // Insert new key into the btree at given level.
2324 // either add a new key or update/add an existing one
2326 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2328 unsigned char newkey[256];
2329 uint slot, idx, len;
2336 // set up the key we're working on
2338 memcpy (newkey + 1, key, keylen);
2341 // is this a non-unique index value?
2347 sequence = bt_newdup (bt);
2348 bt_putid (newkey + *newkey + 1, sequence);
2352 while( 1 ) { // find the page and slot for the current key
2353 if( slot = bt_loadpage (bt, set, newkey + 1, *newkey, lvl, BtLockWrite) )
2354 ptr = keyptr(set->page, slot);
2357 bt->err = BTERR_ovflw;
2361 // if librarian slot == found slot, advance to real slot
2363 if( slotptr(set->page, slot)->type == Librarian )
2364 if( !keycmp (ptr, key, keylen) )
2365 ptr = keyptr(set->page, ++slot);
2369 if( slotptr(set->page, slot)->type == Duplicate )
2372 // if inserting a duplicate key or unique key
2373 // check for adequate space on the page
2374 // and insert the new key before slot.
2376 if( unique && (len != *newkey || memcmp (ptr->key, newkey+1, *newkey)) || !unique ) {
2377 if( !(slot = bt_cleanpage (bt, set->page, *newkey, slot, vallen)) )
2378 if( bt_splitpage (bt, set) )
2383 return bt_insertslot (bt, set, slot, newkey + 1, *newkey, value, vallen, type);
2386 // if key already exists, update value and return
2388 if( val = valptr(set->page, slot), val->len >= vallen ) {
2389 if( slotptr(set->page, slot)->dead )
2391 set->page->garbage += val->len - vallen;
2392 slotptr(set->page, slot)->dead = 0;
2394 memcpy (val->value, value, vallen);
2395 bt_unlockpage(BtLockWrite, set->latch);
2396 bt_unpinlatch (set->latch);
2397 bt_unpinpool (set->pool);
2401 // new update value doesn't fit in existing value area
2403 if( !slotptr(set->page, slot)->dead )
2404 set->page->garbage += val->len + ptr->len + 2;
2406 slotptr(set->page, slot)->dead = 0;
2410 if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
2411 if( bt_splitpage (bt, set) )
2416 set->page->min -= vallen + 1;
2417 ((unsigned char *)set->page)[set->page->min] = vallen;
2418 memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen);
2420 set->page->min -= keylen + 1;
2421 ((unsigned char *)set->page)[set->page->min] = keylen;
2422 memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen);
2424 slotptr(set->page, slot)->off = set->page->min;
2425 bt_unlockpage(BtLockWrite, set->latch);
2426 bt_unpinlatch (set->latch);
2427 bt_unpinpool (set->pool);
2433 // set cursor to highest slot on highest page
2435 uint bt_lastkey (BtDb *bt)
2437 uid page_no = bt_getid (bt->mgr->latchmgr->alloc->left);
2441 if( set->pool = bt_pinpool (bt, page_no) )
2442 set->page = bt_page (bt, set->pool, page_no);
2446 set->latch = bt_pinlatch (bt, page_no);
2447 bt_lockpage(BtLockRead, set->latch);
2449 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2450 slot = set->page->cnt;
2452 bt_unlockpage(BtLockRead, set->latch);
2453 bt_unpinlatch (set->latch);
2454 bt_unpinpool (set->pool);
2459 // return previous slot on cursor page
2461 uint bt_prevkey (BtDb *bt, uint slot)
2469 if( left = bt_getid(bt->cursor->left) )
2470 bt->cursor_page = left;
2474 if( set->pool = bt_pinpool (bt, left) )
2475 set->page = bt_page (bt, set->pool, left);
2479 set->latch = bt_pinlatch (bt, left);
2480 bt_lockpage(BtLockRead, set->latch);
2482 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2484 bt_unlockpage(BtLockRead, set->latch);
2485 bt_unpinlatch (set->latch);
2486 bt_unpinpool (set->pool);
2487 return bt->cursor->cnt;
2490 // return next slot on cursor page
2491 // or slide cursor right into next page
2493 uint bt_nextkey (BtDb *bt, uint slot)
2499 right = bt_getid(bt->cursor->right);
2501 while( slot++ < bt->cursor->cnt )
2502 if( slotptr(bt->cursor,slot)->dead )
2504 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2512 bt->cursor_page = right;
2514 if( set->pool = bt_pinpool (bt, right) )
2515 set->page = bt_page (bt, set->pool, right);
2519 set->latch = bt_pinlatch (bt, right);
2520 bt_lockpage(BtLockRead, set->latch);
2522 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2524 bt_unlockpage(BtLockRead, set->latch);
2525 bt_unpinlatch (set->latch);
2526 bt_unpinpool (set->pool);
2534 // cache page of keys into cursor and return starting slot for given key
2536 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2541 // cache page for retrieval
2543 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2544 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2548 bt->cursor_page = set->page_no;
2550 bt_unlockpage(BtLockRead, set->latch);
2551 bt_unpinlatch (set->latch);
2552 bt_unpinpool (set->pool);
2556 BtKey bt_key(BtDb *bt, uint slot)
2558 return keyptr(bt->cursor, slot);
2561 BtVal bt_val(BtDb *bt, uint slot)
2563 return valptr(bt->cursor,slot);
2569 double getCpuTime(int type)
2572 FILETIME xittime[1];
2573 FILETIME systime[1];
2574 FILETIME usrtime[1];
2575 SYSTEMTIME timeconv[1];
2578 memset (timeconv, 0, sizeof(SYSTEMTIME));
2582 GetSystemTimeAsFileTime (xittime);
2583 FileTimeToSystemTime (xittime, timeconv);
2584 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2587 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2588 FileTimeToSystemTime (usrtime, timeconv);
2591 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2592 FileTimeToSystemTime (systime, timeconv);
2596 ans += (double)timeconv->wHour * 3600;
2597 ans += (double)timeconv->wMinute * 60;
2598 ans += (double)timeconv->wSecond;
2599 ans += (double)timeconv->wMilliseconds / 1000;
2604 #include <sys/resource.h>
2606 double getCpuTime(int type)
2608 struct rusage used[1];
2609 struct timeval tv[1];
2613 gettimeofday(tv, NULL);
2614 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2617 getrusage(RUSAGE_SELF, used);
2618 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2621 getrusage(RUSAGE_SELF, used);
2622 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2629 uint bt_latchaudit (BtDb *bt)
2631 ushort idx, hashidx;
2638 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2640 if( *(ushort *)(bt->mgr->latchmgr->lock) )
2641 fprintf(stderr, "Alloc page locked\n");
2642 *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2644 for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2645 latch = bt->mgr->latchsets + idx;
2646 if( *latch->readwr->rin & MASK )
2647 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2648 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2650 if( *latch->access->rin & MASK )
2651 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2652 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2654 if( *latch->parent->rin & MASK )
2655 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2656 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2659 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2664 for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2665 if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2666 fprintf(stderr, "hash entry %d locked\n", hashidx);
2668 *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2670 if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2671 latch = bt->mgr->latchsets + idx;
2672 if( *(ushort *)latch->busy )
2673 fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2674 *(ushort *)latch->busy = 0;
2676 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2677 } while( idx = latch->next );
2680 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2681 page_no = LEAF_page;
2683 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2684 off64_t off = page_no << bt->mgr->page_bits;
2686 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2690 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2692 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2693 fprintf(stderr, "page %.8x unable to read\n", page_no);
2695 if( *amt < bt->mgr->page_size )
2696 fprintf(stderr, "page %.8x unable to read\n", page_no);
2698 if( !bt->frame->free ) {
2699 for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2700 ptr = keyptr(bt->frame, idx+1);
2701 if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2702 fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2704 if( !bt->frame->lvl )
2705 cnt += bt->frame->act;
2707 if( page_no > LEAF_page )
2722 // standalone program to index file of keys
2723 // then list them onto std-out
2726 void *index_file (void *arg)
2728 uint __stdcall index_file (void *arg)
2731 int line = 0, found = 0, cnt = 0;
2732 uid next, page_no = LEAF_page; // start on first page of leaves
2733 unsigned char key[256];
2734 ThreadArg *args = arg;
2735 int ch, len = 0, slot;
2741 bt = bt_open (args->mgr);
2743 switch(args->type[0] | 0x20)
2746 fprintf(stderr, "started latch mgr audit\n");
2747 cnt = bt_latchaudit (bt);
2748 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2752 fprintf(stderr, "started pennysort for %s\n", args->infile);
2753 if( in = fopen (args->infile, "rb") )
2754 while( ch = getc(in), ch != EOF )
2759 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 0) )
2760 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2763 else if( len < 255 )
2765 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2769 fprintf(stderr, "started indexing for %s\n", args->infile);
2770 if( in = fopen (args->infile, "rb") )
2771 while( ch = getc(in), ch != EOF )
2776 if( args->num == 1 )
2777 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2779 else if( args->num )
2780 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2782 if( bt_insertkey (bt, key, len, 0, NULL, 0, 0) )
2783 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2786 else if( len < 255 )
2788 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2792 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2793 if( in = fopen (args->infile, "rb") )
2794 while( ch = getc(in), ch != EOF )
2798 if( args->num == 1 )
2799 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2801 else if( args->num )
2802 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2804 if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2805 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2806 ptr = (BtKey)(bt->key);
2809 if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2810 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2813 else if( len < 255 )
2815 fprintf(stderr, "finished %s for %d keys, %d found\n", args->infile, line, found);
2819 fprintf(stderr, "started finding keys for %s\n", args->infile);
2820 if( in = fopen (args->infile, "rb") )
2821 while( ch = getc(in), ch != EOF )
2825 if( args->num == 1 )
2826 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2828 else if( args->num )
2829 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2831 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2834 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2837 else if( len < 255 )
2839 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2843 fprintf(stderr, "started scanning\n");
2845 if( set->pool = bt_pinpool (bt, page_no) )
2846 set->page = bt_page (bt, set->pool, page_no);
2849 set->latch = bt_pinlatch (bt, page_no);
2850 bt_lockpage (BtLockRead, set->latch);
2851 next = bt_getid (set->page->right);
2853 for( slot = 0; slot++ < set->page->cnt; )
2854 if( next || slot < set->page->cnt )
2855 if( !slotptr(set->page, slot)->dead ) {
2856 ptr = keyptr(set->page, slot);
2859 if( slotptr(set->page, slot)->type == Duplicate )
2862 fwrite (ptr->key, len, 1, stdout);
2863 fputc ('\n', stdout);
2867 bt_unlockpage (BtLockRead, set->latch);
2868 bt_unpinlatch (set->latch);
2869 bt_unpinpool (set->pool);
2870 } while( page_no = next );
2872 fprintf(stderr, " Total keys read %d\n", cnt);
2876 fprintf(stderr, "started reverse scan\n");
2877 if( slot = bt_lastkey (bt) )
2878 while( slot = bt_prevkey (bt, slot) ) {
2879 if( slotptr(bt->cursor, slot)->dead )
2882 ptr = keyptr(bt->cursor, slot);
2885 if( slotptr(bt->cursor, slot)->type == Duplicate )
2888 fwrite (ptr->key, len, 1, stdout);
2889 fputc ('\n', stdout);
2893 fprintf(stderr, " Total keys read %d\n", cnt);
2898 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2900 fprintf(stderr, "started counting\n");
2901 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2902 page_no = LEAF_page;
2904 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2905 uid off = page_no << bt->mgr->page_bits;
2907 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2911 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2913 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2914 return bt->err = BTERR_map;
2916 if( *amt < bt->mgr->page_size )
2917 return bt->err = BTERR_map;
2919 if( !bt->frame->free && !bt->frame->lvl )
2920 cnt += bt->frame->act;
2921 if( page_no > LEAF_page )
2926 cnt--; // remove stopper key
2927 fprintf(stderr, " Total keys read %d\n", cnt);
2939 typedef struct timeval timer;
2941 int main (int argc, char **argv)
2943 int idx, cnt, len, slot, err;
2944 int segsize, bits = 16;
2961 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2962 fprintf (stderr, " where page_bits is the page size in bits\n");
2963 fprintf (stderr, " mapped_segments is the number of mmap segments in buffer pool\n");
2964 fprintf (stderr, " seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2965 fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n");
2966 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
2970 start = getCpuTime(0);
2973 bits = atoi(argv[3]);
2976 poolsize = atoi(argv[4]);
2979 fprintf (stderr, "Warning: no mapped_pool\n");
2981 if( poolsize > 65535 )
2982 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2985 segsize = atoi(argv[5]);
2987 segsize = 4; // 16 pages per mmap segment
2990 num = atoi(argv[6]);
2994 threads = malloc (cnt * sizeof(pthread_t));
2996 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2998 args = malloc (cnt * sizeof(ThreadArg));
3000 mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
3003 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3009 for( idx = 0; idx < cnt; idx++ ) {
3010 args[idx].infile = argv[idx + 7];
3011 args[idx].type = argv[2];
3012 args[idx].mgr = mgr;
3013 args[idx].num = num;
3014 args[idx].idx = idx;
3016 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3017 fprintf(stderr, "Error creating thread %d\n", err);
3019 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3023 // wait for termination
3026 for( idx = 0; idx < cnt; idx++ )
3027 pthread_join (threads[idx], NULL);
3029 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3031 for( idx = 0; idx < cnt; idx++ )
3032 CloseHandle(threads[idx]);
3035 elapsed = getCpuTime(0) - start;
3036 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3037 elapsed = getCpuTime(1);
3038 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3039 elapsed = getCpuTime(2);
3040 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);