1 // btree version threadskv5 sched_yield version
2 // with reworked bt_deletekey code,
3 // phase-fair reader writer lock,
4 // librarian page split code,
5 // duplicate key management
6 // and bi-directional cursors
10 // author: karl malbrain, malbrain@cal.berkeley.edu
13 This work, including the source code, documentation
14 and related data, is placed into the public domain.
16 The orginal author is Karl Malbrain.
18 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
19 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
20 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
21 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
22 RESULTING FROM THE USE, MODIFICATION, OR
23 REDISTRIBUTION OF THIS SOFTWARE.
26 // Please see the project home page for documentation
27 // code.google.com/p/high-concurrency-btree
29 #define _FILE_OFFSET_BITS 64
30 #define _LARGEFILE64_SOURCE
46 #define WIN32_LEAN_AND_MEAN
60 typedef unsigned long long uid;
63 typedef unsigned long long off64_t;
64 typedef unsigned short ushort;
65 typedef unsigned int uint;
68 #define BT_latchtable 128 // number of latch manager slots
70 #define BT_ro 0x6f72 // ro
71 #define BT_rw 0x7772 // rw
73 #define BT_maxbits 24 // maximum page size in bits
74 #define BT_minbits 9 // minimum page size in bits
75 #define BT_minpage (1 << BT_minbits) // minimum page size
76 #define BT_maxpage (1 << BT_maxbits) // maximum page size
79 There are five lock types for each node in three independent sets:
80 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete.
81 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent.
82 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock.
83 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks.
84 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification.
95 // definition for phase-fair reader/writer lock implementation
109 // definition for spin latch implementation
111 // exclusive is set for write access
112 // share is count of read accessors
113 // grant write lock when share == 0
115 volatile typedef struct {
126 // hash table entries
129 BtSpinLatch latch[1];
130 volatile ushort slot; // Latch table entry at head of chain
133 // latch manager table structure
136 RWLock readwr[1]; // read/write page lock
137 RWLock access[1]; // Access Intent/Page delete
138 RWLock parent[1]; // Posting of fence key in parent
139 BtSpinLatch busy[1]; // slot is being moved between chains
140 volatile ushort next; // next entry in hash table chain
141 volatile ushort prev; // prev entry in hash table chain
142 volatile ushort pin; // number of outstanding locks
143 volatile ushort hash; // hash slot entry is under
144 volatile uid page_no; // latch set page number
147 // Define the length of the page record numbers
151 // Page key slot definition.
153 // Keys are marked dead, but remain on the page until
154 // it cleanup is called. The fence key (highest key) for
155 // a leaf page is always present, even after cleanup.
159 // In addition to the Unique keys that occupy slots
160 // there are Librarian and Duplicate key
161 // slots occupying the key slot array.
163 // The Librarian slots are dead keys that
164 // serve as filler, available to add new Unique
165 // or Dup slots that are inserted into the B-tree.
167 // The Duplicate slots have had their key bytes extended
168 // by 6 bytes to contain a binary duplicate key uniqueifier.
177 uint off:BT_maxbits; // page offset for key start
178 uint type:3; // type of slot
179 uint dead:1; // set for deleted slot
182 // The key structure occupies space at the upper end of
183 // each page. It's a length byte followed by the key
187 unsigned char len; // this can be changed to a ushort or uint
188 unsigned char key[0];
191 // the value structure also occupies space at the upper
192 // end of the page. Each key is immediately followed by a value.
195 unsigned char len; // this can be changed to a ushort or uint
196 unsigned char value[0];
199 #define BT_maxkey 255 // maximum number of bytes in a key
200 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
202 // The first part of an index page.
203 // It is immediately followed
204 // by the BtSlot array of keys.
206 // note that this structure size
207 // must be a multiple of 8 bytes
208 // in order to place dups correctly.
210 typedef struct BtPage_ {
211 uint cnt; // count of keys in page
212 uint act; // count of active keys
213 uint min; // next key offset
214 uint garbage; // page garbage in bytes
215 unsigned char bits:7; // page size in bits
216 unsigned char free:1; // page is on free chain
217 unsigned char lvl:7; // level of page
218 unsigned char kill:1; // page is being deleted
219 unsigned char left[BtId]; // page number to left
220 unsigned char filler[2]; // padding to multiple of 8
221 unsigned char right[BtId]; // page number to right
224 // The memory mapping pool table buffer manager entry
227 uid basepage; // mapped base page number
228 char *map; // mapped memory pointer
229 ushort slot; // slot index in this array
230 ushort pin; // mapped page pin counter
231 void *hashprev; // previous pool entry for the same hash idx
232 void *hashnext; // next pool entry for the same hash idx
234 HANDLE hmap; // Windows memory mapping handle
238 #define CLOCK_bit 0x8000 // bit in pool->pin
240 // The loadpage interface object
243 uid page_no; // current page number
244 BtPage page; // current page pointer
245 BtPool *pool; // current page pool
246 BtLatchSet *latch; // current page latch set
249 // structure for latch manager on ALLOC_page
252 struct BtPage_ alloc[1]; // next page_no in right ptr
253 unsigned long long dups[1]; // global duplicate key uniqueifier
254 unsigned char chain[BtId]; // head of free page_nos chain
255 BtSpinLatch lock[1]; // allocation area lite latch
256 ushort latchdeployed; // highest number of latch entries deployed
257 ushort nlatchpage; // number of latch pages at BT_latch
258 ushort latchtotal; // number of page latch entries
259 ushort latchhash; // number of latch hash table slots
260 ushort latchvictim; // next latch entry to examine
261 BtHashEntry table[0]; // the hash table
264 // The object structure for Btree access
267 uint page_size; // page size
268 uint page_bits; // page size in bits
269 uint seg_bits; // seg size in pages in bits
270 uint mode; // read-write mode
276 ushort poolcnt; // highest page pool node in use
277 ushort poolmax; // highest page pool node allocated
278 ushort poolmask; // total number of pages in mmap segment - 1
279 ushort hashsize; // size of Hash Table for pool entries
280 volatile uint evicted; // last evicted hash table slot
281 ushort *hash; // pool index for hash entries
282 BtSpinLatch *latch; // latches for hash table slots
283 BtLatchMgr *latchmgr; // mapped latch page from allocation page
284 BtLatchSet *latchsets; // mapped latch set from latch pages
285 BtPool *pool; // memory pool page segments
287 HANDLE halloc; // allocation and latch table handle
292 BtMgr *mgr; // buffer manager for thread
293 BtPage cursor; // cached frame for start/next (never mapped)
294 BtPage frame; // spare frame for the page split (never mapped)
295 uid cursor_page; // current cursor page number
296 unsigned char *mem; // frame, cursor, page memory buffer
297 unsigned char key[BT_keyarray]; // last found complete key
298 int found; // last delete or insert was found
299 int err; // last error
313 extern void bt_close (BtDb *bt);
314 extern BtDb *bt_open (BtMgr *mgr);
315 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
316 extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
317 extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
318 extern BtKey *bt_foundkey (BtDb *bt);
319 extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
320 extern uint bt_nextkey (BtDb *bt, uint slot);
323 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
324 void bt_mgrclose (BtMgr *mgr);
326 // Helper functions to return slot values
327 // from the cursor page.
329 extern BtKey *bt_key (BtDb *bt, uint slot);
330 extern BtVal *bt_val (BtDb *bt, uint slot);
332 // BTree page number constants
333 #define ALLOC_page 0 // allocation & latch manager hash table
334 #define ROOT_page 1 // root of the btree
335 #define LEAF_page 2 // first page of leaves
336 #define LATCH_page 3 // pages for latch manager
338 // Number of levels to create in a new BTree
342 // The page is allocated from low and hi ends.
343 // The key slots are allocated from the bottom,
344 // while the text and value of the key
345 // are allocated from the top. When the two
346 // areas meet, the page is split into two.
348 // A key consists of a length byte, two bytes of
349 // index number (0 - 65534), and up to 253 bytes
352 // Associated with each key is a value byte string
353 // containing any value desired.
355 // The b-tree root is always located at page 1.
356 // The first leaf page of level zero is always
357 // located on page 2.
359 // The b-tree pages are linked with next
360 // pointers to facilitate enumerators,
361 // and provide for concurrency.
363 // When to root page fills, it is split in two and
364 // the tree height is raised by a new root at page
365 // one with two keys.
367 // Deleted keys are marked with a dead bit until
368 // page cleanup. The fence key for a leaf node is
371 // Groups of pages called segments from the btree are optionally
372 // cached with a memory mapped pool. A hash table is used to keep
373 // track of the cached segments. This behaviour is controlled
374 // by the cache block size parameter to bt_open.
376 // To achieve maximum concurrency one page is locked at a time
377 // as the tree is traversed to find leaf key in question. The right
378 // page numbers are used in cases where the page is being split,
381 // Page 0 is dedicated to lock for new page extensions,
382 // and chains empty pages together for reuse. It also
383 // contains the latch manager hash table.
385 // The ParentModification lock on a node is obtained to serialize posting
386 // or changing the fence key for a node.
388 // Empty pages are chained together through the ALLOC page and reused.
390 // Access macros to address slot and key values from the page
391 // Page slots use 1 based indexing.
393 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
394 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
395 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
397 void bt_putid(unsigned char *dest, uid id)
402 dest[i] = (unsigned char)id, id >>= 8;
405 uid bt_getid(unsigned char *src)
410 for( i = 0; i < BtId; i++ )
411 id <<= 8, id |= *src++;
416 uid bt_newdup (BtDb *bt)
419 return __sync_fetch_and_add (bt->mgr->latchmgr->dups, 1) + 1;
421 return _InterlockedIncrement64(bt->mgr->latchmgr->dups, 1);
425 // Phase-Fair reader/writer lock implementation
427 void WriteLock (RWLock *lock)
432 tix = __sync_fetch_and_add (lock->ticket, 1);
434 tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
436 // wait for our ticket to come up
438 while( tix != lock->serving[0] )
445 w = PRES | (tix & PHID);
447 r = __sync_fetch_and_add (lock->rin, w);
449 r = _InterlockedExchangeAdd16 (lock->rin, w);
451 while( r != *lock->rout )
459 void WriteRelease (RWLock *lock)
462 __sync_fetch_and_and (lock->rin, ~MASK);
464 _InterlockedAnd16 (lock->rin, ~MASK);
469 void ReadLock (RWLock *lock)
473 w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
475 w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
478 while( w == (*lock->rin & MASK) )
486 void ReadRelease (RWLock *lock)
489 __sync_fetch_and_add (lock->rout, RINC);
491 _InterlockedExchangeAdd16 (lock->rout, RINC);
495 // Spin Latch Manager
497 // wait until write lock mode is clear
498 // and add 1 to the share count
500 void bt_spinreadlock(BtSpinLatch *latch)
506 prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
508 prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
510 // see if exclusive request is granted or pending
515 prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
517 prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
520 } while( sched_yield(), 1 );
522 } while( SwitchToThread(), 1 );
526 // wait for other read and write latches to relinquish
528 void bt_spinwritelock(BtSpinLatch *latch)
534 prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
536 prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
539 if( !(prev & ~BOTH) )
543 __sync_fetch_and_and ((ushort *)latch, ~XCL);
545 _InterlockedAnd16((ushort *)latch, ~XCL);
548 } while( sched_yield(), 1 );
550 } while( SwitchToThread(), 1 );
554 // try to obtain write lock
556 // return 1 if obtained,
559 int bt_spinwritetry(BtSpinLatch *latch)
564 prev = __sync_fetch_and_or((ushort *)latch, XCL);
566 prev = _InterlockedOr16((ushort *)latch, XCL);
568 // take write access if all bits are clear
571 if( !(prev & ~BOTH) )
575 __sync_fetch_and_and ((ushort *)latch, ~XCL);
577 _InterlockedAnd16((ushort *)latch, ~XCL);
584 void bt_spinreleasewrite(BtSpinLatch *latch)
587 __sync_fetch_and_and((ushort *)latch, ~BOTH);
589 _InterlockedAnd16((ushort *)latch, ~BOTH);
593 // decrement reader count
595 void bt_spinreleaseread(BtSpinLatch *latch)
598 __sync_fetch_and_add((ushort *)latch, -SHARE);
600 _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
604 // link latch table entry into latch hash table
606 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
608 BtLatchSet *set = bt->mgr->latchsets + victim;
610 if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
611 bt->mgr->latchsets[set->next].prev = victim;
613 bt->mgr->latchmgr->table[hashidx].slot = victim;
614 set->page_no = page_no;
621 void bt_unpinlatch (BtLatchSet *set)
624 __sync_fetch_and_add(&set->pin, -1);
626 _InterlockedDecrement16 (&set->pin);
630 // find existing latchset or inspire new one
631 // return with latchset pinned
633 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
635 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
636 ushort slot, avail = 0, victim, idx;
639 // obtain read lock on hash table entry
641 bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
643 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
645 set = bt->mgr->latchsets + slot;
646 if( page_no == set->page_no )
648 } while( slot = set->next );
652 __sync_fetch_and_add(&set->pin, 1);
654 _InterlockedIncrement16 (&set->pin);
658 bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
663 // try again, this time with write lock
665 bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
667 if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
669 set = bt->mgr->latchsets + slot;
670 if( page_no == set->page_no )
672 if( !set->pin && !avail )
674 } while( slot = set->next );
676 // found our entry, or take over an unpinned one
678 if( slot || (slot = avail) ) {
679 set = bt->mgr->latchsets + slot;
681 __sync_fetch_and_add(&set->pin, 1);
683 _InterlockedIncrement16 (&set->pin);
685 set->page_no = page_no;
686 bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
690 // see if there are any unused entries
692 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
694 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
697 if( victim < bt->mgr->latchmgr->latchtotal ) {
698 set = bt->mgr->latchsets + victim;
700 __sync_fetch_and_add(&set->pin, 1);
702 _InterlockedIncrement16 (&set->pin);
704 bt_latchlink (bt, hashidx, victim, page_no);
705 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
710 victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
712 victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
714 // find and reuse previous lock entry
718 victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
720 victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
722 // we don't use slot zero
724 if( victim %= bt->mgr->latchmgr->latchtotal )
725 set = bt->mgr->latchsets + victim;
729 // take control of our slot
730 // from other threads
732 if( set->pin || !bt_spinwritetry (set->busy) )
737 // try to get write lock on hash chain
738 // skip entry if not obtained
739 // or has outstanding locks
741 if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
742 bt_spinreleasewrite (set->busy);
747 bt_spinreleasewrite (set->busy);
748 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
752 // unlink our available victim from its hash chain
755 bt->mgr->latchsets[set->prev].next = set->next;
757 bt->mgr->latchmgr->table[idx].slot = set->next;
760 bt->mgr->latchsets[set->next].prev = set->prev;
762 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
764 __sync_fetch_and_add(&set->pin, 1);
766 _InterlockedIncrement16 (&set->pin);
768 bt_latchlink (bt, hashidx, victim, page_no);
769 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
770 bt_spinreleasewrite (set->busy);
775 void bt_mgrclose (BtMgr *mgr)
780 // release mapped pages
781 // note that slot zero is never used
783 for( slot = 1; slot < mgr->poolmax; slot++ ) {
784 pool = mgr->pool + slot;
787 munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
790 FlushViewOfFile(pool->map, 0);
791 UnmapViewOfFile(pool->map);
792 CloseHandle(pool->hmap);
798 munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
799 munmap (mgr->latchmgr, 2 * mgr->page_size);
801 FlushViewOfFile(mgr->latchmgr, 0);
802 UnmapViewOfFile(mgr->latchmgr);
803 CloseHandle(mgr->halloc);
809 free ((void *)mgr->latch);
812 FlushFileBuffers(mgr->idx);
813 CloseHandle(mgr->idx);
814 GlobalFree (mgr->pool);
815 GlobalFree (mgr->hash);
816 GlobalFree ((void *)mgr->latch);
821 // close and release memory
823 void bt_close (BtDb *bt)
830 VirtualFree (bt->mem, 0, MEM_RELEASE);
835 // open/create new btree buffer manager
837 // call with file_name, BT_openmode, bits in page size (e.g. 16),
838 // size of mapped page pool (e.g. 8192)
840 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
842 uint lvl, attr, cacheblk, last, slot, idx;
843 uint nlatchpage, latchhash;
844 unsigned char value[BtId];
845 int flag, initit = 0;
846 BtLatchMgr *latchmgr;
854 SYSTEM_INFO sysinfo[1];
857 // determine sanity of page size and buffer pool
859 if( bits > BT_maxbits )
861 else if( bits < BT_minbits )
865 return NULL; // must have buffer pool
868 mgr = calloc (1, sizeof(BtMgr));
870 mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
873 return free(mgr), NULL;
875 cacheblk = 4096; // minimum mmap segment size for unix
878 mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
879 attr = FILE_ATTRIBUTE_NORMAL;
880 mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
882 if( mgr->idx == INVALID_HANDLE_VALUE )
883 return GlobalFree(mgr), NULL;
885 // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
886 GetSystemInfo(sysinfo);
887 cacheblk = sysinfo->dwAllocationGranularity;
891 latchmgr = malloc (BT_maxpage);
894 // read minimum page size to get root info
895 // to support raw disk partition files
896 // check if bits == 0 on the disk.
898 if( size = lseek (mgr->idx, 0L, 2) ) {
899 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
900 if( latchmgr->alloc->bits )
901 bits = latchmgr->alloc->bits;
905 return free(mgr), free(latchmgr), NULL;
906 } else if( mode == BT_ro )
907 return free(latchmgr), bt_mgrclose (mgr), NULL;
911 latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
912 size = GetFileSize(mgr->idx, amt);
915 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
916 return bt_mgrclose (mgr), NULL;
917 bits = latchmgr->alloc->bits;
918 } else if( mode == BT_ro )
919 return bt_mgrclose (mgr), NULL;
924 mgr->page_size = 1 << bits;
925 mgr->page_bits = bits;
927 mgr->poolmax = poolmax;
930 if( cacheblk < mgr->page_size )
931 cacheblk = mgr->page_size;
933 // mask for partial memmaps
935 mgr->poolmask = (cacheblk >> bits) - 1;
937 // see if requested size of pages per memmap is greater
939 if( (1 << segsize) > mgr->poolmask )
940 mgr->poolmask = (1 << segsize) - 1;
944 while( (1 << mgr->seg_bits) <= mgr->poolmask )
947 mgr->hashsize = hashsize;
950 mgr->pool = calloc (poolmax, sizeof(BtPool));
951 mgr->hash = calloc (hashsize, sizeof(ushort));
952 mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
954 mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
955 mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
956 mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
962 // initialize an empty b-tree with latch page, root page, page of leaves
963 // and page(s) of latches
965 memset (latchmgr, 0, 1 << bits);
966 nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1;
967 bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
968 latchmgr->alloc->bits = mgr->page_bits;
970 latchmgr->nlatchpage = nlatchpage;
971 latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
973 // initialize latch manager
975 latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
977 // size of hash table = total number of latchsets
979 if( latchhash > latchmgr->latchtotal )
980 latchhash = latchmgr->latchtotal;
982 latchmgr->latchhash = latchhash;
984 // initialize left-most LEAF page in
987 bt_putid (latchmgr->alloc->left, LEAF_page);
990 if( pwrite (mgr->idx, latchmgr, mgr->page_size, 0) < mgr->page_size )
991 return bt_mgrclose (mgr), NULL;
993 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
994 return bt_mgrclose (mgr), NULL;
996 if( *amt < mgr->page_size )
997 return bt_mgrclose (mgr), NULL;
1000 memset (latchmgr, 0, 1 << bits);
1001 latchmgr->alloc->bits = mgr->page_bits;
1003 for( lvl=MIN_lvl; lvl--; ) {
1004 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1005 key = keyptr(latchmgr->alloc, 1);
1006 key->len = 2; // create stopper key
1010 bt_putid(value, MIN_lvl - lvl + 1);
1011 val = valptr(latchmgr->alloc, 1);
1012 val->len = lvl ? BtId : 0;
1013 memcpy (val->value, value, val->len);
1015 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
1016 latchmgr->alloc->lvl = lvl;
1017 latchmgr->alloc->cnt = 1;
1018 latchmgr->alloc->act = 1;
1020 if( pwrite (mgr->idx, latchmgr, mgr->page_size, (MIN_lvl - lvl) << bits) < mgr->page_size )
1021 return bt_mgrclose (mgr), NULL;
1023 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1024 return bt_mgrclose (mgr), NULL;
1026 if( *amt < mgr->page_size )
1027 return bt_mgrclose (mgr), NULL;
1031 // clear out latch manager locks
1032 // and rest of pages to round out segment
1034 memset(latchmgr, 0, mgr->page_size);
1037 while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
1039 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
1041 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
1042 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1043 return bt_mgrclose (mgr), NULL;
1044 if( *amt < mgr->page_size )
1045 return bt_mgrclose (mgr), NULL;
1052 // mlock the root page and the latchmgr page
1054 flag = PROT_READ | PROT_WRITE;
1055 mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1056 if( mgr->latchmgr == MAP_FAILED )
1057 return bt_mgrclose (mgr), NULL;
1058 mlock (mgr->latchmgr, 2 * mgr->page_size);
1060 mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1061 if( mgr->latchsets == MAP_FAILED )
1062 return bt_mgrclose (mgr), NULL;
1063 mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1065 flag = PAGE_READWRITE;
1066 mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1068 return bt_mgrclose (mgr), NULL;
1070 flag = FILE_MAP_WRITE;
1071 mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1072 if( !mgr->latchmgr )
1073 return GetLastError(), bt_mgrclose (mgr), NULL;
1075 mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1081 VirtualFree (latchmgr, 0, MEM_RELEASE);
1086 // open BTree access method
1087 // based on buffer manager
1089 BtDb *bt_open (BtMgr *mgr)
1091 BtDb *bt = malloc (sizeof(*bt));
1093 memset (bt, 0, sizeof(*bt));
1096 bt->mem = malloc (2 *mgr->page_size);
1098 bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1100 bt->frame = (BtPage)bt->mem;
1101 bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1105 // compare two keys, returning > 0, = 0, or < 0
1106 // as the comparison value
1108 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1110 uint len1 = key1->len;
1113 if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1126 // find segment in pool
1127 // must be called with hashslot idx locked
1128 // return NULL if not there
1129 // otherwise return node
1131 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1136 // compute start of hash chain in pool
1138 if( slot = bt->mgr->hash[idx] )
1139 pool = bt->mgr->pool + slot;
1143 page_no &= ~bt->mgr->poolmask;
1145 while( pool->basepage != page_no )
1146 if( pool = pool->hashnext )
1154 // add segment to hash table
1156 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1161 pool->hashprev = pool->hashnext = NULL;
1162 pool->basepage = page_no & ~bt->mgr->poolmask;
1163 pool->pin = CLOCK_bit + 1;
1165 if( slot = bt->mgr->hash[idx] ) {
1166 node = bt->mgr->pool + slot;
1167 pool->hashnext = node;
1168 node->hashprev = pool;
1171 bt->mgr->hash[idx] = pool->slot;
1174 // map new buffer pool segment to virtual memory
1176 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1178 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1179 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1183 flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1184 pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1185 if( pool->map == MAP_FAILED )
1186 return bt->err = BTERR_map;
1189 flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1190 pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1192 return bt->err = BTERR_map;
1194 flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1195 pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1197 return bt->err = BTERR_map;
1202 // calculate page within pool
1204 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1206 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1209 page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1210 // madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1216 void bt_unpinpool (BtPool *pool)
1219 __sync_fetch_and_add(&pool->pin, -1);
1221 _InterlockedDecrement16 (&pool->pin);
1225 // find or place requested page in segment-pool
1226 // return pool table entry, incrementing pin
1228 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1230 uint slot, hashidx, idx, victim;
1231 BtPool *pool, *node, *next;
1233 // lock hash table chain
1235 hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1236 bt_spinwritelock (&bt->mgr->latch[hashidx]);
1238 // look up in hash table
1240 if( pool = bt_findpool(bt, page_no, hashidx) ) {
1242 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1243 __sync_fetch_and_add(&pool->pin, 1);
1245 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1246 _InterlockedIncrement16 (&pool->pin);
1248 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1252 // allocate a new pool node
1253 // and add to hash table
1256 slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1258 slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1261 if( ++slot < bt->mgr->poolmax ) {
1262 pool = bt->mgr->pool + slot;
1265 if( bt_mapsegment(bt, pool, page_no) )
1268 bt_linkhash(bt, pool, page_no, hashidx);
1269 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1273 // pool table is full
1274 // find best pool entry to evict
1277 __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1279 _InterlockedDecrement16 (&bt->mgr->poolcnt);
1284 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1286 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1288 victim %= bt->mgr->poolmax;
1289 pool = bt->mgr->pool + victim;
1290 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1295 // try to get write lock
1296 // skip entry if not obtained
1298 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1301 // skip this entry if
1303 // or clock bit is set
1307 __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1309 _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1311 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1315 // unlink victim pool node from hash table
1317 if( node = pool->hashprev )
1318 node->hashnext = pool->hashnext;
1319 else if( node = pool->hashnext )
1320 bt->mgr->hash[idx] = node->slot;
1322 bt->mgr->hash[idx] = 0;
1324 if( node = pool->hashnext )
1325 node->hashprev = pool->hashprev;
1327 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1329 // remove old file mapping
1331 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1333 // FlushViewOfFile(pool->map, 0);
1334 UnmapViewOfFile(pool->map);
1335 CloseHandle(pool->hmap);
1339 // create new pool mapping
1340 // and link into hash table
1342 if( bt_mapsegment(bt, pool, page_no) )
1345 bt_linkhash(bt, pool, page_no, hashidx);
1346 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1351 // place write, read, or parent lock on requested page_no.
1353 void bt_lockpage(BtLock mode, BtLatchSet *set)
1357 ReadLock (set->readwr);
1360 WriteLock (set->readwr);
1363 ReadLock (set->access);
1366 WriteLock (set->access);
1369 WriteLock (set->parent);
1374 // remove write, read, or parent lock on requested page
1376 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1380 ReadRelease (set->readwr);
1383 WriteRelease (set->readwr);
1386 ReadRelease (set->access);
1389 WriteRelease (set->access);
1392 WriteRelease (set->parent);
1397 // allocate a new page
1399 BTERR bt_newpage(BtDb *bt, BtPageSet *set)
1403 // lock allocation page
1405 bt_spinwritelock(bt->mgr->latchmgr->lock);
1407 // use empty chain first
1408 // else allocate empty page
1410 if( set->page_no = bt_getid(bt->mgr->latchmgr->chain) ) {
1411 if( set->pool = bt_pinpool (bt, set->page_no) )
1412 set->page = bt_page (bt, set->pool, set->page_no);
1414 return bt->err = BTERR_struct;
1416 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1418 set->page_no = bt_getid(bt->mgr->latchmgr->alloc->right);
1419 bt_putid(bt->mgr->latchmgr->alloc->right, set->page_no+1);
1421 // if writing first page of pool block, set file length thru last page
1423 if( (set->page_no & bt->mgr->poolmask) == 0 )
1424 ftruncate (bt->mgr->idx, (set->page_no + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1426 if( set->pool = bt_pinpool (bt, set->page_no) )
1427 set->page = bt_page (bt, set->pool, set->page_no);
1432 // unlock allocation latch
1434 bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1438 // find slot in page for given key at a given level
1440 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1442 uint diff, higher = set->page->cnt, low = 1, slot;
1445 // make stopper key an infinite fence value
1447 if( bt_getid (set->page->right) )
1452 // low is the lowest candidate.
1453 // loop ends when they meet
1455 // higher is already
1456 // tested as .ge. the passed key.
1458 while( diff = higher - low ) {
1459 slot = low + ( diff >> 1 );
1460 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1463 higher = slot, good++;
1466 // return zero if key is on right link page
1468 return good ? higher : 0;
1471 // find and load page at given level for given key
1472 // leave page rd or wr locked as requested
1474 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1476 uid page_no = ROOT_page, prevpage = 0;
1477 uint drill = 0xff, slot;
1478 BtLatchSet *prevlatch;
1479 uint mode, prevmode;
1482 // start at root of btree and drill down
1485 // determine lock mode of drill level
1486 mode = (drill == lvl) ? lock : BtLockRead;
1488 set->latch = bt_pinlatch (bt, page_no);
1489 set->page_no = page_no;
1491 // pin page contents
1493 if( set->pool = bt_pinpool (bt, page_no) )
1494 set->page = bt_page (bt, set->pool, page_no);
1498 // obtain access lock using lock chaining with Access mode
1500 if( page_no > ROOT_page )
1501 bt_lockpage(BtLockAccess, set->latch);
1503 // release & unpin parent page
1506 bt_unlockpage(prevmode, prevlatch);
1507 bt_unpinlatch (prevlatch);
1508 bt_unpinpool (prevpool);
1512 // obtain read lock using lock chaining
1514 bt_lockpage(mode, set->latch);
1516 if( set->page->free )
1517 return bt->err = BTERR_struct, 0;
1519 if( page_no > ROOT_page )
1520 bt_unlockpage(BtLockAccess, set->latch);
1522 // re-read and re-lock root after determining actual level of root
1524 if( set->page->lvl != drill) {
1525 if( set->page_no != ROOT_page )
1526 return bt->err = BTERR_struct, 0;
1528 drill = set->page->lvl;
1530 if( lock != BtLockRead && drill == lvl ) {
1531 bt_unlockpage(mode, set->latch);
1532 bt_unpinlatch (set->latch);
1533 bt_unpinpool (set->pool);
1538 prevpage = set->page_no;
1539 prevlatch = set->latch;
1540 prevpool = set->pool;
1543 // find key on page at this level
1544 // and descend to requested level
1546 if( set->page->kill )
1549 if( slot = bt_findslot (set, key, len) ) {
1553 while( slotptr(set->page, slot)->dead )
1554 if( slot++ < set->page->cnt )
1559 page_no = bt_getid(valptr(set->page, slot)->value);
1564 // or slide right into next page
1567 page_no = bt_getid(set->page->right);
1571 // return error on end of right chain
1573 bt->err = BTERR_struct;
1574 return 0; // return error
1577 // return page to free list
1578 // page must be delete & write locked
1580 void bt_freepage (BtDb *bt, BtPageSet *set)
1582 // lock allocation page
1584 bt_spinwritelock (bt->mgr->latchmgr->lock);
1587 memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1588 bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1589 set->page->free = 1;
1591 // unlock released page
1593 bt_unlockpage (BtLockDelete, set->latch);
1594 bt_unlockpage (BtLockWrite, set->latch);
1595 bt_unpinlatch (set->latch);
1596 bt_unpinpool (set->pool);
1598 // unlock allocation page
1600 bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1603 // a fence key was deleted from a page
1604 // push new fence value upwards
1606 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1608 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1609 unsigned char value[BtId];
1613 // remove the old fence value
1615 ptr = keyptr(set->page, set->page->cnt);
1616 memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1617 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1619 // cache new fence value
1621 ptr = keyptr(set->page, set->page->cnt);
1622 memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1624 bt_lockpage (BtLockParent, set->latch);
1625 bt_unlockpage (BtLockWrite, set->latch);
1627 // insert new (now smaller) fence key
1629 bt_putid (value, set->page_no);
1630 ptr = (BtKey*)leftkey;
1632 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1635 // now delete old fence key
1637 ptr = (BtKey*)rightkey;
1639 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1642 bt_unlockpage (BtLockParent, set->latch);
1643 bt_unpinlatch(set->latch);
1644 bt_unpinpool (set->pool);
1648 // root has a single child
1649 // collapse a level from the tree
1651 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1656 // find the child entry and promote as new root contents
1659 for( idx = 0; idx++ < root->page->cnt; )
1660 if( !slotptr(root->page, idx)->dead )
1663 child->page_no = bt_getid (valptr(root->page, idx)->value);
1665 child->latch = bt_pinlatch (bt, child->page_no);
1666 bt_lockpage (BtLockDelete, child->latch);
1667 bt_lockpage (BtLockWrite, child->latch);
1669 if( child->pool = bt_pinpool (bt, child->page_no) )
1670 child->page = bt_page (bt, child->pool, child->page_no);
1674 memcpy (root->page, child->page, bt->mgr->page_size);
1675 bt_freepage (bt, child);
1677 } while( root->page->lvl > 1 && root->page->act == 1 );
1679 bt_unlockpage (BtLockWrite, root->latch);
1680 bt_unpinlatch (root->latch);
1681 bt_unpinpool (root->pool);
1685 // maintain reverse scan pointers by
1686 // linking left pointer of far right node
1688 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1690 BtPageSet right2[1];
1692 // keep track of rightmost leaf page
1694 if( !right_page_no ) {
1695 bt_putid (bt->mgr->latchmgr->alloc->left, left_page_no);
1699 // link right page to left page
1701 right2->latch = bt_pinlatch (bt, right_page_no);
1702 bt_lockpage (BtLockWrite, right2->latch);
1704 if( right2->pool = bt_pinpool (bt, right_page_no) )
1705 right2->page = bt_page (bt, right2->pool, right_page_no);
1709 bt_putid(right2->page->left, left_page_no);
1710 bt_unlockpage (BtLockWrite, right2->latch);
1711 bt_unpinlatch (right2->latch);
1712 bt_unpinpool (right2->pool);
1716 // find and delete key on page by marking delete flag bit
1717 // if page becomes empty, delete it from the btree
1719 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1721 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1722 uint slot, idx, found, fence;
1723 BtPageSet set[1], right[1];
1724 unsigned char value[BtId];
1728 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1729 ptr = keyptr(set->page, slot);
1733 // if librarian slot, advance to real slot
1735 if( slotptr(set->page, slot)->type == Librarian )
1736 ptr = keyptr(set->page, ++slot);
1738 fence = slot == set->page->cnt;
1740 // if key is found delete it, otherwise ignore request
1742 if( found = !keycmp (ptr, key, len) )
1743 if( found = slotptr(set->page, slot)->dead == 0 ) {
1744 val = valptr(set->page,slot);
1745 slotptr(set->page, slot)->dead = 1;
1746 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1749 // collapse empty slots beneath the fence
1751 while( idx = set->page->cnt - 1 )
1752 if( slotptr(set->page, idx)->dead ) {
1753 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1754 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1759 // did we delete a fence key in an upper level?
1761 if( found && lvl && set->page->act && fence )
1762 if( bt_fixfence (bt, set, lvl) )
1765 return bt->found = found, 0;
1767 // do we need to collapse root?
1769 if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1770 if( bt_collapseroot (bt, set) )
1773 return bt->found = found, 0;
1775 // return if page is not empty
1777 if( set->page->act ) {
1778 bt_unlockpage(BtLockWrite, set->latch);
1779 bt_unpinlatch (set->latch);
1780 bt_unpinpool (set->pool);
1781 return bt->found = found, 0;
1784 // cache copy of fence key
1785 // to post in parent
1787 ptr = keyptr(set->page, set->page->cnt);
1788 memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1790 // obtain lock on right page
1792 right->page_no = bt_getid(set->page->right);
1793 right->latch = bt_pinlatch (bt, right->page_no);
1794 bt_lockpage (BtLockWrite, right->latch);
1796 // pin page contents
1798 if( right->pool = bt_pinpool (bt, right->page_no) )
1799 right->page = bt_page (bt, right->pool, right->page_no);
1803 if( right->page->kill )
1804 return bt->err = BTERR_struct;
1806 // transfer left link
1808 memcpy (right->page->left, set->page->left, BtId);
1810 // pull contents of right peer into our empty page
1812 memcpy (set->page, right->page, bt->mgr->page_size);
1817 if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1820 // cache copy of key to update
1822 ptr = keyptr(right->page, right->page->cnt);
1823 memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1825 // mark right page deleted and point it to left page
1826 // until we can post parent updates
1828 bt_putid (right->page->right, set->page_no);
1829 right->page->kill = 1;
1831 bt_lockpage (BtLockParent, right->latch);
1832 bt_unlockpage (BtLockWrite, right->latch);
1834 bt_lockpage (BtLockParent, set->latch);
1835 bt_unlockpage (BtLockWrite, set->latch);
1837 // redirect higher key directly to our new node contents
1839 bt_putid (value, set->page_no);
1840 ptr = (BtKey*)higherfence;
1842 if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1845 // delete old lower key to our node
1847 ptr = (BtKey*)lowerfence;
1849 if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1852 // obtain delete and write locks to right node
1854 bt_unlockpage (BtLockParent, right->latch);
1855 bt_lockpage (BtLockDelete, right->latch);
1856 bt_lockpage (BtLockWrite, right->latch);
1857 bt_freepage (bt, right);
1859 bt_unlockpage (BtLockParent, set->latch);
1860 bt_unpinlatch (set->latch);
1861 bt_unpinpool (set->pool);
1866 BtKey *bt_foundkey (BtDb *bt)
1868 return (BtKey*)(bt->key);
1871 // advance to next slot
1873 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1875 BtLatchSet *prevlatch;
1879 if( slot < set->page->cnt )
1882 prevlatch = set->latch;
1883 prevpool = set->pool;
1885 if( page_no = bt_getid(set->page->right) )
1886 set->latch = bt_pinlatch (bt, page_no);
1888 return bt->err = BTERR_struct, 0;
1890 // pin page contents
1892 if( set->pool = bt_pinpool (bt, page_no) )
1893 set->page = bt_page (bt, set->pool, page_no);
1897 // obtain access lock using lock chaining with Access mode
1899 bt_lockpage(BtLockAccess, set->latch);
1901 bt_unlockpage(BtLockRead, prevlatch);
1902 bt_unpinlatch (prevlatch);
1903 bt_unpinpool (prevpool);
1905 bt_lockpage(BtLockRead, set->latch);
1906 bt_unlockpage(BtLockAccess, set->latch);
1908 set->page_no = page_no;
1912 // find unique key or first duplicate key in
1913 // leaf level and return number of value bytes
1914 // or (-1) if not found. Setup key for bt_foundkey
1916 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1924 if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1926 ptr = keyptr(set->page, slot);
1928 // skip librarian slot place holder
1930 if( slotptr(set->page, slot)->type == Librarian )
1931 ptr = keyptr(set->page, ++slot);
1933 // return actual key found
1935 memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1938 if( slotptr(set->page, slot)->type == Duplicate )
1941 // if key exists, return >= 0 value bytes copied
1942 // otherwise return (-1)
1944 if( slotptr(set->page, slot)->dead )
1948 if( !memcmp (ptr->key, key, len) ) {
1949 val = valptr (set->page,slot);
1950 if( valmax > val->len )
1952 memcpy (value, val->value, valmax);
1958 } while( slot = bt_findnext (bt, set, slot) );
1960 bt_unlockpage (BtLockRead, set->latch);
1961 bt_unpinlatch (set->latch);
1962 bt_unpinpool (set->pool);
1966 // check page for space available,
1967 // clean if necessary and return
1968 // 0 - page needs splitting
1969 // >0 new slot value
1971 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1973 uint nxt = bt->mgr->page_size;
1974 uint cnt = 0, idx = 0;
1975 uint max = page->cnt;
1980 if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1983 // skip cleanup and proceed to split
1984 // if there's not enough garbage
1987 if( page->garbage < nxt / 5 )
1990 memcpy (bt->frame, page, bt->mgr->page_size);
1992 // skip page info and set rest of page to zero
1994 memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1998 // clean up page first by
1999 // removing deleted keys
2001 while( cnt++ < max ) {
2004 if( cnt < max && slotptr(bt->frame,cnt)->dead )
2007 // copy the value across
2009 val = valptr(bt->frame, cnt);
2010 nxt -= val->len + sizeof(BtVal);
2011 ((unsigned char *)page)[nxt] = val->len;
2012 memcpy ((unsigned char *)page + nxt + sizeof(BtVal), val->value, val->len);
2014 // copy the key across
2016 key = keyptr(bt->frame, cnt);
2017 nxt -= key->len + sizeof(BtKey);
2018 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
2020 // make a librarian slot
2023 slotptr(page, ++idx)->off = nxt;
2024 slotptr(page, idx)->type = Librarian;
2025 slotptr(page, idx)->dead = 1;
2030 slotptr(page, ++idx)->off = nxt;
2031 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2033 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2040 // see if page has enough space now, or does it need splitting?
2042 if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2048 // split the root and raise the height of the btree
2050 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
2052 uint nxt = bt->mgr->page_size;
2053 unsigned char value[BtId];
2058 // Obtain an empty page to use, and copy the current
2059 // root contents into it, e.g. lower keys
2061 if( bt_newpage(bt, left) )
2064 memcpy(left->page, root->page, bt->mgr->page_size);
2065 bt_unpinpool (left->pool);
2067 // set left from higher to lower page
2069 bt_putid (right->page->left, left->page_no);
2070 bt_unpinpool (right->pool);
2072 // preserve the page info at the bottom
2073 // of higher keys and set rest to zero
2075 memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2077 // insert stopper key at top of newroot page
2078 // and increase the root height
2080 nxt -= BtId + sizeof(BtVal);
2081 bt_putid (value, right->page_no);
2082 val = (BtVal *)((unsigned char *)root->page + nxt);
2083 memcpy (val->value, value, BtId);
2086 nxt -= 2 + sizeof(BtKey);
2087 slotptr(root->page, 2)->off = nxt;
2088 ptr = (BtKey *)((unsigned char *)root->page + nxt);
2093 // insert lower keys page fence key on newroot page as first key
2095 nxt -= BtId + sizeof(BtVal);
2096 bt_putid (value, left->page_no);
2097 val = (BtVal *)((unsigned char *)root->page + nxt);
2098 memcpy (val->value, value, BtId);
2101 nxt -= leftkey->len + sizeof(BtKey);
2102 slotptr(root->page, 1)->off = nxt;
2103 memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
2105 bt_putid(root->page->right, 0);
2106 root->page->min = nxt; // reset lowest used offset and key count
2107 root->page->cnt = 2;
2108 root->page->act = 2;
2111 // release and unpin root
2113 bt_unlockpage(BtLockWrite, root->latch);
2114 bt_unpinlatch (root->latch);
2115 bt_unpinpool (root->pool);
2119 // split already locked full node
2122 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
2124 unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
2125 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2126 unsigned char value[BtId];
2127 uint lvl = set->page->lvl;
2134 // split higher half of keys to bt->frame
2136 memset (bt->frame, 0, bt->mgr->page_size);
2137 max = set->page->cnt;
2141 while( cnt++ < max ) {
2142 if( slotptr(set->page, cnt)->dead && cnt < max )
2144 src = valptr(set->page, cnt);
2145 nxt -= src->len + sizeof(BtVal);
2146 val = (BtVal*)((unsigned char *)bt->frame + nxt);
2147 memcpy (val->value, src->value, src->len);
2148 val->len = src->len;
2150 key = keyptr(set->page, cnt);
2151 nxt -= key->len + sizeof(BtKey);
2152 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
2153 memcpy (ptr, key, key->len + sizeof(BtKey));
2155 // add librarian slot
2158 slotptr(bt->frame, ++idx)->off = nxt;
2159 slotptr(bt->frame, idx)->type = Librarian;
2160 slotptr(bt->frame, idx)->dead = 1;
2165 slotptr(bt->frame, ++idx)->off = nxt;
2166 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2168 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2172 // remember existing fence key for new page to the right
2174 memcpy (rightkey, key, key->len + sizeof(BtKey));
2176 bt->frame->bits = bt->mgr->page_bits;
2177 bt->frame->min = nxt;
2178 bt->frame->cnt = idx;
2179 bt->frame->lvl = lvl;
2183 if( set->page_no > ROOT_page ) {
2184 bt_putid (bt->frame->right, bt_getid (set->page->right));
2185 bt_putid(bt->frame->left, set->page_no);
2188 // get new free page and write higher keys to it.
2190 if( bt_newpage(bt, right) )
2195 if( set->page_no > ROOT_page && !lvl )
2196 if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
2199 memcpy (right->page, bt->frame, bt->mgr->page_size);
2200 bt_unpinpool (right->pool);
2202 // update lower keys to continue in old page
2204 memcpy (bt->frame, set->page, bt->mgr->page_size);
2205 memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2206 nxt = bt->mgr->page_size;
2207 set->page->garbage = 0;
2213 if( slotptr(bt->frame, max)->type == Librarian )
2216 // assemble page of smaller keys
2218 while( cnt++ < max ) {
2219 if( slotptr(bt->frame, cnt)->dead )
2221 val = valptr(bt->frame, cnt);
2222 nxt -= val->len + sizeof(BtVal);
2223 ((unsigned char *)set->page)[nxt] = val->len;
2224 memcpy ((unsigned char *)set->page + nxt + sizeof(BtVal), val->value, val->len);
2226 key = keyptr(bt->frame, cnt);
2227 nxt -= key->len + sizeof(BtKey);
2228 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
2230 // add librarian slot
2233 slotptr(set->page, ++idx)->off = nxt;
2234 slotptr(set->page, idx)->type = Librarian;
2235 slotptr(set->page, idx)->dead = 1;
2240 slotptr(set->page, ++idx)->off = nxt;
2241 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2245 // remember fence key for smaller page
2247 memcpy(fencekey, key, key->len + sizeof(BtKey));
2249 bt_putid(set->page->right, right->page_no);
2250 set->page->min = nxt;
2251 set->page->cnt = idx;
2253 // if current page is the root page, split it
2255 if( set->page_no == ROOT_page )
2256 return bt_splitroot (bt, set, (BtKey *)fencekey, right);
2258 // insert new fences in their parent pages
2260 right->latch = bt_pinlatch (bt, right->page_no);
2261 bt_lockpage (BtLockParent, right->latch);
2263 bt_lockpage (BtLockParent, set->latch);
2264 bt_unlockpage (BtLockWrite, set->latch);
2266 // insert new fence for reformulated left block of smaller keys
2268 bt_putid (value, set->page_no);
2270 if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 1) )
2273 // switch fence for right block of larger keys to new right page
2275 bt_putid (value, right->page_no);
2277 if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, 1) )
2280 bt_unlockpage (BtLockParent, set->latch);
2281 bt_unpinlatch (set->latch);
2282 bt_unpinpool (set->pool);
2284 bt_unlockpage (BtLockParent, right->latch);
2285 bt_unpinlatch (right->latch);
2289 // install new key and value onto page
2290 // page must already be checked for
2293 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2295 uint idx, librarian;
2300 // if found slot > desired slot and previous slot
2301 // is a librarian slot, use it
2304 if( slotptr(set->page, slot-1)->type == Librarian )
2307 // copy value onto page
2309 set->page->min -= vallen + sizeof(BtVal);
2310 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2311 memcpy (val->value, value, vallen);
2314 // copy key onto page
2316 set->page->min -= keylen + sizeof(BtKey);
2317 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2318 memcpy (ptr->key, key, keylen);
2321 // find first empty slot
2323 for( idx = slot; idx < set->page->cnt; idx++ )
2324 if( slotptr(set->page, idx)->dead )
2327 // now insert key into array before slot
2329 if( idx == set->page->cnt )
2330 idx += 2, set->page->cnt += 2, librarian = 2;
2336 while( idx > slot + librarian - 1 )
2337 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2339 // add librarian slot
2341 if( librarian > 1 ) {
2342 node = slotptr(set->page, slot++);
2343 node->off = set->page->min;
2344 node->type = Librarian;
2350 node = slotptr(set->page, slot);
2351 node->off = set->page->min;
2355 bt_unlockpage (BtLockWrite, set->latch);
2356 bt_unpinlatch (set->latch);
2357 bt_unpinpool (set->pool);
2361 // Insert new key into the btree at given level.
2362 // either add a new key or update/add an existing one
2364 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2366 unsigned char newkey[BT_keyarray];
2367 uint slot, idx, len;
2374 // set up the key we're working on
2376 ins = (BtKey*)newkey;
2377 memcpy (ins->key, key, keylen);
2380 // is this a non-unique index value?
2386 sequence = bt_newdup (bt);
2387 bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2391 while( 1 ) { // find the page and slot for the current key
2392 if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2393 ptr = keyptr(set->page, slot);
2396 bt->err = BTERR_ovflw;
2400 // if librarian slot == found slot, advance to real slot
2402 if( slotptr(set->page, slot)->type == Librarian )
2403 if( !keycmp (ptr, key, keylen) )
2404 ptr = keyptr(set->page, ++slot);
2408 if( slotptr(set->page, slot)->type == Duplicate )
2411 // if inserting a duplicate key or unique key
2412 // check for adequate space on the page
2413 // and insert the new key before slot.
2415 if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2416 if( !(slot = bt_cleanpage (bt, set->page, ins->len, slot, vallen)) )
2417 if( bt_splitpage (bt, set) )
2422 return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
2425 // if key already exists, update value and return
2427 val = valptr(set->page, slot);
2429 if( val->len >= vallen ) {
2430 if( slotptr(set->page, slot)->dead )
2432 set->page->garbage += val->len - vallen;
2433 slotptr(set->page, slot)->dead = 0;
2435 memcpy (val->value, value, vallen);
2436 bt_unlockpage(BtLockWrite, set->latch);
2437 bt_unpinlatch (set->latch);
2438 bt_unpinpool (set->pool);
2442 // new update value doesn't fit in existing value area
2444 if( !slotptr(set->page, slot)->dead )
2445 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2447 slotptr(set->page, slot)->dead = 0;
2451 if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
2452 if( bt_splitpage (bt, set) )
2457 set->page->min -= vallen + sizeof(BtVal);
2458 val = (BtVal*)((unsigned char *)set->page + set->page->min);
2459 memcpy (val->value, value, vallen);
2462 set->page->min -= keylen + sizeof(BtKey);
2463 ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2464 memcpy (ptr->key, key, keylen);
2467 slotptr(set->page, slot)->off = set->page->min;
2468 bt_unlockpage(BtLockWrite, set->latch);
2469 bt_unpinlatch (set->latch);
2470 bt_unpinpool (set->pool);
2476 // set cursor to highest slot on highest page
2478 uint bt_lastkey (BtDb *bt)
2480 uid page_no = bt_getid (bt->mgr->latchmgr->alloc->left);
2484 if( set->pool = bt_pinpool (bt, page_no) )
2485 set->page = bt_page (bt, set->pool, page_no);
2489 set->latch = bt_pinlatch (bt, page_no);
2490 bt_lockpage(BtLockRead, set->latch);
2492 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2493 slot = set->page->cnt;
2495 bt_unlockpage(BtLockRead, set->latch);
2496 bt_unpinlatch (set->latch);
2497 bt_unpinpool (set->pool);
2502 // return previous slot on cursor page
2504 uint bt_prevkey (BtDb *bt, uint slot)
2512 if( left = bt_getid(bt->cursor->left) )
2513 bt->cursor_page = left;
2517 if( set->pool = bt_pinpool (bt, left) )
2518 set->page = bt_page (bt, set->pool, left);
2522 set->latch = bt_pinlatch (bt, left);
2523 bt_lockpage(BtLockRead, set->latch);
2525 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2527 bt_unlockpage(BtLockRead, set->latch);
2528 bt_unpinlatch (set->latch);
2529 bt_unpinpool (set->pool);
2530 return bt->cursor->cnt;
2533 // return next slot on cursor page
2534 // or slide cursor right into next page
2536 uint bt_nextkey (BtDb *bt, uint slot)
2542 right = bt_getid(bt->cursor->right);
2544 while( slot++ < bt->cursor->cnt )
2545 if( slotptr(bt->cursor,slot)->dead )
2547 else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2555 bt->cursor_page = right;
2557 if( set->pool = bt_pinpool (bt, right) )
2558 set->page = bt_page (bt, set->pool, right);
2562 set->latch = bt_pinlatch (bt, right);
2563 bt_lockpage(BtLockRead, set->latch);
2565 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2567 bt_unlockpage(BtLockRead, set->latch);
2568 bt_unpinlatch (set->latch);
2569 bt_unpinpool (set->pool);
2577 // cache page of keys into cursor and return starting slot for given key
2579 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2584 // cache page for retrieval
2586 if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2587 memcpy (bt->cursor, set->page, bt->mgr->page_size);
2591 bt->cursor_page = set->page_no;
2593 bt_unlockpage(BtLockRead, set->latch);
2594 bt_unpinlatch (set->latch);
2595 bt_unpinpool (set->pool);
2599 BtKey *bt_key(BtDb *bt, uint slot)
2601 return keyptr(bt->cursor, slot);
2604 BtVal *bt_val(BtDb *bt, uint slot)
2606 return valptr(bt->cursor,slot);
2612 double getCpuTime(int type)
2615 FILETIME xittime[1];
2616 FILETIME systime[1];
2617 FILETIME usrtime[1];
2618 SYSTEMTIME timeconv[1];
2621 memset (timeconv, 0, sizeof(SYSTEMTIME));
2625 GetSystemTimeAsFileTime (xittime);
2626 FileTimeToSystemTime (xittime, timeconv);
2627 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2630 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2631 FileTimeToSystemTime (usrtime, timeconv);
2634 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2635 FileTimeToSystemTime (systime, timeconv);
2639 ans += (double)timeconv->wHour * 3600;
2640 ans += (double)timeconv->wMinute * 60;
2641 ans += (double)timeconv->wSecond;
2642 ans += (double)timeconv->wMilliseconds / 1000;
2647 #include <sys/resource.h>
2649 double getCpuTime(int type)
2651 struct rusage used[1];
2652 struct timeval tv[1];
2656 gettimeofday(tv, NULL);
2657 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2660 getrusage(RUSAGE_SELF, used);
2661 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2664 getrusage(RUSAGE_SELF, used);
2665 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2672 uint bt_latchaudit (BtDb *bt)
2674 ushort idx, hashidx;
2681 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2683 if( *(ushort *)(bt->mgr->latchmgr->lock) )
2684 fprintf(stderr, "Alloc page locked\n");
2685 *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2687 for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2688 latch = bt->mgr->latchsets + idx;
2689 if( *latch->readwr->rin & MASK )
2690 fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2691 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2693 if( *latch->access->rin & MASK )
2694 fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2695 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2697 if( *latch->parent->rin & MASK )
2698 fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2699 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2702 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2707 for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2708 if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2709 fprintf(stderr, "hash entry %d locked\n", hashidx);
2711 *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2713 if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2714 latch = bt->mgr->latchsets + idx;
2715 if( *(ushort *)latch->busy )
2716 fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2717 *(ushort *)latch->busy = 0;
2719 fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2720 } while( idx = latch->next );
2723 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2724 page_no = LEAF_page;
2726 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2727 off64_t off = page_no << bt->mgr->page_bits;
2729 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2733 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2735 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2736 fprintf(stderr, "page %.8x unable to read\n", page_no);
2738 if( *amt < bt->mgr->page_size )
2739 fprintf(stderr, "page %.8x unable to read\n", page_no);
2741 if( !bt->frame->free ) {
2742 for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2743 ptr = keyptr(bt->frame, idx+1);
2744 if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2745 fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2747 if( !bt->frame->lvl )
2748 cnt += bt->frame->act;
2750 if( page_no > LEAF_page )
2765 // standalone program to index file of keys
2766 // then list them onto std-out
2769 void *index_file (void *arg)
2771 uint __stdcall index_file (void *arg)
2774 int line = 0, found = 0, cnt = 0, unique;
2775 uid next, page_no = LEAF_page; // start on first page of leaves
2776 unsigned char key[BT_maxkey];
2777 ThreadArg *args = arg;
2778 int ch, len = 0, slot;
2785 bt = bt_open (args->mgr);
2787 unique = (args->type[1] | 0x20) != 'd';
2789 switch(args->type[0] | 0x20)
2792 fprintf(stderr, "started latch mgr audit\n");
2793 cnt = bt_latchaudit (bt);
2794 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2798 fprintf(stderr, "started pennysort for %s\n", args->infile);
2799 if( in = fopen (args->infile, "rb") )
2800 while( ch = getc(in), ch != EOF )
2805 if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2806 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2809 else if( len < BT_maxkey )
2811 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2815 fprintf(stderr, "started indexing for %s\n", args->infile);
2816 if( in = fopen (args->infile, "rb") )
2817 while( ch = getc(in), ch != EOF )
2822 if( args->num == 1 )
2823 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2825 else if( args->num )
2826 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2828 if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
2829 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2832 else if( len < BT_maxkey )
2834 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2838 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2839 if( in = fopen (args->infile, "rb") )
2840 while( ch = getc(in), ch != EOF )
2844 if( args->num == 1 )
2845 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2847 else if( args->num )
2848 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2850 if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2851 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2852 ptr = (BtKey*)(bt->key);
2855 if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2856 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2859 else if( len < BT_maxkey )
2861 fprintf(stderr, "finished %s for %d keys, %d found\n", args->infile, line, found);
2865 fprintf(stderr, "started finding keys for %s\n", args->infile);
2866 if( in = fopen (args->infile, "rb") )
2867 while( ch = getc(in), ch != EOF )
2871 if( args->num == 1 )
2872 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2874 else if( args->num )
2875 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2877 if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2880 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2883 else if( len < BT_maxkey )
2885 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2889 fprintf(stderr, "started scanning\n");
2891 if( set->pool = bt_pinpool (bt, page_no) )
2892 set->page = bt_page (bt, set->pool, page_no);
2895 set->latch = bt_pinlatch (bt, page_no);
2896 bt_lockpage (BtLockRead, set->latch);
2897 next = bt_getid (set->page->right);
2899 for( slot = 0; slot++ < set->page->cnt; )
2900 if( next || slot < set->page->cnt )
2901 if( !slotptr(set->page, slot)->dead ) {
2902 ptr = keyptr(set->page, slot);
2905 if( slotptr(set->page, slot)->type == Duplicate )
2908 fwrite (ptr->key, len, 1, stdout);
2909 val = valptr(set->page, slot);
2910 fwrite (val->value, val->len, 1, stdout);
2911 fputc ('\n', stdout);
2915 bt_unlockpage (BtLockRead, set->latch);
2916 bt_unpinlatch (set->latch);
2917 bt_unpinpool (set->pool);
2918 } while( page_no = next );
2920 fprintf(stderr, " Total keys read %d\n", cnt);
2924 fprintf(stderr, "started reverse scan\n");
2925 if( slot = bt_lastkey (bt) )
2926 while( slot = bt_prevkey (bt, slot) ) {
2927 if( slotptr(bt->cursor, slot)->dead )
2930 ptr = keyptr(bt->cursor, slot);
2933 if( slotptr(bt->cursor, slot)->type == Duplicate )
2936 fwrite (ptr->key, len, 1, stdout);
2937 val = valptr(bt->cursor, slot);
2938 fwrite (val->value, val->len, 1, stdout);
2939 fputc ('\n', stdout);
2943 fprintf(stderr, " Total keys read %d\n", cnt);
2948 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2950 fprintf(stderr, "started counting\n");
2951 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2952 page_no = LEAF_page;
2954 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2955 uid off = page_no << bt->mgr->page_bits;
2957 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2961 SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2963 if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2964 return bt->err = BTERR_map;
2966 if( *amt < bt->mgr->page_size )
2967 return bt->err = BTERR_map;
2969 if( !bt->frame->free && !bt->frame->lvl )
2970 cnt += bt->frame->act;
2971 if( page_no > LEAF_page )
2976 cnt--; // remove stopper key
2977 fprintf(stderr, " Total keys read %d\n", cnt);
2989 typedef struct timeval timer;
2991 int main (int argc, char **argv)
2993 int idx, cnt, len, slot, err;
2994 int segsize, bits = 16;
3011 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3012 fprintf (stderr, " where page_bits is the page size in bits\n");
3013 fprintf (stderr, " mapped_segments is the number of mmap segments in buffer pool\n");
3014 fprintf (stderr, " seg_bits is the size of individual segments in buffer pool in pages in bits\n");
3015 fprintf (stderr, " line_numbers = 1 to append line numbers to keys\n");
3016 fprintf (stderr, " src_file1 thru src_filen are files of keys separated by newline\n");
3020 start = getCpuTime(0);
3023 bits = atoi(argv[3]);
3026 poolsize = atoi(argv[4]);
3029 fprintf (stderr, "Warning: no mapped_pool\n");
3031 if( poolsize > 65535 )
3032 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
3035 segsize = atoi(argv[5]);
3037 segsize = 4; // 16 pages per mmap segment
3040 num = atoi(argv[6]);
3044 threads = malloc (cnt * sizeof(pthread_t));
3046 threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3048 args = malloc (cnt * sizeof(ThreadArg));
3050 mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
3053 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3059 for( idx = 0; idx < cnt; idx++ ) {
3060 args[idx].infile = argv[idx + 7];
3061 args[idx].type = argv[2];
3062 args[idx].mgr = mgr;
3063 args[idx].num = num;
3064 args[idx].idx = idx;
3066 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3067 fprintf(stderr, "Error creating thread %d\n", err);
3069 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3073 // wait for termination
3076 for( idx = 0; idx < cnt; idx++ )
3077 pthread_join (threads[idx], NULL);
3079 WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3081 for( idx = 0; idx < cnt; idx++ )
3082 CloseHandle(threads[idx]);
3085 elapsed = getCpuTime(0) - start;
3086 fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3087 elapsed = getCpuTime(1);
3088 fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3089 elapsed = getCpuTime(2);
3090 fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);