1 // btree version threadskv5 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // and bi-directional cursors
10 // author: karl malbrain, malbrain@cal.berkeley.edu
13 This work, including the source code, documentation
14 and related data, is placed into the public domain.
16 The orginal author is Karl Malbrain.
18 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
19 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
20 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
21 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
22 RESULTING FROM THE USE, MODIFICATION, OR
23 REDISTRIBUTION OF THIS SOFTWARE.
26 // Please see the project home page for documentation
27 // code.google.com/p/high-concurrency-btree
29 #define _FILE_OFFSET_BITS 64
30 #define _LARGEFILE64_SOURCE
46 #define WIN32_LEAN_AND_MEAN
60 typedef unsigned long long uid;
63 typedef unsigned long long off64_t;
64 typedef unsigned short ushort;
65 typedef unsigned int uint;
68 #define BT_latchtable 128 // number of latch manager slots
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
79 There are five lock types for each node in three independent sets:
80 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
81 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
82 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
83 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
84 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
95 // definition for phase-fair reader/writer lock implementation
109 // definition for spin latch implementation
111 // exclusive is set for write access
112 // share is count of read accessors
113 // grant write lock when share == 0
115 volatile typedef struct {
126 // hash table entries
129 BtSpinLatch latch[1];
130 volatile ushort slot; // Latch table entry at head of chain
133 // latch manager table structure
136 RWLock readwr[1]; // read/write page lock
137 RWLock access[1]; // Access Intent/Page delete
138 RWLock parent[1]; // Posting of fence key in parent
139 BtSpinLatch busy[1]; // slot is being moved between chains
140 volatile ushort next; // next entry in hash table chain
141 volatile ushort prev; // prev entry in hash table chain
142 volatile ushort pin; // number of outstanding locks
143 volatile ushort hash; // hash slot entry is under
144 volatile uid page_no; // latch set page number
147 // Define the length of the page record numbers
151 // Page key slot definition.
153 // Keys are marked dead, but remain on the page until
154 // it cleanup is called. The fence key (highest key) for
155 // a leaf page is always present, even after cleanup.
159 // In addition to the Unique keys that occupy slots
160 // there are Librarian and Duplicate key
161 // slots occupying the key slot array.
163 // The Librarian slots are dead keys that
164 // serve as filler, available to add new Unique
165 // or Dup slots that are inserted into the B-tree.
167 // The Duplicate slots have had their key bytes extended
168 // by 6 bytes to contain a binary duplicate key uniqueifier.
177 uint off:BT_maxbits; // page offset for key start
178 uint type:3; // type of slot
179 uint dead:1; // set for deleted slot
182 // The key structure occupies space at the upper end of
183 // each page. It's a length byte followed by the key
187 unsigned char len; // this can be changed to a ushort or uint
188 unsigned char key[0];
191 // the value structure also occupies space at the upper
192 // end of the page. Each key is immediately followed by a value.
195 unsigned char len; // this can be changed to a ushort or uint
196 unsigned char value[0];
199 #define BT_maxkey 255 // maximum number of bytes in a key
200 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
202 // The first part of an index page.
203 // It is immediately followed
204 // by the BtSlot array of keys.
206 // note that this structure size
207 // must be a multiple of 8 bytes
208 // in order to place dups correctly.
210 typedef struct BtPage_ {
211 uint cnt; // count of keys in page
212 uint act; // count of active keys
213 uint min; // next key offset
214 uint garbage; // page garbage in bytes
215 unsigned char bits:7; // page size in bits
216 unsigned char free:1; // page is on free chain
217 unsigned char lvl:7; // level of page
218 unsigned char kill:1; // page is being deleted
219 unsigned char left[BtId]; // page number to left
220 unsigned char filler[2]; // padding to multiple of 8
221 unsigned char right[BtId]; // page number to right
224 // The memory mapping pool table buffer manager entry
227 uid basepage; // mapped base page number
228 char *map; // mapped memory pointer
229 ushort slot; // slot index in this array
230 ushort pin; // mapped page pin counter
231 void *hashprev; // previous pool entry for the same hash idx
232 void *hashnext; // next pool entry for the same hash idx
234 HANDLE hmap; // Windows memory mapping handle
238 #define CLOCK_bit 0x8000 // bit in pool->pin
240 // The loadpage interface object
243 uid page_no; // current page number
244 BtPage page; // current page pointer
245 BtPool *pool; // current page pool
246 BtLatchSet *latch; // current page latch set
249 // structure for latch manager on ALLOC_page
252 struct BtPage_ alloc[1]; // next page_no in right ptr
253 unsigned long long dups[1]; // global duplicate key uniqueifier
254 unsigned char chain[BtId]; // head of free page_nos chain
255 BtSpinLatch lock[1]; // allocation area lite latch
256 ushort latchdeployed; // highest number of latch entries deployed
257 ushort nlatchpage; // number of latch pages at BT_latch
258 ushort latchtotal; // number of page latch entries
259 ushort latchhash; // number of latch hash table slots
260 ushort latchvictim; // next latch entry to examine
261 BtHashEntry table[0]; // the hash table
264 // The object structure for Btree access
267 uint page_size; // page size
268 uint page_bits; // page size in bits
269 uint seg_bits; // seg size in pages in bits
270 uint mode; // read-write mode
276 ushort poolcnt; // highest page pool node in use
277 ushort poolmax; // highest page pool node allocated
278 ushort poolmask; // total number of pages in mmap segment - 1
279 ushort hashsize; // size of Hash Table for pool entries
280 volatile uint evicted; // last evicted hash table slot
281 ushort *hash; // pool index for hash entries
282 BtSpinLatch *latch; // latches for hash table slots
283 BtLatchMgr *latchmgr; // mapped latch page from allocation page
284 BtLatchSet *latchsets; // mapped latch set from latch pages
285 BtPool *pool; // memory pool page segments
287 HANDLE halloc; // allocation and latch table handle
292 BtMgr *mgr; // buffer manager for thread
293 BtPage cursor; // cached frame for start/next (never mapped)
294 BtPage frame; // spare frame for the page split (never mapped)
295 uid cursor_page; // current cursor page number
296 unsigned char *mem; // frame, cursor, page memory buffer
297 unsigned char key[BT_keyarray]; // last found complete key
298 int found; // last delete or insert was found
299 int err; // last error
313 extern void bt_close (BtDb *bt);
314 extern BtDb *bt_open (BtMgr *mgr);
315 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
316 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
317 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
318 extern BtKey *bt_foundkey (BtDb *bt);
319 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
320 extern uint bt_nextkey (BtDb *bt, uint slot);
323 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
324 void bt_mgrclose (BtMgr *mgr);
326 // Helper functions to return slot values
327 // from the cursor page.
329 extern BtKey *bt_key (BtDb *bt, uint slot);
330 extern BtVal *bt_val (BtDb *bt, uint slot);
332 // BTree page number constants
333 #define ALLOC_page 0 // allocation & latch manager hash table
334 #define ROOT_page 1 // root of the btree
335 #define LEAF_page 2 // first page of leaves
336 #define LATCH_page 3 // pages for latch manager
338 // Number of levels to create in a new BTree
342 // The page is allocated from low and hi ends.
343 // The key slots are allocated from the bottom,
344 // while the text and value of the key
345 // are allocated from the top. When the two
346 // areas meet, the page is split into two.
348 // A key consists of a length byte, two bytes of
349 // index number (0 - 65534), and up to 253 bytes
352 // Associated with each key is a value byte string
353 // containing any value desired.
355 // The b-tree root is always located at page 1.
356 // The first leaf page of level zero is always
357 // located on page 2.
359 // The b-tree pages are linked with next
360 // pointers to facilitate enumerators,
361 // and provide for concurrency.
363 // When to root page fills, it is split in two and
364 // the tree height is raised by a new root at page
365 // one with two keys.
367 // Deleted keys are marked with a dead bit until
368 // page cleanup. The fence key for a leaf node is
371 // Groups of pages called segments from the btree are optionally
372 // cached with a memory mapped pool. A hash table is used to keep
373 // track of the cached segments. This behaviour is controlled
374 // by the cache block size parameter to bt_open.
376 // To achieve maximum concurrency one page is locked at a time
377 // as the tree is traversed to find leaf key in question. The right
378 // page numbers are used in cases where the page is being split,
381 // Page 0 is dedicated to lock for new page extensions,
382 // and chains empty pages together for reuse. It also
383 // contains the latch manager hash table.
385 // The ParentModification lock on a node is obtained to serialize posting
386 // or changing the fence key for a node.
388 // Empty pages are chained together through the ALLOC page and reused.
390 // Access macros to address slot and key values from the page
391 // Page slots use 1 based indexing.
393 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
394 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
395 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
397 void bt_putid(unsigned char *dest, uid id)
402 dest[i] = (unsigned char)id, id >>= 8;
405 uid bt_getid(unsigned char *src)
410 for( i = 0; i < BtId; i++ )
411 id <<= 8, id |= *src++;
416 uid bt_newdup (BtDb *bt)
419 return __sync_fetch_and_add (bt->mgr->latchmgr->dups, 1) + 1;
421 return _InterlockedIncrement64(bt->mgr->latchmgr->dups, 1);
425 // Phase-Fair reader/writer lock implementation
427 void WriteLock (RWLock *lock)
432 tix = __sync_fetch_and_add (lock->ticket, 1);
434 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
436 // wait for our ticket to come up
438 while( tix != lock->serving[0] )
445 w = PRES | (tix & PHID);
447 r = __sync_fetch_and_add (lock->rin, w);
449 r = _InterlockedExchangeAdd16 (lock->rin, w);
451 while( r != *lock->rout )
459 void WriteRelease (RWLock *lock)
462 __sync_fetch_and_and (lock->rin, ~MASK);
464 _InterlockedAnd16 (lock->rin, ~MASK);
469 void ReadLock (RWLock *lock)
473 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
475 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
478 while( w == (*lock->rin & MASK) )
486 void ReadRelease (RWLock *lock)
489 __sync_fetch_and_add (lock->rout, RINC);
491 _InterlockedExchangeAdd16 (lock->rout, RINC);
495 // Spin Latch Manager
497 // wait until write lock mode is clear
498 // and add 1 to the share count
500 void bt_spinreadlock(BtSpinLatch *latch)
506 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
508 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
510 // see if exclusive request is granted or pending
515 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
517 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
520 } while( sched_yield(), 1 );
522 } while( SwitchToThread(), 1 );
526 // wait for other read and write latches to relinquish
528 void bt_spinwritelock(BtSpinLatch *latch)
534 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
536 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
539 if( !(prev & ~BOTH) )
543 __sync_fetch_and_and ((ushort *)latch, ~XCL);
545 _InterlockedAnd16((ushort *)latch, ~XCL);
548 } while( sched_yield(), 1 );
550 } while( SwitchToThread(), 1 );
554 // try to obtain write lock
556 // return 1 if obtained,
559 int bt_spinwritetry(BtSpinLatch *latch)
564 prev = __sync_fetch_and_or((ushort *)latch, XCL);
566 prev = _InterlockedOr16((ushort *)latch, XCL);
568 // take write access if all bits are clear
571 if( !(prev & ~BOTH) )
575 __sync_fetch_and_and ((ushort *)latch, ~XCL);
577 _InterlockedAnd16((ushort *)latch, ~XCL);
584 void bt_spinreleasewrite(BtSpinLatch *latch)
587 __sync_fetch_and_and((ushort *)latch, ~BOTH);
589 _InterlockedAnd16((ushort *)latch, ~BOTH);
593 // decrement reader count
595 void bt_spinreleaseread(BtSpinLatch *latch)
598 __sync_fetch_and_add((ushort *)latch, -SHARE);
600 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
604 // link latch table entry into latch hash table
606 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
608 BtLatchSet *set = bt->mgr->latchsets + victim;
610 if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
611 bt->mgr->latchsets[set->next].prev = victim;
613 bt->mgr->latchmgr->table[hashidx].slot = victim;
614 set->page_no = page_no;
621 void bt_unpinlatch (BtLatchSet *set)
624 __sync_fetch_and_add(&set->pin, -1);
626 _InterlockedDecrement16 (&set->pin);
630 // find existing latchset or inspire new one
631 // return with latchset pinned
633 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
635 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
636 ushort slot, avail = 0, victim, idx;
639 // obtain read lock on hash table entry
641 bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
643 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
645 set = bt->mgr->latchsets + slot;
646 if( page_no == set->page_no )
648 } while( slot = set->next );
652 __sync_fetch_and_add(&set->pin, 1);
654 _InterlockedIncrement16 (&set->pin);
658 bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
663 // try again, this time with write lock
665 bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
667 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
669 set = bt->mgr->latchsets + slot;
670 if( page_no == set->page_no )
672 if( !set->pin && !avail )
674 } while( slot = set->next );
676 // found our entry, or take over an unpinned one
678 if( slot || (slot = avail) ) {
679 set = bt->mgr->latchsets + slot;
681 __sync_fetch_and_add(&set->pin, 1);
683 _InterlockedIncrement16 (&set->pin);
685 set->page_no = page_no;
686 bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
690 // see if there are any unused entries
692 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
694 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
697 if( victim < bt->mgr->latchmgr->latchtotal ) {
698 set = bt->mgr->latchsets + victim;
700 __sync_fetch_and_add(&set->pin, 1);
702 _InterlockedIncrement16 (&set->pin);
704 bt_latchlink (bt, hashidx, victim, page_no);
705 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
710 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
712 victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
714 // find and reuse previous lock entry
718 victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
720 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
722 // we don't use slot zero
724 if( victim %= bt->mgr->latchmgr->latchtotal )
725 set = bt->mgr->latchsets + victim;
729 // take control of our slot
730 // from other threads
732 if( set->pin || !bt_spinwritetry (set->busy) )
737 // try to get write lock on hash chain
738 // skip entry if not obtained
739 // or has outstanding locks
741 if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
742 bt_spinreleasewrite (set->busy);
747 bt_spinreleasewrite (set->busy);
748 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
752 // unlink our available victim from its hash chain
755 bt->mgr->latchsets[set->prev].next = set->next;
757 bt->mgr->latchmgr->table[idx].slot = set->next;
760 bt->mgr->latchsets[set->next].prev = set->prev;
762 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
764 __sync_fetch_and_add(&set->pin, 1);
766 _InterlockedIncrement16 (&set->pin);
768 bt_latchlink (bt, hashidx, victim, page_no);
769 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
770 bt_spinreleasewrite (set->busy);
775 void bt_mgrclose (BtMgr *mgr)
780 // release mapped pages
781 // note that slot zero is never used
783 for( slot = 1; slot < mgr->poolmax; slot++ ) {
784 pool = mgr->pool + slot;
787 munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
790 FlushViewOfFile(pool->map, 0);
791 UnmapViewOfFile(pool->map);
792 CloseHandle(pool->hmap);
798 munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
799 munmap (mgr->latchmgr, 2 * mgr->page_size);
801 FlushViewOfFile(mgr->latchmgr, 0);
802 UnmapViewOfFile(mgr->latchmgr);
803 CloseHandle(mgr->halloc);
809 free ((void *)mgr->latch);
812 FlushFileBuffers(mgr->idx);
813 CloseHandle(mgr->idx);
814 GlobalFree (mgr->pool);
815 GlobalFree (mgr->hash);
816 GlobalFree ((void *)mgr->latch);
821 // close and release memory
823 void bt_close (BtDb *bt)
830 VirtualFree (bt->mem, 0, MEM_RELEASE);
835 // open/create new btree buffer manager
837 // call with file_name, BT_openmode, bits in page size (e.g. 16),
838 // size of mapped page pool (e.g. 8192)
840 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
842 uint lvl, attr, cacheblk, last, slot, idx;
843 uint nlatchpage, latchhash;
844 unsigned char value[BtId];
845 int flag, initit = 0;
846 BtLatchMgr *latchmgr;
854 SYSTEM_INFO sysinfo[1];
857 // determine sanity of page size and buffer pool
859 if( bits > BT_maxbits )
861 else if( bits < BT_minbits )
865 return NULL; // must have buffer pool
868 mgr = calloc (1, sizeof(BtMgr));
870 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
873 return free(mgr), NULL;
875 cacheblk = 4096; // minimum mmap segment size for unix
878 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
879 attr = FILE_ATTRIBUTE_NORMAL;
880 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
882 if( mgr->idx == INVALID_HANDLE_VALUE )
883 return GlobalFree(mgr), NULL;
885 // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
886 GetSystemInfo(sysinfo);
887 cacheblk = sysinfo->dwAllocationGranularity;
891 latchmgr = malloc (BT_maxpage);
894 // read minimum page size to get root info
895 // to support raw disk partition files
896 // check if bits == 0 on the disk.
898 if( size = lseek (mgr->idx, 0L, 2) ) {
899 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
900 if( latchmgr->alloc->bits )
901 bits = latchmgr->alloc->bits;
905 return free(mgr), free(latchmgr), NULL;
906 } else if( mode == BT_ro )
907 return free(latchmgr), bt_mgrclose (mgr), NULL;
911 latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
912 size = GetFileSize(mgr->idx, amt);
915 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
916 return bt_mgrclose (mgr), NULL;
917 bits = latchmgr->alloc->bits;
918 } else if( mode == BT_ro )
919 return bt_mgrclose (mgr), NULL;
924 mgr->page_size = 1 << bits;
925 mgr->page_bits = bits;
927 mgr->poolmax = poolmax;
930 if( cacheblk < mgr->page_size )
931 cacheblk = mgr->page_size;
933 // mask for partial memmaps
935 mgr->poolmask = (cacheblk >> bits) - 1;
937 // see if requested size of pages per memmap is greater
939 if( (1 << segsize) > mgr->poolmask )
940 mgr->poolmask = (1 << segsize) - 1;
944 while( (1 << mgr->seg_bits) <= mgr->poolmask )
947 mgr->hashsize = hashsize;
950 mgr->pool = calloc (poolmax, sizeof(BtPool));
951 mgr->hash = calloc (hashsize, sizeof(ushort));
952 mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
954 mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
955 mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
956 mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
962 // initialize an empty b-tree with latch page, root page, page of leaves
963 // and page(s) of latches
965 memset (latchmgr, 0, 1 << bits);
966 nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1;
967 bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
968 latchmgr->alloc->bits = mgr->page_bits;
970 latchmgr->nlatchpage = nlatchpage;
971 latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
973 // initialize latch manager
975 latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
977 // size of hash table = total number of latchsets
979 if( latchhash > latchmgr->latchtotal )
980 latchhash = latchmgr->latchtotal;
982 latchmgr->latchhash = latchhash;
984 // initialize left-most LEAF page in
987 bt_putid (latchmgr->alloc->left, LEAF_page);
990 if( pwrite (mgr->idx, latchmgr, mgr->page_size, 0) < mgr->page_size )
991 return bt_mgrclose (mgr), NULL;
993 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
994 return bt_mgrclose (mgr), NULL;
996 if( *amt < mgr->page_size )
997 return bt_mgrclose (mgr), NULL;
1000 memset (latchmgr, 0, 1 << bits);
1001 latchmgr->alloc->bits = mgr->page_bits;
1003 for( lvl=MIN_lvl; lvl--; ) {
1004 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1005 key = keyptr(latchmgr->alloc, 1);
1006 key->len = 2; // create stopper key
1010 bt_putid(value, MIN_lvl - lvl + 1);
1011 val = valptr(latchmgr->alloc, 1);
1012 val->len = lvl ? BtId : 0;
1013 memcpy (val->value, value, val->len);
1015 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
1016 latchmgr->alloc->lvl = lvl;
1017 latchmgr->alloc->cnt = 1;
1018 latchmgr->alloc->act = 1;
1020 if( pwrite (mgr->idx, latchmgr, mgr->page_size, (MIN_lvl - lvl) << bits) < mgr->page_size )
1021 return bt_mgrclose (mgr), NULL;
1023 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1024 return bt_mgrclose (mgr), NULL;
1026 if( *amt < mgr->page_size )
1027 return bt_mgrclose (mgr), NULL;
1031 // clear out latch manager locks
1032 // and rest of pages to round out segment
1034 memset(latchmgr, 0, mgr->page_size);
1037 while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
1039 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
1041 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
1042 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1043 return bt_mgrclose (mgr), NULL;
1044 if( *amt < mgr->page_size )
1045 return bt_mgrclose (mgr), NULL;
1052 // mlock the root page and the latchmgr page
1054 flag = PROT_READ | PROT_WRITE;
1055 mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1056 if( mgr->latchmgr == MAP_FAILED )
1057 return bt_mgrclose (mgr), NULL;
1058 mlock (mgr->latchmgr, 2 * mgr->page_size);
1060 mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1061 if( mgr->latchsets == MAP_FAILED )
1062 return bt_mgrclose (mgr), NULL;
1063 mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1065 flag = PAGE_READWRITE;
1066 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1068 return bt_mgrclose (mgr), NULL;
1070 flag = FILE_MAP_WRITE;
1071 mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1072 if( !mgr->latchmgr )
1073 return GetLastError(), bt_mgrclose (mgr), NULL;
1075 mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1081 VirtualFree (latchmgr, 0, MEM_RELEASE);
1086 // open BTree access method
1087 // based on buffer manager
1089 BtDb *bt_open (BtMgr *mgr)
1091 BtDb *bt = malloc (sizeof(*bt));
1093 memset (bt, 0, sizeof(*bt));
1096 bt->mem = malloc (2 *mgr->page_size);
1098 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1100 bt->frame = (BtPage)bt->mem;
1101 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1105 // compare two keys, returning > 0, = 0, or < 0
1106 // as the comparison value
1108 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1110 uint len1 = key1->len;
1113 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1126 // find segment in pool
1127 // must be called with hashslot idx locked
1128 // return NULL if not there
1129 // otherwise return node
1131 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1136 // compute start of hash chain in pool
1138 if( slot = bt->mgr->hash[idx] )
1139 pool = bt->mgr->pool + slot;
1143 page_no &= ~bt->mgr->poolmask;
1145 while( pool->basepage != page_no )
1146 if( pool = pool->hashnext )
1154 // add segment to hash table
1156 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1161 pool->hashprev = pool->hashnext = NULL;
1162 pool->basepage = page_no & ~bt->mgr->poolmask;
1163 pool->pin = CLOCK_bit + 1;
1165 if( slot = bt->mgr->hash[idx] ) {
1166 node = bt->mgr->pool + slot;
1167 pool->hashnext = node;
1168 node->hashprev = pool;
1171 bt->mgr->hash[idx] = pool->slot;
1174 // map new buffer pool segment to virtual memory
1176 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1178 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1179 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1183 flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1184 pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1185 if( pool->map == MAP_FAILED )
1186 return bt->err = BTERR_map;
1189 flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1190 pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1192 return bt->err = BTERR_map;
1194 flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1195 pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1197 return bt->err = BTERR_map;
1202 // calculate page within pool
1204 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1206 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1209 page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1210 // madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1216 void bt_unpinpool (BtPool *pool)
1219 __sync_fetch_and_add(&pool->pin, -1);
1221 _InterlockedDecrement16 (&pool->pin);
1225 // find or place requested page in segment-pool
1226 // return pool table entry, incrementing pin
1228 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1230 uint slot, hashidx, idx, victim;
1231 BtPool *pool, *node, *next;
1233 // lock hash table chain
1235 hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1236 bt_spinwritelock (&bt->mgr->latch[hashidx]);
1238 // look up in hash table
1240 if( pool = bt_findpool(bt, page_no, hashidx) ) {
1242 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1243 __sync_fetch_and_add(&pool->pin, 1);
1245 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1246 _InterlockedIncrement16 (&pool->pin);
1248 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1252 // allocate a new pool node
1253 // and add to hash table
1256 slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1258 slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1261 if( ++slot < bt->mgr->poolmax ) {
1262 pool = bt->mgr->pool + slot;
1265 if( bt_mapsegment(bt, pool, page_no) )
1268 bt_linkhash(bt, pool, page_no, hashidx);
1269 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1273 // pool table is full
1274 // find best pool entry to evict
1277 __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1279 _InterlockedDecrement16 (&bt->mgr->poolcnt);
1284 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1286 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1288 victim %= bt->mgr->poolmax;
1289 pool = bt->mgr->pool + victim;
1290 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1295 // try to get write lock
1296 // skip entry if not obtained
1298 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1301 // skip this entry if
1303 // or clock bit is set
1307 __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1309 _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1311 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1315 // unlink victim pool node from hash table
1317 if( node = pool->hashprev )
1318 node->hashnext = pool->hashnext;
1319 else if( node = pool->hashnext )
1320 bt->mgr->hash[idx] = node->slot;
1322 bt->mgr->hash[idx] = 0;
1324 if( node = pool->hashnext )
1325 node->hashprev = pool->hashprev;
1327 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1329 // remove old file mapping
1331 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1333 // FlushViewOfFile(pool->map, 0);
1334 UnmapViewOfFile(pool->map);
1335 CloseHandle(pool->hmap);
1339 // create new pool mapping
1340 // and link into hash table
1342 if( bt_mapsegment(bt, pool, page_no) )
1345 bt_linkhash(bt, pool, page_no, hashidx);
1346 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1351 // place write, read, or parent lock on requested page_no.
1353 void bt_lockpage(BtLock mode, BtLatchSet *set)
1357 ReadLock (set->readwr);
1360 WriteLock (set->readwr);
1363 ReadLock (set->access);
1366 WriteLock (set->access);
1369 WriteLock (set->parent);
1374 // remove write, read, or parent lock on requested page
1376 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1380 ReadRelease (set->readwr);
1383 WriteRelease (set->readwr);
1386 ReadRelease (set->access);
1389 WriteRelease (set->access);
1392 WriteRelease (set->parent);
1397 // allocate a new page
1399 BTERR bt_newpage(BtDb *bt, BtPageSet *set)
1403 // lock allocation page
1405 bt_spinwritelock(bt->mgr->latchmgr->lock);
1407 // use empty chain first
1408 // else allocate empty page
1410 if( set->page_no = bt_getid(bt->mgr->latchmgr->chain) ) {
1411 if( set->pool = bt_pinpool (bt, set->page_no) )
1412 set->page = bt_page (bt, set->pool, set->page_no);
1414 return bt->err = BTERR_struct;
1416 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1418 set->page_no = bt_getid(bt->mgr->latchmgr->alloc->right);
1419 bt_putid(bt->mgr->latchmgr->alloc->right, set->page_no+1);
1421 // if writing first page of pool block, set file length thru last page
1423 if( (set->page_no & bt->mgr->poolmask) == 0 )
1424 ftruncate (bt->mgr->idx, (set->page_no + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1426 if( set->pool = bt_pinpool (bt, set->page_no) )
1427 set->page = bt_page (bt, set->pool, set->page_no);
1432 // unlock allocation latch
1434 bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1438 // find slot in page for given key at a given level
1440 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1442 uint diff, higher = set->page->cnt, low = 1, slot;
1445 // make stopper key an infinite fence value
1447 if( bt_getid (set->page->right) )
1452 // low is the lowest candidate.
1453 // loop ends when they meet
1455 // higher is already
1456 // tested as .ge. the passed key.
1458 while( diff = higher - low ) {
1459 slot = low + ( diff >> 1 );
1460 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1463 higher = slot, good++;
1466 // return zero if key is on right link page
1468 return good ? higher : 0;
1471 // find and load page at given level for given key
1472 // leave page rd or wr locked as requested
1474 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1476 uid page_no = ROOT_page, prevpage = 0;
1477 uint drill = 0xff, slot;
1478 BtLatchSet *prevlatch;
1479 uint mode, prevmode;
1482 // start at root of btree and drill down
1485 // determine lock mode of drill level
1486 mode = (drill == lvl) ? lock : BtLockRead;
1488 set->latch = bt_pinlatch (bt, page_no);
1489 set->page_no = page_no;
1491 // pin page contents
1493 if( set->pool = bt_pinpool (bt, page_no) )
1494 set->page = bt_page (bt, set->pool, page_no);
1498 // obtain access lock using lock chaining with Access mode
1500 if( page_no > ROOT_page )
1501 bt_lockpage(BtLockAccess, set->latch);
1503 // release & unpin parent page
1506 bt_unlockpage(prevmode, prevlatch);
1507 bt_unpinlatch (prevlatch);
1508 bt_unpinpool (prevpool);
1512 // obtain read lock using lock chaining
1514 bt_lockpage(mode, set->latch);
1516 if( set->page->free )
1517 return bt->err = BTERR_struct, 0;
1519 if( page_no > ROOT_page )
1520 bt_unlockpage(BtLockAccess, set->latch);
1522 // re-read and re-lock root after determining actual level of root
1524 if( set->page->lvl != drill) {
1525 if( set->page_no != ROOT_page )
1526 return bt->err = BTERR_struct, 0;
1528 drill = set->page->lvl;
1530 if( lock != BtLockRead && drill == lvl ) {
1531 bt_unlockpage(mode, set->latch);
1532 bt_unpinlatch (set->latch);
1533 bt_unpinpool (set->pool);
1538 prevpage = set->page_no;
1539 prevlatch = set->latch;
1540 prevpool = set->pool;
1543 // find key on page at this level
1544 // and descend to requested level
1546 if( set->page->kill )
1549 if( slot = bt_findslot (set, key, len) ) {
1553 while( slotptr(set->page, slot)->dead )
1554 if( slot++ < set->page->cnt )
1559 page_no = bt_getid(valptr(set->page, slot)->value);
1564 // or slide right into next page
1567 page_no = bt_getid(set->page->right);
1571 // return error on end of right chain
1573 bt->err = BTERR_struct;
1574 return 0; // return error
1577 // return page to free list
1578 // page must be delete & write locked
1580 void bt_freepage (BtDb *bt, BtPageSet *set)
1582 // lock allocation page
1584 bt_spinwritelock (bt->mgr->latchmgr->lock);
1587 memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1588 bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1589 set->page->free = 1;
1591 // unlock released page
1593 bt_unlockpage (BtLockDelete, set->latch);
1594 bt_unlockpage (BtLockWrite, set->latch);
1595 bt_unpinlatch (set->latch);
1596 bt_unpinpool (set->pool);
1598 // unlock allocation page
1600 bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1603 // a fence key was deleted from a page
1604 // push new fence value upwards
1606 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1608 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1609 unsigned char value[BtId];
1613 // remove the old fence value
1615 ptr = keyptr(set->page, set->page->cnt);
1616 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1617 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1619 // cache new fence value
1621 ptr = keyptr(set->page, set->page->cnt);
1622 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1624 bt_lockpage (BtLockParent, set->latch);
1625 bt_unlockpage (BtLockWrite, set->latch);
1627 // insert new (now smaller) fence key
1629 bt_putid (value, set->page_no);
1630 ptr = (BtKey*)leftkey;
1632 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1635 // now delete old fence key
1637 ptr = (BtKey*)rightkey;
1639 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1642 bt_unlockpage (BtLockParent, set->latch);
1643 bt_unpinlatch(set->latch);
1644 bt_unpinpool (set->pool);
1648 // root has a single child
1649 // collapse a level from the tree
1651 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1656 // find the child entry and promote as new root contents
1659 for( idx = 0; idx++ < root->page->cnt; )
1660 if( !slotptr(root->page, idx)->dead )
1663 child->page_no = bt_getid (valptr(root->page, idx)->value);
1665 child->latch = bt_pinlatch (bt, child->page_no);
1666 bt_lockpage (BtLockDelete, child->latch);
1667 bt_lockpage (BtLockWrite, child->latch);
1669 if( child->pool = bt_pinpool (bt, child->page_no) )
1670 child->page = bt_page (bt, child->pool, child->page_no);
1674 memcpy (root->page, child->page, bt->mgr->page_size);
1675 bt_freepage (bt, child);
1677 } while( root->page->lvl > 1 && root->page->act == 1 );
1679 bt_unlockpage (BtLockWrite, root->latch);
1680 bt_unpinlatch (root->latch);
1681 bt_unpinpool (root->pool);
1685 // maintain reverse scan pointers by
1686 // linking left pointer of far right node
1688 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1690 BtPageSet right2[1];
1692 // keep track of rightmost leaf page
1694 if( !right_page_no ) {
1695 bt_putid (bt->mgr->latchmgr->alloc->left, left_page_no);
1699 // link right page to left page
1701 right2->latch = bt_pinlatch (bt, right_page_no);
1702 bt_lockpage (BtLockWrite, right2->latch);
1704 if( right2->pool = bt_pinpool (bt, right_page_no) )
1705 right2->page = bt_page (bt, right2->pool, right_page_no);
1709 bt_putid(right2->page->left, left_page_no);
1710 bt_unlockpage (BtLockWrite, right2->latch);
1711 bt_unpinlatch (right2->latch);
1715 // find and delete key on page by marking delete flag bit
1716 // if page becomes empty, delete it from the btree
1718 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1720 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1721 BtPageSet set[1], right[1], right2[1];
1722 uint slot, idx, found, fence;
1723 unsigned char value[BtId];
1727 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1728 ptr = keyptr(set->page, slot);
1732 // if librarian slot, advance to real slot
1734 if( slotptr(set->page, slot)->type == Librarian )
1735 ptr = keyptr(set->page, ++slot);
1737 fence = slot == set->page->cnt;
1739 // if key is found delete it, otherwise ignore request
1741 if( found = !keycmp (ptr, key, len) )
1742 if( found = slotptr(set->page, slot)->dead == 0 ) {
1743 val = valptr(set->page,slot);
1744 slotptr(set->page, slot)->dead = 1;
1745 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1748 // collapse empty slots beneath the fence
1750 while( idx = set->page->cnt - 1 )
1751 if( slotptr(set->page, idx)->dead ) {
1752 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1753 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1758 // did we delete a fence key in an upper level?
1760 if( found && lvl && set->page->act && fence )
1761 if( bt_fixfence (bt, set, lvl) )
1764 return bt->found = found, 0;
1766 // do we need to collapse root?
1768 if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1769 if( bt_collapseroot (bt, set) )
1772 return bt->found = found, 0;
1774 // return if page is not empty
1776 if( set->page->act ) {
1777 bt_unlockpage(BtLockWrite, set->latch);
1778 bt_unpinlatch (set->latch);
1779 bt_unpinpool (set->pool);
1780 return bt->found = found, 0;
1783 // cache copy of fence key
1784 // to post in parent
1786 ptr = keyptr(set->page, set->page->cnt);
1787 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1789 // obtain lock on right page
1791 right->page_no = bt_getid(set->page->right);
1792 right->latch = bt_pinlatch (bt, right->page_no);
1793 bt_lockpage (BtLockWrite, right->latch);
1795 // pin page contents
1797 if( right->pool = bt_pinpool (bt, right->page_no) )
1798 right->page = bt_page (bt, right->pool, right->page_no);
1802 if( right->page->kill )
1803 return bt->err = BTERR_struct;
1805 // transfer left link
1807 memcpy (right->page->left, set->page->left, BtId);
1809 // pull contents of right peer into our empty page
1811 memcpy (set->page, right->page, bt->mgr->page_size);
1816 if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1819 // cache copy of key to update
1821 ptr = keyptr(right->page, right->page->cnt);
1822 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1824 // mark right page deleted and point it to left page
1825 // until we can post parent updates
1827 bt_putid (right->page->right, set->page_no);
1828 right->page->kill = 1;
1830 bt_lockpage (BtLockParent, right->latch);
1831 bt_unlockpage (BtLockWrite, right->latch);
1833 bt_lockpage (BtLockParent, set->latch);
1834 bt_unlockpage (BtLockWrite, set->latch);
1836 // redirect higher key directly to our new node contents
1838 bt_putid (value, set->page_no);
1839 ptr = (BtKey*)higherfence;
1841 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1844 // delete old lower key to our node
1846 ptr = (BtKey*)lowerfence;
1848 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1851 // obtain delete and write locks to right node
1853 bt_unlockpage (BtLockParent, right->latch);
1854 bt_lockpage (BtLockDelete, right->latch);
1855 bt_lockpage (BtLockWrite, right->latch);
1856 bt_freepage (bt, right);
1858 bt_unlockpage (BtLockParent, set->latch);
1859 bt_unpinlatch (set->latch);
1860 bt_unpinpool (set->pool);
1865 BtKey *bt_foundkey (BtDb *bt)
1867 return (BtKey*)(bt->key);
1870 // advance to next slot
1872 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1874 BtLatchSet *prevlatch;
1878 if( slot < set->page->cnt )
1881 prevlatch = set->latch;
1882 prevpool = set->pool;
1884 if( page_no = bt_getid(set->page->right) )
1885 set->latch = bt_pinlatch (bt, page_no);
1887 return bt->err = BTERR_struct, 0;
1889 // pin page contents
1891 if( set->pool = bt_pinpool (bt, page_no) )
1892 set->page = bt_page (bt, set->pool, page_no);
1896 // obtain access lock using lock chaining with Access mode
1898 bt_lockpage(BtLockAccess, set->latch);
1900 bt_unlockpage(BtLockRead, prevlatch);
1901 bt_unpinlatch (prevlatch);
1902 bt_unpinpool (prevpool);
1904 bt_lockpage(BtLockRead, set->latch);
1905 bt_unlockpage(BtLockAccess, set->latch);
1907 set->page_no = page_no;
1911 // find unique key or first duplicate key in
1912 // leaf level and return number of value bytes
1913 // or (-1) if not found. Setup key for bt_foundkey
1915 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1923 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1925 ptr = keyptr(set->page, slot);
1927 // skip librarian slot place holder
1929 if( slotptr(set->page, slot)->type == Librarian )
1930 ptr = keyptr(set->page, ++slot);
1932 // return actual key found
1934 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1937 if( slotptr(set->page, slot)->type == Duplicate )
1940 // if key exists, return >= 0 value bytes copied
1941 // otherwise return (-1)
1943 if( slotptr(set->page, slot)->dead )
1947 if( !memcmp (ptr->key, key, len) ) {
1948 val = valptr (set->page,slot);
1949 if( valmax > val->len )
1951 memcpy (value, val->value, valmax);
1957 } while( slot = bt_findnext (bt, set, slot) );
1959 bt_unlockpage (BtLockRead, set->latch);
1960 bt_unpinlatch (set->latch);
1961 bt_unpinpool (set->pool);
1965 // check page for space available,
1966 // clean if necessary and return
1967 // 0 - page needs splitting
1968 // >0 new slot value
1970 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1972 uint nxt = bt->mgr->page_size;
1973 uint cnt = 0, idx = 0;
1974 uint max = page->cnt;
1979 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1982 // skip cleanup and proceed to split
1983 // if there's not enough garbage
1986 if( page->garbage < nxt / 5 )
1989 memcpy (bt->frame, page, bt->mgr->page_size);
1991 // skip page info and set rest of page to zero
1993 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1997 // clean up page first by
1998 // removing deleted keys
2000 while( cnt++ < max ) {
2003 if( cnt < max && slotptr(bt->frame,cnt)->dead )
2006 // copy the value across
2008 val = valptr(bt->frame, cnt);
2009 nxt -= val->len + sizeof(BtVal);
2010 ((unsigned char *)page)[nxt] = val->len;
2011 memcpy ((unsigned char *)page + nxt + sizeof(BtVal), val->value, val->len);
2013 // copy the key across
2015 key = keyptr(bt->frame, cnt);
2016 nxt -= key->len + sizeof(BtKey);
2017 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2019 // make a librarian slot
2022 slotptr(page, ++idx)->off = nxt;
2023 slotptr(page, idx)->type = Librarian;
2024 slotptr(page, idx)->dead = 1;
2029 slotptr(page, ++idx)->off = nxt;
2030 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2032 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2039 // see if page has enough space now, or does it need splitting?
2041 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2047 // split the root and raise the height of the btree
2049 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
2051 uint nxt = bt->mgr->page_size;
2052 unsigned char value[BtId];
2057 // Obtain an empty page to use, and copy the current
2058 // root contents into it, e.g. lower keys
2060 if( bt_newpage(bt, left) )
2063 memcpy(left->page, root->page, bt->mgr->page_size);
2064 bt_unpinpool (left->pool);
2066 // set left from higher to lower page
2068 bt_putid (right->page->left, left->page_no);
2069 bt_unpinpool (right->pool);
2071 // preserve the page info at the bottom
2072 // of higher keys and set rest to zero
2074 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2076 // insert stopper key at top of newroot page
2077 // and increase the root height
2079 nxt -= BtId + sizeof(BtVal);
2080 bt_putid (value, right->page_no);
2081 val = (BtVal *)((unsigned char *)root->page + nxt);
2082 memcpy (val->value, value, BtId);
2085 nxt -= 2 + sizeof(BtKey);
2086 slotptr(root->page, 2)->off = nxt;
2087 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2092 // insert lower keys page fence key on newroot page as first key
2094 nxt -= BtId + sizeof(BtVal);
2095 bt_putid (value, left->page_no);
2096 val = (BtVal *)((unsigned char *)root->page + nxt);
2097 memcpy (val->value, value, BtId);
2100 nxt -= leftkey->len + sizeof(BtKey);
2101 slotptr(root->page, 1)->off = nxt;
2102 memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
2104 bt_putid(root->page->right, 0);
2105 root->page->min = nxt; // reset lowest used offset and key count
2106 root->page->cnt = 2;
2107 root->page->act = 2;
2110 // release and unpin root
2112 bt_unlockpage(BtLockWrite, root->latch);
2113 bt_unpinlatch (root->latch);
2114 bt_unpinpool (root->pool);
2118 // split already locked full node
2121 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
2123 unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
2124 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2125 unsigned char value[BtId];
2126 uint lvl = set->page->lvl;
2133 // split higher half of keys to bt->frame
2135 memset (bt->frame, 0, bt->mgr->page_size);
2136 max = set->page->cnt;
2140 while( cnt++ < max ) {
2141 if( slotptr(set->page, cnt)->dead && cnt < max )
2143 src = valptr(set->page, cnt);
2144 nxt -= src->len + sizeof(BtVal);
2145 val = (BtVal*)((unsigned char *)bt->frame + nxt);
2146 memcpy (val->value, src->value, src->len);
2147 val->len = src->len;
2149 key = keyptr(set->page, cnt);
2150 nxt -= key->len + sizeof(BtKey);
2151 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
2152 memcpy (ptr, key, key->len + sizeof(BtKey));
2154 // add librarian slot
2157 slotptr(bt->frame, ++idx)->off = nxt;
2158 slotptr(bt->frame, idx)->type = Librarian;
2159 slotptr(bt->frame, idx)->dead = 1;
2164 slotptr(bt->frame, ++idx)->off = nxt;
2165 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2167 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2171 // remember existing fence key for new page to the right
2173 memcpy (rightkey, key, key->len + sizeof(BtKey));
2175 bt->frame->bits = bt->mgr->page_bits;
2176 bt->frame->min = nxt;
2177 bt->frame->cnt = idx;
2178 bt->frame->lvl = lvl;
2182 if( set->page_no > ROOT_page ) {
2183 bt_putid (bt->frame->right, bt_getid (set->page->right));
2184 bt_putid(bt->frame->left, set->page_no);
2187 // get new free page and write higher keys to it.
2189 if( bt_newpage(bt, right) )
2194 if( set->page_no > ROOT_page && !lvl )
2195 if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
2198 memcpy (right->page, bt->frame, bt->mgr->page_size);
2199 bt_unpinpool (right->pool);
2201 // update lower keys to continue in old page
2203 memcpy (bt->frame, set->page, bt->mgr->page_size);
2204 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2205 nxt = bt->mgr->page_size;
2206 set->page->garbage = 0;
2212 if( slotptr(bt->frame, max)->type == Librarian )
2215 // assemble page of smaller keys
2217 while( cnt++ < max ) {
2218 if( slotptr(bt->frame, cnt)->dead )
2220 val = valptr(bt->frame, cnt);
2221 nxt -= val->len + sizeof(BtVal);
2222 ((unsigned char *)set->page)[nxt] = val->len;
2223 memcpy ((unsigned char *)set->page + nxt + sizeof(BtVal), val->value, val->len);
2225 key = keyptr(bt->frame, cnt);
2226 nxt -= key->len + sizeof(BtKey);
2227 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2229 // add librarian slot
2232 slotptr(set->page, ++idx)->off = nxt;
2233 slotptr(set->page, idx)->type = Librarian;
2234 slotptr(set->page, idx)->dead = 1;
2239 slotptr(set->page, ++idx)->off = nxt;
2240 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2244 // remember fence key for smaller page
2246 memcpy(fencekey, key, key->len + sizeof(BtKey));
2248 bt_putid(set->page->right, right->page_no);
2249 set->page->min = nxt;
2250 set->page->cnt = idx;
2252 // if current page is the root page, split it
2254 if( set->page_no == ROOT_page )
2255 return bt_splitroot (bt, set, (BtKey *)fencekey, right);
2257 // insert new fences in their parent pages
2259 right->latch = bt_pinlatch (bt, right->page_no);
2260 bt_lockpage (BtLockParent, right->latch);
2262 bt_lockpage (BtLockParent, set->latch);
2263 bt_unlockpage (BtLockWrite, set->latch);
2265 // insert new fence for reformulated left block of smaller keys
2267 bt_putid (value, set->page_no);
2269 if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 1) )
2272 // switch fence for right block of larger keys to new right page
2274 bt_putid (value, right->page_no);
2276 if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, 1) )
2279 bt_unlockpage (BtLockParent, set->latch);
2280 bt_unpinlatch (set->latch);
2281 bt_unpinpool (set->pool);
2283 bt_unlockpage (BtLockParent, right->latch);
2284 bt_unpinlatch (right->latch);
2288 // install new key and value onto page
2289 // page must already be checked for
2292 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2294 uint idx, librarian;
2299 // if found slot > desired slot and previous slot
2300 // is a librarian slot, use it
2303 if( slotptr(set->page, slot-1)->type == Librarian )
2306 // copy value onto page
2308 set->page->min -= vallen + sizeof(BtVal);
2309 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2310 memcpy (val->value, value, vallen);
2313 // copy key onto page
2315 set->page->min -= keylen + sizeof(BtKey);
2316 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2317 memcpy (ptr->key, key, keylen);
2320 // find first empty slot
2322 for( idx = slot; idx < set->page->cnt; idx++ )
2323 if( slotptr(set->page, idx)->dead )
2326 // now insert key into array before slot
2328 if( idx == set->page->cnt )
2329 idx += 2, set->page->cnt += 2, librarian = 2;
2335 while( idx > slot + librarian - 1 )
2336 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2338 // add librarian slot
2340 if( librarian > 1 ) {
2341 node = slotptr(set->page, slot++);
2342 node->off = set->page->min;
2343 node->type = Librarian;
2349 node = slotptr(set->page, slot);
2350 node->off = set->page->min;
2354 bt_unlockpage (BtLockWrite, set->latch);
2355 bt_unpinlatch (set->latch);
2356 bt_unpinpool (set->pool);
2360 // Insert new key into the btree at given level.
2361 // either add a new key or update/add an existing one
2363 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2365 unsigned char newkey[BT_keyarray];
2366 uint slot, idx, len;
2373 // set up the key we're working on
2375 ins = (BtKey*)newkey;
2376 memcpy (ins->key, key, keylen);
2379 // is this a non-unique index value?
2385 sequence = bt_newdup (bt);
2386 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2390 while( 1 ) { // find the page and slot for the current key
2391 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2392 ptr = keyptr(set->page, slot);
2395 bt->err = BTERR_ovflw;
2399 // if librarian slot == found slot, advance to real slot
2401 if( slotptr(set->page, slot)->type == Librarian )
2402 if( !keycmp (ptr, key, keylen) )
2403 ptr = keyptr(set->page, ++slot);
2407 if( slotptr(set->page, slot)->type == Duplicate )
2410 // if inserting a duplicate key or unique key
2411 // check for adequate space on the page
2412 // and insert the new key before slot.
2414 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2415 if( !(slot = bt_cleanpage (bt, set->page, ins->len, slot, vallen)) )
2416 if( bt_splitpage (bt, set) )
2421 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
2424 // if key already exists, update value and return
2426 val = valptr(set->page, slot);
2428 if( val->len >= vallen ) {
2429 if( slotptr(set->page, slot)->dead )
2431 set->page->garbage += val->len - vallen;
2432 slotptr(set->page, slot)->dead = 0;
2434 memcpy (val->value, value, vallen);
2435 bt_unlockpage(BtLockWrite, set->latch);
2436 bt_unpinlatch (set->latch);
2437 bt_unpinpool (set->pool);
2441 // new update value doesn't fit in existing value area
2443 if( !slotptr(set->page, slot)->dead )
2444 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2446 slotptr(set->page, slot)->dead = 0;
2450 if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
2451 if( bt_splitpage (bt, set) )
2456 set->page->min -= vallen + sizeof(BtVal);
2457 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2458 memcpy (val->value, value, vallen);
2461 set->page->min -= keylen + sizeof(BtKey);
2462 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2463 memcpy (ptr->key, key, keylen);
2466 slotptr(set->page, slot)->off = set->page->min;
2467 bt_unlockpage(BtLockWrite, set->latch);
2468 bt_unpinlatch (set->latch);
2469 bt_unpinpool (set->pool);
2475 // set cursor to highest slot on highest page
2477 uint bt_lastkey (BtDb *bt)
2479 uid page_no = bt_getid (bt->mgr->latchmgr->alloc->left);
2483 if( set->pool = bt_pinpool (bt, page_no) )
2484 set->page = bt_page (bt, set->pool, page_no);
2488 set->latch = bt_pinlatch (bt, page_no);
2489 bt_lockpage(BtLockRead, set->latch);
2491 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2492 slot = set->page->cnt;
2494 bt_unlockpage(BtLockRead, set->latch);
2495 bt_unpinlatch (set->latch);
2496 bt_unpinpool (set->pool);
2501 // return previous slot on cursor page
2503 uint bt_prevkey (BtDb *bt, uint slot)
2511 if( left = bt_getid(bt->cursor->left) )
2512 bt->cursor_page = left;
2516 if( set->pool = bt_pinpool (bt, left) )
2517 set->page = bt_page (bt, set->pool, left);
2521 set->latch = bt_pinlatch (bt, left);
2522 bt_lockpage(BtLockRead, set->latch);
2524 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2526 bt_unlockpage(BtLockRead, set->latch);
2527 bt_unpinlatch (set->latch);
2528 bt_unpinpool (set->pool);
2529 return bt->cursor->cnt;
2532 // return next slot on cursor page
2533 // or slide cursor right into next page
2535 uint bt_nextkey (BtDb *bt, uint slot)
2541 right = bt_getid(bt->cursor->right);
2543 while( slot++ < bt->cursor->cnt )
2544 if( slotptr(bt->cursor,slot)->dead )
2546 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2554 bt->cursor_page = right;
2556 if( set->pool = bt_pinpool (bt, right) )
2557 set->page = bt_page (bt, set->pool, right);
2561 set->latch = bt_pinlatch (bt, right);
2562 bt_lockpage(BtLockRead, set->latch);
2564 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2566 bt_unlockpage(BtLockRead, set->latch);
2567 bt_unpinlatch (set->latch);
2568 bt_unpinpool (set->pool);
2576 // cache page of keys into cursor and return starting slot for given key
2578 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2583 // cache page for retrieval
2585 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2586 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2590 bt->cursor_page = set->page_no;
2592 bt_unlockpage(BtLockRead, set->latch);
2593 bt_unpinlatch (set->latch);
2594 bt_unpinpool (set->pool);
2598 BtKey *bt_key(BtDb *bt, uint slot)
2600 return keyptr(bt->cursor, slot);
2603 BtVal *bt_val(BtDb *bt, uint slot)
2605 return valptr(bt->cursor,slot);
2611 double getCpuTime(int type)
2614 FILETIME xittime[1];
2615 FILETIME systime[1];
2616 FILETIME usrtime[1];
2617 SYSTEMTIME timeconv[1];
2620 memset (timeconv, 0, sizeof(SYSTEMTIME));
2624 GetSystemTimeAsFileTime (xittime);
2625 FileTimeToSystemTime (xittime, timeconv);
2626 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2629 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2630 FileTimeToSystemTime (usrtime, timeconv);
2633 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2634 FileTimeToSystemTime (systime, timeconv);
2638 ans += (double)timeconv->wHour * 3600;
2639 ans += (double)timeconv->wMinute * 60;
2640 ans += (double)timeconv->wSecond;
2641 ans += (double)timeconv->wMilliseconds / 1000;
2646 #include <sys/resource.h>
2648 double getCpuTime(int type)
2650 struct rusage used[1];
2651 struct timeval tv[1];
2655 gettimeofday(tv, NULL);
2656 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2659 getrusage(RUSAGE_SELF, used);
2660 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2663 getrusage(RUSAGE_SELF, used);
2664 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2671 uint bt_latchaudit (BtDb *bt)
2673 ushort idx, hashidx;
2680 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2682 if( *(ushort *)(bt->mgr->latchmgr->lock) )
2683 fprintf(stderr, "Alloc page locked\n");
2684 *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2686 for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2687 latch = bt->mgr->latchsets + idx;
2688 if( *latch->readwr->rin & MASK )
2689 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2690 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2692 if( *latch->access->rin & MASK )
2693 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2694 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2696 if( *latch->parent->rin & MASK )
2697 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2698 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2701 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2706 for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2707 if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2708 fprintf(stderr, "hash entry %d locked\n", hashidx);
2710 *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2712 if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2713 latch = bt->mgr->latchsets + idx;
2714 if( *(ushort *)latch->busy )
2715 fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2716 *(ushort *)latch->busy = 0;
2718 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2719 } while( idx = latch->next );
2722 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2723 page_no = LEAF_page;
2725 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2726 off64_t off = page_no << bt->mgr->page_bits;
2728 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2732 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2734 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2735 fprintf(stderr, "page %.8x unable to read\n", page_no);
2737 if( *amt < bt->mgr->page_size )
2738 fprintf(stderr, "page %.8x unable to read\n", page_no);
2740 if( !bt->frame->free ) {
2741 for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2742 ptr = keyptr(bt->frame, idx+1);
2743 if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2744 fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2746 if( !bt->frame->lvl )
2747 cnt += bt->frame->act;
2749 if( page_no > LEAF_page )
2764 // standalone program to index file of keys
2765 // then list them onto std-out
2768 void *index_file (void *arg)
2770 uint __stdcall index_file (void *arg)
2773 int line = 0, found = 0, cnt = 0, unique;
2774 uid next, page_no = LEAF_page; // start on first page of leaves
2775 unsigned char key[BT_maxkey];
2776 ThreadArg *args = arg;
2777 int ch, len = 0, slot;
2784 bt = bt_open (args->mgr);
2786 unique = (args->type[1] | 0x20) != 'd';
2788 switch(args->type[0] | 0x20)
2791 fprintf(stderr, "started latch mgr audit\n");
2792 cnt = bt_latchaudit (bt);
2793 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2797 fprintf(stderr, "started pennysort for %s\n", args->infile);
2798 if( in = fopen (args->infile, "rb") )
2799 while( ch = getc(in), ch != EOF )
2804 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2805 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2808 else if( len < BT_maxkey )
2810 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2814 fprintf(stderr, "started indexing for %s\n", args->infile);
2815 if( in = fopen (args->infile, "rb") )
2816 while( ch = getc(in), ch != EOF )
2821 if( args->num == 1 )
2822 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2824 else if( args->num )
2825 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2827 if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
2828 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2831 else if( len < BT_maxkey )
2833 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2837 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2838 if( in = fopen (args->infile, "rb") )
2839 while( ch = getc(in), ch != EOF )
2843 if( args->num == 1 )
2844 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2846 else if( args->num )
2847 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2849 if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2850 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2851 ptr = (BtKey*)(bt->key);
2854 if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2855 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2858 else if( len < BT_maxkey )
2860 fprintf(stderr, "finished %s for %d keys, %d found\n", args->infile, line, found);
2864 fprintf(stderr, "started finding keys for %s\n", args->infile);
2865 if( in = fopen (args->infile, "rb") )
2866 while( ch = getc(in), ch != EOF )
2870 if( args->num == 1 )
2871 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2873 else if( args->num )
2874 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2876 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2879 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2882 else if( len < BT_maxkey )
2884 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2888 fprintf(stderr, "started scanning\n");
2890 if( set->pool = bt_pinpool (bt, page_no) )
2891 set->page = bt_page (bt, set->pool, page_no);
2894 set->latch = bt_pinlatch (bt, page_no);
2895 bt_lockpage (BtLockRead, set->latch);
2896 next = bt_getid (set->page->right);
2898 for( slot = 0; slot++ < set->page->cnt; )
2899 if( next || slot < set->page->cnt )
2900 if( !slotptr(set->page, slot)->dead ) {
2901 ptr = keyptr(set->page, slot);
2904 if( slotptr(set->page, slot)->type == Duplicate )
2907 fwrite (ptr->key, len, 1, stdout);
2908 val = valptr(set->page, slot);
2909 fwrite (val->value, val->len, 1, stdout);
2910 fputc ('\n', stdout);
2914 bt_unlockpage (BtLockRead, set->latch);
2915 bt_unpinlatch (set->latch);
2916 bt_unpinpool (set->pool);
2917 } while( page_no = next );
2919 fprintf(stderr, " Total keys read %d\n", cnt);
2923 fprintf(stderr, "started reverse scan\n");
2924 if( slot = bt_lastkey (bt) )
2925 while( slot = bt_prevkey (bt, slot) ) {
2926 if( slotptr(bt->cursor, slot)->dead )
2929 ptr = keyptr(bt->cursor, slot);
2932 if( slotptr(bt->cursor, slot)->type == Duplicate )
2935 fwrite (ptr->key, len, 1, stdout);
2936 val = valptr(bt->cursor, slot);
2937 fwrite (val->value, val->len, 1, stdout);
2938 fputc ('\n', stdout);
2942 fprintf(stderr, " Total keys read %d\n", cnt);
2947 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2949 fprintf(stderr, "started counting\n");
2950 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2951 page_no = LEAF_page;
2953 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2954 uid off = page_no << bt->mgr->page_bits;
2956 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2960 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2962 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2963 return bt->err = BTERR_map;
2965 if( *amt < bt->mgr->page_size )
2966 return bt->err = BTERR_map;
2968 if( !bt->frame->free && !bt->frame->lvl )
2969 cnt += bt->frame->act;
2970 if( page_no > LEAF_page )
2975 cnt--; // remove stopper key
2976 fprintf(stderr, " Total keys read %d\n", cnt);
2988 typedef struct timeval timer;
2990 int main (int argc, char **argv)
2992 int idx, cnt, len, slot, err;
2993 int segsize, bits = 16;
3010 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3011 fprintf (stderr, " where page_bits is the page size in bits\n");
3012 fprintf (stderr, " mapped_segments is the number of mmap segments in buffer pool\n");
3013 fprintf (stderr, " seg_bits is the size of individual segments in buffer pool in pages in bits\n");
3014 fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n");
3015 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3019 start = getCpuTime(0);
3022 bits = atoi(argv[3]);
3025 poolsize = atoi(argv[4]);
3028 fprintf (stderr, "Warning: no mapped_pool\n");
3030 if( poolsize > 65535 )
3031 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
3034 segsize = atoi(argv[5]);
3036 segsize = 4; // 16 pages per mmap segment
3039 num = atoi(argv[6]);
3043 threads = malloc (cnt * sizeof(pthread_t));
3045 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3047 args = malloc (cnt * sizeof(ThreadArg));
3049 mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
3052 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3058 for( idx = 0; idx < cnt; idx++ ) {
3059 args[idx].infile = argv[idx + 7];
3060 args[idx].type = argv[2];
3061 args[idx].mgr = mgr;
3062 args[idx].num = num;
3063 args[idx].idx = idx;
3065 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3066 fprintf(stderr, "Error creating thread %d\n", err);
3068 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3072 // wait for termination
3075 for( idx = 0; idx < cnt; idx++ )
3076 pthread_join (threads[idx], NULL);
3078 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3080 for( idx = 0; idx < cnt; idx++ )
3081 CloseHandle(threads[idx]);
3084 elapsed = getCpuTime(0) - start;
3085 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3086 elapsed = getCpuTime(1);
3087 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3088 elapsed = getCpuTime(2);
3089 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);